作者:凯易&耘田
审核校对:白玙
编辑&排版:雯燕
前言:随着 RocketMQ 5.0 preview 的发布,5.0 的重大特性逐步与大家见面。POP Consumer 作为 5.0 的一大特性,POP 消费模式展现了一种全新的消费模式。其具备的轻量级,无状态,无队列独占等特点,对于消息积压场景,Streaming 消费场景等都非常友好。在介绍 POP Consumer 之前,我们先回顾一下目前使用较多的 Push Consumer。
Push Consumer
熟悉 RocketMQ 的同学对 Push Consumer 肯定不会陌生,客户端消费一般都会使用这种消费模式,使用这种消费模式也比较简单。我们只需简单设置,并在回调方法 ConsumeMessage 中写好业务逻辑即可,启动客户端应用就可以正常消费消息了。
public class PushConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
consumer.subscribe("test_topic", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
那么 Push Consumer 是如何消费消息的呢?
当然,Consumer 收到消息的前提是 Producer 先发消息发到 Topic 当中。Producer 使用轮询的方式分别向每个 Queue 中发送消息,一般消费端都不止一个,客户端启动的时候会在 Topic,Consumer group 维度发生负载均衡,为每个客户端分配需要处理的 Queue。负载均衡过程中每个客户端都获取到全部的的 ConsumerID 和所有 Queue 并进行排序,每个客户端使用相同负责均衡算法,例如平均分配的算法,这样每个客户端都会计算出自己需要消费那些 Queue,每当 Consumer 增加或减少就会触发负载均衡,所以我们可以通过 RocketMQ 负载均衡机制实现动态扩容,提升客户端收发消息能力。
这里有个小问题:可以一直增加客户端的数量提升消费能力吗?当然不可以,因为 Queue 数量有限,客户端数量一旦达到 Queue 数量,再扩容新节点无法提升消费能力,因为会有节点分配不到 Queue 而无法消费。
客户端负责均衡为客户端分配好 Queue 后,客户端会不断向 Broker 拉取消息,在客户端进行消费。不是 Push 客户端吗?怎么会是客户端向 Broker 拉消息,不应该是 Broker 推消息到客户端吗?这是一个很有意思的点,因为 RocketMQ 无论是 Push Consumer,还是 Pull Consumer,还是后面要介绍的 POP Consumer,都是客户端拉的方式消费消息。Push Consumer 只是通过客户端 API 层面的封装让我们感觉是 Broker 推送的。
经过客户端负载均衡以及拉消息,客户端就可以正常消费消息了。
完整的的Push Consumer处理逻辑可以看下上面这张图,我们可以看到Push Consumer完整处理流程。
首先客户端 Rebalance 确定哪些 Consumer 客户端处理哪些 Queue,然后通过 PullMessageService 服务拉取消息,拉取到消息以后 ConsumeMessageConcurrentlyService 提交消费请求到消息消费线程池,然后调用回调方法 ConsumeMessage,到这里就可以拿到消息处理业务了,最后消费成功更新本地 offset 并上报 offset 到 Broker。如果消费失败(抛异常,超时等),客户端会发送 sendBack 告诉 Broker 哪些消息消费失败了,Broker会将消费失败的消息发送到延时队列,延时后再放到retry Topic,客户端消费retry Topic完成消息重投。这样做的好处是不会因为部分消费失败的消息而影响正常消息的消费。想了解细节的同学可以到 github 下载源码对照这张图看一下实际的代码处理流程。
通过前面 Push Consumer 的介绍,我们对 Push Consumer 原理有了一定的认识。我们可以发现,RocketMQ 的客户端做了很多事情,负载均衡,拉消息,消费位点管理,消费失败后的 sendBack 等等。这对多语言支持无疑是不友好的。参与过多语言开发的同学应该会感同身受,将这么多的逻辑移植到不同的语言,肯定不是一件简单的事情。同时客户端的升级运维也会增加难度。
所以我们思考可不可为客户端瘦身,把一部分逻辑从客户端移到 Broker?当然是可以的,前面介绍 Push Consumer 客户端负责均衡的时候,我们可以发现,负载均衡需要的信息,所有ConsumerId,原本就是客户端从 Broker 获取的,所有 Queue 信息,Broker 也可以通过 nameServer 拿到,负责均衡算法在客户端还是 Broker 端调用也没有什么大的差异,所以把 Rebalance 移植到 Broker 是一个不错选择,Broker 负载均衡可以跟客户端负责均衡达到基本相同的效果,客户端逻辑会减少,多语言实现更加简单,后续升级运维也会更加可控。除此以外因为 Broker 相对客户端具有全局信息,还可以做一些更有意思的事情。例如在负责均衡的时候根据 Queue 的积压情况做负载均衡,将一些压力比较大的客户端上的 Queue 分配给其它客户端处理等等。
POP Consumer
通过前面 Push Consumer 的介绍,我们了解到 Push Consumer 的一些特点。
-
队列独占:Broker 上的每个队列只能分配到相同 Consumer group 的一台 Push Consumer 机器上。
-
消费后更新 offset:每次 Pull 请求拉取批量消息到本地队列缓存,本地消费成功才会 commit offset。
以上特点可能会带来一些问题,比如客户端异常机器 hang,导致分配队列消息堆积,无法消费。
RocketMQ 的 Push Consumer 消费对于机器异常 hang 时并不十分友好。如果遇到客户端机器 hang 住,处于半死不活的状态,与 Broker 的心跳没有断掉的时候,客户端 Rebalance 依然会分配消费队列到 hang 机器上,并且 hang 机器消费速度很慢甚至无法消费的时候,会导致消费堆积。另外类似还有服务端 Broker 发布时,也会由于客户端多次 Rebalance 导致消费延迟影响等无法避免的问题。如下图所示:
当 Push Consumer 2 机器发生 hang 的时候,它所分配到的 Broker 上的 Q2 出现严重的堆积。我们目前处理这种问题,一般可能是找到这台机器重启,或者下线。保证业务不受异常机器影响,但是如果队列挤压到一定程度可能机器恢复了也没办法快速追赶消费进度,这也是受 Push Consumer 的能力限制。
我们总结下 Push Consumer 存在的一些痛点问题:
-
富客户端,客户端逻辑比较重,多语言支持不友好;
-
客户端或者 Broker 升级发布,重启等 Rebalance 可能导致消费挤压;
-
队列占位,单队列与单 Consumer 绑定,单个 Queue 消费能力无法横向扩展;
-
机器 hang,会导致挤压。
基于上述问题,RocketMQ 5.0 实现了全新的消费模型-POP Consumer。
POP Consumer 能够解决上述稳定性和解除队列占位的扩展能力。
我们下面来简单看一下 POP Consumer 是如何消费消息的:
POP Client 从 Broker 的队列中发出 POP 请求消息,Broker 返回消息 message。在消息的系统属性里面有一个比较重要的属性叫做 POP_CK,POP_CK 为一条消息的 handler,通过一个 handler 就可以定位到一条消息。当消息消费成功之后,POP client 发送 ackMessage 并传递 handler 向 broker 确认消息消费成功。
对于消息的重试,当 POP 出一条消息之后,这条消息就会进入一个不可见的时间,在这段时间就不会再被 POP 出来。如果没有在这段不可见时间通过 ackMessage 确认消息消费成功,那么过了不可见时间之后,这条消息就会再一次的可见。
另外,对于消息的重试,我们的重试策略是一个梯度的延迟时间,重试的间隔时间是一个逐步递增的。所以,还有一个 changeInvisibleTime 可以修改消息的不可见时间。
从图上可以看见,本来消息会在中间这个时间点再一次的可见的,但是我们在可见之前提前使用 changeInvisibleTime延长了不可见时间,让这条消息的可见时间推迟了。当用户业务代码返回 reconsumeLater 或者抛异常的时候,我们就可以通过 changeInvisibleTime 按照重试次数来修改下一次的可见时间了。另外如果消费 RT 超过了 30 秒(默认值,可以修改),则 Broker 也会把消息放到重试队列。
除此以外,POP 消费的位点是由 Broker 保存和控制,而且 POP 消费是可以多个 Client 消费同一个队列,如下图所示:
三个客户端并不需要 Rebalance 去分配 Queue,取而代之的是,它们都会使用 POP 请求所有的 Broker 获取消息进行消费。即使 POP Consumer 2 出现 hang,其内部消息也会让 POP Consumer1 和 POP Consumer3 进行消费。这样就解决了 hang 机器可能造成的消费堆积问题。
从整体流程可见,POP 消费可以避免 Rebalance 带来的消费延时,同时客户端可以消费 Broker 的所有队列,这样就可以避免机器 hang 而导致堆积的问题。
同时扩展能力提升,POP Consumer 可以消费同一 Topic 下所有 Queue,相比 Push Consumer 解除了每个 Queue 必须 Rebalance 到一台客户端消费的限制,Push Consuner 客户端数量最多只能等于 Queue 的数量。POP Consumer 可以突破这个限制,多个 POP Consumer 可以消费同一个 Queue。
Broker 实现
POP Consumer 在 Broker 端是如何实现的呢?
POP Consumer 拉取消息后,会在 Queue 维度上加锁,保证同一时刻只有一个客户端可以拉去到同一个 Queue 的消息。获取到消息后,会保存 checkPoint 信息在 Broker,checkPoint 信息主要包括消息的 Topic,ConsumerGroup,QueueId,offset,POPTime,msgCout,reviveQueueId 等信息。checkPoint 信息会优先保存到 buffer 当中,等待 ack 消息,在一段时间内收到客户端回复的 ack 消息,对应的 checkPoint 信息从 buffer 中移除,并且更新消费进度,标识消息消费成功。
当 checkPoint 消息在 buffer 中等待一段时间,一直未等到 ack 消息时,checkPoint 信息会清理出 buffer 并发送 ck msg 到 store,ck msg 首先被发送到延时队列 SCHEDULE_Topic_XXXX 中,延时完成以后会进入 REVIVE_LOG Topic,REVIVE_LOG Topic 是保存在 store 当中待处理的 ck msg 和 ack msg 的 Topic,POPReceiveService 拉取 REVIVE_LOG Topic 的消息放到一个 map 当中,如果 ck 有对应的 ack 则会更新 REVIVE_LOG 的消费位点,标识消息消费完成,超时未被确认的 ck msg,会查询到 ck msg 对应的真实的消息,并把这个消息放到 retry Topic 当中,等待客户端消费,POP Consumer 正常消费的时候会概率性的消费到 retry Topic 中的消息。我们从这块设计中可以看到 RocketMQ 的常用设计,通过一些内部的 Topic 实现业务逻辑,事务消息,定时消息都用了这种设计方式。
我们简单终结一下 POP Consumer 的优势:
-
无状态,offset 信息 Broker 维护,客户端与 Queue 无绑定。
-
轻量级,客户端只需要收发消息,确认消息。
-
无队列占位,Queue 不再与客户端绑定。
-
多语言友好,方便多语言移植。
-
升级更可控,逻辑都收敛到 Broker,升级更加方便可控。
POP&Push 融合
既然 POP 有这么多优势,我们能否使用 POP 解决 Push 的一些问题呢?前面我们提到 Push Consumer 当一个队列因为 Consumer 问题已经堆积很多的时候,受限于单个 Consumer 的消费能力,也无法快速的追赶消费进度,延迟会很高。核心问题是单队列单 Consumer 的限制,导致消费能力无法横向扩展。
我们希望通过 POPAPI 的形式,当一个队列堆积太多的情况下,可以切换到 POP 模式,有机会让多个 Consumer 来一起消费该队列,追赶进度,我们在 5.0 的实现中也实现了这一点。
POP/Push 模式切换方式
可以通过两种方式进行切换。
1、命令行
mqadmin setConsumeMode -c cluster -t topic -g group -m POP -n 8
2、代码切换
public static final String CONSUMER_GROUP = "CID_JODIE_1";
public static final String TOPIC = "TopicTest";
// Or use AdminTools directly: mqadmin setConsumeMode -c cluster -t topic -g group -m POP -n 8
private static void switchPop() throws Exception {
DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt();
mqAdminExt.start();
ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
Set<String> brokerAddrs = clusterInfo.getBrokerAddrTable().values().stream().map(BrokerData::selectBrokerAddr).collect(Collectors.toSet());
for (String brokerAddr : brokerAddrs) {
mqAdminExt.setMessageRequestMode(brokerAddr, TOPIC, CONSUMER_GROUP, MessageRequestMode.POP, 8, 3_000);
}
}
通过下面 POP Consumer Demo,我们看到 POP Consumer 跟 Push API 基本是统一,使用也比较简单,相比 Push API 只是多了一步消费模式切换。
Push & POP Retry 队列差异
在使用 POP 消费模式时我们只需要在 Push API 的基础上切换模式即可,对于 Broker 来说还是需要做一些处理的。主要需要处理的地方是 retry 队列。
Push 和 POP 模式对 retry 队列处理不一样
-
Push 的 retry 处理
1)服务端有一个 %RETRY%ConsumerGroup 队列
2)客户端会有拉取任务拉取这个队列的消息。 -
POP 的 retry 处理
1)服务端针对每个Topic,都有一个名为 %RETRY%ConsumerGroup_Topic 的 retry 队列
2)客户端没有专门针对 retry 队列的拉任务,每次普通 POP 请求都有一定概率消费相应的 retry 队列
模式切换之后,老模式的 retry 里的消息还需要继续处理,否则就丢消息了。
Push & POP 切换
Push 切换到 POP
- 正常队列切换到 POP 模式
- 正常队列的 POP 请求会处理对应的 POP retry 队列
- 针对 Push retry 队列,我们保留原来 Push retry 队列的拉取任务,并且是工作在 Push 模式。
POP 切换到 Push
- 正常队列切换到 Push 模式
- Push retry 队列自然有相应的拉取任务
- 之前 POP 的 retry 队列,我们在客户端自动创建拉取任务,以Push 模式去拉取。注意这
总结下来就是,对于 retry 队列,我们会特殊处理不参与模式切换。
总结
最后我们总结下 POP Consumer。POP 作为一种全新的消费模式,解决了 Push 模式的一些痛点,使客户端无状态,更加轻量,消费逻辑也基本都收敛到了 Broker,对多语言的支持十分的友好。在 API 层面也与 Push 完成了融合,继承了 Push API 的简单易用,同时实现了 Push,POP 之间的*切换。