Redis类型 Stream Bitfield

Stream类型就是Redis里的mq,是redis为了占领市场份额的产物 

今天我们就来介绍一下Stream

Redis的消息队列一般是两个方案

第一个是Lpush Rpop 队列的异步队列方案(一对一)

第二个方案就是pubsub(发布订阅)模式 (一对多)

注:这里如果没有消费者了,队列中的数据就直接被丢弃了,没有持久化

Redis Stream是在Redis5引入的,支持消息持久化,全局自动生成唯一id,支持消息ack等等

底层数据结构

首先消息队列的底层是一个链表结构

解释一下这里的参数    

Message Content  消息内容

Consumer group 消费组  含有多个消费者

Last_delieved_id      游标  可以理解为读完一个就向后走一个,消费完成 表示哪些是新的消息,哪些是消费过的消息

   

消费者读完确认了就是ack   类似于TCP中的ack

这里记录的就是读过了但是没有签收的消息id 如果客户端没有ack 这里面的消息就会越来越多    如果客户段发送ack就开始减少    主要用来确保消息至少被消费了一次,而不是在网络传输中被丢失

有关Redis指令

xadd mystream * k1 v1 

这里*表示自动生成消息ID

这里的返回值就是毫秒时间戳和该毫秒产生的第一条消息

注:同一时间产生的消息不允许相同,但是可以产生不同的消息

type mystream

我们还可以查看一下类型如上文所见

xrange 查看消息列表

xrange mystream - +

注:这里使用count来分页  和mysql中的limit一样

xrevrange mystream + -

就是将原来的输出掉个个儿

xdel mtstream 主键

注意这里是按照主键删除,也就是按照时间戳+第一条信息的id删除

注:这里如果出现以下错误,我们只需要将配置文件修改即可(推荐使用yes保证数据快照的时效性)

xlen key

查看长度

xtrim截取操作

可以按照时间截取也可以按照最大长度获取

我们首先加点数据

我们执行操作就可以看到数据只剩两个了

同样的我们可以规定截取在某时间戳之后产生的消息

xread count n (block) streams key 符号

这里streams是一个关键字

这条指令用于获取消息,以阻塞或者非阻塞的方式或者比某个时间点大的n条消息

这里符号如果填写$就是表示目前队列中的最新消息的下一个消息

如果能获取到,返回,获取不到返回nil

0-0就是从最小的消息开始读取返回,没有限制就是类似于遍历整个队列

block 0 就是无限阻塞

我们这里举个例子就是获取最新的下一条信息 $  然后我们重新开一个客户端来进行修改看能不能返回

新的客户端

旧的客户端

消费组指令

队列里面也有消息了,那如何来消费呢?

下面我们就开始讲解消费组

创建消费组

例:xgroup create mystream groupX $   从尾部开始消费

xgroup create mystream groupA 0   从首部开始消费

 

注:此时2号消费者就读不到了,因为一旦被本组任意消费者消费了,指针就走到头了,本组其他消费者就不能再读取了

消费组的目的:

每个消费者读取一部分数据  这样就能达成负载均衡的效果

消息的ack机制

xpending key group

查看xpending队列中的消费组消费情况

这里就是groupA中的consumer1消费了三条数据

一旦消息被ack了之后,这里的消息队列中的数据就可以删除了

我们这里查看一下groupA的xpending数据

xpending stream groupA - + 10  consumer1

我们发现签收完了悬而未决的list中就删除了

注:这里队列中不会删除

xinfo 用于打印消费者分组的情况

xinfo stream mystream

使用建议

Stream不能100%代替kafka.....建议慎用

上一篇:K8S 运维管理


下一篇:jmeter性能压测的标准和实战中会遇到的问题