RocketMQ基础及应用
基础概念
消息消费模式
消息消费模式由消费者来决定,可以由消费者设置MessageModel来决定消息模式。消费模式默认为集群消费模式。
1.MessageModel.CLUSTERING - 集群消费:
使用集群消费模式时,MQ 认为任意一条消息只需要被集群(group)内的任意一个消费者处理即可。
- 每条消息只需要被处理一次,broker只会把消息发送给消费集群中的一个消费者
- 在消息重投时,不能保证路由到同一台机器上
- 消费状态由broker维护
2.MessageModel.BROADCASTING - 广播消费:
使用广播消费模式时,MQ 会将每条消息推送给集群内所有消费者,保证消息至少被每个消费者消费一次。
- 消费进度由consumer维护
- 保证每个消费者消费一次消息
- 消费失败的消息不会重投
消息发送方式
单向消息:
只发送消息,不等待服务器响应,只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。
producer.sendOneway(message);
同步消息:
消息发送中进入同步等待状态,可以保证消息投递一定到达。
异步消息:
需要快速发送消息,也不想丢失的时候可以使用异步消息(常用)
producer.send(message,new SendCallback() {
public void onSuccess(SendResult sendResult) {
// 发送成功的回调
System.out.println("ok");
}
public void onException(Throwable e) {
// 发送失败的回调
e.printStackTrace();
System.out.println("err");
}
});
批量消息:
可以多条消息打包一起发送,减少网络传输次数提高效率。
- 批量消息要求必要具有同一topic、相同消息配置
- 不支持延时消息
- 建议一个批量消息最好不要超过1MB大小
- 如果不确定是否超过限制,可以手动计算大小分批发送
public SendResult send(Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(batch(msgs));
}
producer.send(Collection c) //方法可以接受一个集合 实现批量发送
延迟消息:
RocketMQ使用messageDelayLevel可以设置延迟投递
在broker.conf
中添加配置
//配置从1级开始,各级延时的时间,可以修改某级别的延时时间
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
在发送消息时设置 message.setDelayTimeLevel(1);
消息过滤
TAG:
可以使用TAG来对消息进行过滤。
在Producer生产者中使用Tag:
Message msg = new Message("TopicTest","TagTest" ,("Hello RocketMQ " ).getBytes(RemotingHelper.DEFAULT_CHARSET));
在Consumer消费者中使用Tag:
// * 代表订阅Topic下的所有消息 可以用多个tag过滤消息
consumer.subscribe("TopicTest", "TagTest||TagTest2");
MessageSelector:
SQL表达式过滤,消费者将收到包含TAGA或TAGB的消息,但需要消费里面部分的消息,可以使用SQL表达式筛选出消息。
在broker.conf
中添加配置 enablePropertyFilter=true 并在启动broker的时候指定好配置文件。
//使用MessageSelector
MessageSelector selector = MessageSelector.bySql("order > 5");
consumer.subscribe("topicTest", selector);
可以支持基本语法:
- 数字比较:
>
,>=
,<
,<=
,BETWEEN
,=
; - 字符比较:
=
,<>
,IN
; -
IS NULL
或者IS NOT NULL
; - 逻辑运算
AND
,OR
,NOT
;
事务消息
1. 通过 Half Message 预处理消息,当broker收到此类消息后,会存储到RMQ_SYS_TRANS_HALF_TOPIC的消息消费队列中。
2. Broker会开启一个定时任务,消费RMQ_SYS_TRANS_HALF_TOPIC队列中的消息,每次执行任务会向消息发送者确认事务执行状态(提交、回滚、未知),如果是未知,等待下一次回调。如果超过回调次数,默认回滚消息
事务状态:
//执行事务成功,确认提交
LocalTransactionState.COMMIT_MESSAGE
//回滚消息,broker端会删除半消息
LocalTransactionState.ROLLBACK_MESSAGE
//暂时为未知状态,等待broker回查
LocalTransactionState.UNKNOW
TransactionListener两个方法
executeLocalTransaction:半消息发送成功触发此方法来执行本地事务
checkLocalTransaction:broker将发送检查消息来检查事务状态,并将调用此方法来获取本地事务状态
重试机制
producer生产者:
private int sendMsgTimeout = 3000; //默认超时时间
// 异步发送时 重试次数,默认 2
producer.setRetryTimesWhenSendAsyncFailed(1);
// 同步发送时 重试次数,默认 2
producer.setRetryTimesWhenSendFailed(1);
// 是否向其他broker发送请求 默认false
producer.setRetryAnotherBrokerWhenNotStoreOK(true);
Consumer消费者:
消费失败时返回RECONSUME_LATER
broker投递:
只有在消息模式为MessageModel.CLUSTERING集群模式时,Broker才会自动进行重试,广播消息不重试。
重投使用messageDelayLevel
参数
顺序消费
-
队列先天支持FIFO模型,单一生产和消费者下只要保证使用
MessageListenerOrderly
监听器即可 -
顺序消费表示消息消费的顺序同生产者为每个消息队列发送的顺序一致,所以如果正在处理全局顺序是强制性的场景,需要确保使用的主题只有一个消息队列。
-
并行消费不再保证消息顺序,消费的最大并行数量受每个消费者客户端指定的线程池限制。
-
跟普通消息相比,顺序消息的使用需要在producer的send()方法中添加MessageQueueSelector接口的实现类,并重写select选择使用的队列,因为顺序消息局部顺序,需要将所有消息指定发送到同一队列中。
关键为以下几点:
1. 同一topic
2. 同一个QUEUE
3. 发消息的时候一个线程去发送消息
4. 消费的时候 一个线程 消费一个queue里的消息或者使用MessageListenerOrderly