小子我又来了,这次是redis缓存.虽然工作中咱也没有分布式啊,也没有啥高并发的场景啊,但是缓存作为工作中常用的中间件,基本的使用咱还是必须得掌握的.但是为了变强,为了迎娶白富美.哈哈,说这些都是虚的,还不是为了在下一次面试的时候不被面试官吊打,不至于输的太惨,对,就是这么真实.下面就是我忙里偷闲记得笔记,不喜勿碰哦…
缓存&分布式锁
一 缓存使用
1 使用缓存的目的: 为了提升系统性能,提升访问速速.
2 适合放入缓存的数据:
- 即时性,数据一致性要求不高的数据
- 访问频率高更新频率低的数据(多读少写)
3 缓存的分类
- 本地缓存:使用Map作为缓存的容器,缺点是占用虚拟机内存,在分布式架构中,缓存一致性问题解决不了
- 外置缓存:redis/memecache
二 springboot整合redis
- 导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
- 配置redis
spring:
redis:
host: 192.168.217.128
port: 6379
springboot中操作redis的工具类
- redisTemplate:存入缓存的数据key为Object,value为Object
- stringRedisTemplate:存入缓存的数据key为String,value为Object
public class RedisAutoConfiguration {
public RedisAutoConfiguration() {
}
@Bean
@ConditionalOnMissingBean(
name = {"redisTemplate"}
)
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
RedisTemplate<Object, Object> template = new RedisTemplate();
template.setConnectionFactory(redisConnectionFactory);
return template;
}
@Bean
@ConditionalOnMissingBean
public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
StringRedisTemplate template = new StringRedisTemplate();
template.setConnectionFactory(redisConnectionFactory);
return template;
}
}
redisTemplate操作redis
@Slf4j
@SpringBootTest
class RedisApplicationTests {
@Autowired
private StringRedisTemplate redisTemplate;
@Test
void contextLoads() {
redisTemplate.opsForValue().set("hello", "world"+ UUID.randomUUID().toString());
log.info("result={}",redisTemplate.opsForValue().get("hello"));
}
}
redis中8大数据类型
- String:字符串
- Hash:哈希
- List:列表
- Set:集合
- Zset:有序集合
- geospatial:地理位置
- hyperloglog:基数,叔学上集合元素的个数,是不能重复的
- bitmap:地图
springboot2.0后默认使用lettuce作为操作redis的客户端.lettuce使用netty进行网络通信,在并发量高德情况下lettuce的bug会导致netty堆外内存溢出.
解决方案:
-
升级lettuce客户端
-
切换使用jedis
三 切换客户端为jedis
- 导入依赖,切换客户端
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<exclusions>
<exclusion>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>
四 高并发下缓存问题
1 缓存穿透
- 问题描述:查询数据库和缓存中都不存在的数据.
- 风险:利用不存在的数据进行攻击,db压力瞬间增大,导致db崩溃
- 解决:1 null结果缓存,加入短暂的过期时间 2 布隆过滤器
2 缓存击穿
- 问题描述:大量请求,查询db中存在,缓存中不存在的数据
- 解决:加锁
3 缓存雪崩
- 问题描述:缓存在某一时刻同时失效,请求全部转发到db,导致db崩溃
- 解决:设置缓存数据随机的过期时间
五 加锁解决缓存击穿问题
单机情况下,使用同步代码块
Lock
private Lock lock = new ReentrantLock();
public List<Product> getProductByCode(String id){
lock.lock();
try{
//处理任务
//查询缓存
Product product = null;
String proStr = redisTemplate.opsForValue().get(id);
if(StrUtil.isNotBlank(proStr)){
//缓存命中
product = JSONUtil.toBean(proStr,Product.class);
}else{
//缓存不存在
//查询数据库
//模拟数据
product = Product.builder()
.id(id)
.build();
proStr = JSONUtil.toJsonStr(product);
//添加缓存
redisTemplate.opsForValue().set(id, proStr);
}
}catch(Exception ex){
}finally{
lock.unlock(); //释放锁
}
return product;
}
synchronize
public synchronized Product getProductByCode1(String id){
//处理任务
//查询缓存
Product product = null;
String proStr = redisTemplate.opsForValue().get(id);
if(StrUtil.isNotBlank(proStr)){
//缓存命中
product = JSONUtil.toBean(proStr,Product.class);
}else{
//缓存不存在
//查询数据库
//模拟数据
product = Product.builder()
.id(id)
.build();
proStr = JSONUtil.toJsonStr(product);
//添加缓存
redisTemplate.opsForValue().set(id, proStr);
}
return product;
}
分布式情况下使用分布式锁
redis命令中心: http://www.redis.cn/commands.html
NX - 只有键key不存在的时候才会设置key的值
命令 SET resource-name anystring NX EX max-lock-time
是一种用 Redis 来实现锁机制的简单方法。
解锁脚本的一个例子将类似于以下:
if redis.call("get",KEYS[1]) == ARGV[1]
then
return redis.call("del",KEYS[1])
else
return 0
end
redis分布式锁的原理:借助redis的原子加锁命令与原子解锁脚本
分布式锁实现
public synchronized Product getProductByRedisLock(String id) throws InterruptedException {
Product product = null;
String uuid = UUID.randomUUID().toString();
//加锁
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid, 10L, TimeUnit.SECONDS);
if(lock){
//加锁成功
//处理任务
String proStr = redisTemplate.opsForValue().get(id);
if(StrUtil.isNotBlank(proStr)){
//缓存命中
product = JSONUtil.toBean(proStr,Product.class);
}else{
//缓存不存在
//查询数据库
//模拟数据
product = Product.builder()
.id(id)
.build();
proStr = JSONUtil.toJsonStr(product);
//添加缓存
redisTemplate.opsForValue().set(id, proStr);
}
String script = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
//解锁
redisTemplate.execute(new DefaultRedisScript<Long>(script,Long.class), Collections.singletonList("lock"),uuid);
}else{
//没有获取到锁 进行自旋
Thread.sleep(100);
getProductByRedisLock(id);
}
return product;
}
加锁参数
- key: 锁的标识
- uuid: 防止解了别人的锁
- 过期时间: 防止出现死锁
上述加锁存在问题: 不可重入,当业务代码还没执行完,锁过期,其它线程获取锁
六 Redisson
Redisson: https://github.com/redisson/redisson
1 概述
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列
的分布式的Java常用对象,还提供了许多分布式服务。
2 使用
- 导入依赖:已经存在springboot自动化配置依赖
<!-- https://mvnrepository.com/artifact/org.redisson/redisson -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.15.0</version>
</dependency>
- 构建RedissonClient客户端
@Configuration
public class RedissonConfig {
/**
* RedissonClient来操作redisson
* Redis url should start with redis:// or rediss:// (for SSL connection)
* @return
* @throws IOException
*/
@Bean(destroyMethod="shutdown")
public RedissonClient redisson() throws IOException {
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.217.128:6379");
return Redisson.create(config);
}
}
3 分布式锁和同步器
3.1 可重入锁(Reentrant Lock)
@Test
void redissonTest() throws InterruptedException {
//可重入锁测试
//1. 解决业务时间过长,锁续期问题
Thread t1 = new Thread(this::execute);
Thread t2 = new Thread(this::execute);
t1.start();
t2.start();
Thread.sleep(60000);
}
private void execute() {
//获取锁
RLock lock = redissonClient.getLock("redisson_lock");
//加锁,默认锁的时间是30s
lock.lock();
try{
//执行业务代码
log.info("我是线程{},加锁成功,执行业务代码",Thread.currentThread().getName());
Thread.sleep(31000);
}catch (Exception e){
e.printStackTrace();
}finally {
//释放锁
lock.unlock();
log.info("我是线程{},解锁成功",Thread.currentThread().getName());
}
}
结果
redis中锁的数据: key:UUID+一个id
从控制台可以看出,获取锁的线程才可以执行业务代码,没有获取到锁的线程在阻塞进行等待,在加锁时也并没有设置锁的过期时间,默认是时间是30s,到期后业务未执行完,看门狗会自动进行续期,当锁未正常释放时,到期后会自动释放锁
注意: lock()方法不带参数,使用默认的"看门狗"时间,当"看门狗时间"/3,会进行自动续期,当带过期时间参数,超过时间后锁自动解开
3.2 读写锁
基于Redis的Redisson分布式可重入读写锁
RReadWriteLock
Java对象实现了java.util.concurrent.locks.ReadWriteLock
接口。其中读锁和写锁都继承了RLock接口。
分布式可重入读写锁允许同时有多个读锁和一个写锁处于加锁状态。
业务代码
@Slf4j
@Component
public class ProductServiceImpl {
@Autowired
private RedissonClient redissonClient;
private static Map<String,Product> map = new HashMap<>();
static {
Product product = Product.builder()
.id("1")
.code(UUID.randomUUID().toString())
.name("testProduct")
.price(new BigDecimal("123"))
.stone(10)
.build();
map.put("1", product);
}
public Product getProduct(String id) {
Product product = null;
//获取读写锁
RReadWriteLock lock = redissonClient.getReadWriteLock("read_write_lock");
RLock rLock = lock.readLock();
rLock.lock();
log.info("线程{}加读锁成功,开始读数据",Thread.currentThread().getName());
product = map.get("id");
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
rLock.unlock();
log.info("线程{}释放读锁成功",Thread.currentThread().getName());
}
return product;
}
public Product updateProduct(String id){
//获取读写锁
RReadWriteLock lock = redissonClient.getReadWriteLock("read_write_lock");
RLock writeLock = lock.writeLock();
writeLock.lock();
log.info("线程{}加写锁成功,开始写数据",Thread.currentThread().getName());
Product product = map.get("id");
map.put(id,product);
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
writeLock.unlock();
log.info("线程{}释放写锁成功",Thread.currentThread().getName());
}
return product;
}
}
写读
@Test
public void redissonRWLockTest() throws InterruptedException {
//线程池
ExecutorService executorService = Executors.newFixedThreadPool(2);
CountDownLatch latch = new CountDownLatch(2);
executorService.submit(()->{
productService.updateProduct("1");
latch.countDown();
});
Thread.sleep(1000);
executorService.submit(() -> {
try {
productService.getProduct("1");
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
latch.await();
}
写的同时,读会等待写锁释放
读写
@Test
public void redissonRWLockTest() throws InterruptedException {
//线程池
ExecutorService executorService = Executors.newFixedThreadPool(2);
CountDownLatch latch = new CountDownLatch(2);
executorService.submit(() -> {
productService.getProduct("1");
latch.countDown();
});
Thread.sleep(1000);
executorService.submit(()->{
productService.updateProduct("1");
latch.countDown();
});
latch.await();
}
读的时候,写会阻塞等待读锁释放
读读
@Test
public void redissonRWLockTest() throws InterruptedException {
//线程池
ExecutorService executorService = Executors.newFixedThreadPool(2);
CountDownLatch latch = new CountDownLatch(2);
executorService.submit(() -> {
productService.getProduct("1");
latch.countDown();
});
executorService.submit(() -> {
productService.getProduct("1");
latch.countDown();
});
/*Thread.sleep(1000);
executorService.submit(()->{
try {
productService.updateProduct("1");
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
});*/
latch.await();
}
读锁共享
写写
@Test
public void redissonRWLockTest() throws InterruptedException {
//线程池
ExecutorService executorService = Executors.newFixedThreadPool(2);
CountDownLatch latch = new CountDownLatch(2);
/*executorService.submit(() -> {
try {
productService.getProduct("1");
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread.sleep(1000);*/
executorService.submit(()->{
productService.updateProduct("1");
latch.countDown();
});
executorService.submit(()->{
productService.updateProduct("1");
latch.countDown();
});
latch.await();
}
写锁互斥
3.3 信号量(Semaphore)
同一共享的资源,每一个线程获取资源都必须获取去资源池获取令牌,当令牌获取完便进行等待,等别人释放资源.
可以进行限流操作,对资源进行保护
public void getResources(){
RSemaphore semaphore = redissonClient.getSemaphore("semaphore");
//设置资源数,只会设置一次
semaphore.trySetPermits(3);
try {
//请求资源
semaphore.acquire();
//执行业务代码
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
//释放资源
semaphore.release();
}
}
3.4 闭锁(CountDownLatch)
举个不恰当的例子:王公子请大家吃饭,等人全部到齐了开始吃饭
/**
* 多个分布在不同jvm虚拟机下的线程进一个汇合
*/
public void testCountDownLatch() throws InterruptedException {
//准备开饭
log.info("准备开饭");
RCountDownLatch latch = redissonClient.getCountDownLatch("anyCountDownLatch");
latch.trySetCount(2);
//创建两个线程模拟其它虚拟机下的业务代码
//线程池
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.submit(() -> imComing());
executorService.submit(() -> imComing());
latch.await();
log.info("人已到齐,开饭");
}
/**
* 入席
*/
public void imComing(){
RCountDownLatch latch = redissonClient.getCountDownLatch("anyCountDownLatch");
try {
Thread.sleep(2000);
log.info("宾客{}入席",Thread.currentThread().getName());
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
七 缓存一致性
缓存数据与数据库数据保存一致,高并发下会存在问题
1 双写并发
问题描述:A写数据库还没来得及写缓存,B写数据库写缓存,A再写缓存,缓存中的数据最后就是A修改后的数据
解决方案: 采用双删策略
2 读写并发
问题描述:A写数据库删除缓存,C读取到A写后的数据,B写数据库,B删除缓存,C再写缓存,缓存中是A的脏数据
解决方案:
- 并发量小的数据,不需要考虑缓存不一致问题,缓存数据加上过期时间,每隔一段时间自动更新.
- 通过加锁保证保证并发读写,写写时候按照顺序排好队.适用分布式读写锁.
- 可以适用canal订阅mysql的binlog日志方式.
总结:
- 放入缓存中的数据适用于多读少写的场景,数据实时性,一致性要求不高的场景
- 实时性,一致性要求高的数据,直接查询数据库
走在下班的路上,一阵春风吹来,咦,怎么感觉头有点冷…未完,待续