RedissonLock
代码示例:
Config config = new Config();
config.useClusterServers().addNodeAddress("redis://192.168.31.114:7001").addNodeAddress("redis://192.168.31.184:7002");
RedissonClient redisson = Redisson.create(config);
RLock lock = redisson.getLock("anyLock");
lock.lock();
lock.unlock();
Redisson#getLock()
public RLock getLock(String name) {
//CommandExecutor:命令执行器,封装了一个redis连接的命令执行器,可以执行一些set、get redis的一些操作,用来执行底层的redis命令
return new RedissonLock(connectionManager.getCommandExecutor(), name);
}
RedissonLock构造方法
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.id = commandExecutor.getConnectionManager().getId();
//初始化internalLockLeaseTime = 30秒
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
this.entryName = id + ":" + name;
this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
}
RedissonLock#lock()
public void lock() {
try {
lockInterruptibly();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
RedissonLock#lockInterruptibly()
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
//得到当前线程ID
long threadId = Thread.currentThread().getId();
//尝试获取锁,得到锁的存活时间
Long ttl = tryAcquire(leaseTime, unit, threadId);
//说明加锁成功
if (ttl == null) {
return;
}
//加锁失败,走阻塞逻辑,发布一个订阅事件
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {
while (true) {
//一直在尝试获取锁
ttl = tryAcquire(leaseTime, unit, threadId);
//当获取锁成功,则会跳出循环
if (ttl == null) {
break;
}
//说明锁还在被占用
if (ttl >= 0) {
//等待一段时间,再次投入到while(true)死循环的逻辑内,尝试去获取锁
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
unsubscribe(future, threadId);
}
}
RedissonLock#tryAcquire()
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(leaseTime, unit, threadId));
}
RedissonLock#tryAcquireAsync()
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) { //如果设置了锁超时时间
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
//加锁的核心代码,返回锁的存活时间
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
//tryLockInnerAsync执行完成,RFuture的监听器就会被触发执行的
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
//加锁成功
if (ttlRemaining == null) {
//执行调度任务
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
获取锁的核心逻辑:
RedissonLock#tryLockInnerAsync()
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
//evalWriteAsync会去选择master
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
//如果我们设置的锁anyLock不存在,说明可以加锁
"if (redis.call('exists', KEYS[1]) == 0) then " +
//设置一个map数据结构,key:anyLock,field:当前线程ID,value:1
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
//给anyLock锁设置过期时间,默认是internalLockLeaseTime(30秒)
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
//如果anyLock这个map中,存在key:当前线程ID,value:1
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
//给value加上1,即field:线程ID,value:2
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
//将anyLock这个key的有效期设置问了30秒
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
//返回anyLock这个key当前还剩下的存活时间
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
调度任务的核心逻辑,加锁成功后延迟10秒执行。
RedissonLock#renewExpirationAsync()
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
//判断key:anyLock,是否存在field:当前线程ID,value:1
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
//将anyLock的存活时间重新设置为30秒
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}
RedissonLock#unlock()
public void unlock() {
try {
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException) e.getCause();
}
}
}
释放锁的核心代码:
RedissonLock#unlockInnerAsync()
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
//如果anyLock锁不存在
"if (redis.call('exists', KEYS[1]) == 0) then " +
//发布一个消息
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
//如果key:anyLock中,不存在当前线程加的这个锁,说明之前有其他线程加了锁
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
//key:anyLock,field:当前线程的value减一
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
//如果counter>0,说明当前线程加了几次锁,重新将过期时间设置为30秒
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
//counter=0,说明当前线程只加了一次锁
"else " +
//删除这个锁的key
"redis.call('del', KEYS[1]); " +
//发布一个消息
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
redisson 重入锁的原理: