SpringBoot进阶教程(二十七)整合Redis之分布式锁

在之前的一篇文章(《Java分布式锁,搞懂分布式锁实现看这篇文章就对了》),已经介绍过几种java分布式锁,今天来个Redis分布式锁的demo。redis 现在已经成为系统缓存的必备组件,针对缓存读取更新操作,通常我们希望当缓存过期之后能够只有一个请求去更新缓存,其它请求依然使用旧的数据。这就需要用到锁,因为应用服务多数以集群方式部署,因此这里的锁就必需要是分布式锁才能符合需求。

学习本章节之前,建议依次阅读以下文章,更好的串联全文内容,如已掌握以下列出知识点,请跳过:

SpringBoot进阶教程(二十七)整合Redis之分布式锁

v简单实现

锁是针对某个资源的状态,保证其访问的互斥性,在实际使用当中,这个状态一般是一个字符串。使用 Redis 实现锁,主要是将状态放到 Redis 当中,利用其原子性,当其他线程访问时,如果 Redis 中已经存在这个状态,就不允许之后的一些操作。spring boot使用Redis的操作主要是通过RedisTemplate(或StringRedisTemplate )来实现。

1.1 将锁状态放入 Redis:

redisTemplate.opsForValue().setIfAbsent("lockkey", "value"); // setIfAbsent如果键不存在则新增,存在则不改变已经有的值。

1.2 设置锁的过期时间

redisTemplate.expire("lockkey", 30000, TimeUnit.MILLISECONDS);

1.3 删除/解锁

redisTemplate.delete("lockkey");

这么就是简单实现,但是1.1和1.2这么做,这两步违背了原子性,也就是一旦锁被创建,而没有设置过期时间,则锁会一直存在。

1.4 获取锁

redisTemplate.opsForValue().get("lockkey");

1.5 解决方案

spring data的 RedisTemplate 当中并没有这样的方法。但是在jedis当中是有这种原子操作的方法的,需要通过 RedisTemplate 的 execute 方法获取到jedis里操作命令的对象.

String result = template.execute(new RedisCallback<String>() {
@Override
public String doInRedis(RedisConnection connection) throws DataAccessException {
JedisCommands commands = (JedisCommands) connection.getNativeConnection();
return commands.set(key, "锁定的资源", "NX", "PX", 3000);
}
});

注意: Redis 从2.6.12版本开始 set 命令支持 NX 、 PX 这些参数来达到 setnx 、 setex 、 psetex 命令的效果,文档参见: SET — Redis 命令参考

NX: 表示只有当锁定资源不存在的时候才能 SET 成功。利用 Redis 的原子性,保证了只有第一个请求的线程才能获得锁,而之后的所有线程在锁定资源被释放之前都不能获得锁。

v锁的进阶

模拟一个比较常见的秒杀场景,这时候就需要用到锁。

2.1 创建RedisLockHelper

package com.demo.common;

import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component; /**
* Created by toutou on 2019/1/27.
*/
@Component
@Slf4j
public class RedisLockHelper {
@Autowired
private StringRedisTemplate stringRedisTemplate; /**
* 加锁
* @param targetId targetId - 商品的唯一标志
* @param timeStamp 当前时间+超时时间 也就是时间戳
* @return
*/
public boolean lock(String targetId,String timeStamp){
if(stringRedisTemplate.opsForValue().setIfAbsent(targetId,timeStamp)){
// 对应setnx命令,可以成功设置,也就是key不存在
return true;
} // 判断锁超时 - 防止原来的操作异常,没有运行解锁操作 防止死锁
String currentLock = stringRedisTemplate.opsForValue().get(targetId);
// 如果锁过期 currentLock不为空且小于当前时间
if(!Strings.isNullOrEmpty(currentLock) && Long.parseLong(currentLock) < System.currentTimeMillis()){
// 获取上一个锁的时间value 对应getset,如果lock存在
String preLock =stringRedisTemplate.opsForValue().getAndSet(targetId,timeStamp); // 假设两个线程同时进来这里,因为key被占用了,而且锁过期了。获取的值currentLock=A(get取的旧的值肯定是一样的),两个线程的timeStamp都是B,key都是K.锁时间已经过期了。
// 而这里面的getAndSet一次只会一个执行,也就是一个执行之后,上一个的timeStamp已经变成了B。只有一个线程获取的上一个值会是A,另一个线程拿到的值是B。
if(!Strings.isNullOrEmpty(preLock) && preLock.equals(currentLock) ){
// preLock不为空且preLock等于currentLock,也就是校验是不是上个对应的商品时间戳,也是防止并发
return true;
}
}
return false;
} /**
* 解锁
* @param target
* @param timeStamp
*/
public void unlock(String target,String timeStamp){
try {
String currentValue = stringRedisTemplate.opsForValue().get(target);
if(!Strings.isNullOrEmpty(currentValue) && currentValue.equals(timeStamp) ){
// 删除锁状态
stringRedisTemplate.opsForValue().getOperations().delete(target);
}
} catch (Exception e) {
log.error("警报!警报!警报!解锁异常{}",e);
}
}
}

这个是Redis加锁和解锁的工具类,里面使用的主要是两个命令,SETNX和GETSET。

SETNX命令 将key设置值为value,如果key不存在,这种情况下等同SET命令。 当key存在时,什么也不做

GETSET命令 先查询出原来的值,值不存在就返回nil。然后再设置值 对应的Java方法在代码中提示了。 注意一点的是,Redis是单线程的!所以在执行GETSET和SETNX不会存在并发的情况。

2.2 创建Controller模拟秒杀场景

package com.demo.controller;

import com.demo.common.RedisLockHelper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController; /**
* Created by toutou on 2019/1/27.
*/
@RestController
@Slf4j
public class RedisController { @Autowired
RedisLockHelper redisLockHelper; /**
* 超时时间 5s
*/
private static final int TIMEOUT = 5*1000; @RequestMapping(value = "/seckilling")
public String Seckilling(String targetId){
//加锁
long time = System.currentTimeMillis() + TIMEOUT;
if(!redisLockHelper.lock(targetId,String.valueOf(time))){
return "排队人数太多,请稍后再试.";
} int surplusCount = 0;
// 查询该商品库存,为0则活动结束 e.g. getStockByTargetId
if(surplusCount==0){
return "活动结束.";
}else {
// 下单 e.g. buyStockByTargetId //减库存 不做处理的话,高并发下会出现超卖的情况,下单数,大于减库存的情况。虽然这里减了,但由于并发,减的库存还没存到map中去。新的并发拿到的是原来的库存
surplusCount =surplusCount-1;
try{
Thread.sleep(100);//模拟减库存的处理时间
}catch (InterruptedException e){
e.printStackTrace();
}
// 减库存操作数据库 e.g. updateStockByTargetId // buyStockByTargetId 和 updateStockByTargetId 可以同步完成(或者事物),保证原子性。
} //解锁
redisLockHelper.unlock(targetId,String.valueOf(time)); return "恭喜您,秒杀成功。";
}
}

其他参考资料:

注:本文中很多内容来自以上链接的学习心得,感谢以上人员分享,也请转载本文的各站保持以上链接。

v源码地址

https://github.com/toutouge/javademosecond/tree/master/hellospringboot

作  者:请叫我头头哥

出  处:http://www.cnblogs.com/toutou/

关于作者:专注于基础平台的项目开发。如有问题或建议,请多多赐教!

版权声明:本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文链接。

特此声明:所有评论和私信都会在第一时间回复。也欢迎园子的大大们指正错误,共同进步。或者直接私信

声援博主:如果您觉得文章对您有帮助,可以点击文章右下角推荐一下。您的鼓励是作者坚持原创和持续写作的最大动力!

上一篇:SpringBoot学习(二)——Spring的Java配置方式


下一篇:SpringBoot进阶教程(二十五)整合Redis之@Cacheable、@CachePut、@CacheEvict的应用