RocketMQ知识整理

目录

一、基本原理及特性

1.文档、资料地址

2.架构设计

3.应用场景

4.特点

5.名词解释

二、进阶

1.消息存储

2.事物消息

3.消息刷盘

4.负载均衡

三、常见问题

1.Consumer是如何从Broker获取消息的?Push or Pull? 

2.如何保证消息可靠性

3.rocketMQ的消息堆积如何处理

4.customer和queue的对应关系

5.顺序消息是怎么实现的

四、如何设计消息中间件


一、基本原理及特性

1.文档、资料地址

2.架构设计

RocketMQ知识整理

1)结合部署架构图,描述集群工作流程:

  1. 启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
  2. Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
  3. 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
  4. Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
  5. Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。

2)Producer:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。

3)Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。

4)NameServer 主要包括两个功能:

  • Broker管理:NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;
  • 路由信息管理:每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。

4)Broker:Broker主要负责消息的存储、投递和查询以及服务高可用

Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave 的对应关系通过指定相同的BrokerName,不同的BrokerId 来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。

3.应用场景

  • 削峰填谷

诸如秒杀、抢红包、企业开门红等大型活动时皆会带来较高的流量脉冲,或因没做相应的保护而导致系统超负荷甚至崩溃,或因限制太过导致请求大量失败而影响用户体验,消息队列RocketMQ版可提供削峰填谷的服务来解决该问题。

  • 异步解耦

交易系统作为淘宝和天猫主站最核心的系统,每笔交易订单数据的产生会引起几百个下游业务系统的关注,包括物流、购物车、积分、流计算分析等等,整体业务系统庞大而且复杂,消息队列RocketMQ版可实现异步通信和应用解耦,确保主站业务的连续性。

  • 顺序收发

细数日常中需要保证顺序的应用场景非常多,例如证券交易过程时间优先原则,交易系统中的订单创建、支付、退款等流程,航班中的旅客登机消息处理等等。与先进先出FIFO(First In First Out)原理类似,消息队列RocketMQ版提供的顺序消息即保证消息FIFO。

  • 分布式事务一致性

交易系统、支付红包等场景需要确保数据的最终一致性,大量引入消息队列RocketMQ版的分布式事务,既可以实现系统之间的解耦,又可以保证最终的数据一致性。

4.特点

  • 分布式事务消息:实现类似 X/Open XA 的分布事务功能,以达到事务最终一致性状态。
  • 定时(延时)消息:允许消息生产者指定消息进行定时(延时)投递,最长支持 40 天。
  • 大消息:支持最大 4 MB 消息。
  • 消息轨迹:通过消息轨迹,能清晰定位消息从发布者发出,经由消息队列 RocketMQ 服务端,投递给消息订阅者的完整链路,方便定位排查问题。
  • 广播消费:允许同一个 Group ID 所标识的所有 Consumer 都各自消费某条消息一次。
  • 顺序消息:允许消息消费者按照消息发送的顺序对消息进行消费。
  • 重置消费进度:根据时间重置消费进度,允许用户进行消息回溯或者丢弃堆积消息。
  • 死信队列:将无法正常消费的消息储存到特殊的死信队列供后续处理。
  • 全球消息路由:用于全球不同地域之间的消息同步复制,保证地域之间的数据一致性。

5.名词解释

1)Message Trace (消息轨迹)

在一条消息从生产者发出到订阅者消费处理过程中,由各个相关节点的时间、地点等数据汇聚而成的完整链路信息。通过消息轨迹,您能清晰定位消息从生产者发出,经由消息队列 RocketMQ 服务端,投递给消息消费者的完整链路,方便定位排查问题。

2)Reset Consumer Locus (重置消费位点)

以时间轴为坐标,在消息持久化存储的时间范围内(默认 3 天),重新设置消息消费者对其订阅 Topic 的消费进度,

设置完成后订阅者将接收设定时间点之后由消息生产者发送到消息队列 RocketMQ 服务端的消息。

3)Dead Letter Queue (死信队列)

死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,消息队列 RocketMQ 会自动进行消息重试;达到最大重试次数(默认16次)后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 RocketMQ 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。

消息队列 RocketMQ 将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message)

消息队列 RocketMQ 将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。

二、进阶

1.消息存储

RocketMQ知识整理

消息存储架构图中主要有下面三个跟消息存储相关的文件构成。

  • (1) CommitLog:消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容,消息内容不是定长的。

单个文件大小默认1G ,文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。消息主要是顺序写入日志文件,磁盘顺序写速度特别快,这也是RocketMQ高性能的原因,当文件满了,写入下一个文件;

  • (2) ConsumeQueue:消息消费队列,引入的目的主要是提高消息消费的性能,由于RocketMQ是基于主题topic的订阅模式,消息消费是针对主题进行的,如果要遍历commitlog文件中根据topic检索消息是非常低效的。

Consumer即可根据ConsumeQueue来查找待消费的消息。其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。consumequeue文件可以看成是基于topic的commitlog索引文件,故consumequeue文件夹的组织方式如:topic/queue/file三层组织结构.

具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同样consumequeue文件采取定长设计,每一个条目共20个字节,分别为8字节的commitlog物理偏移量、4字节的消息长度、8字节tag hashcode,单个文件由30W个条目组成,可以像数组一样随机访问每一个条目,每个ConsumeQueue文件大小约5.72M;

  • (3) IndexFile:IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法。

Index文件的存储位置是:$HOME \store\index${fileName},文件名fileName是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存 2000W个索引,IndexFile的底层存储设计为在文件系统中实现HashMap结构,故rocketmq的索引文件其底层实现为hash索引。

2.事物消息

RocketMQ知识整理

  1. 发送方向消息队列 RocketMQ 服务端发送消息。
  2. 服务端将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息。
  3. 发送方开始执行本地事务逻辑。
  4. 发送方根据本地事务执行结果向服务端提交二次确认(Commit 或是 Rollback),服务端收到 Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息;服务端收到 Rollback 状态则删除半消息,订阅方将不会接受该消息。
  5. 在断网或者是应用重启的特殊情况下,上述步骤 4 提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查。
  6. 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
  7. 发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤 4 对半消息进行操作。

3.消息刷盘

RocketMQ知识整理

(1) 同步刷盘:如上图所示,只有在消息真正持久化至磁盘后RocketMQ的Broker端才会真正返回给Producer端一个成功的ACK响应。同步刷盘对MQ消息可靠性来说是一种不错的保障,但是性能上会有较大影响,一般适用于金融业务应用该模式较多。

(2) 异步刷盘:能够充分利用OS的PageCache的优势,只要消息写入PageCache即可将成功的ACK返回给Producer端。消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了MQ的性能和吞吐量。

3.消息过滤

  Tag过滤方式:Consumer端在订阅消息时除了指定Topic还可以指定TAG,如果一个消息有多个TAG,可以用||分隔。其中,Consumer端会将这个订阅请求构建成一个 SubscriptionData,发送一个Pull消息的请求给Broker端。Broker端从RocketMQ的文件存储层—Store读取数据之前,会用这些数据先构建一个MessageFilter,然后传给Store。Store从 ConsumeQueue读取到一条记录后,会用它记录的消息tag hash值去做过滤,由于在服务端只是根据hashcode进行判断,无法精确对tag原始字符串进行过滤,故在消息消费端拉取到消息后,还需要对消息的原始tag字符串进行比对,如果不同,则丢弃该消息,不进行消息消费。

4.负载均衡

通过Topic在多Broker中分布式存储实现

producer端 默认策略是随机选择:

  • producer维护一个index
  • 每次取节点会自增
  • index向所有broker个数取余
  • 自带容错策略

consumer端 默认是平均分配算法

其他负载均衡算法:环形分配策略(AllocateMessageQueueAveragelyByCircle) 手动配置分配策略(AllocateMessageQueueByConfig) 机房分配策略(AllocateMessageQueueByMachineRoom) 一致性哈希分配策略(AllocateMessageQueueConsistentHash) 靠近机房策略(AllocateMachineRoomNearby)

三、常见问题

1.Consumer是如何从Broker获取消息的?Push or Pull? 

RocketMQ知识整理

RocketMQ没有真正意义的push,都是pull,虽然有push类,但实际底层实现采用的是长轮询机制,即拉取方式

push方式里,consumer把轮询过程封装了,并注册MessageListener监听器,取到消息后,唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉消息是被推送过来的。

pull方式里,取消息的过程需要用户自己写,首先通过打算消费的Topic拿到MessageQueue的集合,遍历MessageQueue集合,然后针对每个MessageQueue批量取消息,一次取完后,记录该队列下一次要取的开始offset,直到取完了,再换另一个MessageQueue。

从下面这张简单的示意图也可以大致看出其中的差别,相当于是说,push的方式是:消息发送到broker后,如果是push,则broker会主动把消息推送给consumer即topic中,而pull的方式是:消息投递到broker后,消费端需要主动去broker上拉消息,即需要手动写代码实现,

2.如何保证消息可靠性

1.Producer端:

  • 采取send()同步发消息,发送结果是同步感知的。
  • 发送失败后可以重试,设置重试次数。默认3次。

2.Broker

  • 修改刷盘策略为同步刷盘。默认情况下是异步刷盘的。
  • 集群部署,主从模式,高可用。

3.Consumer端

  • 完全消费正常后在进行手动ack确认。

3.rocketMQ的消息堆积如何处理

consumer水平扩容,即上线更多consumer来消费消息

如果Consumer和Queue不对等,上线了多台也在短时间内无法消费完堆积的消息怎么办?

  • 准备一个临时的topic
  • queue的数量是堆积的几倍
  • queue分布到多Broker中
  • 上线一台Consumer做消息的搬运工,把原来Topic中的消息挪到新的Topic里,不做业务逻辑处理,只是挪过去
  • 上线N台Consumer同时消费临时Topic中的数据
  • 改bug
  • 恢复原来的Consumer,继续消费之前的Topic

4.customer和queue的对应关系

https://blog.csdn.net/qq_34930488/article/details/101282436

5.顺序消息是怎么实现的

rocketmq怎么保证队列完全顺序消费? - 知乎

四、如何设计消息中间件

本质:一种具备接受请求、保存数据、发送数据等功能的网络应用

1.高可用

1)主从模式

2)可横向扩展

2.高可靠

1)消息传输高可靠

2)消息存储高可靠

3.协议

提高各组件之间的通信速度

4.消息分发

消息分发主要保证的就是,一条消息到底要发送给谁去消费,以及采用何用策略去分配,对于一个消费失败的数据是否可以进行重发等

上一篇:阿里云消息队列 RocketMQ 5.0 全新升级:消息、事件、流融合处理平台


下一篇:阿里云消息队列 RocketMQ 5.0 全新升级:消息、事件、流融合处理平台