目录
Redisson源码分析
上篇文章链接: https://blog.csdn.net/QiuHaoqian/article/details/114301895.
中说过,分布式锁的核心功能其实就三个:加锁、解锁、设置锁超时。这三个功能也是研究Redisson分布式锁原理的方向。
Redis 发布订阅
在正式开始之前,有必要先了解一个知识点,就是有关Redis的发布订阅功能。
Redis 发布订阅 (pub/sub) 是一种消息通信模式:发送者 (pub) 发送消息,订阅者 (sub) 接收消息,发布者可以向指定的渠道 (channel) 发送消息,订阅者如果订阅了该频道的话就能收到消息,从而实现多个客户端的通信效果。
在使用Redisson加锁之前,需要先获取一个RLock实例对象,有了这个对象就可以调用lock、tryLock方法来完成加锁的功能;
Config config = new Config();
config.useSingleServer()
.setPassword("")
.setAddress("redis://127.0.0.1:6379");
RedissonClient redisson = Redisson.create(config);
// RLock对象
RLock lock = redisson.getLock("myLock");
RLock是一个接口,具体的同步器需要实现该接口,当调用redisson.getLock()时,程序会初始化一个默认的同步执行器 RedissonLock 。
这里面初始化了几个参数:
commandExecutor ------------ 异步的Executor执行器,Redisson中所有的命令都是通过…Executor 执行的 ;
id ------------------------------------- 唯一ID,初始化的时候是用UUID创建的;
internalLockLeaseTime ------- 等待获取锁时间,这里读的是配置类中默认定义的,时间为30秒;
同时,图片里还标注了一个方法getEntryName,返回的是 “ID :锁名称” 的字符串,代表的是当前线程持有对应锁的一个标识,这些参数有必要留个印象,后面的源码解析中经常会出现。 说完了初始化的东西,我们就可以开始学习加锁和解锁的源码了。
加锁
Redisson的加锁方法有两个,tryLock 和 lock,使用上的区别在于 tryLock 可以设置锁的过期时长 leaseTime 和等待时长 waitTime,核心处理的逻辑都差不多,我们先从tryLock讲起。
tryLock()
/**
* @param waitTime 等待锁的时长
* @param leaseTime 锁的持有时间
* @param unit 时间单位
* @return
* @throws InterruptedException
*/
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { // 剩余的等待锁的时间
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
final long threadId = Thread.currentThread().getId();
// 尝试获取锁,如果没取到锁,则返回锁的剩余超时时间
Long ttl = tryAcquire(leaseTime, unit, threadId);
// ttl为null,说明可以抢到锁了,返回true
if (ttl == null) {
return true;
}
// 如果waitTime已经超时了,就返回false,代表申请锁失败
time -= (System.currentTimeMillis() - current);
if (time <= 0) {
acquireFailed(threadId);
return false;
}
current = System.currentTimeMillis();
// 订阅分布式锁, 解锁时进行通知,看,这里就用到了我们上面说的发布-订阅了吧
final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
// 阻塞等待锁释放,await()返回false,说明等待超时了
if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
@Override
public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
if (subscribeFuture.isSuccess()) {
// 等待都超时了,直接取消订阅
unsubscribe(subscribeFuture, threadId);
}
}
});
}
acquireFailed(threadId);
return false;
}
try {
time -= (System.currentTimeMillis() - current);
if (time <= 0) {
acquireFailed(threadId);
return false;
}
// 进入死循环,反复去调用tryAcquire尝试获取锁,跟上面那一段拿锁的逻辑一样
while (true) {
long currentTime = System.currentTimeMillis();
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
time -= (System.currentTimeMillis() - currentTime);
if (time <= 0) {
acquireFailed(threadId);
return false;
}
// waiting for message
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= (System.currentTimeMillis() - currentTime);
if (time <= 0) {
acquireFailed(threadId);
return false;
}
}
} finally {
unsubscribe(subscribeFuture, threadId);
}
// return get(tryLockAsync(waitTime, leaseTime, unit));
}
代码还是挺长的,不过流程也就两步,要么线程拿到锁返回成功;要么没拿到锁并且等待时间还没过就继续循环拿锁,同时监听锁是否被释放。
拿锁的方法是 tryAcquire() ,传入的参数分别是 锁的持有时间,时间单位、代表当前线程的ID,跟进代码查看调用栈,它会调到一个叫做tryAcquireAsync的方法:
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(leaseTime, unit, threadId));
}
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
// 如果有设置锁的等待时长的话,就直接调用tryLockInnerAsync方法获取锁
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
// 没有设置等待锁的时长的话,加多一个监听器,也就是调用lock.lock()会跑的逻辑,后面会说
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
return;
}
Long ttlRemaining = future.getNow();
// lock acquired
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
继续跟,看看 tryLockInnerAsync() 方法的源码:
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
String getLockName(long threadId) {
return id + ":" + threadId;
}
这里就是底层的调用栈了,直接操作命令,整合成lua脚本后,调用netty的工具类跟redis进行通信,从而实现获取锁的功能。
这段脚本命令还是有点意思的,简单解读一下:
- 先用 exists key 命令判断是否锁是否被占据了,没有的话就用 hset 命令写入,key为锁的名称,field为“客户端唯一ID:线程ID”,value为1;
- 锁被占据了,判断是否是当前线程占据的,是的话value值加1;
- 锁不是被当前线程占据,返回锁剩下的过期时长;
命令的逻辑并不复杂,但不得不说,作者的设计还是很有心的,用了redis的Hash结构存储数据,如果发现当前线程已经持有锁了,就用hincrby命令将value值加1,value的值将决定释放锁的时候调用解锁命令的次数,达到实现锁的可重入性效果。
命令对应的逻辑图中所示:
继续跟代码吧,根据上面的命令可以看出,如果线程拿到锁的话,tryLock方法会直接返回true,万事大吉。
拿不到的话,就会返回锁的剩余过期时长,这个时长有什么作用呢?我们回到tryLock方法中死循环的那个地方:
这里有一个针对 waitTime 和 key 的剩余过期时间大小的比较,取到二者中比较小的那个值,然后用Java的Semaphore信号量的tryAcquire方法来阻塞线程。
那么Semaphore信号量又是由谁控制呢,何时才能release呢。这里又需要回到上面来看,各位应该还记得,我们上面贴的tryLock代码中还有这一段:
订阅的逻辑显然是在subscribe方法里,跟着方法的调用链,它会进入到PublishSubscribe.Java中:
这段代码的作用在于将当前线程的 threadId 添加到一个 AsyncSemaphore 中,并且设置一个 redis 的监听器,这个监听器是通过 redis 的发布、订阅功能实现的。
一旦监听器收到 redis 发来的消息,就从中获取与当前 thread 相关的,如果是锁被释放的消息,就立马通过操作Semaphore(也就是调用release方法)来让刚才阻塞的地方释放。
释放后线程继续执行,仍旧是判断是否已经超时。如果还没超时,就进入下一次循环再次去获取锁,拿到就返回true,没有拿到的话就继续流程。
这里说明一下,之所以要循环,是因为锁可能会被多个客户端同时争抢,线程阻塞被释放之后的那一瞬间很可能还是拿不到锁,但是线程的等待时间又还没过,这个时候就需要重新跑循环去拿锁。
tryLock获取锁的整个过程如图:
lock()
除了tryLock,一般我们还经常直接调用lock来获取锁,lock的拿锁过程跟tryLock基本是一致的,区别在于lock没有手动设置锁过期时长的参数,该方法的调用链也是跑到tryAcquire方法来获取锁的,不同的是,它会跑到这部分的逻辑:
这段代码做了两件事:
1、预设30秒的过期时长,然后去获取锁
2、开启一个监听器,如果发现拿到锁了,就开启定时任务不断去刷新该锁的过期时长
刷新过期时长的方法是scheduleExpirationRenewal,源码如下:
private void scheduleExpirationRenewal(final long threadId) {
// expirationRenewalMap是一个ConcurrentMap,存储标志为"当前线程ID:key名称"的任务
if (expirationRenewalMap.containsKey(getEntryName())) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
// 检测锁是否存在的lua脚本,存在的话就用pexpire命令刷新过期时长
RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
expirationRenewalMap.remove(getEntryName());
if (!future.isSuccess()) {
log.error("Can't update lock " + getName() + " expiration", future.cause());
return;
}
if (future.getNow()) {
// reschedule itself
scheduleExpirationRenewal(threadId);
}
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
task.cancel();
}
}
代码的流程比较简单,大概就是开启一个定时任务,每隔internalLockLeaseTime / 3的时间(这个时间是10秒)就去检测锁是否还被当前线程持有,是的话就重新设置过期时长internalLockLeaseTime,也就是30秒的时间。
而这些定时任务会存储在一个ConcurrentHashMap对象expirationRenewalMap中,存储的key就为“线程ID:key名称”,如果发现expirationRenewalMap中不存在对应当前线程key的话,定时任务就不会跑,这也是后面解锁中的一步重要操作。
上面这段代码就是Redisson中所谓的”看门狗“程序,用一个异步线程来定时检测并执行的,以防手动解锁之前就过期了。
其他的逻辑就跟tryLock()基本没什么两样啦。
解锁
unlock()
Redisson分布式锁解锁的上层调用方法是unlock(),默认不用传任何参数
@Override
public void unlock() {
// 发起释放锁的命令请求
Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId()));
if (opStatus == null) {
throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + Thread.currentThread().getId());
}
if (opStatus) {
// 成功释放锁,取消"看门狗"的续时线程
cancelExpirationRenewal();
}
}
解锁相关的命令操作在unlockInnerAsync方法中定义:
又是一大串的lua脚本,比起前面加锁那段脚本的命令稍微复杂了点,不过没关系,我们简单梳理一下,命令的逻辑大概是这么几步:
- 判断锁是否存在,不存在的话用 publish 命令发布释放锁的消息,订阅者收到后就能做下一步的拿锁处理;
- 锁存在但不是当前线程持有,返回空置nil;
- 当前线程持有锁,用 hincrby 命令将锁的可重入次数 -1,然后判断重入次数是否大于 0,是的话就重新刷新锁的过期时长,返回0,否则就删除锁,并发布释放锁的消息,返回 1;
当线程完全释放锁后,就会调用cancelExpirationRenewal()方法取消"看门狗"的续时线程
void cancelExpirationRenewal() {
// expirationRenewalMap移除对应的key,就不会执行当前线程对应的"看门狗"程序了
Timeout task = expirationRenewalMap.remove(getEntryName());
if (task != null) {
task.cancel();
}
}
分布式锁的流程图如下:
RedLock算法实现的锁
以上就是Redisson分布式锁的原理讲解,总的来说,就是简单的用lua脚本整合基本的set命令实现锁的功能,这也是很多Redis分布式锁工具的设计原理。除此之外,Redisson还支持用 “RedLock算法” 来实现锁的效果,这个工具类就是RedissonRedLock。
用法也很简单,创建多个Redisson Node, 由这些无关联的Node就可以组成一个完整的分布式锁
RLock lock1 = Redisson.create(config1).getLock(lockKey);
RLock lock2 = Redisson.create(config2).getLock(lockKey);
RLock lock3 = Redisson.create(config3).getLock(lockKey);
RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
try {
redLock.lock();
} finally {
redLock.unlock();
}
RedLock算法原理方面我就不细说了,可以看之前的文章,简单的说就是能一定程度上能有效防止 Redis 实例单点故障的问题,但并不完全可靠,不管是哪种设计,光靠Redis本身都是无法保证锁的强一致性的。还是那句话,鱼和熊掌不可兼得,性能和安全方面也往往如此,Redis强大的性能和使用的方便足以满足日常的分布式锁需求,如果业务场景对锁的安全隐患无法忍受的话,最保底的方式就是在业务层做幂等处理。