Redis分布式锁

分布式锁是在分布式环境下(多个JVM进程)控制多个客户端对某一资源的同步访问的一种实现,与之相对应的是线程锁,线程锁控制的是同一个JVM进程内多个线程之间的同步。分布式锁的一般实现方法是在应用服务器之外通过一个共享的存储服务器存储锁资源,同一时刻只有一个客户端能占有锁资源来完成。

通常有基于Zookeeper,Redis,或数据库三种实现形式。

 

基于Redis实现分布式锁:

在分布式集群中,被分布式锁控制的方法或代码段同一时刻只能被一个客户端上面的一个线程执行,也就是互斥

锁信息需要设置过期时间,避免一个线程长期占有(比如在做解锁操作前异常退出)而导致死锁

加锁与解锁必须一致,谁加的锁,就由谁来解(或过期超时),一个客户端不能解开另一个客户端加的锁

加锁与解锁的过程必须保证原子性

 

redis依赖

<dependency>

   <groupId>org.springframework.boot</groupId>

   <artifactId>spring-boot-starter-data-redis</artifactId>

</dependency>

 

 

/**

   * 尝试获取锁(立即返回)

   * @param key  锁的redis key

   * @param value 锁的value

   * @param expire 过期时间/秒

   * @return 是否获取成功

   */

public boolean lock(String key, String value, long expire) {

   return stringRedisTemplate.opsForValue().setIfAbsent(key, value, expire, TimeUnit.SECONDS);

}

/**

   * 尝试获取锁,并至多等待timeout时长

   *

   * @param key  锁的redis key

   * @param value 锁的value

   * @param expire 过期时间/秒

   * @param timeout 超时时长

   * @param unit    时间单位

   * @return 是否获取成功

   */

public boolean lock(String key, String value, long expire, long timeout, TimeUnit unit) {

   long waitMillis = unit.toMillis(timeout);

   long waitAlready = 0;

   while (!stringRedisTemplate.opsForValue().setIfAbsent(key, value, expire, TimeUnit.SECONDS) && waitAlready < waitMillis) {

       try {

           Thread.sleep(waitMillisPer);

       } catch (InterruptedException e) {

           log.error("Interrupted when trying to get a lock. key: {}", key, e);

       }

       waitAlready += waitMillisPer;

   }

   if (waitAlready < waitMillis) {

       return true;

   }

   log.warn("<====== lock {} failed after waiting for {} ms", key, waitAlready);

   return false;

}

 

 

上述实现如何满足前面提到的几点要求:

客户端互斥: 可以将expire过期时间设置为大于同步代码的执行时间,比如同步代码块执行时间为1s,则可将expire设置为3s或5s。

避免同步代码执行过程中expire时间到,其它客户端又可以获取锁执行同步代码块。

通过设置过期时间expire来避免某个客户端长期占有锁。

通过value来控制谁加的锁,由谁解的逻辑,比如可以使用requestId作为value,requestId唯一标记一次请求。

setIfAbsent方法 底层通过调用 Redis 的 SETNX 命令,操作具备原子性。

错误示例:

 

public boolean lock(String key, String value, long expire) {

boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, value);

if(result) {

   stringRedisTemplate.expire(key, expire, TimeUnit.SECONDS);

}

    return result;

}

 

如果在result为true,但还没成功设置expire时,程序异常退出了,将导致该锁一直被占用而导致死锁,不满足第二点要求。

解锁实现
解锁也需要满足前面所述的四个要求,实现代码如下:

 

 
private static final String RELEASE_LOCK_LUA_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";

private static final Long RELEASE_LOCK_SUCCESS_RESULT = 1L;

/**

* 释放锁

* @param  key  锁的redis key

* @param  value 锁的value

*/

public boolean unLock(String key, String value) {

DefaultRedisScript redisScript = new DefaultRedisScript<>(RELEASE_LOCK_LUA_SCRIPT, Long.class);

long result = stringRedisTemplate.execute(redisScript, Collections.singletonList(key), value);

return Objects.equals(result, RELEASE_LOCK_SUCCESS_RESULT);

}

这段实现使用一个Lua脚本来实现解锁操作,保证操作的原子性。传入的value值需与该线程加锁时的value一致,可以使用requestId(具体实现下面给出)。

错误示例:

public boolean unLock(String key, String value) {

String oldValue = stringRedisTemplate.opsForValue().get(key);

if(value.equals(oldValue)) {

stringRedisTemplate.delete(key);

}

}

 

先获取锁的当前值,判断两值相等则删除

考虑一种极端情况,如果在判断为true时,刚好该锁过期时间到,另一个客户端加锁成功,则接下来的delete将不管三七二十一将别人加的锁直接删掉了,不满足第三点要求。

该示例主要是因为没有保证解锁操作的原子性导致。

注解支持
为了方便使用,添加一个注解,可以放于方法上控制方法在分布式环境中的同步执行。

@Retention(RetentionPolicy.RUNTIME)

@Target(ElementType.METHOD)  

public @interface  DistributedLockable {  

String key();

String prefix() default "disLock:";

long expire() default 10L; // 默认10s过期

}

 

添加一个切面来解析注解的处理,

/**

• 分布式锁注解处理切面

*/

@Aspect  

@Slf4j  

public class DistributedLockAspect {

private DistributedLock lock;

public DistributedLockAspect(DistributedLock lock) {

this.lock = lock;

}

/**

• 在方法上执行同步锁

*/

@Around(value  = "@annotation(lockable)")  

public Object distLock(ProceedingJoinPoint point, DistributedLockable lockable) throws Throwable {

boolean locked = false;

String key = lockable.prefix() + lockable.key();

try {

locked = lock.lock(key, WebUtil.getRequestId(), lockable.expire());

if(locked) {

return point.proceed();

 } else {

log.info("Did not get a lock for key {}", key);

return null;

  }

} catch (Exception e) {

throw e;

} finally {

if(locked) {

if(!lock.unLock(key, WebUtil.getRequestId())){

log.warn("Unlock {} failed, maybe locked by another client already. ", lockable.key());

 }

}

}

}

}

RequestId 的实现如下,通过注册一个Filter,在请求开始时生成一个uuid存于ThreadLocal中,在请求返回时清除。

public class WebUtil {

   

public static final String REQ_ID_HEADER = "Req-Id";

private static final ThreadLocal<String> reqIdThreadLocal = new ThreadLocal<>();

public static void setRequestId(String requestId) {

   reqIdThreadLocal.set(requestId);

}

public static String getRequestId(){

   String requestId = reqIdThreadLocal.get();

   if(requestId == null) {

       requestId = ObjectId.next();

       reqIdThreadLocal.set(requestId);

   }

   return requestId;

}

public static void removeRequestId() {

   reqIdThreadLocal.remove();

}

}

public class RequestIdFilter implements Filter {

@Override  

 public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {

HttpServletRequest httpServletRequest = (HttpServletRequest) servletRequest;

String reqId = httpServletRequest.getHeader(WebUtil.REQ_ID_HEADER);

//没有则生成一个

if (StringUtils.isEmpty(reqId)) {

reqId = ObjectId.next();

}

WebUtil.setRequestId(reqId);

try {

filterChain.doFilter(servletRequest, servletResponse);

} finally {

WebUtil.removeRequestId();

}

}

}

//在配置类中注册Filter

/**

• 添加RequestId

• @return  

*/

@Bean  

public FilterRegistrationBean requestIdFilter() {

RequestIdFilter reqestIdFilter = new RequestIdFilter();

FilterRegistrationBean registrationBean = new FilterRegistrationBean();

registrationBean.setFilter(reqestIdFilter);

List urlPatterns = Collections.singletonList("/*");

registrationBean.setUrlPatterns(urlPatterns);

registrationBean.setOrder(Ordered.HIGHEST_PRECEDENCE + 1);

return registrationBean;

}

 

 

使用注解

 

@DistributedLockable(key  = "test", expire = 10)  

public void test(){

System.out.println("线程-"+Thread.currentThread().getName()+"开始执行..." + LocalDateTime.now());

try {

Thread.sleep(2000);

} catch (InterruptedException e) {

e.printStackTrace();

}

System.out.println("线程-"+Thread.currentThread().getName()+"结束执行..." + LocalDateTime.now());

}

 

 

上一篇:expire_logs_day自动过期清理binlog


下一篇:【Python机器学习实战】聚类算法——层次聚类(HAC)和DBSCAN