【不同类型的消费者】
DefaultMQPushConsumer
由系统控制读取操作,收到消息后自动调用传入的处理方法来处理。
DefaultMQPullConsumer
读取操作中的大部分功能由使用者自动控制。
【DefaultMQPushConsumer的使用】
[特点]
1.系统收到消息后自动调用处理方法来处理消息,自动保存Offset。
2.加入的新的DefaultMQPushConsumer会自动做负载均衡。
public class QuickStart {
/**
* DefaultMQPushConsumer需要配置三个参数
* 1.Consumer的GroupName
* 2.NameServer的地址和端口号
* 3.Topic的名称
*/
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer Consumer = new DefaultMQPushConsumer("unique_group_name_1"); //1.GroupName
Consumer.setNamesrvAddr("127.0.0.1:9876;127.0.0.2:9876"); //2.NameServer的地址和端口号
Consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
Consumer.setMessageModel(MessageModel.BROADCASTING);
Consumer.subscribe("TopicTest", "*"); //3.Topic的名称
Consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf(Thread.currentThread().getName() + "Receive New Messages:" + msgs + "%n");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
Consumer.start();
}
}
[ 注意1:关于GroupName ]
RocketMQ支持两种消息模式:Clustering(集群消费) 和 Broadcasting(广播消费)。
1.Clustering模式 (即P2P模式)
同一个ConsumerGroup(即相同的GroupName)里的每个Consumer只消费订阅消息的一部分,同一个ConsumerGroup里所有的Consumer消费的内容合起来才是所订阅的Topic内容的整体,从而达到负载均衡的目的。
2.Broading模式(即发布-订阅模式)
同一个ConsumerGroup里的每个Consumer都能消费到所订阅的Topic的消息,就是一个消息会被多次分发,被多个Consumer消费。
[ 注意2:关于NameServer配置 ]
NameServer的地址和端口号,可以填写多个,用 ";" 隔开,达到消除单点故障的目的。如"ip1:port;ip2:"
[ 注意3:Topic的配置 ]
Topic的名称用来标识消息类型,需要填创建。如果不需要消费某个Topic下的所有消息,可以通过指定消息的Tag进行消息过滤,比如:
//表示这个Consumer只消费"TopicTest"下的带有tag1、tag2、tag3的消息
Consumer.subscribe("TopicTest", "tag1||tag2||tag3");
Tag是发消息时设置的标签,在填写Tag参数的位置,用null或"*"表示要消费这个Topic的所有消息。
【DefaultMQPushConsumer的处理流程】
DefaultMQPushConsumer的主要功能实现在DefaultMQPushConsumerImpl中,消息处理在pullMessage的PullCallBack中。
在PullCallBack中有个Switch语句,根据Broker返回的消息类型做对应的处理。
DefaultMQPushConsumer的源码中有很多PullRequest语句,比如
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest)
为什么Push的Consumer中会出现PullRequest相关的代码呢?
通过长轮询的方式达到Push效果的方法,长轮询的方式既有Pull的优点,也有Push方式的实时性。
[ 补充——Push方式和Pull方式的区别 ]
1.Push方式
过程:Server收到消息后,主动把消息推送给Client端。
优点:实时性高。
缺点:
加大了Server端的工作量,会影响Server的性能。
Client端处理能力各不相同,Client的状态不收Server控制,如果Client端不能及时处理Server推送过来的消息,会造成各种潜在的问题。
2.Pull方式
过程:Client端循环地从Server端拉取消息,主动权在Client手里,自己拉取到一定量的消息后,处理完成之后继续取。
缺点:
循环拉取的消息间隔时间不好设定,间隔太短就处于忙等状态,浪费资源;间隔太长,消息不能被及时处理。
[ 长轮询的方式 ]
长轮询的方式通过Client端和Server端的配合,达到了既有Pull方式的优点,也能达到保证实时性的目的。
[ 长轮询的发送Pull消息的代码片段 ]
拼接PullMessageRequestHeader,然后作为消息参数发送。
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
requestHeader.setConsumerGroup(this.ConsumerGroup);
requestHeader.setTopic(mq.getTopic());
requestHeader.setQueueId(mq.getQueueid());
requestHeader.setQueueOffset(Offset);
requestHeader.setMaxMsgNums(maxNums);
requestHeader.setSysFlag(sysFlaginner);
requestHeader.setCommitOffset(commitOffset);
//设置了Broker的最长阻塞时间,默认15秒,Broker没有消息时才阻塞,有消息会立刻返回。
requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
requestHeader.setSubscription(subExpression);
requestHeader.setSubVersion(subVersion);
requestHeader.setExpressionType(expressionType);
PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
brokerAddr, requestHeader, timeoutMillis, communicationMode, pullCallback);
[ 长轮询的Broker服务端代码 ]
//如果队列里没有消息
if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
this.waitForRunning( * ); //等待5S
} else {
this.waitForRunning(this.brokerController.getBrokerConfig().
getShortPollingTimeMills());
long beginLockTimestamp = this.systemClock.now();
this.checkHoldRequest();
long costTime = this.systemClock.now() - beginLockTimestamp;
if (costTime > * ) {
Log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
}
}
从Broker的源码中看,服务端收到新消息请求时,如果队列里没有消息,并不急于返回,而是循环不断的查看状态,每次waitForRunning一段时间(默认5S),然后再check。默认情况下当Broker一直没有新消息,第三次check的时候,等待时间超过了RequestHeader里面的SuspendTimeoutMillis,就会返回空结果。
在等待的过程中,Broker收到了新的消息后会直接调用notifyMessageAriving方法返回的请求结果。
[ 长轮询小结 ]
长轮询的核心是,Broker端HOLD住客户端发送过来的请求一小段时间,在这个时间里有新的消息到达,就利用现有的连接立即返回给Consumer。
长轮询的主动权还是掌握在Consumer手中,Broker即使有大量的消息积压,也不会主动推送给Consumer。
局限性:
在HOLD住Consumer请求的时候需要占用资源,它适合用在消息队列这种客户端连接可控的场景。
【DefaultMQPush的流量控制】
PushConsumer的核心还是Pull方式,所以采用这种方式的客户端能根据自身的处理速度调整获取消息的操作速度。
PushConsumer有一个线程池,消息处理逻辑在各个线程里同时执行,线程池定义如下:
this.consumeExecutor = new ThreadPoolExecutor(
this.defaultMQPushConsumer.getConsumeThreadMin(),
this.defaultMQPushConsumer.getConsumeThreadMax(),
* ,
TimeUnit.MILLISECONDS,
this.consumeRequestQueue,
new ThreadFactoryimpl("ConsumeMessageThread")
);
Pull拉的消息,如果直接提交到线程池,很难监控和控制,比如当前消息堆积数量、消息是否重复执行、如何延迟处理某些消息。这些问题,都用一个快照类ProcessQueue来解决,在PushConsumer运行的时候,每个Message Queue都有个对应的ProcessQueue对象,保存这个Message Queue消息处理状态的快照。
[ ProcessQueue对象 ]
主要组成:一个TreeMap + 一个读写锁。
[ PushConsumer的流量控制 ]
有了ProcessQueue对象,流量的控制就方便多了。
PushConsumer会判断下面三个数据:
1.获取但还未处理的消息个数;
2.消息的总大小;
3.Offset的跨度;
任何一个值超过设定的大小就会隔一段时间再拉取,从而达到流量控制的目的。
【 DefaultMQPullConsumer 】
使用DefaultMQPullConsumer像使用DefaultMQPushConsumer一样需要设置各种参数,写处理消息的方法等。
[ PullConsumer示例代码 ]
public class PullConsumer {
private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<>(); public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer Consumer = new DefaultMQPullConsumer("group_name_A");
Consumer.start();
Set<MessageQueue> mqs = Consumer.fetchSubscribeMessageQueues("TopicTest");
/** 1.获取MessageQueue,并遍历 **/
for (MessageQueue mq : mqs) {
/** 2.维护Offsetstore **/
long Offset = Consumer.fetchConsumeOffset(mq, true);
System.out.printf("Consume from the Queue:" + mq + "%n");
SINGLE_MQ:
while (true) {
try {
PullResult pullResult = Consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), );
System.out.printf("%s%n", pullResult);
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
/** 3.根据不同的消息状态做不同的处理 **/
switch (pullResult.getPullStatus()) {
case FOUND:
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
break;
default:
break;
}
} catch (Exception e) {
e.printStackTrace();
}
Consumer.shutdown();
} }
} private static long getMessageQueueOffset(MessageQueue mq){
Long Offset = OFFSE_TABLE.get(mq);
if(Offset!=null){
return Offset;
}
return ;
} private static void putMessageQueueOffset(MessageQueue mq,long Offset){
OFFSE_TABLE.put(mq,Offset);
}
}
[ PullConsumer注意点 ]
示例代码是逐个读取某个Topic下的所有MessageQueue的内容,主要做了三件事:
1.获取MessageQueue,并遍历
一个Topic包含多个MessageQueue。
如果这个Consumer需要获取这个Topic下的所有消息,就要遍历所有的MessageQueue。如果有特殊情况,也可以选择某些特定的MessageQueue来读取消息。
2.维护Offsetstore
从一个MessageQueue里拉取消息时,要传入Offset参数,随着不断的读取消息,Offset不断怎张。此时需要由用户负责把Offset存储下来,可以根据具体情况存到内存、磁盘或者数据中。
3.根据不同的消息状态做不同的处理
拉取消息的请求发出后,会返回下面4种状态码:
/**
* 拉取消息状态码
*/
public enum PullStatus {
/**
* Founded
*/
FOUND,
/**
* No new message can be pull
*/
NO_NEW_MSG,
/**
* Filtering results can not match
*/
NO_MATCHED_MSG,
/**
* Illegal offset,may be too big or too small
*/
OFFSET_ILLEGAL
}
比较重要的是这2个状态码:
FOUND 获取到消息;
NO_NEW_MESSAGE 没有新的消息;
【Consumer的启动、关闭流程】
[1.PullConsumer]
PullConsumer的主动权很高,可以根据实际需要暂停、停止、启动消费者。
[ 注意 ]
PullConsumer的重点是Offset的保存,需要再代码中异常处理部分增加这样的处理:把Offset写入磁盘,记住每个MessageQueue的Offset,才能保证消息消费的准确性。
[ 2.PushConsumer ]
DefaultMQPushConsumer的退出:
调用shutdown()方法,以便释放资源,保存Offset等。(这个调用要加到Consumer所在应用的处理逻辑中)
PushConsumer启动:
PushConsumer启动的时候,会做各种配置的检查,然后连接NameServer获取Topic信息,启动时如遇到异常,如无法连接NameServer,程序依然可以正常启动不报错(日志里会有Warn信息)。
【为什么DefaultPushMQConsumer在无法连接NameServer时不报错?】
和分布式系统的设计有关,RocketMQ集群可以有多个NameServer、Broker,某个机器出异常后整体服务依然可用。
所以DefaultMQPushConsumer被设计成当发现某个连接异常时,不立即退出,而是不断尝试重新连接。
【如果要在DefaultMQPushConsumer启动的时候,及时暴露配置问题(及时报错),如何处理?】
可以在Consumer.start()语句后调用:Consumer.fetchSubscribeMessageQueues("TopicName"),这时如果配置信息不准确,或者当前服务不可用,会报MQClientException异常。