Redis实现消息队列

今天和大家来聊一聊Redis的Stream类型,Redis从5.0开始引入了一种新的数据类型Stream类型,它是专门为消息队列设计的数据类型。
首先,我们先来看一下消息队列存取消息的过程。在分布式系统中,当两个组件要基于消息队列进行通信时,一个组件把消息发送到消息队列,我们称之为生产者,另一个组件消费消息,然后处理,我们称之为消费者。如下图所示:

Redis实现消息队列

   消息队列主要是解决生产者和消费者处理能力不一致的问题。也经常是大厂中必不可少的中间件。 Redis在5.0之前就有一个基于发布订阅(pub/sub)来实现消息队列的功能,但是,它有个缺点就是当出现网络断开、Redis宕机等,消息就会被丢弃。而Redis Stream提供了消息的持久化和主从复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。
首先,我们看Redis Stream提供了哪些操作命令:

  • XADD 添加消息到末尾
  • XTRIM 对流进行修剪,限制长度
  • XLEN 获取流包含的元素数量,即消息长度
  • XDEL 删除消息
  • XRANGE 获取消息列表,会自动过滤已经删除的消息
  • XREAD 以阻塞或非阻塞方式获取消息列表
  • XGROUP CREATE 创建消费组
  • XREADGROUP 按消费者的形式来读取消息
  • XACK 用于向消息队列确认消息处理已完成
  • XPENDING 查询每个消费组内所有消费者已读取但尚未确认的消息

一、常用命令

1. XADD

 XADD test * invent 5 
 1629701120782-0

  表示往名称为test的消息队列中插入一条消息,消息的key是invent,值是5。其中test后面的“*”表示给插入的消息自动生成一个全局唯一的ID,1629701120782-0,前半部分1629701120782表示服务器的时间,后半部分0表示插入消息在当前毫秒内的消息序号,从0开始。1629701120782-0表示1629701120782毫秒内的第一条消息。

2. XTRIM

XTRIM test maxlen 100

 表示设置test的消息队列的长度为100,当Stream达到最大长度时,老的消息就会被删除。由于stream内部的实现机制,精确设置一个长度上限,消耗的资源会比较大,所以我们一般采用模糊设置的方式:XTRIM test maxlen ~ 100,意思是长度可以超过100一些,可以是103、110等,由redis自行判断什么时候去截断。

3. XLEN

XLEN test

   表示获取消息队列test的长度。

4. XDEL

XDEL test 1629701684031-0

   表示删除消息队列test中,ID为1629701684031-0的消息。

5. XRANGE

XRANGE test - +

   表示获取test指定范围内的消息,其中“-”代表最小id,“+”代表最大id。

6. XREAD

XREAD block 10000 streams test $

   表示读取消息,其中“$”代表最新消息,“block 10000”其中10000的单位是毫秒,代表10s。表示XREAD在读取消息时,如果没有消息到来,XREAD将阻塞10s,然后再返回。

7. XGROUP

XGROUP CREATE test mygroup 0

   表示给消息队列test创建一个消费组mygroup,0表示从最开始的位置读。

8. XREADGROUP

XREADGROUP group mygroup consumer1 streams test >

   消费组mygroup中的消费者consumer1从消息队列test中读取所有消息,其中“>”表示从第一条尚未被消费的消息开始读取。
需要注意的是,消息队列中的消息一旦被消费组里的一个消费者消费了,就不能再被该消费组内的其他消费者读取了。使用消费组的目的是让组内的多个消费者共同分担读取消息。所以,我们通常会让每个消费者读取部分消息,从而实现消息读取负载在多个消费者间是均衡分布的。

9. XPENDING

XPENDING test mygroup

   为了保证消费者在发生故障重启后,仍然可以获取到未处理完的消息,Streams会自动使用内部队列,来存放消费组里每个消费者读取的消息,直到消费者使用XACK命令,通知Streams,消息已处理完成。当消费者重启后,可以使用XPENDING命令查看已读取,但是没有被确认完成的消息。

10. XACK

XACK test mygroup 1629701684031-0

   表示消费组mygroup已经确认处理了test消息队列中id为1629701684031-0的消息。
到此为止,我们已经了解了使用Stream类型来实现消息队列的用法了。

二、总结

   你也许会想,我们为什么要用Redis来做消息队列呢?要使用消息队列,就应该使用kafka、RabbitMQ这些专门的消息队列中间件,而Redis更适合做缓存。其实,我认为,采用什么技术和你当前所遇到的应用场景有关,如果你的消息通信不大,对数据丢失不敏感,那么用redis作为消息队列不失为一个好的方法,毕竟redis相比kafka等专业的消息系统来说,更加轻量级,维护成本也低。

   今天我们就聊到这里,更多有趣知识,请关注【程序员学长】,回复【资料】,可以获得上百本技术类电子书,设计java、redis、python、数据结构和算法等。 

   你知道的越多,你的思维也就越开阔,我们下期再见。

Redis实现消息队列

 

 

Redis实现消息队列

上一篇:调用webapi 错误:使用 HTTP 谓词 POST 向虚拟目录发送了一个请求,而默认文档是不支持 GET 或 HEAD 以外的 HTTP 谓词的静态文件。的解决方案


下一篇:webapi的Ioc的构造注入