一、回顾一下分布式锁的基本使用方式
①、注入redissonClient客户端
②、通过redissonClient客户端获取锁对象rLock
③、通过rLock尝试获取锁
// ①、注入redissonClient客户端
@Autowired
private RedissonClient redissonClient;
public boolean getLock() {
// ②、通过redissonClient客户端获取锁对象rLock(RedissonLock实现默认是可重入锁)
rLock = redissonClient.getLock(lockInfo.getName());
try {
// ③、通过rLock尝试获取锁
return rLock.tryLock(lockInfo.getWaitTime(), lockInfo.getLeaseTime(), TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.info("获取可重入锁时线程被意外中断,锁名称:{},异常信息:{}", lockInfo.getName(), e);
}
return false;
}
二、redisson实现分布式锁结构介绍
1、redisson锁实现的基本特性
- redisson对Redis的各种命令操作和特性都做了很好的封装和扩展,是一个非常灵活易用的Java中间件。
- 在redisson实现分布式锁这块最核心的类是
RedissonLock
。 -
RedissonLock
类提供了可重入锁的实现。 - redisson实现了公平锁(
RedissonFairLock
)、读锁(RedissonReadLock
)、写锁(RedissonWriteLock
)、可重入锁(RedissonLock
)、联锁(RedissonMultiLock
)、红锁(RedissonRedLock
)
2、锁实现的继承关系和子类实现
上面说到redisson实现分布式锁最核心的类是RedissonLock(很多锁都是基于该类实现的)
(红锁和联锁后面介绍),那么下面看看RedissonLock
的继承关系和子类以及类的结构,以便于更好的理解源码。
①、RedissonLock继承关系
②、RedissonLock的实现类
redissonLock类作为一个通用的模板类(默认提供可重入锁实现),其他很多类型的锁都基于该类去实现不同的加锁方式。
可以看到RedissonLock
类有四个子类实现,这些子类分别覆写了加锁释放锁的核心方法。
三、加锁方法分析
1、说明
- 上面理清了
RedissonLock
锁的继承以及实现关系,下面我们就找一个类型的锁实现来分析加锁解锁源码!这里以RedissonLock
默认实现的可重入锁为例进行源码分析。 -
RedissonLock
实现了多种加锁的接口,比如:带等待时间和释放时间的加锁接口(tryLock(long waitTime, long leaseTime, TimeUnit unit)
)、只带等待时间的加锁接口(tryLock(long waitTime, TimeUnit unit)
)、既不带等待时间也不带锁释放时间的加锁接口(tryLock()
)、只带释放时间没有等待时间的接口(lock(long leaseTime, TimeUnit unit)
)等,这里我们挑选一个参数最多的tryLock(long waitTime, long leaseTime, TimeUnit unit)
方法来分析。 - 源码分析的方式,按照先画整体流程图,对整个加锁解锁流程有大致理解后再分析源码。
2、加锁整体流程分析
3、源码分析
下面所有的源码都来自于RedssionLock
类!!!
①、tryLock方法
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
// 1、将等待时间转换为毫秒、、获取当前时间、获取当前线程ID
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
final long threadId = Thread.currentThread().getId();
// 2、尝试申请锁,返回还剩余的锁过期时间【加锁核心方法,下面(②、加锁核心方法tryAcquire分析)】
Long ttl = tryAcquire(leaseTime, unit, threadId);
// 3、ttl==null 表示获取锁成功则直接返回true
// lock acquired
if (ttl == null) {
return true;
}
// 4、获取还需要等待的时间,且根据还需等待的时间(time)判断是否获取锁失败
time -= (System.currentTimeMillis() - current);
// 如果还需要等待的时间为0,则说明获取锁已经失败了
// 申请锁的耗时如果大于等于最大等待时间,则申请锁失败
if (time <= 0) {
acquireFailed(threadId);
return false;
}
// 重新获取当前时间
current = System.currentTimeMillis();
// 5、上面第一次尝试获取锁失败,且还没有超出最大等待时间的基础上,基于Redis的发布订阅机制,订阅锁释放事件
final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
/*
* 6、基于Redis的发布订阅机制,订阅锁释放事件,并通过await方法阻塞等待锁释放,有效的解决了无效的锁申请浪费资源的问题:
* 基于信号量,当锁被其它资源占用时,当前线程通过 Redis 的 channel 订阅锁的释放事件,一旦锁释放会发消息通知待等待的线程进行竞争
* 当 this.await返回false,说明等待时间已经超出获取锁最大等待时间,取消订阅并返回获取锁失败
* 当 this.await返回true,进入下面的循环再次尝试获取锁
*
* await是通过CountDownLatch + 监听器机制来实现的,具体看方法内部注释,见下面【③、await加锁最大等待时间方法分析】
*/
if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
// 在等待时间耗完的情况下,取消对该锁的订阅
if (!subscribeFuture.cancel(false)) {
subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
@Override
public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
if (subscribeFuture.isSuccess()) {
unsubscribe(subscribeFuture, threadId);
}
}
});
}
// 获取锁失败
acquireFailed(threadId);
return false;
}
// 7、如果在等待时间内订阅的锁已经被释放了,则会执行这里
try {
// 获取还需要等待的时间
time -= (System.currentTimeMillis() - current);
// 如果还需要等待的时间小于0,则说明已经超过最大等待时间,获取锁失败
if (time <= 0) {
acquireFailed(threadId);
return false;
}
// 8、能运行到这里则说明:
// 1、当前还在最大等待时间内
// 2、并且等待的锁已经被释放(即对该锁的订阅事件已经被吊起过),在这里可以再次尝试获取锁
// 这是一个死循环,循环退出条件有两个:
// ①、在最大等待时间内成功获取锁,返回true
// ②、超出了最大等待时间,但仍然没有成功获取到锁,返回false
while (true) {
// 获取当前时间
long currentTime = System.currentTimeMillis();
// 8.1、再次尝试申请锁,返回还剩余的锁过期时间
ttl = tryAcquire(leaseTime, unit, threadId);
// 8.2、ttl==null 表示获取锁成功则直接返回true
// lock acquired
if (ttl == null) {
return true;
}
// 再次计算还需要等待多时时间
time -= (System.currentTimeMillis() - currentTime);
// 8.3、如果还需要等待的时间小于0,则说明已经超过最大等待时间,获取锁失败
if (time <= 0) {
acquireFailed(threadId);
return false;
}
// waiting for message
// 更新一下当前时间,因为上面的操作可能会耗时,进而导致下面根据currentTime计算的time不准确
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
// 8.4、如果剩余时间(ttl)小于waittime ,就在 ttl 时间内,从Entry的信号量(Semaphore)获取一个许可(除非被中断或者一直没有可用的许可)。
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
// 8.5、如果该锁剩余过期时间(ttl)大于waittime,则就在waittime 时间范围内等待可以通过信号量(Semaphore)
getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
// 更新剩余的等待时间(最大等待时间-已经消耗的阻塞时间)
time -= (System.currentTimeMillis() - currentTime);
// 8.6、如果还需要等待的时间小于0,则说明已经超过最大等待时间,获取锁失败
if (time <= 0) {
acquireFailed(threadId);
return false;
}
}
} finally {
// 9、无论是否获得锁,都要取消订阅解锁消息
unsubscribe(subscribeFuture, threadId);
}
}
②、加锁核心方法tryAcquire分析
/**
* 尝试获取锁,如果没有获取到锁则返回该锁还剩余多少毫秒过期,如果获取到了锁,则返回空
*/
// 【1、尝试获取锁】
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
// tryAcquireAsync方法返回一个RFuture<Long>类型,get方法主要就是取得RFuture中的数值
// 该数值就是该锁还剩余的过期时间(如果为空,则表示已经获取到锁了,反之则表示该锁还剩多久过期)
// 见下面【2、异步尝试获取锁】
return get(tryAcquireAsync(leaseTime, unit, threadId));
}
// 【2、异步尝试获取锁】
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
// 1、如果有设置锁过期时间
if (leaseTime != -1) {
// 调用tryLockInnerAsync,【通过lua脚本去加锁】,见下面【3、通过调用lua脚本去真正开始加锁】
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
// 2、如果获取锁时没有传递锁过期时间,则这里会给个默认过期时间30s(通过执行lua脚本去获取锁)
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
// 3、给获取锁的操作添加一个监听,当获取锁的操作返回时(不管成功还是失败),立即调用监听方法
ttlRemainingFuture.addListener(new FutureListener<Long>() {
// 当获取锁的操作执行结束时,该方法被吊起
@Override
public void operationComplete(Future<Long> future) throws Exception {
// 3.1、如果获取锁失败,则直接返回
if (!future.isSuccess()) {
return;
}
Long ttlRemaining = future.getNow();
// lock acquired
// ttlRemaining == null 则说明获取锁成功
if (ttlRemaining == null) {
// 3.2、如果获取锁成功了,则开启定时任务去定时延长锁过期时间(看门狗)
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
// 【3、通过调用lua脚本去真正开始加锁】
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
// 将锁过期时间转换为毫秒
internalLockLeaseTime = unit.toMillis(leaseTime);
// 通过lua脚本去获取锁(可重入锁)
// pttl命令和ttl命令类似,只是他是以毫秒为单位返回剩余过期时间,ttl是以秒为单位
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName())
, internalLockLeaseTime, getLockName(threadId));
}
③、await加锁最大等待时间方法分析
protected boolean await(RFuture<?> future, long timeout, TimeUnit timeoutUnit) throws InterruptedException {
// 具体实现见下面
return commandExecutor.await(future, timeout, timeoutUnit);
}
public boolean await(RFuture<?> future, long timeout, TimeUnit timeoutUnit) throws InterruptedException {
// 创建一个门栓
final CountDownLatch l = new CountDownLatch(1);
// 当订阅的锁被释放后会吊起这个监听方法,在监听方法内部将门栓数量减一
future.addListener(new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
// 监听方法被吊起,门栓数量减一
l.countDown();
}
});
// 在这里等待门栓数量为0,或超时时间到了再继续运行
// 在等待时间内如果订阅的锁已经释放,监听方法会被吊起门栓数量为0,则这里返回true
// 如果等待时间已经耗完了,订阅的锁还没被释放的话,则这里返回false
return l.await(timeout, timeoutUnit);
}
四、释放锁方法分析
1、释放锁流程分析
释放锁的代码逻辑比较简单,这里只描述一下释放锁的lua脚本的大体流程即可。
2、释放锁源码分析
①、unlock
public void unlock() {
try {
// 释放锁的核心方法unlockAsync
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException)e.getCause();
} else {
throw e;
}
}
}
②、unlockAsync
RFuture可以理解为对Future的一个增强,netty中的实现。JDK中的future只能阻塞获取子线程的返回,在netty中对future进行了增强,可以添加监听并且异步获取。
public RFuture<Void> unlockAsync(final long threadId) {
final RPromise<Void> result = new RedissonPromise<Void>();
// 1、具体的释放锁的lua脚本(释放锁的动作在这里完成)
RFuture<Boolean> future = unlockInnerAsync(threadId);
// 2、添加一个监听器,一旦释放锁的操作完成(无论失败或成功),都会吊起监听器的operationComplete方法
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
// 3、如果释放锁失败了
if (!future.isSuccess()) {
cancelExpirationRenewal(threadId);
result.tryFailure(future.cause());
return;
}
Boolean opStatus = future.getNow();
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
result.tryFailure(cause);
return;
}
if (opStatus) {
cancelExpirationRenewal(null);
}
// 4、释放锁成功
result.trySuccess(null);
}
});
return result;
}
③、unlockInnerAsync
// 释放锁的lua脚本,这个lua脚本很简单不过多解锁
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
}
五、遗留问题
1、如果自己设置了锁过期时间那么redisson就不会开启看门狗去延长过期时间了吗?
关于这个问题,从源代码中可以看到如果自己设置了锁过期时间那么会直接调用
tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG)
方法去获取锁,获取到了以后就直接返回了,并没有看到在哪里会去开启一个看门狗。如果是这样的话,那么是不是在我们调用
tryLock
方法的时候,如果自己指定了锁的过期时间,是不是就意味着没有定时线程(看门狗)去定期的延长锁的过期时间了???那这样是不是就不能保证分布式锁的安全性了????
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
// 1、如果有设置锁过期时间
if (leaseTime != -1) {
// 调用tryLockInnerAsync,通过lua脚本去加锁【这里加锁成功后就直接返回了,并没有添加看门狗延长过期时间????】
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
// 2、如果获取锁时没有传递锁过期时间,则这里会给个默认过期时间30s(通过执行lua脚本去获取锁)
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
// 3、给获取锁的操作添加一个监听,当获取锁的操作返回时(不管成功还是失败),立即调用监听方法
ttlRemainingFuture.addListener(new FutureListener<Long>() {
// 当获取锁的操作执行结束时,该方法被吊起
@Override
public void operationComplete(Future<Long> future) throws Exception {
// 3.1、如果获取锁失败,则直接返回
if (!future.isSuccess()) {
return;
}
Long ttlRemaining = future.getNow();
// ttlRemaining == null 则说明获取锁成功
if (ttlRemaining == null) {
// 3.2、如果获取锁成功了,则开启定时任务去定时延长锁过期时间(看门狗)
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
六、参考文章
1、慢谈 Redis 实现分布式锁 以及 Redisson 源码解析
2、RedissonMultiLock + RedissonLock部分源码