分布式锁(1)Redisson之RedissonLock

RedissonLock

代码示例:

Config config = new Config();
config.useClusterServers().addNodeAddress("redis://192.168.31.114:7001").addNodeAddress("redis://192.168.31.184:7002");

RedissonClient redisson = Redisson.create(config);
RLock lock = redisson.getLock("anyLock"); 
lock.lock();
lock.unlock();

Redisson#getLock()

public RLock getLock(String name) {
    //CommandExecutor:命令执行器,封装了一个redis连接的命令执行器,可以执行一些set、get redis的一些操作,用来执行底层的redis命令
    return new RedissonLock(connectionManager.getCommandExecutor(), name);
}

RedissonLock构造方法

public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
    super(commandExecutor, name);
    this.commandExecutor = commandExecutor;
    this.id = commandExecutor.getConnectionManager().getId();
    //初始化internalLockLeaseTime = 30秒
    this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
    this.entryName = id + ":" + name;
    this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
}

RedissonLock#lock()

public void lock() {
    try {
        lockInterruptibly();
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}

RedissonLock#lockInterruptibly()

public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
        //得到当前线程ID
        long threadId = Thread.currentThread().getId();
        //尝试获取锁,得到锁的存活时间
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        //说明加锁成功
        if (ttl == null) {
            return;
        }

        //加锁失败,走阻塞逻辑,发布一个订阅事件
        RFuture<RedissonLockEntry> future = subscribe(threadId);
        commandExecutor.syncSubscription(future);

        try {
            while (true) {
                //一直在尝试获取锁
                ttl = tryAcquire(leaseTime, unit, threadId);
                //当获取锁成功,则会跳出循环
                if (ttl == null) {
                    break;
                }

                //说明锁还在被占用
                if (ttl >= 0) {
                    //等待一段时间,再次投入到while(true)死循环的逻辑内,尝试去获取锁
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    getEntry(threadId).getLatch().acquire();
                }
            }
        } finally {
            unsubscribe(future, threadId);
        }
    }

RedissonLock#tryAcquire()

private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
    return get(tryAcquireAsync(leaseTime, unit, threadId));
}

RedissonLock#tryAcquireAsync()

private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {   //如果设置了锁超时时间
    if (leaseTime != -1) {
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    //加锁的核心代码,返回锁的存活时间
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    //tryLockInnerAsync执行完成,RFuture的监听器就会被触发执行的
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        if (e != null) {
            return;
        }

        //加锁成功
        if (ttlRemaining == null) {
            //执行调度任务
            scheduleExpirationRenewal(threadId);
        }
    });
    return ttlRemainingFuture;
}

获取锁的核心逻辑:

RedissonLock#tryLockInnerAsync()

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);
    //evalWriteAsync会去选择master
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
              //如果我们设置的锁anyLock不存在,说明可以加锁
              "if (redis.call('exists', KEYS[1]) == 0) then " +
                   //设置一个map数据结构,key:anyLock,field:当前线程ID,value:1
                  "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                   //给anyLock锁设置过期时间,默认是internalLockLeaseTime(30秒)
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              //如果anyLock这个map中,存在key:当前线程ID,value:1
              "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                  //给value加上1,即field:线程ID,value:2
                  "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                  //将anyLock这个key的有效期设置问了30秒
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              //返回anyLock这个key当前还剩下的存活时间
              "return redis.call('pttl', KEYS[1]);",
                Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

调度任务的核心逻辑,加锁成功后延迟10秒执行。

RedissonLock#renewExpirationAsync()

protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            //判断key:anyLock,是否存在field:当前线程ID,value:1
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                 //将anyLock的存活时间重新设置为30秒
                "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                "return 1; " +
            "end; " +
            "return 0;",
        Collections.<Object>singletonList(getName()), 
        internalLockLeaseTime, getLockName(threadId));
}

RedissonLock#unlock()

public void unlock() {
    try {
        get(unlockAsync(Thread.currentThread().getId()));
    } catch (RedisException e) {
        if (e.getCause() instanceof IllegalMonitorStateException) {
            throw (IllegalMonitorStateException) e.getCause();
        } 
    }
}

释放锁的核心代码:

RedissonLock#unlockInnerAsync()

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            //如果anyLock锁不存在
            "if (redis.call('exists', KEYS[1]) == 0) then " +
                //发布一个消息
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; " +
            "end;" +
            //如果key:anyLock中,不存在当前线程加的这个锁,说明之前有其他线程加了锁
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                "return nil;" +
            "end; " +
            //key:anyLock,field:当前线程的value减一
            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
            //如果counter>0,说明当前线程加了几次锁,重新将过期时间设置为30秒
            "if (counter > 0) then " +
                "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                "return 0; " +
            //counter=0,说明当前线程只加了一次锁
            "else " +
                //删除这个锁的key
                "redis.call('del', KEYS[1]); " +
                //发布一个消息
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; "+
            "end; " +
            "return nil;",
            Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));

}

redisson 重入锁的原理:
分布式锁(1)Redisson之RedissonLock

上一篇:Java导入类/枚举内部内部类时导入顺序的重要性


下一篇:分布式锁(3)Redisson之MultiLock