RocketMq原理解析

文章目录

消费

一个Consumer实例可以消费Group下的所有主题的,少量一个Group订阅了多个集群的多个主题的消息,每个集群对应各自的消费者实例,总结一条判断方式:一个Group内MessageProcessor只有一个,所有订阅都以用此MessageProcessor处理。

Group名称

必须以cg_开头 只能输入数字、大小写字母、下划线、减号 不可以重复,即使删除重新创建,也不能使用原来的Topic名字

支持延迟消息&循环消息

DDMQ支持发送延时消息和循环消息, 可指定消息生效时间与消息发送次数

项目中采用的是不断遍历注册到容器的listener,进行逐一消费。

   String consumerGroupName = ddMqEventListener.consumerGroupName();
   eventListeners.put(consumerGroupName, ddMqEventListener);

在init的时候,通过监听器所分配的groupname分组监听器。通过遍历消费者map来获取groupname下的监听器,并分配线程来执行消费者的任务,最终交由监听器来执行onevent。

consumer配置在nacos,消费者与监听器的联系在于groupname。消费者通过groupname将获取到的任务交由监听器执行。解耦。

RocketMq 消费者

消费者主要分为两种:

  • DefaultMQPushConsumer,由系统控制读取操作,收到消息后自动调用传人的 处理方法来处理;
  • DefaultMQPullConsumer,读取操作中的大部分功能由使用者自主控制。

主要对应推拉功能,Pull类型灵活度大于Push。

DefaultMQPushConsumer使用

使用 DefaultMQPushConsumer 主要是设置好各种参数和传人处理消息的函数。系统收到消息后自动调用处理函数来处理消息,自动保存 Offset,而且加 入新的 DefaultMQPushConsumer后会自动做负载均衡 。

Consumer的 GroupName用于把多个 Consumer组织到一起, 提高并发处理能力, GroupName需要和消息模式 (MessageModel)配合使用。

RocketMQ支持两种消息模式: Clustering和Broadcasting。

  • Clustering模式下,同一个 ConsumerGroup(GroupName相同) 里的每 个 Consumer 只消费所订阅消 息 的一部分 内容, 同一个 ConsumerGroup 里所有的 Consumer消费的内容合起来才是所订阅 Topic 内容的整体, 从而达到负载均衡的目的 。
  • Broadcasting模式下,同一个 ConsumerGroup里的每个 Consumer都 能消费到所订阅 Topic 的全部消息,也就是一个消息会被多次分发,被 多个 Consumer消费。

Topic名称用来标识消息类型, 需要提前创建。 如果不需要消费某个Topic 下的所有消息,可以通过指定消息的 Tag 进行消息过滤。

RocketMq原理解析

DefaultMQPushConsumer 的处理流程

PullMessage方法内:

DefaultMQPushConsuer的源码中有很多PullRequest语句,比如Default­ MQPushConsumerlmpl.this.executePullRequestlmmediately(pullRequest)。 为什 么“ PushConsumer”中使用“ PullRequest”呢 ? 这是通过“长轮询”方式达到 Push效果的方法,长轮询方式既有 Pull 的优点,又兼具 Push方式的实时性。

  • Push方式是 Server端接收到消息后,主动把消息推送给 Client端,实时 性高。 对于一个提供队列服务的 Server来说,用 Push方式主动推送有很多弊 端:首先是加大 Server 端的 工作量,进而影响 Server 的性能;其次, Client 的 处理能力各不相同, Client 的状态不受 Server 控制,如果 Client 不能及时处理 Server 推送过来的消息,会造成各种潜在问题 。
  • Pull方式是 Client端循环地从 Server端拉取消息,主动权在 Client手里, 自己拉取到一定量消息后,处理妥当了再接着取。 Pull 方式的问题是循环拉取 消息的间隔不好设定,间隔太短就处在一个 “忙等”的状态,浪费资源;每个Pull 的时间间隔太长 Server 端有消息到来时 有可能没有被及时处理 。

长轮询方式的局限性,是在 HOLD 住 Consumer 请求的时候需要占用资源, 它适合用在消息队列这种客户端连接数可控的场 景 中 。

DefaultMQPushConsumer 的流量控制

Pull 获得的消息, 如果直接提交到线程 池 里执行,很难监控和控 制 ,比 如,如何得知当前消息堆积的数 量 ?如 何 重复处理某些消息? 如何延迟处 理 某些消息? RocketMQ定义了一个快照类 ProcessQueue来解决这些问题,在 PushConsumer 运行的时候, 每个 Message Queue 都会有个对应的 ProcessQueue 对象,保存了这个 Message Queue 消息处理状态的快照 。

ProcessQueue对象里主要的内容是一个 TreeMap 和一个读写锁。 TreeMap 里以 Message Queue 的 Offset作为 Key,以消息内容的引用为 Value,保存了 所有从 MessageQueue 获取到,但是还未被处理的消息;

DefaultMQPullConsumer

( 1 )获取 Message Queue 并遍历

( 2 )维护 Offsetstore

( 3 )根据不同的消息状态做不同的处理

Consumer 的启动、关闭流程

Consumer分为 Push和l Pull两种方式,对于 PullConsumer来说,使用者主动权很高,可以根据实际需要暂停、停止、启动消费过程 。 需要注意的是 Offset 的保存,要在程序的异常处理部分增加把 Offset 写人磁盘方 面的处理,记准了每个 Message Queue 的 Offset,才能保证消息消 费 的准确性 。

DefaultMQPushConsumer 的退出, 要调用 shutdown() 函数, 以便 释放资 源、保存 Offset 等 。 这个调用要加到 Consumer 所在应用的退出逻辑中 。

RocketMq 生产者

用来表示一个发送消息应用,一个 Producer Group 下包含多个 Producer 实例,可以是多台机器,也可以 是一台机器的多个进程,或者一个进程的多个 Producer 对象。一个 Producer Group 可以发送多个 Topic 消息,Producer Group 作用如下:

标识一类 Producer

可以通过运维工具查询这个发送消息应用下有多个 Producer 实例

发送分布式事务消息时,如果 Producer 中途意外宕机,Broker 会主动回调 Producer Group 内的任意 一台机器来确认事务状态。

DefaultMQProducer

发送消息要经过五个步骤:

  1. 设置 Producer 的 GroupName。
  2. 设置 lnstanceName,当一个 Jvm 需要启动多个 Producer 的时候,通过设置不同的 InstanceName来区分,不设置的话系统使用默认名称“DEFAULT”。
  3. 设置发送失败重试次数,当网络出现异常的时候,这个次数影响消息的重复投递次数 。 想保证不丢消息,可以设置多重试几次 。
  4. 设置 NameServer 地址 。
  5. 组装消息并发送 。

消息发送的返回状态有如下四种 : FLUSH_DISK_SEND_TIMEOUT,FLUSH_SLAVE_TIMEOUT、SLAVE_NOT_AVAILABLE、SEND_OK,不同状态在不同的刷盘策略和同步策略的配置下含义是不同的 。

写一个高质量的生产者程序,重点在于对发送结果的处理,要充分考虑各种异常,写清对应的处理逻辑 。

发送延迟消息

迟消息的使用方法是在创建 Message对象时,调用 setDelayTimeLevel ( int level) 方法设置延迟时间, 然后再把这个消息发送 出去。

自定义消息发送规则

一个 Topic会有多个 Message Queue,如果使用 Producer的默认配置,这 个 Producer 会轮流向各个 Message Queue 发 送 消息 。 Consumer 在消费消息的 时候,会根据负载均衡策略,消费被分配到的 MessageQueue,如果不经过特 定的设置,某条消息被发往 l哪个 Message Queue,被哪个 Consumer 消费是未知的。

如果业务 需 要我们把消息 发 送到指定的 MessageQueue 里,比如把同 一 类型 的消息都发 往 相同的 Message Queue, 该怎么办呢 ?

可以用 Message­QueueSelector。

发送消息的时候,把 MessageQueueSelector 的对象作为参数,使用 public SendResult send ( Message msg, MessageQueueSelector selector, Object arg)函 数发送消 息即可 。 在 MessageQueueSelector 的实现中,根据传人的 Object参 数,或者根据 Message 消息内容确定把消息发往那个 Message Queue,返回被 选中的 Message Queue。

ConsumeQueue:消息消费队列,消息到达 CommitLog 文件后,将异步转发到消息 消费队列,供消息消费者消费。

Message Queue:消息的物理管理单位。一个Topic下可以有多个Queue,Queue的引入使得消息的存储可以分布式集群化。

对事务的支持

RocketMQ 采用两阶段提交 的方式实现事务消息:

  1. 发送方向 RocketMQ 发送“待确认”消息 。

  2. RocketMQ将收到的“待确认” 消息持久化成功后, 向发送方回复消息已经发送成功,此时第 一 阶段消息发送完成 。

  3. 发送方开始执行本地事件逻辑 。

  4. 发送方根据本地事件执行结果向 RocketMQ发送二次确认( Commit或 是 Rollback) 消息, RocketMQ收到 Commit状态则将第一阶段消息标记为可投 递,订阅方将能够收到该消息;收到 Rollback 状态则删除第一阶段的消息,订 阅方接收不到该消息 。

  5. 如果出现异常情况,步骤 4 )提交的二次确认最终未到达 RocketMQ,服务器在经过固定时间段后将对“待确认”消息、发起回查请求。

  6. 发送方收到消息回查请求后(如果发送一阶段消息的 Producer 不能工 作,回查请求将被发送到和 Producer 在同一个 Group 里的其他 Producer),通 过检查对应消息 的本地事件执行结果返回 Commit 或 Roolback 状态 。

  7. RocketMQ 收到回 查请求后,按照步骤 4 ) 的逻辑处理 。

todo事务消息实现 RocketMQ 在 4.x 的版本中将这部分 功能去除 。

如何存储队列位置信息 offset

  • message queue 是无限长的数组,一条消息进来下标就会涨1,下标就是 offset,消息在某个 MessageQueue 里的位置,通过 offset 的值可以定位到这条消息,或者指示 Consumer 从这条消息开始向后处理。
  • message queue 中的 maxOffset 表示消息的最大 offset,maxOffset 并不是最新的那条消息的 offset,而是最新消息的 offset+1,minOffset 则是现存在的最小 offset。
  • fileReserveTime=48 默认消息存储48小时后,消费会被物理地从磁盘删除,message queue 的 minOffset 也就对应增长。所以比 minOffset 还要小的那些消息已经不在 broker上了,就无法被消费

实际运行 中的系统,难免会遇到重新消费某条消息、 跳过 一段时间内的消息等情况 。 这些异常情况的处理,都和 Offset 有关 。

RocketMQ 中, 一 种类型的消息会放到 一 个 Topic 里,为了能够并行, 一 般一个 Topic 会有多个 Message Queue (也可以 设置成一个), Offset是指某个 Topic下的一条消息在某个 Message Queue里的 位置,通过 Offset的值可以定位到这条消息,或者指示 Consumer从这条消息开始向后继续处理 。
RocketMq原理解析

在 DefaultMQPushConsumer里的 BROADCASTING模式下,每个 Consumer 都收到这个 Topic 的全部消息,各个 Consumer 间相互没有干扰, RocketMQ 使 用 LocalfileOffsetStore,把 Offset存到本地 。

DefaultMQPushConsumer类里有个函数用来设置从哪儿开始消费 消 息。

注意设置读取位置不是每次都有效,它的优先级默认在 Offset Store后面 , 比如 在 DefaultMQPushConsumer 的 BROADCASTING 方式 下 ,默 认 是 从 Broker 里读取某个 Topic 对 应 ConsumerGroup 的 Offset, 当读 取不到 Offset 的时候, ConsumeFromWhere 的设置才生效 。 大部分情况下这个设置在 Consumer Group初次启动时有效。 如果 Consumer正常运行后被停止, 然后再启动, 会 接着上次的 Offset开始消费, ConsumeFromWhere 的设置元效。

NameServer

NameServer是整个消息队列中 的状态服务器,集群的各个组件通过它来了 解全局的信息 。 同时午,各个角色的机器都要定期 向 NameServer上报自己的状 态,超 时不上报的 话, NameServer 会认为 某个机器出故障不可用了,其他的组 件会把这个机器从可用列表里移除 。

NameServer本身是无状态的,也就 是说 NameServer 中的 Broker、 Topic 等状态信息不会持久存储,都是由各个角色 定时上报并存储到内存中的。

底层通信机制(基于netty)

RocketMQ 自己定义了一个通信协议,使得模块间传输的二进制消息和有 意义的内容之间互相转换。

RocketMq原理解析

tmp_buf是预先申请的内存,这两个看似简单的操作,实际进行了 4次数据复制,分别是:从磁盘复制数据到内核态内存,从内核态内存复制到用户 态内存(完成了 read(file, tmp_b叫 len));然后从用户态内存复制到网络驱动 的内核态内存,最后是从网络驱动的内核态内存复制到网卡中进行传输(完成 write(socket, tmp_buf, len))。RocketMq采用零拷贝技术,提升消息的存盘与发送。

消息存储结构

RocketMQ消息的存储是由ConsumeQueue和CommitLog配合完成的, 消息真正的物理存储文件是 CommitLog, ConsumeQueue 是消息的逻辑队 列,类似数据库的索引文件,存储的是指向物理存储的地址 。

每个 Topic 下 的每个 Message Queue都有一个对应的 ConsumeQueue 文件 。 文件地址在 KaTeX parse error: Expected '}', got 'EOF' at end of input: {storeRoot}\consumequeue t o p i c N a m e {topicName} topicName{queueld}${fileName}。

CommitLog 以物理文件的方式存放,每台 Broker上的 CommitLog被本 机器所有 ConsumeQueue 共 享,文 件地址:$ {user.home} \store${commitlog}\ ${白leName}。 在 CommitLog 中,一个消息的存储长度是不固定的, RocketMQ 采取一些机制,尽量 向 CommitLog 中顺序写 ,但是随机读 。 ConsumeQueue 的 内容也会被写到磁盘里作持久存储。

RocketMq原理解析

存储结构优点:

  1. CommitLog 顺序 写 ,可以大大提 高写人效率 。
  2. 虽然是随机读,但是利用操作系统的 pagecache 机制,可以批量地从磁 盘读取,作为 cache存到内存中,加速后续的读取速度。
  3. 为了保证完全的顺序写,需要 ConsumeQueue 这个中间结构 ,因为 ConsumeQu巳ue 里只存偏移量信息,所以尺寸是有限的,在实际情况中,大部 分的 ConsumeQueue 能够被全部读人内存,所以这个中间结构的操作速度很快, 可以认为是内存读取的速度 。 此外为了保证 CommitLog 和 ConsumeQueue 的一 致性, CommitLog 里存储了 Consume Queues、 Message k町、 Tag 等所有信息, 即使 ConsumeQueue 丢失,也可以通过 commitLog 完全恢复出来 。

RocketMQ 目前还不支持把 Slave 自动转成 Master,如果机器资源不足,需要把 Slave 转成 Master,则要手 动停止 Slave角色的 Broker,更改配置文件,用新的配置文件启动 Broker。

RocketMQ存储的特点:

  1. Broker单个实例下所有的队列共用一个日志数据文件(即为CommitLog)来存储

  2. consumerQueue 是个消费的逻辑队列,保存了数据在commit log中的offset

  3. 消费读取数据,需要先读取consumerQueue,再读取commit log,消息主体都是通过CommitLog来进行读写

同步刷盘和异步刷盘

RocketMQ 的消息是存储到磁盘上的,这样既能保证断电后恢复,又可以 让存储的消息 量超出内存的限制 。 RocketMQ 为了提高性能,会尽可能地保证 磁盘的顺序写 。

  • 异步刷盘方式:在返回写成功状态时 ,消息可能只是被写人了内存的 PAGECACHE ,写操作的返回快,吞吐 量大 ;当内存里的消息 量 积累到 一定程度时 ,统一触发 写磁盘动 作,快速 写入。

  • 同步刷盘方式:在返回写成功状态时,消息已经被写人磁盘 。 具体流程 是,消息、写入内存的 PAGECACHE 后,立刻通知刷盘线程刷盘,然后 等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功 的状态 。

同步刷盘还是异步刷盘,是通过 Broker 配置文件里的 flushDiskType 参数 设置的,这个参数被配置成 SYNC_FLUSH、 ASYNC_FLUSH 中的一个。

同步复制和异步复制

如果一个 Broker组有 Master和 Slave, 消息需要从 Master复制到 Slav巳 上,有同步和异步两种复制方式 。 同步复制方式是 等 Master 和 Slave 均写成功 后才反馈给客户端写成功状态;异步复制方式是只要 Master 写成功即可反馈给 客户端写成功状态 。

这两种复制方式各有优劣,在异步复制方式下,系统拥有较低的延迟和较 高的吞吐量,但是如果 Master 出了故障,有些数据因为没有被写人 Slave,有 可能会丢失;在同步复制方式下,如果 Master 出故障, Slave 上有全部的备份 数据,容易恢复,但是同步复制会增大数据写人延迟,降低系统吞吐量 。

同步复制和异步复制是通过 Broker 配置文件里的 brokerRole 参数进行设置 的,这个参数可以被设置成 ASYNC MASTER、 SYNC MASTER、 SLAVE 三 个值中的一个 。通常情 况下,应该把 Master和 Save配置成 ASYNC FLUSH 的刷盘方式,主从之间配 置成 SYNC MASTER 的 复制方式,这 样即使有 一台机器出故障, 仍然能保证 数据不丢,是个不错的选择 。

顺序消息 (有问题)

顺序消息分为全局顺序消息和部分顺序消息,全局顺序消息指某个 Topic 下的 所有消息都要保证顺序;部分顺序消息只要保证每一组消息被顺序消费即可。

全局顺序消息

要保 证全局顺序消息, 需要 先把 Topic 的读写队列数设置为 一,然后 Producer 和 Consumer 的并发设置也要是一 。 简单来说,为了保证整个 Topic 的 全局消息有序,只能消除所有的并发处理,各部分都设置成单线程处理 。 这时 高并发、高吞吐量的功能完全用不上了 。

部分顺序消息

要保证部分消息有序,需要发送端和消费端配合处理 。 在发送端,要做到 把同一业务 ID 的消息发送到同一个 Message Queue ;在消费过程中,要做到从 同一个 Message Queue 读取的消息不被并发处理,这样才能达到部分有序 。

发送端使用 MessageQueueSelector类来控制 把消息发往哪个 MessageQueue。消费端通过使用 MessageListenerOrderly类 来解决单 Message Queue 的消 息被并发处理的问题。

消息重复问题

解决消息重复有两种方法:第一种方法是保证消费逻辑的幕等性(多次调 用和一次调用效果相同);另一种方法是维护一个巳消费消息的记录,消费前查 询这个消息是否被消费过 。 这两种方法都需要使用者自己实现 。

消息优先级

有些场景,需要应用程序处理几种类型的消息,不同消息的优先级不同 。 RocketMQ 是个先人先出的队列,不支持消息级别或者 Topic 级别的优先级 。需要自己定义消费逻辑。

消息的Tag和Key

对一个应用来说,尽可能只用一个 Topic,不同的消息子类型用 Tag来标识(每条消息只 能有一个 Tag),服务器端基于 Tag 进行过滤,并不需要读取消息体 的内容,所以效率很高 。发送消息设置了 Tag 以后,消费方在订阅消息时,才可以利用 Tag 在 Broker 端做消息过滤 。

消息的 Key。 对发送的消息设置好 Key,以后可以根据这个 Key 来 查找消息 。 所以这个 K巳y 一般用消息在业务层面的唯一标识码来表示,这样后续查询消息异常,消息丢失等都很方便 。 Broker会创建专门的索引文件,来存 储 Key 到消息的映射,由于是哈希索引,应尽量使 Key 唯一 ,避免潜在的hash冲突。

使用 Tag 方式过滤非常高效, Broker 端可以在 ConsumeQueue 中做这种过滤,只从 CommitLog 里读取过滤后被命中的消息 。

以下是ConsumerQueue 的存储格式。

RocketMq原理解析

Consume Queue 的第三部分存储的是 Tag对应的 hashcode,是一个定长的字符串,通过 Tag 过滤的过 程就是对 比定长的 hashcode。 经过 hashcode 对比 , 符合要求的消息被从 CommitLog 读取出来,不用担心 Hash 冲突问题,消息在被消费前,会对比完整的 Message Tag 字符串,消除 Hash 冲突造成的误读 。

Broker 端进行消息过滤

Filter Server方式过滤,Filter Server 是 一 种比 SQL 表 达式更灵活的过滤方式,允许用户自 定 义 Java 函数,根据 Java 函数的逻辑对消息进行过滤。

要使用 Filter Server, 首先要在启动 Broker前在配置文件里加上 filterServer­ Nums= 3 这样的配置, Broker在启动的时候, 就会在本机启动3个Filter Server进程 。 Filter Server类 似 一 个 RocketMQ 的 Consumer 进程,它从 本 机 Broker 获取消息,然后根据用户上传过来的 Java 函数进行过滤,过滤后的消息 再传给远端的 Consumer。 这种方式会占用很多 Broker机器的 CPU 资源,要根 据实际情况谨慎使用 。 上传的 java代码也要经过检查 ,不能有申请大内存、创 建线程等这样的操作,否则容易造成 Broker服务器右机。

Consumer 处理能力

当 Consumer 的处 理速度眼不上消息的产生速度,会造成越来越多的消息 积压,这个时候首先查看消费逻辑本身有没有优化空间,除此之外还有三种方 法可 以提高 Consumer 的处理能力 。

  1. 提高消费并行度

在同一个 ConsumerGroup 下( Clustering 方式),可以通过增加 Consumer 实例的数量来提高并行度,通过加机器,或者在 已有机器中启动 多个 Consumer 进程都可以增加 Consumer实例数。注意总的 Consumer数量不要超过 Topic下 Read Queue 数量,超过的 Consumer 实例接收不到消息 。 此外,通过提高单个 Consumer 实例中的并行处理的线程数 可以在同一个 Consumer 内增加并行度 来提高吞吐量(设置方法是修改 consumeThreadMin 和 consumeThreadMax)。

  1. 以批量方式进行消费

通过批量方式消费来提高消费的吞吐 量 。 实现方法是设置 Consumer 的 consumeMessageBatchMaxSize 这个参数 ,默 认是 1,如果设置为 N,在消息多的时候每次收到的是个长度为 N的消息链表。

  1. 检测延时情况,跳过非重要消息

Consumer 在消费的过程中, 如 果发 现由于某种 原因发生严重的消 息堆积, 短时间无法消除堆积,这个时候可以选择丢弃不重要 的消息,使 Consumer尽 快追上 Producer 的进度。

Consumer 的负载均衡

在 RocketMQ 中,负载均衡或者消息分配是在 Consumer端代码中完成的, Consumer从 Broker处获得全局信息,然后自己做 负载均衡,只处理分给自己的那部分消息 。

DefaultMQPushConsumer 的负载均衡

DefaultMQPushConsumer 的负载均衡过程不需要使用者操心,客户端程 序会自动处理,每个 DefultMQPushConsumer启动后,会马上会触发一个 doRebalance 动作;而且在同一个 ConsumerGroup 里加入新的 DefaultMQPush­Consumer时,各个 Consumer都会被触发 doRebalance动作。

DefaultMQPullConsumer 的负载均衡

Pull Consumer可以看到所有的 Message Queue, 而且从哪个 Message Queue读取消息,读消息时的 Offset都由使用者控制,使用者可以实现任何特 殊方式的负载均衡。

提高 Producer 的发送速度

RocketMQ 的 CommitLog 会有频繁的创建/删除动作 ,EXT4 创建/删除文件的性能比 EXT3 及其他文件系统要好。IO调度算法也推荐调整为 deadline。

IndexFile

消息索引文件,主要存储消息 Key 与 Offset 的对应关系。

消息消费队列是RocketMQ专门为消息订阅构建的索引文件,提高根据主题与消息队 列检索消息的速度 ,另外 RocketMQ 引入了 Hash 索引机制为消息建立索引, HashMap 的设 计包含两个基本点 :Hash 槽与 Hash 冲突的链表结构。

上一篇:卡夫卡消息队列


下一篇:dubbox REST服务使用fastjson替换jackson