分布式锁的特点
- 互斥性
- 锁超时
- 可重入性
- 支持阻塞和非阻塞
- 性能好
Redisson实现分布式锁示例
public class DistributedLockerImpl {
private RedissonClient redissonClient;
//加锁
public boolean tryLock(String lockKey, TimeUnit unit, int waitTime, int leaseTime) {
//获取锁对象
RLock lock = redissonClient.getLock(lockKey);
try {
//最常用的使用方法
//waitTime:获取锁的最大等待时间
//leaseTime:获取锁成功的持有时间,过期自动解锁
//unit:时间单位
return lock.tryLock(waitTime, leaseTime, unit);
} catch (InterruptedException e) {
log.error("tryLock error", e);
Thread.currentThread().interrupt();
return false;
}
}
//解锁
public void unlock(String lockKey) {
RLock lock = redissonClient.getLock(lockKey);
if(lock != null) {
lock.unlock();
}
}
}
加锁源码分析
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
//加锁核心代码
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
// 申请锁的耗时如果大于等于最大等待时间,则申请锁失败.
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
/**
订阅锁释放事件,并通过 await 方法阻塞等待锁释放,有效的解决了无效的锁申请浪费资源的问题:
基于信息量,当锁被其它资源占用时,当前线程通过 Redis 的 channel 订阅锁的释放事件
一旦锁释放会发消息通知待等待的线程进行竞争.
*/
current = System.currentTimeMillis();
RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
unsubscribe(subscribeFuture, threadId);
}
});
}
acquireFailed(waitTime, unit, threadId);
return false;
}
try {
//如果获取锁的耗时超过最大等待时间,加锁失败
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
//在最大等待时间内循环获取锁
while (true) {
long currentTime = System.currentTimeMillis();
//获取锁
ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
//判断时间
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
// waiting for message,等待解锁消息
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
//更新等待时间
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
}
} finally {
//取消订阅消息
unsubscribe(subscribeFuture, threadId);
}
// return get(tryLockAsync(waitTime, leaseTime, unit));
}
如果tryAcquire()获取锁成功,返回null,ttl为已存在锁的过期时间。
如果tryAcquire()获取锁失败,判断最大等待时间是否大于加锁耗时时间,如果小于,直接返回false加锁失败。否则,客户端的线程id通过redis的channel订阅锁释放的事件,超过最大等待时间,则加锁失败,如果等到了锁的释放事件通知,则进入一个不断获取锁的循环,尝试加锁。
加锁核心代码:
tryAcquireAsync():
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
//如果leaseTime!=-1,加锁成功直接返回,否则启动一个Watch Dog线程,定期延长持有锁时间
if (leaseTime != -1) {
//加锁
return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// 续锁逻辑
// lock acquired
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
tryLockInnerAsync():
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call(‘exists‘, KEYS[1]) == 0) then " +
"redis.call(‘hincrby‘, KEYS[1], ARGV[2], 1); " +
"redis.call(‘pexpire‘, KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call(‘hexists‘, KEYS[1], ARGV[2]) == 1) then " +
"redis.call(‘hincrby‘, KEYS[1], ARGV[2], 1); " +
"redis.call(‘pexpire‘, KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call(‘pttl‘, KEYS[1]);",
Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
debug结果:
lua 脚本含义:
第一段 if 判断语句,就是用 exists myLock
命令判断一下,如果要加锁的key 不存在,就使用 hincrby
命令设置一个 hash 结构,接着会执行 pexpire myLock 1800000
命令,设置 myLock 这个锁 key 的生存时间是 1800 秒。到此为止,加锁完成。
如果此时有第二个客户端请求加锁,就会进入第二个if判断语句,判断myLock 锁 key 的 hash 数据结构中,是否包含客户端 2 的 ID,如果不是,客户端 2 会执行:return redis.call(‘pttl‘, KEYS[1]);
返回了myLock持有锁的时间。
可重入锁:
如果myLock 锁 key 的 hash 数据结构中,包含客户端的 ID,就会执行hincrby myLock 5f431de4-a523-4bd7-965b-32b83fee3897:366 1
,加锁次数加1,此时存储结果如下:
redis的key为锁名称(所示myLock),hash结构key为客户端ID,value为客户端加锁的次数
解锁源码分析
public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise<Void>();
//释放锁
RFuture<Boolean> future = unlockInnerAsync(threadId);
//取消watch dog机制
future.onComplete((opStatus, e) -> {
cancelExpirationRenewal(threadId);
if (e != null) {
result.tryFailure(e);
return;
}
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
result.tryFailure(cause);
return;
}
result.trySuccess(null);
});
return result;
}
unlockInnerAsync()
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call(‘hexists‘, KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call(‘hincrby‘, KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call(‘pexpire‘, KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call(‘del‘, KEYS[1]); " +
"redis.call(‘publish‘, KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
lua脚本含义:
判断锁搜否存在,如果存在,将客户端ID对应的value值递减,直到为0后删除缓存,然后向通道名为 redisson_lock__channel publish 一条 UNLOCK_MESSAGE 信息
Redisson方案缺点
- 如果你对某个 Redis Master 实例完成了加锁,此时 Master 会异步复制给其对应的 slave 实例。在这期间,假如 Master 宕机,主备切换,slave 变为了 Master。此时客户端 2 来尝试加锁的时候,在新的 Master 上完成了加锁,而客户端 1 也以为自己成功加了锁,此时就会导致多个客户端对一个分布式锁完成了加锁,失去控制。这是Redis Master-Slave 架构的主从异步复制导致的 Redis 分布式锁的最大缺陷(在 Redis Master 实例宕机的时候,可能导致多个客户端同时完成加锁)。