RoceketMQ事务消息介绍
- RocketMQ采用了2PC的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败的消息
- 使用背景可以参考阿里云的《事务消息》介绍
2.1 传统的大事务可以被拆分为小事务,通过RocketMQ版分布式事务消息保证数据的最终一致性
RocketMQ事务消息源码分析
Demo演示环境
- 在github上拉取RocketMQ源码,具体源码搭建环境参考《搭建RocketMq源码调式环境》
- 启动namesrv和broker
- 启动example的事务生产者TransactionProducer
// 事务监听器 TransactionListener transactionListener = new TransactionListenerImpl(); TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name"); ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("client-transaction-msg-check-thread"); return thread; } }); producer.setExecutorService(executorService); producer.setTransactionListener(transactionListener); // 启动生产者 producer.start(); String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"}; int i = 0; Message msg = new Message("TopicTest1234", tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); // 发送事务消息 SendResult sendResult = producer.sendMessageInTransaction(msg, null); System.out.printf("%s%n", sendResult); producer.shutdown(); //断点在这,避免生产者被立马关闭
流程分析
-
事务发送提交过程:
1.1 发送消息(half消息)
1.2 服务端响应消息写入结果
1.3 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)
1.4 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见) -
发送消息入口TransactionMQProducer#sendMessageInTransaction
2.1 debug DefaultMQProducerImpl#sendMessageInTransaction源码发现,其实事务消息主要就是多了TRAN_MSG=true的事务标识(关于服务端如何响应见下面分析3)
2.2 如果半消息发送成功,则触发事务监听器执行本地事务
2.3 根据本地事务状态,向broker发送commit或者rollback请求(详情见下方4)public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter localTransactionExecuter, final Object arg) throws MQClientException { // 事务监听器 TransactionListener transactionListener = getCheckListener(); // 添加TRAN_MSG=true事务消息标识,以及PGROUP=please_rename_unique_group_name MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup()); // 发送半消息 SendResult sendResult = this.send(msg); // 消息发送成功处理 switch (sendResult.getSendStatus()) { case SEND_OK: { //消息的UNIQ_KEY属性作为事务ID String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); if (null != transactionId && !"".equals(transactionId)) { msg.setTransactionId(transactionId); } if (transactionListener != null) { log.debug("Used new transaction API"); // 执行本地事务 localTransactionState = transactionListener.executeLocalTransaction(msg, arg); } } } // 根据本地事务状态,向broker发送commit或者rollback请求 this.endTransaction(msg, sendResult, localTransactionState, localException); }
-
发送事务消息后,broker是怎样响应的呢?
3.1 首先看一下SendMessageProcessor#asyncSendMessage,发现是根据事务标识做分别处理private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request, SendMessageContext mqtraceContext, SendMessageRequestHeader requestHeader) { Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties()); String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED); CompletableFuture<PutMessageResult> putMessageResult = null; // 判断消息是否有事务标识,有则按照事务消息处理,否则按照普通消息处理 if (transFlag != null && Boolean.parseBoolean(transFlag)) { putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner); }else { putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner); } }
3.2 处理事务消息TransactionalMessageBridge#asyncPutHalfMessage
(1)将消息转换为半消息parseHalfMessageInner,更换主题以及队列,防止被消费端消费private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) { // 备份原消息的topic和queueId MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic()); MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msgInner.getQueueId())); // 设置消息的系统标识为TRANSACTION_NOT_TYPE msgInner.setSysFlag( MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE)); // 重新赋值消息的topic为RMQ_SYS_TRANS_HALF_TOPIC以及queueId为0 msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic()); msgInner.setQueueId(0); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); return msgInner; }
(2)DefaultMessageStore#asyncPutMessage将半消息写入commitLog
-
发送commit或rollback请求:DefaultMQProducerImpl#endTransaction
4.1 根据事务状态设置CommitOrRollback属性,向broker发送请求public void endTransaction( final Message msg, final SendResult sendResult, final LocalTransactionState localTransactionState, final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException { // 找到半消息所在队列的broker地址 final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName()); // 组装事务提交/回滚请求报文 EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader(); requestHeader.setTransactionId(transactionId); requestHeader.setCommitLogOffset(id.getOffset()); // 根据本地事务状态,设置commitOrRollback属性进行提交或者回滚 switch (localTransactionState) { case COMMIT_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE); break; case ROLLBACK_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); break; case UNKNOW: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE); break; default: break; } // 以oneway的方式发送broker(单向:只发送请求不等待应答) this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark, this.defaultMQProducer.getSendMsgTimeout()); }
4.2 broker接收EndTransaction的入口在EndTransactionProcessor#processRequest
(1)根据请求的偏移量获取指定的half消息
(2)将half消息 转换为原始消息(topic、queueId更换)
(3)将消息发送到真正的Topic里,该消息可以开始下发给消费者
(4)如果落盘成功,则删除prepare消息,其实是将消息写入到Op Topic里
(5)如果是Rollback,则直接将消息转换为原消息,并写入到Op Topic里public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { OperationResult result = new OperationResult(); if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) { // 获取半消息 result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader); if (result.getResponseCode() == ResponseCode.SUCCESS) { // 将prepare消息转换为原消息,该消息的Topic就是真正消息的Topic MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage()); // 将消息发送到真正的Topic里,该消息可以开始下发给消费者 RemotingCommand sendResult = sendFinalMessage(msgInner); if (sendResult.getCode() == ResponseCode.SUCCESS) { //如果落盘成功,则删除prepare消息,其实是将消息写入到Op Topic里 this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage()); } } } }
流程补充
- RocketMQ事务消息方案中引入Op消息的概念,用Op消息标识事务消息已经确定状态(commit或者Rollback)
1.1 如果一条事务消息没有对应的Op消息,说明这个事务的状态还无法确定(可能是二阶段失败了)
1.2 实际上RocketMQ也无法去真正的删除一条消息,因为是顺序写文件的 - Op消息的存储和对应关系
- 如何处理二阶段失败的消息?
3.1 如果出现网络问题导致Commit失败,Broker端对未确定状态的消息发起回查
(1)将消息发送到对应的Producer端(同一个Group的Producer)
(2)Producer根据消息来检查本地事务的状态,进而执行Commit或者Rollback
3.2 Broker端通过对比Half消息和Op消息进行事务消息的回查并且推进CheckPoint
3.3 默认回查15次,超过则默认回滚该消息
束语
- 自己没有实战过RocketMQ事务消息,本篇文章相当于是技术积累。在以后实现中积累方案
- 有说的不对的地方帮忙指正,实战的场景欢迎大家多多讨论