一、RocketMQ事务消息
RocketMQ事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。
注意:
主要针对消息发送者。
RocketMQ的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。
1、RocketMQ事务消息流程概要
上图说明了事务消息的大致方案,其中分为两个流程:
- 正常事务消息的发送及提交
- 事务消息的补偿流程。
1.1 正常事务流程
- (1) 发送消息(half 消息):图1。
- (2) 服务端响应消息写入结果:图2。
- (3) 根据发送结果执行本地事务(如果写入失败,此时 half 消息对业务不可见,本地逻辑不执行):图3。
- (4) 根据本地事务状态执行 Commit 或者 Rollback(Commit操作生成消息索引,消息对消费者可见):图4。
1.2 事务补偿流程
- (1) 对没有 Commit/Rollback 的事务消息(pending 状态的消息),从服务端发起一次“回查”:图5。
- (2) Producer 收到回查消息,检查回查消息对应的本地事务的状态:图6。
- (3) 根据本地事务状态,重新 Commit 或者 Rollback::图7。
其中,补偿阶段用于解决消息 Commit 或者 Rollback 发生超时或者失败的情况。
2、 事务消息状态
事务消息共有三种状态,提交状态、回滚状态、中间状态:
- TransactionStatus.
CommitTransaction
: 提交状态,它允许消费者消费此消息(完成了图中 1,2,3,4 步,第 4 步是 Commit)。 - TransactionStatus.
RollbackTransaction
: 回滚状态,它代表该消息将被删除,不允许被消费(完成了图中 1,2,3,4 步, 第 4 步是 Rollback)。 - TransactionStatus.
Unknown
: 中间状态,它代表需要检查消息队列来确定状态(完成了图中 1,2,3 步, 但是没有 4 或者没有 7,无法 Commit 或 Rollback)。
3、使用限制:
- 事务消息不支持延时消息和批量消息。
- 事务回查的间隔时间:BrokerConfig. transactionCheckInterval 通过 Broker 的配置文件设置好。
- 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax 参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认 情况下同时打印错误日志。用户可以通过重写 AbstractTransactionCheckListener 类来修改这个行为。
- 事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用 户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionMsgTimeout 参数。
- 事务性消息可能不止一次被检查或消费。
- 事务性消息中用到了生产者群组,这种就是一种高可用机制,用来确保事务消息的可靠性。
- 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事 务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
- 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ 服务器能通过它们的生产者 ID 查询到消费者。
二、代码演示
1、消息发送者
使用 TransactionMQProducer
类创建生产者,并指定唯一的 ProducerGroup,就可以设置自定义线程池来处理这些检查请求。执行本地事务后、需要根据执行结果对消息队列进行回复。回传的事务状态。
public class TransactionProducer {
public static void main(String[] args) throws Exception {
// 1.创建消息生产者 producer,并指定生产者组名
TransactionMQProducer producer = new TransactionMQProducer("mq_client_TransactionProducerGroup");
// 2.指定 Nameserver 地址,集群的话使用;分隔
producer.setNamesrvAddr("192.168.xxx.xxx:9876");
//创建线程池
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
// 设置生产者回查线程池
producer.setExecutorService(executorService);
// 生产者设置事务监听器
producer.setTransactionListener(new TransactionListenerImpl());
// 3.启动 producer
producer.start();
for (int i = 0; i < 10; i++) {
//4.创建消息对象,指定主题Topic、Tag和消息体
String orderId = UUID.randomUUID().toString();
Message message = new Message("TopicTest","Tag_Transaction2", orderId,
("下单,orderId: " + orderId +",hashcode:" + Objects.hash(orderId)).getBytes(RemotingHelper.DEFAULT_CHARSET));
//5.发送消息:todo 半事务的发送
SendResult sendResult = producer.sendMessageInTransaction(message, orderId);
System.out.println("发送结果:" + sendResult);
//线程睡1秒
TimeUnit.SECONDS.sleep(1);
}
// 6.关闭生产者 producer实例
// producer.shutdown();
}
}
2、添加事务的监听接口
使用匿名内部类或者自定义事务监听接口的实现类。
当发送半消息成功时,我们可以使用 TransactionListener
接口中的
- executeLocalTransaction 方法来执行本地事务。
- checkLocalTransaction 方法用于检查本地事务状态,并回应消息队列的检查请求。
public class TransactionListenerImpl implements TransactionListener {
// 本地事务状态,可以放数据库,redis等
private ConcurrentHashMap<String, Boolean> localTrans = new ConcurrentHashMap<>();
/**
* 在该方法中执行本地事务
* @param msg - 消息本身
* @param arg - 发送事务消息时,传入的参数值
* @return
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String orderId = (String) arg;
//记录本地事务执行结果
boolean success = persistLocalTransactionResult(orderId);
System.err.println("订单服务执行本地事务下单,orderId: " + orderId + ", result: " + success);
return success ? LocalTransactionState.UNKNOW : LocalTransactionState.ROLLBACK_MESSAGE;
}
/**
* 消息事务状态回查:默认回查15次,一分钟检查一次
* @param messageExt
* @return
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
String orderId = messageExt.getKeys();
System.err.println("执行事务消息回查,orderId: " + orderId);
return Boolean.TRUE.equals(localTrans.get(orderId)) ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;
}
private boolean persistLocalTransactionResult(String orderId) {
boolean success = Math.abs(Objects.hash(orderId)) % 2 == 0;
localTrans.put(orderId, success);
return success;
}
}
注意:
- 本地事务:回滚状态, 中间状态(需要回查的)
- 回查时,把中间状态 变成 提交状态。所以:投递给消费者的订单只有订单id的 hashcode为偶数。
3、消息消费者
public class TranscationComuser {
public static void main(String[] args) throws Exception {
// 1.创建消费者 Consumer,指定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("mq_client_TransactionProducerGroup");
// 2.指定 Nameserver 地址
consumer.setNamesrvAddr("192.168.xxx.xxx:9876");
// 3.订阅主题 Topic 和 Tag
consumer.subscribe("TopicTest", "Tag_Transaction2");
// 4.注册消息监听器,设置回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg : msgs) {
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(), "utf-8");
String tags = msg.getTags();
System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 5.启动消费者 consumer
consumer.start();
}
}
注意:
只关注了 消息生产者本地事务的成功或回滚,来决定是否真正投递消息给 消费者。
作为消费者,需要:
- 确保消息的幂等性消费
- 如果消费失败就重试机制,最终要是消息成为了死信消息时,就需要特殊处理。
参考官方文档:
- 事务设计文档:https://github.com/apache/rocketmq/blob/master/docs/cn/design.md
- 事务样例文档:https://github.com/apache/rocketmq/blob/master/docs/cn/RocketMQ_Example.md
Stay Hungry, Stay Foolish. 求知若饥,虚心若愚。