原文链接:https://www.changxuan.top/?p=1238
在上一篇文章(一把简单的“锁”)中,我们发现了目前锁存在两个问题:
- 某个线程所持有的锁可以被其它线程随意释放掉
- 目前锁还不支持可配置的阻塞/非阻塞锁
注:当然不仅仅是存在这两个问题
首先,我们来看第一个问题。之所以存在这种问题,是因为在释放锁的时候只要知道这把锁的名称(key)就能释放成功了。就好比你进卧室后,在卧室门上加了一把“卧室锁”(key),然后有其他人想进的时候就说释放"卧室锁",然后门就开了。所以,这把锁也太不安全了。想解决这个问题,我们可以在锁上再加点东西即标识,来区分到底是谁的加的锁。这样再去释放锁的时候,拿着key和标识来判断能不能释放锁。
在第一版的锁中,加锁的核心代码是这一行:
lock = Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(key, "LOCK"));
我们可以看到,其中 value
的位置,我们只放了一个固定值 LOCK ,有些浪费空间。所以,这个地方可以用来存储一个唯一的标识。在释放锁的时候,用来判断能否释放。
那么,加锁(获取锁)的方法,我们可以改写成下面这样:
/**
* 加锁/获取锁
* @param key 锁名称
* @param identity 标识
* @return boolean 是否加锁成功
*/
public boolean lock(String key, String identity) {
boolean lock;
try {
lock = Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(key, identity));
} catch (Exception e) {
log.error("获取锁:{},出现错误{}", key, e);
return false;
}
return lock;
}
同时,释放锁的方法,改成这个样子:
/**
* 释放锁
* @param key 锁名称
* @param identity 标识
*/
public void unlock(String key, String identity) {
try {
Object obj = redisTemplate.opsForValue().get(key);
String value = obj != null ? obj.toString():null;
if (value != null){
if (value.equals(identity)) {
redisTemplate.delete(key);
}
}
}catch (Exception e) {
log.error("释放锁操作失败,key:" + key + "失败原因:",e);
}
}
通过上面的改造,我们只要在获取锁时同时加上一个唯一标识就能保证我们所持有的锁不会被其它线程恶意释放掉了。生成唯一标识的方式有很多种,可以根据自己的所依赖的工具库进行选择。例如,我在项目中常用的就是 hutool
工具包中的 StrUtil.uuid()
方法。
接下来,我们要解决第二个问题。我不想如果获取不到锁,直接就返回了 false
,做人要有恒心、有毅力,做锁也是。所以,当没有获取到锁的时候,需要让它一直尝试获取锁,是不是有点 自旋锁
那个意思了。当然,也不能无上限的尝试获取,做人要把握好一个度,做锁也是。这里有两个选择,一、可以设置最大尝试次数,二、可以设置超时时间。在一般的项目中,设置超时时间是比较实用的选择。而且,如果实现了一把能设置超时时间的锁,那么一定也能实现设置最大尝试次数的锁。所以,我们这里选择在 boolean lock(String key, String identity)
基础上实现一把能设置超时时间的锁。当然,获取锁的方法会增加一个参数[超时时长] sec
。实现代码如下:
/**
* 阻塞锁
* @param key 锁名称
* @param identity 标识
* @param sec 超时时长(秒)
* @return boolean 是否成功获取锁
*/
public boolean lockb(String key, String identity, int sec) {
try {
int count = 0;
while (!lock(key, identity)) {
Thread.sleep(100);
count++;
if (count > sec * 1000L / 100) {
if (sec != 0) {
log.warn("线程:{}获取锁{}超时", Thread.currentThread().getName(), key);
}
return false;
}
}
return true;
} catch (Exception e) {
log.error("获取锁:{},出现错误{}", key, e);
return false;
}
}
还记得我们在第一篇文章中的测试用例吧,测试用例中 count
的输出结果是 1000, 表明只有一个线程对 count
执行了1000次自增操作,其余九个线程没有获取到锁后就直接放弃了。现在我们,稍微修改一下测试代码使用 boolean lockb(String key, String identity, int sec)
方法让每个线程都有机会执行自增操作。
测试代码如下:
@WorkService.java
public static int count = 0;
private static final int TIME = 1000;
public void work() {
String key = "TEST_KEY";
String identity = StrUtil.uuid();
// 最大超时时间
int timeout = 20;
// 新获取锁方法
if (redisUtil.lockb(key, identity, timeout)) {
try {
increment();
}catch (Exception e) {
log.error("发生错误",e);
}finally {
// 新释放锁方法
redisUtil.unlock(key, identity);
}
} else {
log.info("线程:{},未获取到锁", Thread.currentThread().getName());
}
}
private void increment() {
for (int i = 0; i < TIME; i++) {
WorkService.count++;
}
}
测试(控制台输出)
count = 10000
如预期一样,每个线程都执行了 1000 次自增操作,10个线程执行完毕后 count
的值成为了 10000。
好了,现在我们已经成功了解决了之前提到的两个问题。那么再好好想想,是否还存在其它问题或者需求呢?
往往在生产环境中会出现各种意想不到的问题,假如我们的代码在执行 increment()
方法时,当前的这台机器突然断电了!那么这把锁,也将会是一直被持有的状态了。不出意外的话其它线程再也获取不到锁了,不出意外的话程序员小哥又要写检讨了... ...
另外,在分布式系统中由于子系统需要支持水平扩展,大都是以集群的形式进行部署的。每个节点中会存在同样的定时任务(例如每天执行一次),当系统部署成功后我并不想集群的每个节点都去跑一遍定时任务。可能你会说,你使用非阻塞锁不就行了。这样到开始执行定时任务时只有一个线程能够获取到锁,其它没获取到锁的线程就不会执行此次定时任务的业务代码了。 看来我需要再次重申一遍这句话了“往往在生产环境中会出现各种意想不到的问题”,万一集群有个节点的时间不准确,与其它节点的时间差刚好大于执行一次定时任务所消耗的时间呢?如果仍然使用之前的非阻塞锁,是不是在很短的时间内就执行了两次定时任务,这与你的预期不符了。可能你会说,执行两次就执行两次吧!这样的话,你可不是一个“好”程序员。做事情要严谨,要认真,一丝不苟,写代码也是,这样才能尽量避免系统少出事故。