Redisson 源码初探 (六)公平锁

因为Redisson 默认是非公平锁,client 端互相一起争抢,现在我们继续研究公平锁,为什么要研究?研究分布式锁 不仅仅要研究最基础的锁对吧,我们要把一系列的非公平锁 公平锁 读写锁 RedLock锁,Semaphore CountDownLatch  一系列的研究完,才算真正的研究了分布式锁,对吧

那么公平锁呢,我们知道公平锁需要维持一个有序的获取锁的顺序,可以使用队列也可以使用一些其他的机制,那么我们慢慢来看Redisson的公平锁是如何实现的?

先找到入口

 public static void main(String[] args) throws Exception {
    	//构建一个配置信息对象
        Config config = new Config();
        config.useClusterServers()
                //定时扫描连接信息 默认1000ms
                .setScanInterval(2000)
                .addNodeAddress("redis://127.0.0.1:7001");
		//因为Redisson 是基于redis封装的一套便于复杂操作的框架
		//所以这里构建对象肯定是创建一些与redis的连接
        RedissonClient redisson = Redisson.create(config);
		//这里是重点 获取锁,这也是重点分析的地方
        //这里获取公平锁
        RLock lock = redisson.getFairLock("lock");
        //尝试获取锁,这里其实和之前的非公平锁 基本一样
        //那么公平锁最主要的逻辑在于有序 应该会有一个队列或者其他机制来维持有序
        //那么我们知道了类似点,同时我们去看看有哪些地方不一样
        lock.lock();
        //释放锁
        lock.unlock();
    }

Redisson 源码初探 (六)公平锁

 

我们可以看到RedissonFairLock 其实是我们RedissonLock 的一个子类,前面的逻辑都是一样的

@Override
    public void lock() {
        try {
            lockInterruptibly();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
@Override
    public void lockInterruptibly() throws InterruptedException {
        lockInterruptibly(-1, null);
    }
@Override
    public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
        long threadId = Thread.currentThread().getId();
          //尝试上锁
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return;
        }

        RFuture<RedissonLockEntry> future = subscribe(threadId);
        commandExecutor.syncSubscription(future);

        try {
            while (true) {
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    break;
                }

                // waiting for message
                if (ttl >= 0) {
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    getEntry(threadId).getLatch().acquire();
                }
            }
        } finally {
            unsubscribe(future, threadId);
        }
//        get(lockAsync(leaseTime, unit));
    }
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
        return get(tryAcquireAsync(leaseTime, unit, threadId));
    }
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
        if (leaseTime != -1) {
            return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        }
        //真正的尝试获取锁,执行lua脚本,这里是RedissonFairLock 不同的地方,子类进行了重写

        RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        ttlRemainingFuture.addListener(new FutureListener<Long>() {
            @Override
            public void operationComplete(Future<Long> future) throws Exception {
                if (!future.isSuccess()) {
                    return;
                }

                Long ttlRemaining = future.getNow();
                // lock acquired
                if (ttlRemaining == null) {
                    scheduleExpirationRenewal(threadId);
                }
            }
        });
        return ttlRemainingFuture;
    }

其实区别主要在执行lua脚本这里,这里RedissonFairLock 重写了父类的 tryLockInnerAsync()方法,那么重写了什么内容?

内容有点多,重点还是看lua脚本 调试源码 if (command == RedisCommands.EVAL_LONG) 才是真正的执行逻辑

KEYS : Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName)

AVGS : internalLockLeaseTime, getLockName(threadId), currentTime + threadWaitTime, currentTime

KEYS[1] = getName() 其实就是锁的名称 lockName

KEYS[2] =  thredsQueueName = redisson_lock_queue:{lockName} 基于redis  list 结构实现的一个队列

KEYS[3] = timeoutSetName =  redisson_lock_timeout:{lockName}  基于redis 数据结构实现的一个set集合,有序的集合,可以自动按照每个数据指定一个分数(score)来进行排序

AVGS[1] = internalLockLeaseTime 就是lock 的契约时间,默认是30S

AVGS[2] = getLockName(threadId) 其实就是UUID + ":" + ThreadId

AVGS[3] = currentTime + threadWaitTime,当前时间 + 线程等待时间 默认 5000ms,最后得到一个时间 代表什么意义?

AVGS[4] = currentTime 当前时间

@Override
    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        long currentTime = System.currentTimeMillis();
        if (command == RedisCommands.EVAL_NULL_BOOLEAN) {
            return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                    // remove stale threads
                    "while true do "
                    + "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);"
                    
                    + "if firstThreadId2 == false then "
                        + "break;"
                    + "end; "

                    + "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));"
                    + "if timeout <= tonumber(ARGV[3]) then "
                        + "redis.call('zrem', KEYS[3], firstThreadId2); "
                        + "redis.call('lpop', KEYS[2]); "
                    + "else "
                        + "break;"
                    + "end; "
                  + "end;"
                    + 
                    "if (redis.call('exists', KEYS[1]) == 0) and ((redis.call('exists', KEYS[2]) == 0) "
                            + "or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +
                            "redis.call('lpop', KEYS[2]); " +
                            "redis.call('zrem', KEYS[3], ARGV[2]); " +
                            "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            "return nil; " +
                        "end; " +
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                            "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            "return nil; " +
                        "end; " +
                        "return 1;", 
                    Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName), 
                    internalLockLeaseTime, getLockName(threadId), currentTime);
        }
        //如果你去debug 源码,你会知道其实走的是这块逻辑,而不是上面那一块
        if (command == RedisCommands.EVAL_LONG) {
            return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                    // remove stale threads
                     //一进来 就是一个死循环
                    "while true do "

                        //KEY[2] 是什么threadsQueueName 就是队列的名称
                        // lindex htreadsQueueName 0 什么意思?就是从队列中拿出第一个元素 
                  
                    + "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);"
                    //如果不存在 就直接会break掉 直接跳出循环
                    + "if firstThreadId2 == false then "
                        + "break;"
                    + "end; "
                        //从有序set 集合中拿去第一个元素 看是否过期
                        //其实就是去把一些过期的key清除掉
                    + "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));"
                    + "if timeout <= tonumber(ARGV[4]) then "
                        //从set 集合中 以及 queue中移除代表客户端的元素
                        + "redis.call('zrem', KEYS[3], firstThreadId2); "
                        + "redis.call('lpop', KEYS[2]); "
                    + "else "
                        + "break;"
                    + "end; "
                  + "end;"
                    

                       //如果没有人加锁 锁key 和 队列都不存在 或者 队列的第一个元素是当前线程标志 满足其中一个条件
                      + "if (redis.call('exists', KEYS[1]) == 0) and ((redis.call('exists', KEYS[2]) == 0) "
                            + "or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +
                            //弹出第一个元素
                            "redis.call('lpop', KEYS[2]); " +
                            //从set集合中删除当前线程元素
                            "redis.call('zrem', KEYS[3], ARGV[2]); " +
                            //进行加锁 ,就是一个map数据结构
                            "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                            //设置过期时间
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            "return nil; " +
                        "end; " +
                        //如果是当前线程持有锁,那么重入的时候 就会将对应的value + 1,白哦名重入的次数 
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                            "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                            //重写设置过期时间
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            "return nil; " +
                        "end; " +
                          //就会进入排队逻辑
                        "local firstThreadId = redis.call('lindex', KEYS[2], 0); " +
                        "local ttl; " + 
                        //如果第一个元素不为空 同时当前线程不是第一个线程 就会进入排队 或者更新score
                        "if firstThreadId ~= false and firstThreadId ~= ARGV[2] then " + 
                            //根据第一个线程的过期时间 - 当前时间 比如 10:00:25 - 10:00:05 = 20S
                            "ttl = tonumber(redis.call('zscore', KEYS[3], firstThreadId)) - tonumber(ARGV[4]);" + 
                        "else "
                               //获取锁剩余存活时间
                          + "ttl = redis.call('pttl', KEYS[1]);" + 
                        "end; " + 

                            //timeout = ttl + 当前时间 + 5000ms
                        "local timeout = ttl + tonumber(ARGV[3]);" + 
                        //放入zset 有序集合中(过期时间作为分数)这里会存在两种情况对吧
//(1)如果不存在zset这个数据集合中 那么就会返回1,就会同时放入到 队列中
//(2)如果存在了元素,那么这个zadd key score value 那么就是更新分数,返回0,这时就不会再放入到队列中
                        "if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +
                            "redis.call('rpush', KEYS[2], ARGV[2]);" +
                        "end; " +
                        "return ttl;", 
                        Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName), 
                                    internalLockLeaseTime, getLockName(threadId), currentTime + threadWaitTime, currentTime);
        }
        
        throw new IllegalArgumentException();
    }

这里我们要注意,我们的公平锁,如果某个线程持有线程过长,可能会导致队列被重写排序,为什么?

我们注意两个关键存储数据的结构 queue 和 zset,

在开始的时候,就会从zset中拿去score 判断是否timeout,如果timeout,对吧,会将队列中的元素 pop 以及 set中的对应的元素移除,但是这并不代表对应的线程不进行争夺锁了哦,

那些对应的线程会根据激活时间重写争夺锁,这个时候就会重写将自己放入到queue中 以及 zset中,因为GC 或者其他原因  有可能会导致延迟,从而导致 被顺序被重排,所有严格意义上来说,公平锁,并不是完全公平 

 

 

 

 

 

 

上一篇:requests库


下一篇:Linux 总线、设备、驱动模型 与 设备树