Redis自学笔记
- NoSQL简介
- Redis简介
- 常用五大数据类型
- Redis6配置文件详解
- Redis6的发布和订阅
- Redis6新数据类型 Bitmaps
- Redis6新数据类型 HyperLogLog
- Redis6新数据类型 Geospatial
- Jedis操作Redis6*
- Redis与Springboot整合
- Redis事务和锁机制
- 事务的冲突问题
- Redis事务秒杀案例
- Redus6持久化操作之 RDB
- Redus6持久化操作之 AOF
- Redis 主从复制
- Redis集群
- 应用问题解决 缓存穿透
- 应用问题解决 缓存击穿
- 应用问题解决 缓存雪崩
- 分布式锁
- Redis6.0新功能
NoSQL简介
NoSQL作用
1解决session在不同服务器之间不能获取的问题
把session对象放入nosql内,这样各个服务器都能从里面取,而且不用进行io操作,因为这个数据库是放在内存中的。
2解决io压力
Nosql作为缓存数据库,里面存放一些常用的数据,就不用经常去数据库拿,可以直接在内存的nosql里面取数据
NoSQL特点
Not only sql 不仅仅是数据库 非关系型数据库
以简单的key-value模式存储
· 不遵循SQL标准
· 不支持ACID
· 远超于SQL的性能
适用于 高并发,海量的数据处理
数据库存储模式
行式数据库 查询单个数据行的数据会比较快
列式数据库 查询某个字段订单数据平均值或者总和会比较快
图关系数据库 地图,公共交通网络
Redis简介
Redis6 安装
Redis就是一个NoSQL数据库,key-value存储系统
支持的存储类型: string,list,set,zset,hash
1下载tar.gz 压缩包
2下载gcc环境 yum install gcc
3 tar命令解压tar.gz压缩包
4在redis目录下,执行 make (目前只是编译好了)
5安装 make install
安装之后默认是在 /usr/local/bin
6检查安装是否成功 cd /usr/local/bin --> ls 查看是否有redis
Redis6 启动
1前台启动
输入命令:redis - server
关窗口就使用不了了,不推荐
Ctrl C: 关闭
2后台启动 推荐
把redis的redis.conf 复制一份放在/etc下,且修改此文件的demonize no为yes
这个就是把配置文件修改为运行后台启动
输入命令,启动方式则为后台启动
直接这样也行
检查是否打开
Redis6 关闭
1单实例关闭 redis-cli shutdown
2找到进程号,用kill -9命令去杀死进程也可也
Redis6 介绍
默认6379端口 默认16个数据库,默认是用0号库
支持多种数据类型
支持持久化
单线程+多路IO复用
常用五大数据类型
String List Set Hash Zset
Redis键(key)
- 查看所有的key:keys *
- 增加数据: set key value
- 查询是否存在: exists key
- 删除数据: del key 直接删除 unlink key 异步删除
- 给key设置过期时间: expire key 10 (10秒)
- 查看还有多少秒过期: ttl k1 (-1表示永不过期,-2表示过期)
- 切换0到15号数据库: select 0-15
- 清空当前数据库 flushdb
- 清空全部0-15数据库 flushall
测试截图
Redis字符串
1简介
String是Redis最基本的类型
String是二进制安全的,意味着Redis的string可以包含任何数据,比如jpg图片或者序列化对象,最多可以是512M
2常用命令
- set k1 / get k1
- append k1 abc 追加到k1的value后面
- strlen k1 获取值的长度
- setnx key value 只有key不存在的时候,才能设置成功
- Incr k1 对k1的value值加1,是字符串则设置不成功,必须是数字类型
- decr k1 对k1的value值减1,是字符串则设置不成功,必须是数字类型
- Incrby key 步长 自定义设置加上的大小
- decrby key 步长 自定义设置减去的大小
- mset key value key value 设置多个key-value
- mget key1 key2 key3 获取多个value
- msetnx key1 value1 key2 value2 当有任何一个key已经存在,则设置不成功
- getrange key 起始位置,结束位置 获取value的指定截取长度,类似submit
- setrange key 3,abc 修改key的value值的下标为3的位置往后的值为abc
- setex key 过期时间 value
- getset key value 以新换旧,设置新值
Java的i++是否是原子操作?不是
两个线程分别执行i++100次,结果的范围是== 2-200==
i++有取值 和 ++ 和 赋值的操作,没有原子操作的话,可能在取值成功了,然后赋值被打断了,就没有赋成功,互相会产生影响
3数据结构
底层是一个简单动态字符串数组arraylist,预设了长度,如果字符串过长,则会扩容,每次扩容一倍,最大为512M
Redis列表
List 单键多值,底层是一个双向链表
-
lpush/rpush key1 value1 value2 从左边/右边插入一个或多个值
-
lpop/rpop key1 从左边/右边吐出一个值 值在键在,值光键亡
-
rpoplpush key1 key2 从key1列表右边吐出一个值,插到key2列表左边
-
lrange key start stop 按照索引下标获取元素(从左到右)
-
rrange key start stop 按照索引下标获取元素(从右到左)
0 -1 表示获取所有索引 -
lindex key index 按照索引下标获取元素(从左到右)
-
llen key 获取列表长度
-
linsert key before/after value newvalue 在value的后面插入newvalue值
-
lrem key n value 从左边删除n个value
-
lset key index value 在指定下标替换为value值
数据结构:
List的数据结构为快速链表quicklist
多个压缩列表 ziplist 使用双向指针串起来使用
Redis集合
Set 集合
-
sadd key value1 value2 将多个member元素加入到集合key中,自动去重
-
smembers key 取出该集合的所有值
-
sismember key value 判断集合 key 是否含有该 value 值,有1 没有0
-
scard key 返回该集合的元素个数
-
srem key value1 value2 删除集合中的某个元素
-
spop key 随机从该集合中吐出一个值
-
srandmember key n 随机从该集合中取出n个值,不会删除
-
smove source destination value 把集合中的一个value放入另一个集合中
-
sinter key1 key2 返回两个集合的交集元素
-
sunion key1 key2 返回两个集合的并集元素
-
sdiff key1 key2 返回两个集合的差集元素
数据结构:
是用dict字典,字典是用哈希表实现的
Set结构用的是hash结构,所有的value都指向同一个对象
Redis哈希
Hash是一个键值对集合
常用来存储对象,每一个value可以是一个属性映射,key表示一个对象
存储对象的方法:
1用json字符串 user:{id=1,name=zhangsan} 不方便修改属性值
2 分为多段数据 user:id 1
user:name zhangsan
这样存储会非常的混乱
3 hash 以user作为key,value中的属性作为映射,映射每一个属性值
常用命令:
-
hset key field value 给key集合中的field键赋值value
-
Hget key1 field 从key集合field取出value
-
Hmset key1 field1 value1 field2 value2 批量设置hash值
-
Hexists key1 field 查看哈希表key中,给定域field是否存在
-
Hkeys key 列出该hash集合的所有field
-
Hvals key 列出该hash集合的所有value
-
Hincrby key field increment 为哈希表key中的域field的值加上增量
-
Hsetnx key field value 将哈希表key中的域field 的值设置为value,当且仅当域field不存在
数据结构:
hash类型对应的数据结构是两种 ziplist(压缩列表) hashtable(哈希表) 当field-value长度较短且个数少时,使用ziplist,否则使用hashtable
Redis有序集合
Zset
有序集合和set相似,也是自动去重
不同的是有序集合的每个成员都关联了一个score,根据此可以排序
-
zadd key score1 value1 score2 value2 score3 value3
-
zrange key 0 -1 里面自动根据评分来排序显示出来
-
Zrangebyscore key (min max) withscore
查询min到max之间的value包括分数显示出来 -
Zrevrangebyscore key (max min)withscore
查询从大到小排序包括分数显示出来 -
Zincrby key increment value 为元素的score加上增量increment
-
Zrem key value 删除该集合下,指定值的元素
-
Zcount key min max 统计该集合,分数区间内的元素个数
-
Zrank key value 返回该值在集合终端排名 从0 开始
数据结构:
是redis提供的一个非常特别的数据结构,等价于map,有value,score,底层又类似于treeset,内部的元素会按照score进行排序。
Redis6配置文件详解
/etc/redis.conf
bind 127.0.0.1 -::1 目前访问只能通过本地访问redis
protected-mode no 支持远程访问
Port 6379 默认端口号
tcp-backlog 511 是队列总和=未完成的三次握手+已完成的三次握手队列
Timeout 0 永不超时
Tcp-keepalive 300 心跳时间,检查是否在操作,不在就释放
Database 16 默认是16个数据库
Maxmemory 10000 默认可连接用户是10000
Redis6的发布和订阅
是一种消息的通信模式:发送者发送消息,订阅者接收消息
Redis的客户端可以订阅多个频道
代码实现:
Publish发步给channel1一句hello
Subscribe订阅了channel1,则可以接收到hello信息
没有持久化,没订阅之前的内容,都不会进行保留
Redis6新数据类型 Bitmaps
本身不是一种数据类型,其实就是只含有0和1的字符串,可以对字符串的位进行操作
可以把bitmaps想象成一个以位为单位的数组,数组的每个单位只能储存0和1,数组的下标值bitmaps中叫做偏移量
- Setbit key 1 1/0 设置bitmaps中第某个偏移量的值
- Setbit key 6 1/0
- Getbit key 1 获取bitmaps中某个偏移量的值
- Bitcount key 统计里面1的数量
- Bitop and(or/not/xor) destkey [key…]
这是一个复合操作,可以做多个bitmaps的与或非异或操作,并且结果放到destkey
如果每一天的所有用户访问可以记作一个bitmaps数据,计算两天都访问的用户
Bitop and destkey 第一天的key,第二天的key
其实原理就是对每一位去and运算,如果用户两天都访问了,那对应的那一位都是1,1and1就是1
同理,计算两天访问过的用户可以用or计算
Redis6新数据类型 HyperLogLog
内部计算不重复元素的个数,也就是基数,非常快速,使用的内存很少
1 pfadd
Pfadd program ”java“ 1
Pfadd program ”java“ 0
Pfadd program ”c++“ 1
每一次加入,都会计算输出基数,如果基数不变就0 变了就1
2 Pfcount program 返回总数量
3 pfmerge destkey program1 program2 合并两个program到destkey中
Redis6新数据类型 Geospatial
GEO,地理信息的缩写
该类型,就是元素的二维坐标,在地图上就是经纬度,redis基于该类型,提供了经纬度设置,查询,范围查询,距离查询,经纬度hash等常见操作
1 geoadd
geoadd china:city 121.47 31.23 shanghai
geoadd china:city 106.50 29.53 chongqing
2 geopos
geopos china:city shanghai 获取地区的具体位置数据
3 geodist
geodist china:city shanghai chongqing km/m/… 计算地区的直线距离
4 georadius
georadius china:city 110 30 1000 km
获取(110,30)经纬度附近1000km以内的城市
Jedis操作Redis6*
通过java操作redis 支持java语言对redis操作
对配置文件要修改,
1 bind之前要加上#
2 把protected设为no
3关闭防火墙 systemctl stop firewalld
4 重启redis
Idea测试连接:
1引入依赖
<dependencies>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.7.3</version>
</dependency>
</dependencies>
2测试ping
输出PONG则说明连接成功
操作测试:
@Test
public void demo1(){
Jedis jedis = new Jedis("192.168.244.130",6379);
//添加
jedis.set("name","lucy");
//获取
System.out.println(jedis.get("name"));
//查询过期时间
Long time = jedis.ttl("name");
System.out.println(time);
//操作多个
jedis.mset("k1","v1","k2","v2");
List<String> mget = jedis.mget("k1", "k2");
System.out.println(mget);
Set<String> keys = jedis.keys("*");
for (String key : keys) {
System.out.println(key);
}
}
@Test
public void demo2(){
Jedis jedis = new Jedis("192.168.244.130",6379);
//设置左插入
jedis.lpush("list1","v1","v2","v3");
//从左边开始读取
List<String> value = jedis.lrange("list1", 0, -1);
System.out.println(value);
}
@Test
public void demo3(){
Jedis jedis = new Jedis("192.168.244.130",6379);
//插入,自动去重
jedis.sadd("name1","lucy","jack","lucy");
//读取
Set<String> name = jedis.smembers("name1");
System.out.println(name);
}
@Test
public void demo4(){
Jedis jedis = new Jedis("192.168.244.130",6379);
//插入,自动去重
jedis.hset("user","name","yuan");
jedis.hset("user","age","22");
String age = jedis.hget("user", "age");
String name = jedis.hget("user", "name");
System.out.println(age+" "+name);
}
@Test
public void demo5(){
Jedis jedis = new Jedis("192.168.244.130",6379);
//插入,自动去重
jedis.zadd("china",100d,"guandong");
jedis.zadd("china",190d,"beijing");
jedis.zadd("china",10d,"shanghai");
//zset内部根据score自动排序,相当于treeset数据结构
Set<String> china = jedis.zrange("china", 0, -1);
System.out.println(china);
}
手机验证码实现
要求
1输入手机号,点击发送后随机生成6位数字码,2分钟有效
2输入验证码,点击验证,返回成功或失败
3每个手机号每天只能输入3次
public class PhoneNumber {
public static void main(String[] args) {
//模拟验证码发送
verifyCode("134343");
getRedisCode("134343","4545");
}
//3验证码校验
public static void getRedisCode(String phone,String code){
Jedis jedis = new Jedis("192.168.244.130",6379);
//验证码的key
String codeKey = "VerifuCode"+phone+":code";
String redisCode = jedis.get(codeKey);
//判断
if(redisCode.equals(code)){
System.out.println("成功");
}
else{
System.out.println("失败");
}
jedis.close();
}
//2每个手机每天只能发送三次,验证码放到redis中,设置过期时间
public static void verifyCode(String phone){
Jedis jedis = new Jedis("192.168.244.130",6379);
//拼接key
String countKey = "VerifyCode"+phone+":count";
String codeKey = "VerifuCode"+phone+":code";
String count = jedis.get(codeKey);
if(count == null ){
//没有发送过
//设置发送次数为1
jedis.setex(codeKey,24*60*60,"1");
}else if(Integer.parseInt(count) <=2){
jedis.incr(codeKey);
}else{
//发送三次,不能再发送了
System.out.println("今天已经发送三次,不能再发送");
}
//发送的验证码放到redis中
String vcode = getCode();
jedis.setex(codeKey,120,vcode);
jedis.close();
}
//1生成6位数字验证码
public static String getCode(){
Random random = new Random();
StringBuilder stringBuilder = new StringBuilder();
for(int i=0;i<6;i++){
int rand = random.nextInt(10);
stringBuilder.append(rand);
}
return stringBuilder.toString();
}
}
Redis与Springboot整合
1引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.6.0</version>
</dependency>
2配置application.properties
#配置redis
spring.redis.host=192.168.244.130
spring.redis.port=6379
spring.redis.database=0
spring.redis.timeout=1800000
spring.jta.atomikos.properties.max-actives=20
spring.redis.lettuce.pool.max-idle=5
spring.redis.lettuce.pool.max-wait=-1
spring.redis.lettuce.pool.min-idle=0
3写redis的配置类
@EnableCaching
@Configuration
public class RedisConfig extends CachingConfigurerSupport {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
RedisSerializer<String> redisSerializer = new StringRedisSerializer();
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
template.setConnectionFactory(factory);
//key序列化方式
template.setKeySerializer(redisSerializer);
//value序列化
template.setValueSerializer(jackson2JsonRedisSerializer);
//value hashmap序列化
template.setHashValueSerializer(jackson2JsonRedisSerializer);
return template;
}
@Bean
public CacheManager cacheManager(RedisConnectionFactory factory) {
RedisSerializer<String> redisSerializer = new StringRedisSerializer();
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
//解决查询缓存转换异常的问题
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
// 配置序列化(解决乱码的问题),过期时间600秒
RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
.entryTtl(Duration.ofSeconds(600))
.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(redisSerializer))
.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(jackson2JsonRedisSerializer))
.disableCachingNullValues();
RedisCacheManager cacheManager = RedisCacheManager.builder(factory)
.cacheDefaults(config)
.build();
return cacheManager;
}
}
4修改starter
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.4.5</version>
</dependency>
5测试controller
@RestController
@RequestMapping("/redisTest")
public class RedisTestController {
@Autowired
private RedisTemplate redisTemplate;
@GetMapping
public String testRedis(){
redisTemplate.opsForValue().set("name","lucy");
//从redis获取数据
String name = (String) redisTemplate.opsForValue().get("name");
return name;
}
}
Redis事务和锁机制
Redis事务的主要作用就是串联多个命令防止别的命令插队
Multi,Exce,discard
从输入multi命令开始,输入的命令都会依次进入命令队列,但不会执行,直到输入Exec之后,Redis会将之前的命令队列中的命令进行依次执行
组队的过程中,可以通过discard来发起组队
测试
事务的错误处理
错误情况
1在组队阶段的时候,有一个错误,那最终都不会执行
2在执行阶段的时候,有一个错误,则单独这个不会执行
事务的冲突问题
如果三个人对同一个账户进行处理,同时请求减不同的金额值
解决方案:
悲观锁
认为每次操作都不安全,每次操作之前都加锁,别人不能操作
缺点就是效率很低,只能一个一个操作
乐观锁
对每个操作设置一个新的版本号,在操作之前,对当前的数据的版本号和数据库更新的版本号进行比较,如果版本号不一致,那就不能进行操作
乐观锁使适用于多读的应用类型在,这样可以提高吞吐量,Redis就是利用这种check-and-set机制实现事务的
Watch
乐观锁案例
Watch key 如果执行在multi之前,就等于是对key进行了监视。
那在exec执行之前,如果修改了这个key,那执行就会报错,因为修改了key,等于修改了内部的版本号,在exec执行的时候判断出版本号不一致,就会取消执行。
Unwatch key 就是取消监视
Redis事务三特性
- 单独的隔离操作
- 没有隔离级别的概念
- 不保证原子性 在执行的时候一个错误,后面还会继续执行,不会回滚
Redis事务秒杀案例
导入seckill文件
利用ab工具模拟开发
1 yum install httpd-tools 下载ab测试工具
ab -n 1000 -c 100 一千个请求中有100个并发处理的
2vim postfile 模拟表单提交参数,以&符号结尾;存放当前目录。
内容:prodid=0101&
3执行
ab -n 2000 -c 200 -k -p ~/postfile
-T application/x-www-form-urlencoded http://192.168.2.115:8088/Seckill/doseckill
模拟并发效果
解决问题:
1 连接超时
使用连接池
JedisPool jedisPoolInstance = JedisPoolUtil.getJedisPoolInstance();
Jedis jedis = jedisPoolInstance.getResource();
2 超卖问题
//秒杀过程
public static boolean doSecKill(String uid,String prodid) throws IOException {
//1 uid和prodid非空判断
if(uid == null || prodid == null) {
return false;
}
//2 连接redis
//Jedis jedis = new Jedis("192.168.44.168",6379);
//通过连接池得到jedis对象
JedisPool jedisPoolInstance = JedisPoolUtil.getJedisPoolInstance();
Jedis jedis = jedisPoolInstance.getResource();
//3 拼接key
// 3.1 库存key
String kcKey = "sk:"+prodid+":qt";
// 3.2 秒杀成功用户key
String userKey = "sk:"+prodid+":user";
//监视库存
jedis.watch(kcKey);
//4 获取库存,如果库存null,秒杀还没有开始
String kc = jedis.get(kcKey);
if(kc == null) {
System.out.println("秒杀还没有开始,请等待");
jedis.close();
return false;
}
// 5 判断用户是否重复秒杀操作
if(jedis.sismember(userKey, uid)) {
System.out.println("已经秒杀成功了,不能重复秒杀");
jedis.close();
return false;
}
//6 判断如果商品数量,库存数量小于1,秒杀结束
if(Integer.parseInt(kc)<=0) {
System.out.println("秒杀已经结束了");
jedis.close();
return false;
}
//7 秒杀过程
//使用事务
Transaction multi = jedis.multi();
//组队操作
multi.decr(kcKey);
multi.sadd(userKey,uid);
//执行
List<Object> results = multi.exec();
if(results == null || results.size()==0) {
System.out.println("秒杀失败了....");
jedis.close();
return false;
}
//7.1 库存-1
//jedis.decr(kcKey);
//7.2 把秒杀成功用户添加清单里面
//jedis.sadd(userKey,uid);
System.out.println("秒杀成功了..");
jedis.close();
return true;
}
Redus6持久化操作之 RDB
持久化操作的第一种是写入rdb文件
在指定的时间间隔内将内存中的数据集快照写入磁盘, 也就是行话讲的Snapshot快照,它恢复时是将快照文件直接读到内存里
Rdb文件,在创建连接的时候会自动创建一个rdb文件
Key改变的越快,持久化save的操作就越快
如果设置30秒内10个改变进行save操作,则一次性加入11个key的话,最后一个不会持久化到rdb文件中,这就是rdb的丢失数据原因
上面的save是手动保存,会全部阻塞,一般不使用
Bgsave是异步自动保存,不会阻塞
RDB备份是如何执行的
Redis会单独创建一个fork子进程来进行持久化,会先将数据写到一个临时文件中,待持久化过程结束了,再用这个临时文件代替上次持久化好的文件,整个个过程中,主进程是不进行任何IO操作,这就确保了极高的性能,如果需要进行大规模数据的恢复,且对于数据恢复的完整性不是非常敏感,RDB方式要比AOF方式更加的高效,缺点是最后一次持久化的数据可能丢失
RDB备份恢复
先通过config get dir 查询rdb文件的目录
将*.rdb的文件拷贝到别的地方
rdb的恢复
关闭Redis
先把备份的文件拷贝到工作目录下 cp dump2.rdb dump.rdb
启动Redis, 备份数据会直接加载
优势就是 恢复快,劣势是 占用2倍的内存数据量,因为克隆了一份
Redus6持久化操作之 AOF
AOF启动
Append only file 对文件进行内容的追加
只记录写操作,不记录读操作,只需追加文件,不能修改文件
RDB默认是开启的,而AOF默认是不开启的
在/etc/redis.conf修改为yes
重启
打开文件位置则可以看到aof文件
如果AOF和RDB同时开启,系统默认取AOF的值
AOF 恢复
跟RDB基本是一样的
正常恢复
将有数据的aof文件复制一份保存到对应目录(查看目录:config get dir)
恢复:重启redis然后重新加载
异常恢复
修改默认的appendonly no,改为yes
如遇到AOF文件损坏,通过/usr/local/bin/redis-check-aof–fix appendonly.aof进行恢复
备份被写坏的AOF文件
恢复:重启redis,然后重新加载
AOF同步频率设置
- appendfsync always
始终同步,每次Redis的写入都会立刻记入日志;性能较差但数据完整性比较好 - appendfsync everysec
每秒同步,每秒记入日志一次,如果宕机,本秒的数据可能丢失。 - appendfsync no
redis不主动进行同步,把同步时机交给操作系统。
Redis 主从复制
主机数据更新后根据配置和策略, 自动同步到备机的master/slaver机制
Master以写为主,Slave以读为主
主服务器只能有一台,从服务器可以有多台,复制主服务器的数据
一主多从
读写分离,主机做写操作,从机做读操作
容灾快速恢复
搭建一主多从服务器
1创建/myredis文件夹
2复制redis.conf配置文件到此文件夹中
3配置一主两从,新建三个.conf配置文件
创建redis6379.conf
创建三个配置文件
4启动三个redis服务
5查看主从状态
查看主机的运行状况:info replication
6配置从服务器
Slaveof 127.0.0.1 6379 加入到6379,作为从服务器
在master主机中可以操作set写操作,在savler是不行的,从机不能进行写操作
一主二仆
从服务器挂掉之后,不能自动变成从服务器,而是变成主服务器,需要的话可以再次slaveof加入到主从关系中,加入主服务器中后,会复制主服务器内的数据。
主服务器挂掉之后,从服务器还是从服务器,还是主服务器的小弟,不会篡位。
主服务器重新启动起来,还是回到主的位置,属于它的从服务器还是存在。
主从复制的原理
1从连接上主服务器之后,从服务器向主服务器发送进行数据同步的消息
2主服务器接到从服务器发送过来的同步消息,把主服务器数据进行持久化rdb文件。把rdb文件发送从服务器,从服务器拿到rdb进行读取
3每次写操作之后,和从服务器进行数据同步
第一次读取复制是全量复制,后面追加内容是增量复制
薪火相传,反客为主
上一个Slave可以是下一个slave的Master,Slave同样可以接收其他 slaves的连接和同步请求,那么该slave作为了链条中下一个的master, 可以有效减轻master的写压力,去中心化降低风险。
反客为主
当一个master宕机后,后面的slave可以立刻升为master,其后面的slave不用做任何修改。
用 slaveof no one 将从机变为主机。
这个是手动,是比较麻烦的,后面讲到哨兵模式,当主机坏了,从机会自动上位,反客为主自动版
哨兵模式
反客为主的自动版,能够后台监控主机是否故障,如果故障了根据投票数自动将从库转换为主库
1一主二从配置完成
2配置哨兵
在:myredis中加入 sentinel.conf ,写入
3启动哨兵
redis-sentinel /myredis/sentinel.conf
开启成功
当6379主机挂掉,哨兵就监控到了,就会在从服务器中选择一个作为主服务器上位,而6379也还是存在,当6379重新启动,会变成从服务器。
在上位的从服务器选择中,优先级作为判断依据,replca-priority 越小优先级越高
Redis集群
无中心化集群
任何一台服务器都可以作为请求的入口,服务器之间是连通的,可以互相调用
搭建Redis集群
cluster nodes 命令查看集群信息
redis cluster 如何分配这六个节点?
一个集群至少要有三个主节点。
选项 --cluster-replicas 1 表示我们希望为集群中的每个主节点创建一个从节点。
分配原则尽量保证每个主数据库运行在不同的IP地址,每个从库和主库不在一个IP地址上。
什么是slots
一个 Redis 集群包含 16384 个插槽(hash slot), 数据库中的每个键都属于这 16384 个插槽的其中一个,
集群使用公式 CRC16(key) % 16384来计算键 key 属于哪个槽, 其中 CRC16(key) 语句用于计算键 key 的 CRC16 校验和 。
集群中的每个节点负责处理一部分插槽。 举个例子, 如果一个集群可以有主节点, 其中:
- 节点 A 负责处理 0 号至 5460 号插槽。
- 节点 B 负责处理 5461 号至 10922 号插槽。
- 节点 C 负责处理 10923 号至 16383 号插槽。
在集群中录入值
在redis-cli每次录入、查询键值,redis都会计算出该key应该送往的插槽,如果不是该客户端对应服务器的插槽**,redis会报错**,并告知应前往的redis实例地址和端口。
redis-cli客户端提供了 –c 参数实现自动重定向。
如 redis-cli -c –p 6379 登入后,再录入、查询键值对可以自动重定向。
不在一个slot下的键值,是不能使用mget,mset等多键操作。
可以通过{}来定义组的概念,从而使key中{}内相同内容的键值对放到一个slot中去。
查询集群中的值
CLUSTER GETKEYSINSLOT 返回 count 个 slot 槽中的键。
故障恢复
如果主节点下线?从节点能否自动升为主节点?注意:15秒超时
主节点恢复后,主从关系会如何?主节点回来变成从机。
如果所有某一段插槽的主从节点都宕掉,redis服务是否还能继续?
如果某一段插槽的主从都挂掉,而cluster-require-full-coverage 为yes ,那么 ,整个集群都挂掉
如果某一段插槽的主从都挂掉,而cluster-require-full-coverage 为no ,那么,该插槽数据全都不能使用,也无法存储。
集群的Jedis开发
即使连接的不是主机,集群会自动切换主机存储。主机写,从机读。
无中心化主从集群。无论从哪台主机写的数据,其他主机上都能读到数据。
public class JedisClusterTest {
public static void main(String[] args) {
Set<HostAndPort>set =new HashSet<HostAndPort>();
set.add(new HostAndPort("192.168.31.211",6379));
JedisCluster jedisCluster=new JedisCluster(set);
jedisCluster.set("k1", "v1");
System.out.println(jedisCluster.get("k1"));
}
}
节约了代理服务器
应用问题解决 缓存穿透
1应用服务器压力变大了,请求很多,请求一过来,是先去redis查缓存
2会使redis的命中率降低,就是查不到需要的数据,这就会去查数据库的内容
3一直查询数据库,造成数据库压力过大,使服务器瘫痪。
一般是遭受到黑客的恶意url攻击
(1)解决方法:
对空值缓存:如果一个查询返回的数据为空(不管是数据是否不存在),我们仍然把这个空结果(null)进行缓存,设置空结果的过期时间会很短,最长不超过五分钟
(2)设置可访问的名单(白名单):
使用bitmaps类型定义一个可以访问的名单,名单id作为bitmaps的偏移量,每次访问和bitmap里面的id进行比较,如果访问id不在bitmaps里面,进行拦截,不允许访问。
(3)采用布隆过滤器:(布隆过滤器(Bloom Filter)是1970年由布隆提出的。它实际上是一个很长的二进制向量(位图)和一系列随机映射函数(哈希函数)。
布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都远远超过一般的算法,缺点是有一定的误识别率和删除困难。)
将所有可能存在的数据哈希到一个足够大的bitmaps中,一个一定不存在的数据会被 这个bitmaps拦截掉,从而避免了对底层存储系统的查询压力。
(4)进行实时监控:当发现Redis的命中率开始急速降低,需要排查访问对象和访问的数据,和运维人员配合,可以设置黑名单限制服务
应用问题解决 缓存击穿
1数据库访问压力瞬间增加
2 redis里面没有出现大量key过期
3 redis正常运行
原因:redis中某个key过期了,大量访问这个key
解决问题:
(1)预先设置热门数据:在redis高峰访问之前,把一些热门数据提前存入到redis里面,加大这些热门数据key的时长
(2)实时调整:现场监控哪些数据热门,实时调整key的过期时长
(3)使用锁:
(1)就是在缓存失效的时候(判断拿出来的值为空),不是立即去load db。
(2)先使用缓存工具的某些带成功操作返回值的操作(比如Redis的SETNX)去set一个mutex key
(3)当操作返回成功时,再进行load db的操作,并回设缓存,最后删除mutex key;
(4)当操作返回失败,证明有线程在load db,当前线程睡眠一段时间再重试整个get缓存的方法。
应用问题解决 缓存雪崩
1数据库压力变大
在极少时间段,查询大量key的集中过期情况
解决方案:
构建多级缓存架构:nginx缓存 + redis缓存 +其他缓存(ehcache等)
使用锁或队列:
用加锁或者队列的方式保证来保证不会有大量的线程对数据库一次性进行读写,从而避免失效时大量的并发请求落到底层存储系统上。不适用高并发情况
设置过期标志更新缓存:
记录缓存数据是否过期(设置提前量),如果过期会触发通知另外的线程在后台去更新实际key的缓存。
将缓存失效时间分散开:
比如我们可以在原有的失效时间基础上增加一个随机值,比如1-5分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件。
分布式锁
单机锁的话,其他服务器是不知道这个锁的,单机锁只能独立于单个服务器内,这时就需要共享锁,也叫分布式锁,可以用redis创建出来
setnx key value 设置锁
del key 释放锁
expire key 10 设置锁的过期时间,防止占着锁不释放
解决原子性操作
set key value nx ex 10 上锁又设置过期时间为10秒,同步进行
优化之UUID防误删
防止锁误删,对锁加上uuid标识,在加锁的时候对value加锁uuid标识,在解锁的时候判断是否是自己的uuid,如果是自己的锁,才能操作解锁
分布式锁条件
为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件:
- ==互斥性=。在任意时刻,只有一个客户端能持有锁。
- 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。
- 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。
- 加锁和解锁必须具有原子性。
Redis6.0新功能
1 acl 权限功能
acl list :用户权限查看
acl setuser user1 添加用户
2 IO多线程
单线程加多路复用
IO多线程其实指客户端交互部分的网络IO交互处理模块多线程,而非执行命令多线程。Redis6执行命令依然是单线程。