特点
- C 语言编写
- 基于内存的 Key-Value 数据存储
- 不需要预定义数据结构
- 支持多种类型的数据:string,hash,list,set,sorted set,bitmaps,hyperloglog,geospatial
- 只支持简单的读写操作,不支持 SQL,操作都是原子的
- 读写业务采用单线程模式,网络 IO 可以采用多线程(版本 6 引入,默认关闭)
- 周期性的数据落盘(可以关闭,默认是启动的,每 15 分钟落盘)
- 支持 TTL
- 支持事务处理
- 支持 Pipeline 批量处理
- 支持消息的 Pub/Sub
- Stream 消息队列(版本 5 引入),支持历史数据、分组、游标、阻塞/非阻塞消费
- 安全机制:密码认证(默认关闭),版本 6 开始支持 SSL 证书认证、ACL 权限控制
- 支持分布式部署(最终一致性)
- 性能:每秒执行 10W+ 命令(单机、非批量模式)
- 应用:数据库,缓存(LRU 算法),消息队列(Pub/Sub、Stream),分布式锁(setnx)
更多内容查看官网 https://redis.io/
基本配置
配置文件 redis.conf
# index(类似关系数据库的表,用数字表示,没有名字)的个数,默认 16
databases 16
# 数据落盘,比如每 300 秒如果有 10 条数据更新,将更新的数据落盘,可以同时配多个规则,默认打开
save 900 1
save 300 10
save 60 10000
# aof 机制,将每个写操作记录到磁盘,默认关闭
appendonly yes
appendfilename "appendonly.aof"
appendfsync everysec
# 允许使用的最大内存,默认不限制
maxmemory <bytes>
# 超过最大内存后如何处理
# 默认 noeviction,即不处理,直接报错
# 可以设置使用 LRU 算法 volatile-lru 将长时间没使用的数据删除,这样 redis 就变成传统意义上的缓存
maxmemory-policy noeviction
# 密码(默认不配置)
requirepass foobared
查看该文件可以看到更多配置
单线程设计
Redis 的高性能和稳定性主要有以下原因
- 基于内存,读写性能好
- 只支持简单的操作和数据结构,从根本上保证用户不会因为复杂的操作影响性能
- C 语言性能更好
- 读写业务采用单线程设计,更简单,更稳定
- 分布式部署时采用最终一致性,而不是强一致性,提高了响应速度和稳定性
这里讲一下单线程
Redis 的单线程设计,指的是读写业务由一个线程完成,并且没有使用异步编程,也就是说 Redis 同一时间只能操作一个命令,不支持并发操作,但这并不影响性能,因为基于内存、数据简单、命令简单,所以每个命令都是很快执行完的,实际上 CPU 并不是瓶颈,所以单个线程就足够了
其他需要通过多线程,或是通过异步编程,来提供高并发性能的场景,通常都是因为需要磁盘 IO 操作,或是命令执行起来本身就比较复杂比较耗时,所以才需要多线程、异步编程的方式来提高性能
由于单线程模式,要明显地比多线程模式、异步编程模式,要简单很多,稳定很多,所以也提高了 Redis 的稳定性
Redis 还有其他线程处理其他事情,比如周期性落盘需要自己的线程,但这和读写业务是独立的,互不影响的
从版本 6 开始,可以通过配置,将网络 IO 分出来,用多个线程处理,这是因为随着业务量越来越大,虽然 CPU 不是瓶颈,但网络能力会有一定的限制,为了提高并发能力,用多个线程处理网络 IO,但读写业务还是单线程的
基本命令
help
127.0.0.1:6379> help
redis-cli 3.0.6
Type: "help @<group>" to get a list of commands in <group>
"help <command>" for help on <command>
"help <tab>" to get a list of possible help topics
"quit" to exit
127.0.0.1:6379>
127.0.0.1:6379> help @hash
127.0.0.1:6379>
127.0.0.1:6379> help hget
@connection
ping 测定连接是否存活
select index 选择数据库,index 用数字值指定,以 0 作为起始索引值,默认使用 0 号数据库
@server
info 获取服务器的信息和统计
dbsize 返回当前数据库中 key 的数目
flushdb 删除当前选择数据库中所有的 key
flushall 删除所有数据库中的所有的 key
config get p 获取服务器配置的信息
config set p v 设置配置项信息
bgsave 后台保存 rdb 快照
save 保存 rdb 快照
@generic
keys pattern 查找相应的 key,支持通配符 *、?、[],keys * 返回所有 key
SCAN cursor 迭代获取数据库的 key
exists key 判断某个 key 是否存在,不支持通配符
type key 返回 key 存储的类型
move key index 将 key 移动到另一个数据库
del key 删除 key
expire key 设置 key 的生命周期以秒为单位
sort key 对 list,set 里面的数据排序
@string
set key value [ex 秒][px 毫秒][nx/xx] 设置 key 的 value
ex/px 设置超时时间
nx 表示只有 key 不存在才执行,xx 表示只有 key 存在才执行
默认不会超时,并且无论 key 是否存在都会执行
setnx key value 等于 set key value nx
setex key time value 等于 set key value ex time
psetex key milliseconds value 等于 set key value px time
mset key1 value1 key2 value2 一次性设置多个
get key
mget key1 key2 一次性读取多个
getset key value 返回旧值,设置新值
strlen key 取指定 key 的 value 值的长度
append key value 把 value 追加到 key 的原值上
setrange key offset value 部分重写字符串
getrange key start stop 获取字符串中 [start, stop] 范围的值
incr key 加 1
incrby key increment 加 increment
incrbyfloat by increment
decr key
decrby key decrement
setbit key offset value 设置 offset 对应位的值,返回该位上的旧值
getbit key offset
bitop operation destkey key1 [key2..] 对 key1 key2 做 operation (operation) 并将结果保存在 destkey
@list
lpush key value 把值插入到链表头部
rpush key value 把值插入到链表尾部
lpop key 返回并删除链表头部元素
rpop key 返回并删除链表尾部元素
lrange key start stop 返回链表中 [start, stop] 中的元素
lrem key count value 从链表中删除 value 值,最多删除数量为 count
ltrim key start stop 剪切 key 对应的链表,取 [start, stop] 一段,并把该段值重新赋给 key
lindex key index 返回列表中第 index 个元素
llen key 链表的元素个数
linsert key after|before search value 在 key 链表中寻找 search,并在 search 值之前|之后插入 value
rpoplpush source dest 把 source 的末尾拿出,放到 dest 头部,并返回该元素
@set
sadd key value1 value2 往集合里面添加元素
smembers key 获取集合所有的元素
srem key value 删除集合某个元素
spop key 随机返回并删除集合中1个元素
srandmember key 随机取一个元素
sismember key value 判断集合是否有某个值
scard key 返回集合元素的个数
smove source dest value 把 source 的 value 移动到 dest 集合中
sinter key1 key2 key3 求交集
sunion key1 key2 求并集
sdiff key1 key2 求差集
sinterstore res key1 key2 求交集并存在 res
@sorted_set
zadd key score member [score member ...] 添加 member 到集合,按 score 排序,如果 member 已经存在则更改 score
zcard key 返回集合的所有元素
zcount key min max 返回 score 在 [min, max] 区间内元素数量
zrem key member [member ...] 删除元素
zremrangebyscore key min max 删除 score 在 [min, max] 之间的元素
zremrangebyrank key start end 删除次序在 [start, end] 之间的元素
zrank key member 查询 member 排名,升序
zrevrank key member 查询 member 排名,降序
zrange key start stop 返回次序在 [start,stop] 的元素
zrangebyscore key min max 返回 score 在 [min, max] 内的元素
@hash
hset key field value 设置 field 的值
hsetnx key field value 只在 field 不存在的情况下设置
hmset key field1 value1 field2 value2 同时设置多个 field
hget key field 获取指定的 field
hmget key field1 field2 获取多个 field
hincrby key field increment 指定的 field 加上给定的值
hexists key field 指定的 field 是否存在
hlen key 返回 field 数量
hdel key field 删除指定的 field
hkeys key 返回所有的 field
hvals key 返回所有的 value
hgetall key 获取全部的 field 及 value
可以看到这些命令,以及数据结构,都比较简单
这些命令全都是原子操作
更多命令查看 help
Python 代码
import redis
pool = redis.ConnectionPool(host="localhost", port=6379, db=3)
redisConn = redis.Redis(connection_pool = pool)
redisConn.set("key_1", 123)
redisConn.set("key_2", "value_2")
print redisConn.keys()
print redisConn.get("key_1")
需要先通过 pip install redis 安装 python 包
Java 代码(单独使用)
pom
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.3.0</version>
</dependency>
使用
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(10);
config.setMaxIdle(10);
config.setMaxWaitMillis(10000);
JedisPool jedisPool = new JedisPool(config, "localhost", 6379, 10000);
Jedis jedis = jedisPool.getResource();
jedis.set("key_3", "hello world");
System.out.println(jedis.keys("*"));
System.out.println(jedis.get("key_3"));
可以是 pool 也可以直接连接
Java 代码(结合 SpringBoot)
pom
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
配置
spring:
redis:
host: localhost
port: 6379
database: 0
timeout: 10000
pool:
max-active: 200
初始化
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();
template.setConnectionFactory(factory);
ObjectMapper mapper = new ObjectMapper();
mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer =
new Jackson2JsonRedisSerializer<Object>(Object.class);
jackson2JsonRedisSerializer.setObjectMapper(mapper);
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
template.setKeySerializer(stringRedisSerializer);
template.setHashKeySerializer(stringRedisSerializer);
template.setValueSerializer(jackson2JsonRedisSerializer);
template.setHashValueSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
使用
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@PostConstruct
public void init() {
redisTemplate.opsForValue().set("key_5", 123456);
System.out.println(redisTemplate.keys("*"));
System.out.println(redisTemplate.opsForValue().get("key_5"));
}
不定义 redisTemplate 函数也可以,但那样系统自动初始化的会是 RedisTemplate<Object, Object>,也就是 key 和 value 都是 Object 的,需要自己再转,不好用,所以最好还是自己定义 redisTemplate 函数,自己初始化 RedisTemplate,把 key 类型设置成 String 的比较好
TTL 设置
redis> SET mykey "Hello"
"OK"
redis> EXPIRE mykey 10
(integer) 1
redis> TTL mykey
(integer) 10
redis> SET mykey "Hello World"
"OK"
redis> TTL mykey
(integer) -1
redis>
EXPIRE 命令可以设置超时时间,单位是秒
TTL 命令查看还剩多少时间超时,返回 -1 表示没有设置超时时间,返回 -2 表示 key 不存在
如果重新设置 key 的值,会自动取消超时时间设置
Transactions 事务性
> MULTI
OK
> INCR foo
QUEUED
> INCR bar
QUEUED
> EXEC
1) (integer) 1
2) (integer) 1
Redis 的事务不做回滚,而且可能部分成功部分失败,只保证将整个事务作为一个原子操作,即 Redis 在处理事务的过程中,不会处理其他 client 的请求,要等事务处理完了才继续处理其他请求
Pipeline 流水线处理
# python
import redis
pool = redis.ConnectionPool(host="localhost", port=6379, db=3)
redisConn = redis.Redis(connection_pool = pool)
pipeline = redisConn.pipeline()
pipeline.set("key_1", 10)
pipeline.set("key_2", 20)
pipeline.set("key_3", 30)
pipeline.execute()
pipeline 功能貌似在命令行下不支持,必须通过代码实现
execute 的时候才真正执行
连续发送请求,但是中间不等待回复,而是最后一次性接收所有回复
不保证原子性,不会阻止其他 client 的请求
Pub-Sub
sub
# sub
import redis
pool = redis.ConnectionPool(host="localhost", port=6379, db=3)
redisConn = redis.Redis(connection_pool = pool)
pubsub = redisConn.pubsub()
pubsub.subscribe(["channel_1", "channel_2"])
# pubsub.psubscribe(["channel*", "*hi*"])
for item in pubsub.listen():
if item[‘type‘] == ‘message‘:
print item[‘channel‘]
print item[‘data‘]
pub
# pub
import redis
pool = redis.ConnectionPool(host="localhost", port=6379, db=5)
redisConn = redis.Redis(connection_pool = pool)
redisConn.publish("channel_1", "Hello")
redisConn.publish("channel_2", "World")
通过 Pub-Sub 可以作为消息通知机制使用(类似于 Kafka)
分布式锁
SET key value EX 120 NX
该命令是原子操作,表示只有在 key 不存在的情况下,才会赋值成功,并且 120 秒后会自动删除,这样就实现了带超时时间的互斥锁功能,获得锁的程序删除 key 就是释放了锁,如果程序出错退出,达到超时时间后也会保证锁能被释放
??
这种方法在比较极端的情况下可能失效
??Redis 是分布式部署
??程序 A 获取锁
??master 将数据同步到 slave 之前 master 宕机了
??slave 被选为 master 且没有 A 的锁
??程序 B 获取锁成功,这时候程序 A 还在正常运行,导致两个程序同时获得锁