RocketMQ(十)RocketMQ分布式事务实现

目录

1、事务场景

场景:工行用户A向建行用户B转账1万元;
RocketMQ(十)RocketMQ分布式事务实现
步骤:

  • 1.工行系统发送一个给B增款1万元的同步消息M给Broker
  • 2.消息被Broker成功接收后,向工行系统发送成功ACK
  • 3.工行系统收到成功ACK后从用户A中扣款1万元
  • 4.建行系统从Broker中获取到消息M
  • 5.建行系统消费消息M,即向用户B中增加1万元

这其中是有问题的:若第3步中的扣款操作失败,但消息已经成功发送到了Broker。对于MQ来说,只要消息写入成功,那么这个消息就可以被消费;此时建行系统中用户B会增加了1万元,出现了数据不一致问题。

2、解决思路

解决思路: 让上述步骤的第1、2、3步具有原子性,要么全部成功,要么全部失败。即消息发送成功后,必须要保证扣款成功。如果扣款失败,则回滚发送成功的消息,简单来说就是先预提交一个消息,当本地事务没有异常的情况下,再次发送一个确认消息;该思路即使用事务消息,这里要使用分布式事务解决方案;
RocketMQ(十)RocketMQ分布式事务实现
使用事务消息来处理该需求场景过程:

  1. 事务管理器TM向事务协调器TC发起指令,开启全局事务

  2. 工行系统发一个给B增款1万元的事务消息M给TC

  3. TC会向Broker发送半事务消息prepareHalf,将消息M预提交到Broker,此时的建行系统是看不到Broker中的消息M的

  4. Broker会将预提交执行结果Report给TC。

  5. 如果预提交失败,则TC会向TM上报预提交失败的响应,全局事务结束;如果预提交成功,TC会调用工行系统的回调操作,去完成工行用户A的预扣款1万元的操作

  6. 工行系统会向TC发送预扣款执行结果,即本地事务的执行状态

  7. TC收到预扣款执行结果后,会将结果上报给TM

  8. TM会根据上报结果向TC发出不同的确认指令

    • 若预扣款成功(本地事务状态为COMMIT_MESSAGE),则TM向TC发送Global Commit指令
    • 若预扣款失败(本地事务状态为ROLLBACK_MESSAGE),则TM向TC发送Global Rollback指令
    • 若现未知状态(本地事务状态为UNKNOW),则会触发工行系统的本地事务状态回查操作。回查操作会将回查结果,即COMMIT_MESSAGE或ROLLBACK_MESSAGE Report给TC。TC将结果上报给TM,TM会再向TC发送最终确认指令Global Commit或Global Rollback
  9. TC在接收到指令后会向Broker与工行系统发出确认指令

    • TC接收的若是Global Commit指令,则向Broker与工行系统发送Branch Commit指令。此时Broker中的消息M才可被建行系统看到;此时的工行用户A中的扣款操作才真正被确认;
    • TC接收到的若是Global Rollback指令,则向Broker与工行系统发送Branch Rollback指令。此时Broker中的消息M将被撤销;工行用户A中的扣款操作将被回滚;

3、事务消息概念

RocketMQ的事务消息,主要是通过消息的异步处理,可以保证本地事务和消息发送同时成功执行或失败,从而保证数据的最终一致性,具体流程如下:
RocketMQ(十)RocketMQ分布式事务实现
事务消息共有三种状态,提交状态、回滚状态、中间状态:

  • RocketMQLocalTransactionState.COMMIT: 提交事务,它允许消费者消费此消息。
  • RocketMQLocalTransactionState.ROLLBACK: 回滚事务,它代表该消息将被删除,不允许被消费。
  • RocketMQLocalTransactionState.UNKNOWN: 中间状态,它代表需要检查消息队列来确定状态。

事务过程:
通过sendMessageInTransaction方法将消息发送到broker,并回调事务监听器的方法,此时消息为半消息状态,需要进行二次确认才能发送到队列并由消费者进行消费。

如果MQ收到的事务状态一直是UNKNOWN,那么将不断的向MQ发送方发起回调检查本地事务状态,直到收Commit/Rollback状态的消息或者人工干预删除UNKNOWN状态的消息;

注意本地SQL事务一定要在MQ监听器的回调方法中执行

4、事务消息实现

4.1 代码说明:

核心类为TxConsumerTxProducerTxProducerListener实现流程。TxProducer调用sendMessageInTransaction方法后进入到TxProducerListener

注意本地SQL事务一定要在MQ监听器的回调方法中执行

需要注意的是: 一个RocketMQTemplate只能注册一个事务监听器,如果存在多个事务监听器监听不同的Producer,需要通过注解@ExtRocketMQTemplateConfiguration定义不同的RocketMQTemplate, ,比如:

  // 定义RocketMQTemplate
  @ExtRocketMQTemplateConfiguration
  public class XXXRocketMQTemplate extends RocketMQTemplate {
  
  }
  
  // 消费者发送事务消息
  // 引入自定义的RocketMQTemplate
  @Resource(name = "xxxRocketMQTemplate")
  RocketMQTemplate rocketMQTemplate;
  
  public void send() {
    Message<String> message = MessageBuilder.withPayload("text").build();
    rocketMQTemplate.sendMessageInTransaction("topic", message, null);
  }

要使用事务消息,需要自定义消息监听器,并且与RocketMQTemplate进行绑定:

// 监听器,默认的为rocketMQTemplate
// 如果使用ExtRocketMQTemplateConfiguration是自定义了RocketMQTemplate,则需要绑定自定义的bean名称
@RocketMQTransactionListener(rocketMQTemplateBeanName="txRocketMQTemplate")
public class TxProducerListener implements RocketMQLocalTransactionListener {
   
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        .....
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        .....
    }
}

4.2 完整代码

TxConsumer:

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Component
@RocketMQMessageListener(topic = "tx_topic", consumerGroup = "tx_group")
@Slf4j
public class TxConsumer implements RocketMQListener<String> {

    /**
     *
     * @param message
     */
    @Override
    public void onMessage(String message) {
        log.info("消息事务-接受到消息:" + message);
    }
}

TxRocketMQTemplate:

// 一个RocketMQTemplate只能注册一个事务监听器,如果存在多个事务监听器监听不同的`Producer`
// 需要通过注解`@ExtRocketMQTemplateConfiguration`定义不同的RocketMQTemplate
@ExtRocketMQTemplateConfiguration
public class TxRocketMQTemplate extends RocketMQTemplate {

}

TxProducerListener:

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.Message;

import java.util.concurrent.ConcurrentHashMap;

@Slf4j
@RocketMQTransactionListener(rocketMQTemplateBeanName = "txRocketMQTemplate")
public class TxProducerListener implements RocketMQLocalTransactionListener {

    /**
     * 记录各个事务Id的状态:1-正在执行,2-执行成功,3-执行失败
     */
    private ConcurrentHashMap<String, Integer> transMap = new ConcurrentHashMap<>();

    /**
     * 执行本地事务
     *
     * @param msg
     * @param arg
     * @return
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // 执行本地事务
        String transId = msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID).toString();
        log.info("消息事务id为:" + transId);
        // 状态为正在执行
        transMap.put(transId, 1);
        try {
            // 本地SQL事务一定要在MQ监听器的回调方法中执行
            log.info("正在执行本地事务");

            // 模拟耗时操作估计出发mq回查操作:当RocketMQ长时间(1分钟)没有收到本地事务的返回结果
            // TimeUnit.SECONDS.sleep(80);

            // 模拟业代码执行,比如模拟插入user数据到数据库中,并且失败的情况
            // System.out.println(1 / 0);

            log.info("事务执行完成.");
        } catch (Exception e) {
            // 状态为执行失败
            transMap.put(transId, 3);
            log.error("事务执行异常.");

            // 出现异常
            // 如果不需要重试 则设置为:ROLLBACK
            // 如果需要检查事务重试,1分钟后发起检查 则设置为:UNKNOWN
            return RocketMQLocalTransactionState.UNKNOWN;
        }
        // 状态为执行成功
        transMap.put(transId, 2);
        return RocketMQLocalTransactionState.COMMIT;
    }


    /**
     * 事务超时,回查方法
     * 检查本地事务,如果RocketMQ长时间(1分钟左右)没有收到本地事务的返回结果,则会定时主动执行改方法,查询本地事务执行情况。
     *
     * @param msg
     * @return
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {

        //根据transaction的id回查该事务的状态,并返回给消息队列
        //未知状态:查询事务状态,但始终无结果,或者由于网络原因发送不成功,对mq来说都是未知状态
        //正确提交返回LocalTransactionState.COMMIT_MESSAGE
        //事务执行失败返回LocalTransactionState.ROLLBACK_MESSAGE
        String transId = (String) msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
        Integer status = transMap.get(transId);
        // 执行状态 1-正在执行,2-执行成功,3-执行失败
        log.info("回查的事务id为:" + transId + ",当前的状态为:" + status);
        //正在执行
        if (status == 1) {
            log.info("回查结果为:正在执行状态");
            return RocketMQLocalTransactionState.UNKNOWN;
        } else if (status == 2) {
            //执行成功,返回commit
            log.info("回查结果为:成功状态");
            transMap.remove(transId);
            return RocketMQLocalTransactionState.COMMIT;
        } else if (status == 3) {
            //执行失败,返回rollback
            log.info("回查结果为:失败状态");
            return RocketMQLocalTransactionState.ROLLBACK;
            // 通过伪代码表示 检查本地事务执行情况
            // User user = selectByUserId(userId);
            // if (user!=null) {
            //     //插入成功(本地事务完成)
            //     transMap.remove(transId);
            //     return RocketMQLocalTransactionState.COMMIT;
            // } else {
            //      // 插入失败
            //      // 如果不需要再重试 则设置为:ROLLBACK
            //      // 如果还需要检查事务重试 则设置为:UNKNOWN
            //     return RocketMQLocalTransactionState.UNKNOWN;
            // }
        }

        //  其他未知情况,统一返回不重试,删除消息
        transMap.remove(transId);
        return RocketMQLocalTransactionState.ROLLBACK;
    }
}

TxProducer:

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.UUID;


@Service
@Slf4j
public class TxProducer {
    /**
     * 一个RocketMQTemplate只能注册一个事务监听器,
     * 如果存在多个事务监听器监听不同的`Producer`,
     * 需要通过注解`@ExtRocketMQTemplateConfiguration`定义不同的RocketMQTemplate
     */
    @Resource(name = "txRocketMQTemplate")
    RocketMQTemplate rocketMQTemplate;

    public void tx() {
        String text = "消息事务发送" + System.currentTimeMillis();
        log.info(text);
        UUID transactionId = UUID.randomUUID();
        log.info("事务ID:" + transactionId);
        Message<String> message = MessageBuilder.withPayload(text)
                // 设置事务Id
                .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
                .build();
        // 调研sendMessageInTransaction后进行到监听器中
        rocketMQTemplate.sendMessageInTransaction("tx_topic", message, null);
        log.info("已发送...");
    }
}
上一篇:对抗拖库 —— Web 前端慢加密


下一篇:1.MQ 的相关概念