RocketMQ的事务消息的摸索

RoceketMQ事务消息介绍

  1. RocketMQ采用了2PC的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败的消息
    RocketMQ的事务消息的摸索
  2. 使用背景可以参考阿里云的《事务消息》介绍
    2.1 传统的大事务可以被拆分为小事务,通过RocketMQ版分布式事务消息保证数据的最终一致性
    RocketMQ的事务消息的摸索

RocketMQ事务消息源码分析

Demo演示环境

  1. 在github上拉取RocketMQ源码,具体源码搭建环境参考《搭建RocketMq源码调式环境》
  2. 启动namesrv和broker
  3. 启动example的事务生产者TransactionProducer
    // 事务监听器
    TransactionListener transactionListener = new TransactionListenerImpl();
    TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
      ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
          @Override
          public Thread newThread(Runnable r) {
              Thread thread = new Thread(r);
              thread.setName("client-transaction-msg-check-thread");
              return thread;
          }
    });
    
    producer.setExecutorService(executorService);
    producer.setTransactionListener(transactionListener);
    // 启动生产者
    producer.start();
    
    String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
    int i = 0;
    Message msg = new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
                            ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
    // 发送事务消息
    SendResult sendResult = producer.sendMessageInTransaction(msg, null);
    System.out.printf("%s%n", sendResult);
    
    producer.shutdown(); //断点在这,避免生产者被立马关闭
    

流程分析

  1. 事务发送提交过程:
    1.1 发送消息(half消息)
    1.2 服务端响应消息写入结果
    1.3 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)
    1.4 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)

  2. 发送消息入口TransactionMQProducer#sendMessageInTransaction
    2.1 debug DefaultMQProducerImpl#sendMessageInTransaction源码发现,其实事务消息主要就是多了TRAN_MSG=true的事务标识(关于服务端如何响应见下面分析3)
    2.2 如果半消息发送成功,则触发事务监听器执行本地事务
    2.3 根据本地事务状态,向broker发送commit或者rollback请求(详情见下方4)

    public TransactionSendResult sendMessageInTransaction(final Message msg,
        final LocalTransactionExecuter localTransactionExecuter, final Object arg)
        throws MQClientException {
        // 事务监听器
    	TransactionListener transactionListener = getCheckListener();		
    	// 添加TRAN_MSG=true事务消息标识,以及PGROUP=please_rename_unique_group_name
    	MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
        // 发送半消息
        SendResult sendResult = this.send(msg);
        
        // 消息发送成功处理
    	switch (sendResult.getSendStatus()) {
    		case SEND_OK: {
    			//消息的UNIQ_KEY属性作为事务ID
    			String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
    			if (null != transactionId && !"".equals(transactionId)) {
                     msg.setTransactionId(transactionId);
                }
                if (transactionListener != null) {
                  log.debug("Used new transaction API");
                  // 执行本地事务
                  localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
                }
    		}
    	}
    	// 根据本地事务状态,向broker发送commit或者rollback请求
    	this.endTransaction(msg, sendResult, localTransactionState, localException);
    }
    
  3. 发送事务消息后,broker是怎样响应的呢?
    3.1 首先看一下SendMessageProcessor#asyncSendMessage,发现是根据事务标识做分别处理

    private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
                                                                SendMessageContext mqtraceContext,
                                                                SendMessageRequestHeader requestHeader) {
    	Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
    	String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    	CompletableFuture<PutMessageResult> putMessageResult = null;
    	// 判断消息是否有事务标识,有则按照事务消息处理,否则按照普通消息处理
    	if (transFlag != null && Boolean.parseBoolean(transFlag)) {
    		    		putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
    	}else {
            putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
        }
    }
    

    3.2 处理事务消息TransactionalMessageBridge#asyncPutHalfMessage
    (1)将消息转换为半消息parseHalfMessageInner,更换主题以及队列,防止被消费端消费

    private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
    	// 备份原消息的topic和queueId
        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
            String.valueOf(msgInner.getQueueId()));
       // 设置消息的系统标识为TRANSACTION_NOT_TYPE    
        msgInner.setSysFlag(
            MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
       // 重新赋值消息的topic为RMQ_SYS_TRANS_HALF_TOPIC以及queueId为0    
        msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
        msgInner.setQueueId(0);
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
        return msgInner;
    }
    

    (2)DefaultMessageStore#asyncPutMessage将半消息写入commitLog

  4. 发送commit或rollback请求:DefaultMQProducerImpl#endTransaction
    4.1 根据事务状态设置CommitOrRollback属性,向broker发送请求

    public void endTransaction(
        final Message msg,
        final SendResult sendResult,
        final LocalTransactionState localTransactionState,
        final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
        // 找到半消息所在队列的broker地址
        final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
        // 组装事务提交/回滚请求报文
        EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
        requestHeader.setTransactionId(transactionId);
        requestHeader.setCommitLogOffset(id.getOffset());
        // 根据本地事务状态,设置commitOrRollback属性进行提交或者回滚
        switch (localTransactionState) {
            case COMMIT_MESSAGE:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
                break;
            case ROLLBACK_MESSAGE:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
                break;
            case UNKNOW:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
                break;
            default:
                break;
        }
        // 以oneway的方式发送broker(单向:只发送请求不等待应答)
        this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
            this.defaultMQProducer.getSendMsgTimeout());
    }
    

    4.2 broker接收EndTransaction的入口在EndTransactionProcessor#processRequest
    (1)根据请求的偏移量获取指定的half消息
    (2)将half消息 转换为原始消息(topic、queueId更换)
    (3)将消息发送到真正的Topic里,该消息可以开始下发给消费者
    (4)如果落盘成功,则删除prepare消息,其实是将消息写入到Op Topic里
    (5)如果是Rollback,则直接将消息转换为原消息,并写入到Op Topic里

    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
        RemotingCommandException {
        OperationResult result = new OperationResult();
    	if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
    		// 获取半消息
    		result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
    		if (result.getResponseCode() == ResponseCode.SUCCESS) {
    			// 将prepare消息转换为原消息,该消息的Topic就是真正消息的Topic
    			MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
    			// 将消息发送到真正的Topic里,该消息可以开始下发给消费者
    			RemotingCommand sendResult = sendFinalMessage(msgInner);
    			if (sendResult.getCode() == ResponseCode.SUCCESS) {
    			//如果落盘成功,则删除prepare消息,其实是将消息写入到Op Topic里
                  this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
                }
    		}
    	}
    }
    

流程补充

  1. RocketMQ事务消息方案中引入Op消息的概念,用Op消息标识事务消息已经确定状态(commit或者Rollback)
    1.1 如果一条事务消息没有对应的Op消息,说明这个事务的状态还无法确定(可能是二阶段失败了)
    1.2 实际上RocketMQ也无法去真正的删除一条消息,因为是顺序写文件的
  2. Op消息的存储和对应关系
    RocketMQ的事务消息的摸索
  3. 如何处理二阶段失败的消息?
    3.1 如果出现网络问题导致Commit失败,Broker端对未确定状态的消息发起回查
    (1)将消息发送到对应的Producer端(同一个Group的Producer)
    (2)Producer根据消息来检查本地事务的状态,进而执行Commit或者Rollback
    3.2 Broker端通过对比Half消息和Op消息进行事务消息的回查并且推进CheckPoint
    3.3 默认回查15次,超过则默认回滚该消息

束语

  1. 自己没有实战过RocketMQ事务消息,本篇文章相当于是技术积累。在以后实现中积累方案
  2. 有说的不对的地方帮忙指正,实战的场景欢迎大家多多讨论
    RocketMQ的事务消息的摸索RocketMQ的事务消息的摸索
上一篇:RocketMQ发消息连接失败之指定borker的ip地址


下一篇:第十周 psp