实战应用
maven依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.1</version>
</dependency>
普通消息
消息发送
1、Producer端发送同步消息
这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
public static void syncProducer() throws Exception{
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("Group_A");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
//启动Producer实例
producer.start();
for (int i = 0; i < 100; i++) {
// 创建消息,并指定Topic,Tag和消息体
Message message = new Message("TopicTest", "TagA",
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送消息到一个Broker
SendResult sendResult = producer.send(message);
System.out.println(sendResult);
}
// 不再发送消息,关闭Producer实例
producer.shutdown();
}
2、发送异步消息
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
public static void asyncProducer() throws Exception{
DefaultMQProducer producer = new DefaultMQProducer("Group_A");
producer.setNamesrvAddr("localhost:9876");
producer.start();
// 重试次数
producer.setRetryTimesWhenSendAsyncFailed(0);
// 根据消息数量实例化倒计时计算器
final CountDownLatch2 countDownLatch2 = new CountDownLatch2(messageCount);
for(int i = 0; i < messageCount; i++){
final int index = i;
Message message = new Message("TopicTest", "TagB",
"OrderID909", "Hello World".getBytes(RemotingHelper.DEFAULT_CHARSET));
// SendCallBack接受异步返回结果的回调
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
@Override
public void onException(Throwable throwable) {
System.out.printf("%-10d Exception %s %n", index, throwable);
throwable.printStackTrace();
}
});
}
// 等待5s
countDownLatch2.await(5, TimeUnit.SECONDS);
producer.shutdown();
}
3、发送单向消息
这种方式主要用在不特别关心发送结果的场景,例如日志发送。
public static void onewayProducer() throws Exception{
DefaultMQProducer producer = new DefaultMQProducer("Group_A");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < messageCount; i++) {
Message message = new Message("TopicTest", "TagC",
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送单向消息,没有返回结果
producer.sendOneway(message);
}
producer.shutdown();
}
消费消息
public static void consumer() throws MQClientException {
// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Consumer_GroupA");
//设置NameServer地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
consumer.subscribe("TopicTest", "*");
// 注册回调实现类来处理从broker拉取回来的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgList);
// 标记该消息已经被成功消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消费者实例
consumer.start();
System.out.println("Consumer Started");
}
发送延时消息
只需要给消息设置 DelayTimeLevel
延时等级即可
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
// 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
message.setDelayTimeLevel(3);
// 发送消息
producer.send(message);
}
顺序消息
消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。
顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
在局部有序中:一个MessageQueue只能由一个消费者消费,且只能单线程消费。但是这个消费者可以开启多线程,同时消费多个MessageQueue。
顺序消息生产
下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。
生产消息时,订单号相同的消息会被先后发送到同一个队列 MessageQueue
中;消费时,同一个OrderId获取到的肯定是同一个队列。
/**
* 顺序消息生产
*/
public static void orderProducer() throws Exception{
DefaultMQProducer producer = new DefaultMQProducer("ProducerA");
producer.setNamesrvAddr("localhost:9876");
producer.start();
String[] tags = new String[]{"TagA", "TagC", "TagD"};
// 订单列表
List<OrderStep> orderList = new Producer().buildOrders();
Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateStr = sdf.format(date);
for (int i = 0; i < 10; i++) {
// 加个时间前缀
String body = dateStr + " Hello RocketMQ " + orderList.get(i);
Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes());
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long id = (Long) arg; //根据订单id选择发送queue
long index = id % mqs.size(); //取模
return mqs.get((int) index);
}
}, orderList.get(i).getOrderId());//订单id
System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
sendResult.getSendStatus(),
sendResult.getMessageQueue().getQueueId(),
body));
}
producer.shutdown();
}
顺序消息消费
/**
* 顺序消息消费
*/
public static void orderConsumer() throws Exception{
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerA");
consumer.setNamesrvAddr("localhost:9876");
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerOrderly() {
Random random = new Random();
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
// 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序
System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
}
try {
//模拟业务逻辑处理中...
TimeUnit.SECONDS.sleep(random.nextInt(10));
} catch (Exception e) {
e.printStackTrace();
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
- 重点:每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序
从 consumer.registerMessageListener(new MessageListenerOrderly()
代码可以知道,顺序消息使用 MessageListenerOrderly
来告诉消费者进行顺序消费消息,并且只能单线程去消费同一个queue。而普通消息使用 MessageListenerConcurrently
进行并发消费消息。
事务消息
事务消息共有三种状态,提交状态、回滚状态、中间状态:
- TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。
- TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
- TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。
创建事务型生产者
使用 TransactionMQProducer
类创建生产者,并指定唯一的 ProducerGroup
,就可以设置自定义线程池来处理这些检查请求。执行本地事务后、需要根据执行结果对消息队列进行回复。
public static void transactionProducer() throws Exception{
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("TransactionProducer");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message message = new Message("MyTopic", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
TransactionSendResult sendResult = producer.sendMessageInTransaction(message, null);
System.out.println(sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
实现事务的监听接口
当发送半消息成功时,我们使用 executeLocalTransaction
方法来执行本地事务。它返回前一节中提到的三个事务状态之一。checkLocalTransaction
方法用于检查本地事务状态,并回应消息队列的检查请求。它也是返回前一节中提到的三个事务状态之一。
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
/**
* 执行本地事务
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
//返回未知,让Broker回查事务状态
return LocalTransactionState.UNKNOW;
}
/**
* 检查本地事务状态
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if(null != status){
switch (status){
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
事务消息流程
分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。
1、事务消息发送及提交:
(1) 发送消息(half半消息)。
(2) 服务端响应消息写入结果。
(3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。
(4) 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)
2、补偿流程(回查):
(1) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”
(2) Producer收到回查消息,检查回查消息对应的本地事务的状态
(3) 根据本地事务状态,重新Commit或者Rollback
其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。
最佳实践
Tags的使用
一个应用尽可能用一个Topic,而消息子类型则可以用tags来标识。tags可以由应用*设置,只有生产者在发送消息设置了tags,消费方在订阅消息时才可以利用tags通过broker做消息过滤:message.setTags(“TagA”)。
Keys的使用
每个消息在业务层面的唯一标识码要设置到keys字段,方便将来定位消息丢失问题。服务器会为每个消息创建索引(哈希索引),应用可以通过topic、key来查询这条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证key尽可能唯一,这样可以避免潜在的哈希冲突。
事务消息
执行本地事务 executeLocalTransaction
时,如果业务执行失败,可明确告知回滚,直接返回 Rollback
;如果业务成功,不建议直接返回 Commit
,而是建议返回 UNKNOW
。
然后在进行事务回查 checkLocalTransaction
时如果能明确事务成功,才返回 Commit
。如果不能明确本地事务成功,返回 UNKNOW
,服务端默认回查15次。
消息发送失败处理方式
Producer的send方法本身支持内部重试,重试逻辑如下:
- 至多重试2次。
- 如果同步模式发送失败,则轮转到下一个Broker,如果异步模式发送失败,则只会在当前Broker进行重试。这个方法的总耗时时间不超过sendMsgTimeout设置的值,默认10s。
- 如果本身向broker发送消息产生超时异常,就不会再重试。
以上策略也是在一定程度上保证了消息可以发送成功。如果业务对消息可靠性要求比较高,建议应用增加相应的重试逻辑:比如调用send同步方法发送失败时,则尝试将消息存储到db,然后由后台线程定时重试,确保消息一定到达Broker。
参考资料
rocketmq/docs/cn at master · apache/rocketmq (github.com)