redis分布式锁实现---源码分析

一、回顾一下分布式锁的基本使用方式

①、注入redissonClient客户端

②、通过redissonClient客户端获取锁对象rLock

③、通过rLock尝试获取锁

// ①、注入redissonClient客户端
@Autowired
private RedissonClient redissonClient;

public boolean getLock() {
    // ②、通过redissonClient客户端获取锁对象rLock(RedissonLock实现默认是可重入锁)
    rLock = redissonClient.getLock(lockInfo.getName());
    try {
        // ③、通过rLock尝试获取锁
        return rLock.tryLock(lockInfo.getWaitTime(), lockInfo.getLeaseTime(), TimeUnit.SECONDS);
    } catch (InterruptedException e) {
        log.info("获取可重入锁时线程被意外中断,锁名称:{},异常信息:{}", lockInfo.getName(), e);
    }
    return false;
}

二、redisson实现分布式锁结构介绍

1、redisson锁实现的基本特性

  • redisson对Redis的各种命令操作和特性都做了很好的封装和扩展,是一个非常灵活易用的Java中间件。
  • 在redisson实现分布式锁这块最核心的类是RedissonLock
  • RedissonLock类提供了可重入锁的实现。
  • redisson实现了公平锁(RedissonFairLock)、读锁(RedissonReadLock)、写锁(RedissonWriteLock)、可重入锁(RedissonLock)、联锁(RedissonMultiLock)、红锁(RedissonRedLock

2、锁实现的继承关系和子类实现

上面说到redisson实现分布式锁最核心的类是RedissonLock(很多锁都是基于该类实现的)(红锁和联锁后面介绍),那么下面看看RedissonLock的继承关系和子类以及类的结构,以便于更好的理解源码。

①、RedissonLock继承关系

redis分布式锁实现---源码分析

②、RedissonLock的实现类

redissonLock类作为一个通用的模板类(默认提供可重入锁实现),其他很多类型的锁都基于该类去实现不同的加锁方式。

可以看到RedissonLock类有四个子类实现,这些子类分别覆写了加锁释放锁的核心方法。

redis分布式锁实现---源码分析

三、加锁方法分析

1、说明

  • 上面理清了RedissonLock锁的继承以及实现关系,下面我们就找一个类型的锁实现来分析加锁解锁源码!这里以RedissonLock默认实现的可重入锁为例进行源码分析。
  • RedissonLock实现了多种加锁的接口,比如:带等待时间和释放时间的加锁接口(tryLock(long waitTime, long leaseTime, TimeUnit unit))、只带等待时间的加锁接口(tryLock(long waitTime, TimeUnit unit))、既不带等待时间也不带锁释放时间的加锁接口(tryLock())、只带释放时间没有等待时间的接口(lock(long leaseTime, TimeUnit unit))等,这里我们挑选一个参数最多的tryLock(long waitTime, long leaseTime, TimeUnit unit)方法来分析。
  • 源码分析的方式,按照先画整体流程图,对整个加锁解锁流程有大致理解后再分析源码。

2、加锁整体流程分析

redis分布式锁实现---源码分析

3、源码分析

下面所有的源码都来自于RedssionLock类!!!

①、tryLock方法

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
  // 1、将等待时间转换为毫秒、、获取当前时间、获取当前线程ID
  long time = unit.toMillis(waitTime);
  long current = System.currentTimeMillis();
  final long threadId = Thread.currentThread().getId();
  // 2、尝试申请锁,返回还剩余的锁过期时间【加锁核心方法,下面(②、加锁核心方法tryAcquire分析)】
  Long ttl = tryAcquire(leaseTime, unit, threadId);
  // 3、ttl==null 表示获取锁成功则直接返回true
  // lock acquired
  if (ttl == null) {
    return true;
  }

  // 4、获取还需要等待的时间,且根据还需等待的时间(time)判断是否获取锁失败
  time -= (System.currentTimeMillis() - current);
  // 如果还需要等待的时间为0,则说明获取锁已经失败了
  // 申请锁的耗时如果大于等于最大等待时间,则申请锁失败
  if (time <= 0) {
    acquireFailed(threadId);
    return false;
  }

  // 重新获取当前时间
  current = System.currentTimeMillis();
  // 5、上面第一次尝试获取锁失败,且还没有超出最大等待时间的基础上,基于Redis的发布订阅机制,订阅锁释放事件
  final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
  /*
   * 6、基于Redis的发布订阅机制,订阅锁释放事件,并通过await方法阻塞等待锁释放,有效的解决了无效的锁申请浪费资源的问题:
   * 基于信号量,当锁被其它资源占用时,当前线程通过 Redis 的 channel 订阅锁的释放事件,一旦锁释放会发消息通知待等待的线程进行竞争
   * 当 this.await返回false,说明等待时间已经超出获取锁最大等待时间,取消订阅并返回获取锁失败
   * 当 this.await返回true,进入下面的循环再次尝试获取锁
   *
   * await是通过CountDownLatch + 监听器机制来实现的,具体看方法内部注释,见下面【③、await加锁最大等待时间方法分析】
   */
  if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
    // 在等待时间耗完的情况下,取消对该锁的订阅
    if (!subscribeFuture.cancel(false)) {
      subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
        @Override
        public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
          if (subscribeFuture.isSuccess()) {
            unsubscribe(subscribeFuture, threadId);
          }
        }
      });
    }
    // 获取锁失败
    acquireFailed(threadId);
    return false;
  }

  // 7、如果在等待时间内订阅的锁已经被释放了,则会执行这里
  try {
    // 获取还需要等待的时间
    time -= (System.currentTimeMillis() - current);
    // 如果还需要等待的时间小于0,则说明已经超过最大等待时间,获取锁失败
    if (time <= 0) {
      acquireFailed(threadId);
      return false;
    }

    // 8、能运行到这里则说明:
    // 1、当前还在最大等待时间内
    // 2、并且等待的锁已经被释放(即对该锁的订阅事件已经被吊起过),在这里可以再次尝试获取锁
    // 这是一个死循环,循环退出条件有两个:
    // ①、在最大等待时间内成功获取锁,返回true
    // ②、超出了最大等待时间,但仍然没有成功获取到锁,返回false
    while (true) {
      // 获取当前时间
      long currentTime = System.currentTimeMillis();
      // 8.1、再次尝试申请锁,返回还剩余的锁过期时间
      ttl = tryAcquire(leaseTime, unit, threadId);
      // 8.2、ttl==null 表示获取锁成功则直接返回true
      // lock acquired
      if (ttl == null) {
        return true;
      }

      // 再次计算还需要等待多时时间
      time -= (System.currentTimeMillis() - currentTime);
      // 8.3、如果还需要等待的时间小于0,则说明已经超过最大等待时间,获取锁失败
      if (time <= 0) {
        acquireFailed(threadId);
        return false;
      }

      // waiting for message
      // 更新一下当前时间,因为上面的操作可能会耗时,进而导致下面根据currentTime计算的time不准确
      currentTime = System.currentTimeMillis();
      if (ttl >= 0 && ttl < time) {
        // 8.4、如果剩余时间(ttl)小于waittime ,就在 ttl 时间内,从Entry的信号量(Semaphore)获取一个许可(除非被中断或者一直没有可用的许可)。
        getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
      } else {
        // 8.5、如果该锁剩余过期时间(ttl)大于waittime,则就在waittime 时间范围内等待可以通过信号量(Semaphore)
        getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
      }

      // 更新剩余的等待时间(最大等待时间-已经消耗的阻塞时间)
      time -= (System.currentTimeMillis() - currentTime);
      // 8.6、如果还需要等待的时间小于0,则说明已经超过最大等待时间,获取锁失败
      if (time <= 0) {
        acquireFailed(threadId);
        return false;
      }
    }
  } finally {
    // 9、无论是否获得锁,都要取消订阅解锁消息
    unsubscribe(subscribeFuture, threadId);
  }

}

②、加锁核心方法tryAcquire分析

/**
 * 尝试获取锁,如果没有获取到锁则返回该锁还剩余多少毫秒过期,如果获取到了锁,则返回空
 */
// 【1、尝试获取锁】
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
    // tryAcquireAsync方法返回一个RFuture<Long>类型,get方法主要就是取得RFuture中的数值
    // 该数值就是该锁还剩余的过期时间(如果为空,则表示已经获取到锁了,反之则表示该锁还剩多久过期)
  	// 见下面【2、异步尝试获取锁】
    return get(tryAcquireAsync(leaseTime, unit, threadId));
}

// 【2、异步尝试获取锁】
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
    // 1、如果有设置锁过期时间
    if (leaseTime != -1) {
      // 调用tryLockInnerAsync,【通过lua脚本去加锁】,见下面【3、通过调用lua脚本去真正开始加锁】
      return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    // 2、如果获取锁时没有传递锁过期时间,则这里会给个默认过期时间30s(通过执行lua脚本去获取锁)
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    // 3、给获取锁的操作添加一个监听,当获取锁的操作返回时(不管成功还是失败),立即调用监听方法
    ttlRemainingFuture.addListener(new FutureListener<Long>() {
      // 当获取锁的操作执行结束时,该方法被吊起
      @Override
      public void operationComplete(Future<Long> future) throws Exception {
        // 3.1、如果获取锁失败,则直接返回
        if (!future.isSuccess()) {
          return;
        }

        Long ttlRemaining = future.getNow();
        // lock acquired
        // ttlRemaining == null 则说明获取锁成功
        if (ttlRemaining == null) {
          // 3.2、如果获取锁成功了,则开启定时任务去定时延长锁过期时间(看门狗)
          scheduleExpirationRenewal(threadId);
        }
      }
    });
    return ttlRemainingFuture;
}

// 【3、通过调用lua脚本去真正开始加锁】
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    // 将锁过期时间转换为毫秒
    internalLockLeaseTime = unit.toMillis(leaseTime);

    // 通过lua脚本去获取锁(可重入锁)
    // pttl命令和ttl命令类似,只是他是以毫秒为单位返回剩余过期时间,ttl是以秒为单位
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                                          "if (redis.call('exists', KEYS[1]) == 0) then " +
                                          "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                                          "return nil; " +
                                          "end; " +
                                          "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                                          "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                                          "return nil; " +
                                          "end; " +
                                          "return redis.call('pttl', KEYS[1]);",
                                          Collections.<Object>singletonList(getName())
                                          , internalLockLeaseTime, getLockName(threadId));
 }

③、await加锁最大等待时间方法分析

protected boolean await(RFuture<?> future, long timeout, TimeUnit timeoutUnit) throws InterruptedException {
    // 具体实现见下面
  	return commandExecutor.await(future, timeout, timeoutUnit);
}

public boolean await(RFuture<?> future, long timeout, TimeUnit timeoutUnit) throws InterruptedException {
    // 创建一个门栓
    final CountDownLatch l = new CountDownLatch(1);
    // 当订阅的锁被释放后会吊起这个监听方法,在监听方法内部将门栓数量减一
    future.addListener(new FutureListener<Object>() {
      @Override
      public void operationComplete(Future<Object> future) throws Exception {
        // 监听方法被吊起,门栓数量减一
        l.countDown();
      }
    });
    // 在这里等待门栓数量为0,或超时时间到了再继续运行
    // 在等待时间内如果订阅的锁已经释放,监听方法会被吊起门栓数量为0,则这里返回true
    // 如果等待时间已经耗完了,订阅的锁还没被释放的话,则这里返回false
    return l.await(timeout, timeoutUnit);
}

四、释放锁方法分析

1、释放锁流程分析

释放锁的代码逻辑比较简单,这里只描述一下释放锁的lua脚本的大体流程即可。

redis分布式锁实现---源码分析

2、释放锁源码分析

①、unlock

public void unlock() {
  try {
    // 释放锁的核心方法unlockAsync
    get(unlockAsync(Thread.currentThread().getId()));
  } catch (RedisException e) {
    if (e.getCause() instanceof IllegalMonitorStateException) {
      throw (IllegalMonitorStateException)e.getCause();
    } else {
      throw e;
    }
  }
}

②、unlockAsync

RFuture可以理解为对Future的一个增强,netty中的实现。JDK中的future只能阻塞获取子线程的返回,在netty中对future进行了增强,可以添加监听并且异步获取。

public RFuture<Void> unlockAsync(final long threadId) {
    final RPromise<Void> result = new RedissonPromise<Void>();
    // 1、具体的释放锁的lua脚本(释放锁的动作在这里完成)
    RFuture<Boolean> future = unlockInnerAsync(threadId);

    // 2、添加一个监听器,一旦释放锁的操作完成(无论失败或成功),都会吊起监听器的operationComplete方法
    future.addListener(new FutureListener<Boolean>() {
        @Override
        public void operationComplete(Future<Boolean> future) throws Exception {
            // 3、如果释放锁失败了
            if (!future.isSuccess()) {
                cancelExpirationRenewal(threadId);
                result.tryFailure(future.cause());
                return;
            }

            Boolean opStatus = future.getNow();
            if (opStatus == null) {
                IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                        + id + " thread-id: " + threadId);
                result.tryFailure(cause);
                return;
            }
            if (opStatus) {
                cancelExpirationRenewal(null);
            }

            // 4、释放锁成功
            result.trySuccess(null);
        }
    });

    return result;
}

③、unlockInnerAsync

// 释放锁的lua脚本,这个lua脚本很简单不过多解锁
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('exists', KEYS[1]) == 0) then " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; " +
            "end;" +
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                "return nil;" +
            "end; " +
            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
            "if (counter > 0) then " +
                "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                "return 0; " +
            "else " +
                "redis.call('del', KEYS[1]); " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; "+
            "end; " +
            "return nil;",
            Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));

}

五、遗留问题

1、如果自己设置了锁过期时间那么redisson就不会开启看门狗去延长过期时间了吗?

关于这个问题,从源代码中可以看到如果自己设置了锁过期时间那么会直接调用tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG)方法去获取锁,获取到了以后就直接返回了,并没有看到在哪里会去开启一个看门狗。

如果是这样的话,那么是不是在我们调用tryLock方法的时候,如果自己指定了锁的过期时间,是不是就意味着没有定时线程(看门狗)去定期的延长锁的过期时间了???那这样是不是就不能保证分布式锁的安全性了????

private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
    // 1、如果有设置锁过期时间
    if (leaseTime != -1) {
        // 调用tryLockInnerAsync,通过lua脚本去加锁【这里加锁成功后就直接返回了,并没有添加看门狗延长过期时间????】
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    // 2、如果获取锁时没有传递锁过期时间,则这里会给个默认过期时间30s(通过执行lua脚本去获取锁)
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    // 3、给获取锁的操作添加一个监听,当获取锁的操作返回时(不管成功还是失败),立即调用监听方法
    ttlRemainingFuture.addListener(new FutureListener<Long>() {
        // 当获取锁的操作执行结束时,该方法被吊起
        @Override
        public void operationComplete(Future<Long> future) throws Exception {
            // 3.1、如果获取锁失败,则直接返回
            if (!future.isSuccess()) {
                return;
            }

            Long ttlRemaining = future.getNow();
            // ttlRemaining == null 则说明获取锁成功
            if (ttlRemaining == null) {
                // 3.2、如果获取锁成功了,则开启定时任务去定时延长锁过期时间(看门狗)
                scheduleExpirationRenewal(threadId);
            }
        }
    });
    return ttlRemainingFuture;
}

六、参考文章

1、慢谈 Redis 实现分布式锁 以及 Redisson 源码解析

2、RedissonMultiLock + RedissonLock部分源码

3、redisson-2.10.4源代码分析

4、redis客户端redisson实战

上一篇:分布式锁Redission


下一篇:Redisson分布式锁源码解读