今天和大家来聊一聊Redis的Stream类型,Redis从5.0开始引入了一种新的数据类型Stream类型,它是专门为消息队列设计的数据类型。
首先,我们先来看一下消息队列存取消息的过程。在分布式系统中,当两个组件要基于消息队列进行通信时,一个组件把消息发送到消息队列,我们称之为生产者,另一个组件消费消息,然后处理,我们称之为消费者。如下图所示:
消息队列主要是解决生产者和消费者处理能力不一致的问题。也经常是大厂中必不可少的中间件。
Redis在5.0之前就有一个基于发布订阅(pub/sub)来实现消息队列的功能,但是,它有个缺点就是当出现网络断开、Redis宕机等,消息就会被丢弃。而Redis Stream提供了消息的持久化和主从复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。
首先,我们看Redis Stream提供了哪些操作命令:
- XADD 添加消息到末尾
- XTRIM 对流进行修剪,限制长度
- XLEN 获取流包含的元素数量,即消息长度
- XDEL 删除消息
- XRANGE 获取消息列表,会自动过滤已经删除的消息
- XREAD 以阻塞或非阻塞方式获取消息列表
- XGROUP CREATE 创建消费组
- XREADGROUP 按消费者的形式来读取消息
- XACK 用于向消息队列确认消息处理已完成
- XPENDING 查询每个消费组内所有消费者已读取但尚未确认的消息
一、常用命令
1. XADD
XADD test * invent 5 1629701120782-0
表示往名称为test的消息队列中插入一条消息,消息的key是invent,值是5。其中test后面的“*”表示给插入的消息自动生成一个全局唯一的ID,1629701120782-0,前半部分1629701120782表示服务器的时间,后半部分0表示插入消息在当前毫秒内的消息序号,从0开始。1629701120782-0表示1629701120782毫秒内的第一条消息。
2. XTRIM
XTRIM test maxlen 100
表示设置test的消息队列的长度为100,当Stream达到最大长度时,老的消息就会被删除。由于stream内部的实现机制,精确设置一个长度上限,消耗的资源会比较大,所以我们一般采用模糊设置的方式:XTRIM test maxlen ~ 100,意思是长度可以超过100一些,可以是103、110等,由redis自行判断什么时候去截断。
3. XLEN
XLEN test
表示获取消息队列test的长度。
4. XDEL
XDEL test 1629701684031-0
表示删除消息队列test中,ID为1629701684031-0的消息。
5. XRANGE
XRANGE test - +
表示获取test指定范围内的消息,其中“-”代表最小id,“+”代表最大id。
6. XREAD
XREAD block 10000 streams test $
表示读取消息,其中“$”代表最新消息,“block 10000”其中10000的单位是毫秒,代表10s。表示XREAD在读取消息时,如果没有消息到来,XREAD将阻塞10s,然后再返回。
7. XGROUP
XGROUP CREATE test mygroup 0
表示给消息队列test创建一个消费组mygroup,0表示从最开始的位置读。
8. XREADGROUP
XREADGROUP group mygroup consumer1 streams test >
消费组mygroup中的消费者consumer1从消息队列test中读取所有消息,其中“>”表示从第一条尚未被消费的消息开始读取。
需要注意的是,消息队列中的消息一旦被消费组里的一个消费者消费了,就不能再被该消费组内的其他消费者读取了。使用消费组的目的是让组内的多个消费者共同分担读取消息。所以,我们通常会让每个消费者读取部分消息,从而实现消息读取负载在多个消费者间是均衡分布的。
9. XPENDING
XPENDING test mygroup
为了保证消费者在发生故障重启后,仍然可以获取到未处理完的消息,Streams会自动使用内部队列,来存放消费组里每个消费者读取的消息,直到消费者使用XACK命令,通知Streams,消息已处理完成。当消费者重启后,可以使用XPENDING命令查看已读取,但是没有被确认完成的消息。
10. XACK
XACK test mygroup 1629701684031-0
表示消费组mygroup已经确认处理了test消息队列中id为1629701684031-0的消息。
到此为止,我们已经了解了使用Stream类型来实现消息队列的用法了。
二、总结
你也许会想,我们为什么要用Redis来做消息队列呢?要使用消息队列,就应该使用kafka、RabbitMQ这些专门的消息队列中间件,而Redis更适合做缓存。其实,我认为,采用什么技术和你当前所遇到的应用场景有关,如果你的消息通信不大,对数据丢失不敏感,那么用redis作为消息队列不失为一个好的方法,毕竟redis相比kafka等专业的消息系统来说,更加轻量级,维护成本也低。
今天我们就聊到这里,更多有趣知识,请关注【程序员学长】,回复【资料】,可以获得上百本技术类电子书,设计java、redis、python、数据结构和算法等。
你知道的越多,你的思维也就越开阔,我们下期再见。