一.分布式锁
-
什么是分布式锁?
要介绍分布式锁,首先要提到与分布式锁相对应的是线程锁、进程锁。
线程锁:主要用来给方法、代码块加锁。当某个方法或代码使用锁,在同一时刻仅有一个线程执行该方法或该代码段。线程锁只在同一JVM中有效果,因为线程锁的实现在根本上是依靠线程之间共享内存实现的,比如synchronized是共享对象头,显示锁Lock是共享某个变量(state)。
进程锁:为了控制同一操作系统中多个进程访问某个共享资源,因为进程具有独立性,各个进程无法访问其他进程的资源,因此无法通过synchronized等线程锁实现进程锁。
分布式锁:当多个进程不在同一个系统中,用分布式锁控制多个进程对资源的访问,也就是保证同一时间只能有一个客户端对共享资源进行操作。 -
分布式锁实现原理:
①.互斥:在分布式高并发的条件下,我们最需要保证,同一时刻只能有一个线程获得锁,这是最基本的一点。
②.防止死锁:在分布式高并发的条件下,比如有个线程获得锁的同时,还没有来得及去释放锁,就因为系统故障或者其它原因使它无法执行释放锁的命令,导致其它线程都无法获得锁,造成死锁。所以分布式非常有必要设置锁的有效时间,确保系统出现故障后,在一定时间内能够主动去释放锁,避免造成死锁的情况。
③.性能:对于访问量大的共享资源,需要考虑减少锁等待的时间,避免导致大量线程阻塞。所以在锁的设计时,需要考虑两点。
⑴.锁的颗粒度要尽量小。比如你要通过锁来减库存,那这个锁的名称你可以设置成是商品的ID,而不是任取名称。这样这个锁只对当前商品有效,锁的颗粒度小。
⑵.锁的范围尽量要小。比如只要锁2行代码就可以解决问题的,那就不要去锁10行代码了。
④.重入:我们知道ReentrantLock是可重入锁,那它的特点就是:同一个线程可以重复拿到同一个资源的锁。重入锁非常有利于资源的高效利用。关于这点之后会做演示。 -
redis实现分布式锁原理
redis分布式锁是基于SetNx命令实现的,setNx命令每次都会检查key是否存在,不存在则创建并返回1,存在则不创建返回0,也就是1表示创建成功,0表示创建失败,因为在redis中可以保证key的唯一性,当多个线程同时创建同一个key的时候只有一个线程能创建成功,那个线程创建成功了那个线程就获取了锁,获取锁的时候需要给锁设置一个有效期,防止死锁问题,同时当执行完业务逻辑时要释放锁。 -
分布式锁应用场景
①.分布式任务调度,我们可以使用分布式锁确保只有一个任务调度执行;
②.分布式全局id的创建,不过这个性能会低;
二.基于redis实现分布式锁
- 为保证原子性,使用jedis操作redis创建redisUtils工具类
public class RedisUtils {
private static String IP = "192.168.241.143";
// Redis的端口号
private static int PORT = 6379;
// 可用连接实例的最大数目,默认值为8;
// 如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted(耗尽)。
private static int MAX_ACTIVE = 100;
// 控制一个pool最多有多少个状态为idle(空闲的)的jedis实例,默认值也是8。
private static int MAX_IDLE = 20;
// 等待可用连接的最大时间,单位毫秒,默认值为-1,表示永不超时。如果超过等待时间,则直接抛出JedisConnectionException;
private static int MAX_WAIT = 3000;
private static int TIMEOUT = 3000;
// 在borrow一个jedis实例时,是否提前进行validate操作;如果为true,则得到的jedis实例均是可用的;
private static boolean TEST_ON_BORROW = true;
// 在return给pool时,是否提前进行validate操作;
private static boolean TEST_ON_RETURN = true;
private static JedisPool jedisPool = null;
/**
* redis过期时间,以秒为单位
*/
public final static int EXRP_HOUR = 60 * 60; // 一小时
public final static int EXRP_DAY = 60 * 60 * 24; // 一天
public final static int EXRP_MONTH = 60 * 60 * 24 * 30; // 一个月
/**
* 初始化Redis连接池
*/
private static void initialPool() {
try {
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(MAX_ACTIVE);
config.setMaxIdle(MAX_IDLE);
config.setMaxWaitMillis(MAX_WAIT);
config.setTestOnBorrow(TEST_ON_BORROW);
// ip 地址和 redis 密码记得修改成自己的
jedisPool = new JedisPool(config, IP, PORT, TIMEOUT, "root123456");
} catch (Exception e) {
// logger.error("First create JedisPool error : "+e);
e.getMessage();
}
}
/**
* 在多线程环境同步初始化
*/
private static synchronized void poolInit() {
if (jedisPool == null) {
initialPool();
}
}
/**
* 同步获取Jedis实例
*
* @return Jedis
*/
public synchronized static Jedis getJedis() {
if (jedisPool == null) {
poolInit();
}
Jedis jedis = null;
try {
if (jedisPool != null) {
jedis = jedisPool.getResource();
}
} catch (Exception e) {
e.getMessage();
// logger.error("Get jedis error : "+e);
}
return jedis;
}
/**
* 释放jedis资源
*
* @param jedis
*/
public static void returnResource(final Jedis jedis) {
if (jedis != null) {
jedis.close();
}
}
public static Long sadd(String key, String... members) {
Jedis jedis = null;
Long res = null;
try {
jedis = getJedis();
res = jedis.sadd(key, members);
} catch (Exception e) {
// logger.error("sadd error : "+e);
e.getMessage();
}
return res;
}
}
- 创建redisLock工具类
@Component
public class RedisLockUtils {
@Resource
private RedisTemplate<String, String> redisTemplate;
private final String SUCCESS = "OK";
private final String REDIS_LOCK_KEY = "redis:lock:";
/**
* 获取锁
*
* @param lockKey 锁的key也就是存在redis中的key
* @param lockValue 存储在redis中的vaule
* @param expireTime 锁过期时间 单位毫秒
* @param outTime 获取锁超时时间 单位毫秒
* @return
*/
public Boolean getLock(String lockKey, String lockValue, int expireTime, long outTime) {
Jedis jedis = null;
try {
jedis = RedisUtils.getJedis();
System.out.println(Thread.currentThread().getName() + "lockValue = " + lockValue);
long currentTimeMillis = System.currentTimeMillis();
long endTime = currentTimeMillis + outTime;
while (currentTimeMillis < endTime) {
/**
* EX: 设置超时时间,单位是秒
* PX: 设置超时时间,单位是毫秒
* NX: IF NOT EXIST 的缩写,只有 KEY不存在的前提下 才会设置值 XX:
* IF EXIST 的缩写,只有在 KEY存在的前提下 才会设置值
*/
SetParams params = new SetParams();
params.px(expireTime);
params.nx();
String result = jedis.set(createKey(lockKey), lockValue, params);
System.out.println(Thread.currentThread().getName() + "获取锁的结果:result=" + result);
if (SUCCESS.equalsIgnoreCase(result)) {
return true;
} else {
currentTimeMillis = System.currentTimeMillis();
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (jedis != null) {
RedisUtils.returnResource(jedis);
}
}
return false;
}
private String createKey(String lockKey) {
return this.REDIS_LOCK_KEY + lockKey;
}
public Boolean unLock(String lockKey, String lockValue) {
Jedis jedis = RedisUtils.getJedis();
String script = "local clival = redis.call('get',KEYS[1] ) " +
"if(clival == ARGV[1]) then "+
"redis.call('del',KEYS[1]) " +
"return 'OK' " +
"else " +
"return nil " +
"end ";
List<String> keys = new ArrayList<String>();
keys.add(createKey(lockKey));
List<String> args = new ArrayList<String>();
args.add(lockValue);
String eval = (String) jedis.eval(script, keys, args);
if(jedis != null) {
jedis.close();
}
if(SUCCESS.equalsIgnoreCase(eval)) {
return true;
}
return false;
}
}
- 编写测试类测试
@RestController
public class TaskScheduling {
private final String TASK_SCHEDULING = "task";
@Autowired
private RedisLockUtils redisLockUtils;
@RequestMapping("/task")
public String task() {
long currentTimeMillis = System.currentTimeMillis();
String lockValue = UUID.randomUUID().toString();
long outTime = currentTimeMillis + 5 * 1000;
Boolean result = redisLockUtils.getLock(TASK_SCHEDULING, lockValue, 10000, 5000);
if(result) {
System.out.println(Thread.currentThread().getName() + "获取到锁,执行业务逻辑。。。");
if(System.currentTimeMillis() > outTime) {
System.out.println(Thread.currentThread().getName() + "因为没有可重入锁,当前时间比锁超时时间还长,此时可能存在误杀其他线程获取的锁,因此手动回滚事务");
}
redisLockUtils.unLock(TASK_SCHEDULING, lockValue);
return "获取锁成功";
}else {
System.out.println(Thread.currentThread().getName() + "获取锁超时。。。");
}
return "获取锁失败";
}
}
- 所使用的到的maven jar
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.1.4</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.17</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<!-- <version>2.7.1</version> --><!--版本号可根据实际情况填写-->
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<!-- <version>3.3.2</version> --><!--版本号可根据实际情况填写-->
</dependency>
</dependencies>
三.redis 分布式锁总结
虽然上述代码已经很大程度上解决了分布式锁可能存在的一些问题,但是没有实现可重入锁以及redis集群的方式,同时代码执行超时问题是基于判断方法执行时间是否超过了锁的时间,如果超过了就回滚事务。
redis的 Redisson框架可以用来实现redis分布式锁;
redis分布式锁这篇文章详细讲述了redis分布式锁从无到有搭建及解决各种问题。