1、场景
假设场景,多个线程并发(模拟并发)对库存数量进行扣减,现将库存预置在redis中,然后开启多线程对库存进行扣减
private static final String PRODUCT = "MoonCake";
private static final String PRODUCT_STOCK = PRODUCT + "Stock";
@Autowired
private RedissonClient redissonClient;
@Autowired
private RedisTemplate redisTemplate;
@GetMapping("/lockAdd")
@ApiOperation("加分布式锁")
public void lockAdd() throws Exception {
redisTemplate.opsForValue().set(PRODUCT_STOCK, 90);// 预置库存为90
for (int i = 0; i < 100; i++) {
new Thread(() -> {
try {
decreaseStock();
} catch (InterruptedException e) {
log.error("error:", e);
}
}).start();
}
}
private void decreaseStock() throws InterruptedException {
//对数据进行加锁
RLock lock = redissonClient.getLock(PRODUCT);
if (lock.tryLock(5, TimeUnit.SECONDS)) {
// 获取最新的库存
Integer curStock = (Integer) redisTemplate.opsForValue().get(PRODUCT_STOCK);
log.info("Get from redis, curStock=" + curStock);
if (curStock > 0) {
curStock -= 1;
// 更新库存值
redisTemplate.opsForValue().set(PRODUCT_STOCK, curStock);
log.info("扣减成功,库存stock:" + curStock);
} else {
//没库存
log.info("扣减失败,库存不足");
}
//解锁
lock.unlock();
}
}
2、新建锁对象
获取锁时,一般需要锁的key,可以根据实际业务情况进行设置,这里设置为“MoonCake”
RLock lock = redissonClient.getLock(PRODUCT);
上面新建锁时,主要将name注入,生成一个锁对象RedissonLock
@Override
public RLock getLock(String name) {
return new RedissonLock(connectionManager.getCommandExecutor(), name);
}
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
// 命令行执行器
this.commandExecutor = commandExecutor;
// id
this.id = commandExecutor.getConnectionManager().getId();
// 内部锁释放时间
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
// entry名,存储在redis中的形式
this.entryName = id + ":" + name;
// 锁状态的发布和订阅
this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
}
3、获取锁
使用lock.tryLock可以获取锁,该方法的定义如下
/**
**@param waitTime: 获取锁的等待时间
**@param leaseTime:获取到锁之后的释放时间
**@param unit:时间单位
**/
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
// 等待时间转化为毫秒
long time = unit.toMillis(waitTime);
// 当前时间
long current = System.currentTimeMillis();
// 当前线程ID
long threadId = Thread.currentThread().getId();
// 尝试获取锁,返回ttl时间
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// 获取到锁
if (ttl == null) {
return true;
}
// 没有获取到锁,更新等待时间=waitTime-刚才操作花费时间
time -= System.currentTimeMillis() - current;
// 等待时间到,没有获取到锁,获取失败
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
// 等待时间还没有用完
current = System.currentTimeMillis();
/**
* 订阅锁释放事件,并通过 await 方法阻塞等待锁释放,有效的解决了无效的锁申请浪费资源的问题:
* 基于信号量,当锁被其它资源占用时,当前线程通过 Redis 的 channel 订阅锁的释放事件,一旦锁释放会发消息通知待等待的线程进行竞争.
* 当 this.await 返回 false,说明等待时间已经超出获取锁最大等待时间,取消订阅并返回获取锁失败.
* 当 this.await 返回 true,进入循环尝试获取锁.
*/
RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
// await 方法内部是用 CountDownLatch 来实现阻塞,获取 subscribe 异步执行的结果(应用了 Netty 的 Future)
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
unsubscribe(subscribeFuture, threadId);
}
});
}
acquireFailed(waitTime, unit, threadId);
return false;
}
try {
// 计算获取锁的总耗时,总耗时大于获取锁的等待时间,返回获取锁失败
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
/**
* 收到锁释放的信号后,在最大等待时间之内,循环尝试获取锁
* 获取锁成功,则返回 true,
* 若在最大等待时间之内还没获取到锁,则认为获取锁失败,返回 false 结束循环
*/
while (true) {
long currentTime = System.currentTimeMillis();
ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// 获取锁成功
if (ttl == null) {
return true;
}
// 计算获取锁的总耗时,总耗时大于获取锁的等待时间,返回获取锁失败
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
/**
* 阻塞等待锁(通过信号量(共享锁)阻塞,等待解锁消息):
*/
currentTime = System.currentTimeMillis();
// 当前锁的ttl>0并且ttl<剩余的等待时间
if (ttl >= 0 && ttl < time) {
// 如果剩余时间(ttl)小于wait time ,就在 ttl 时间内,从Entry的信号量获取一个许可(除非被中断或者一直没有可用的许可)。
subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
// 在wait time 时间范围内等待可以通过信号量
subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
// 计算获取锁的总耗时,总耗时大于获取锁的等待时间,返回获取锁失败
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
}
} finally {
// 取消订阅
unsubscribe(subscribeFuture, threadId);
}
}
加锁之后在redis中的数据结构是hash类型
获取锁的Lua脚本
public class RedissonLock extends RedissonExpirable implements RLock {
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
// 脚本执行参数: 都是从下标1开始
// KEYS[1]=锁的key
// ARGV[1]=锁释放时间,ARGV[2]=当前线程Id
return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " + // 当前锁key存在
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " + // 线程持有的锁计数+1
"redis.call('pexpire', KEYS[1], ARGV[1]); " + // 以毫秒为单位,将锁的TTL过期时间刷新成最新值
"return nil; " + // 返回nil
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + // 当前锁的key不存在
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " + // 线程持有的锁计数+1
"redis.call('pexpire', KEYS[1], ARGV[1]); " + //以毫秒为单位,将锁的TTL过期时间刷新成最新值
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);", // 以毫秒为单位,返回锁的当前ttl过期时间
Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
}
4、释放锁
解锁时的参数类似于:
getName() -> MoonCake
getChannelName() -> redisson_lock__channel:{MoonCake}
LockPubSub.UNLOCK_MESSAGE -> 0
internalLockLeaseTime -> 90000
getLockName(threadId) -> 1a2cef15-9816-4bea-89dc-1a8d9ad096ca:644
主要通过RedissonLock#unlockInnerAsync方法进行解锁,先判断锁是否被当前线程加的,如果不是,直接返回解锁失败;如果是,将当前线程持有的锁的计数器减1,获得减1之后的计数器值A;如果A大于0,代表锁被当前线程加了多次,将锁的过期时间刷新成传入的internalLockLeaseTime;如果A==0,代表锁要被释放了,删除锁,然后发送解锁消息给等待队列。
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
// 脚本执行参数: 都是从下标1开始
// KEYS[1]=锁的key, KEYS[2]=消息通道名称
// ARGV[1]=解锁信息,ARGV[2]=锁释放时间,ARGV[2]=当前线程Id
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + // 当前线程持有锁?
"return nil;" + // 不是当前线程持有的锁
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + // 当前线程持有锁计数-1
"if (counter > 0) then " + // 锁重入
"redis.call('pexpire', KEYS[1], ARGV[2]); " + // 以毫秒为单位,将锁的TTL过期时间刷新成释放时间
"return 0; " + // 返回0
"else " + // 只锁定了一次,没有锁重入
"redis.call('del', KEYS[1]); " + // 删除锁
"redis.call('publish', KEYS[2], ARGV[1]); " + // 发布锁释放消息,通知等待队列
"return 1; " + // 返回1
"end; " +
"return nil;",
Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}