RocketMQ事务消息

一、RocketMQ事务消息

RocketMQ事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。
注意:主要针对消息发送者。

RocketMQ的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。

1、RocketMQ事务消息流程概要

RocketMQ事务消息
上图说明了事务消息的大致方案,其中分为两个流程:

  • 正常事务消息的发送及提交
  • 事务消息的补偿流程。

1.1 正常事务流程

  • (1) 发送消息(half 消息):图1。
  • (2) 服务端响应消息写入结果:图2。
  • (3) 根据发送结果执行本地事务(如果写入失败,此时 half 消息对业务不可见,本地逻辑不执行):图3。
  • (4) 根据本地事务状态执行 Commit 或者 Rollback(Commit操作生成消息索引,消息对消费者可见):图4。

1.2 事务补偿流程

  • (1) 对没有 Commit/Rollback 的事务消息(pending 状态的消息),从服务端发起一次“回查”:图5。
  • (2) Producer 收到回查消息,检查回查消息对应的本地事务的状态:图6。
  • (3) 根据本地事务状态,重新 Commit 或者 Rollback::图7。

其中,补偿阶段用于解决消息 Commit 或者 Rollback 发生超时或者失败的情况。

2、 事务消息状态

事务消息共有三种状态,提交状态、回滚状态、中间状态:

  • TransactionStatus.CommitTransaction: 提交状态,它允许消费者消费此消息(完成了图中 1,2,3,4 步,第 4 步是 Commit)。
  • TransactionStatus.RollbackTransaction: 回滚状态,它代表该消息将被删除,不允许被消费(完成了图中 1,2,3,4 步, 第 4 步是 Rollback)。
  • TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态(完成了图中 1,2,3 步, 但是没有 4 或者没有 7,无法 Commit 或 Rollback)。

3、使用限制:

  1. 事务消息不支持延时消息和批量消息。
  2. 事务回查的间隔时间:BrokerConfig. transactionCheckInterval 通过 Broker 的配置文件设置好。
  3. 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax 参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认 情况下同时打印错误日志。用户可以通过重写 AbstractTransactionCheckListener 类来修改这个行为。
  4. 事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用 户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionMsgTimeout 参数。
  5. 事务性消息可能不止一次被检查或消费。
  6. 事务性消息中用到了生产者群组,这种就是一种高可用机制,用来确保事务消息的可靠性。
  7. 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事 务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
  8. 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ 服务器能通过它们的生产者 ID 查询到消费者。

二、代码演示

1、消息发送者

使用 TransactionMQProducer类创建生产者,并指定唯一的 ProducerGroup,就可以设置自定义线程池来处理这些检查请求。执行本地事务后、需要根据执行结果对消息队列进行回复。回传的事务状态。

public class TransactionProducer {

    public static void main(String[] args) throws Exception {
        // 1.创建消息生产者 producer,并指定生产者组名
        TransactionMQProducer producer = new TransactionMQProducer("mq_client_TransactionProducerGroup");
        // 2.指定 Nameserver 地址,集群的话使用;分隔
        producer.setNamesrvAddr("192.168.xxx.xxx:9876");

        //创建线程池
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });
        // 设置生产者回查线程池
        producer.setExecutorService(executorService);
        // 生产者设置事务监听器
        producer.setTransactionListener(new TransactionListenerImpl());
        // 3.启动 producer
        producer.start();

        for (int i = 0; i < 10; i++) {
            //4.创建消息对象,指定主题Topic、Tag和消息体
            String orderId = UUID.randomUUID().toString();
            Message message = new Message("TopicTest","Tag_Transaction2", orderId,
                    ("下单,orderId: " + orderId +",hashcode:" + Objects.hash(orderId)).getBytes(RemotingHelper.DEFAULT_CHARSET));
            //5.发送消息:todo 半事务的发送
            SendResult sendResult = producer.sendMessageInTransaction(message, orderId);
            System.out.println("发送结果:" + sendResult);
            //线程睡1秒
            TimeUnit.SECONDS.sleep(1);
        }
        // 6.关闭生产者 producer实例
//        producer.shutdown();
    }
}

2、添加事务的监听接口

使用匿名内部类或者自定义事务监听接口的实现类。

当发送半消息成功时,我们可以使用 TransactionListener接口中的

  • executeLocalTransaction 方法来执行本地事务。
  • checkLocalTransaction 方法用于检查本地事务状态,并回应消息队列的检查请求。
public class TransactionListenerImpl implements TransactionListener {
	// 本地事务状态,可以放数据库,redis等
	private ConcurrentHashMap<String, Boolean> localTrans = new ConcurrentHashMap<>();

	/**
	 * 在该方法中执行本地事务
	 * @param msg - 消息本身
	 * @param arg - 发送事务消息时,传入的参数值
	 * @return
	 */
	@Override
	public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
		String orderId = (String) arg;

		//记录本地事务执行结果
		boolean success = persistLocalTransactionResult(orderId);
		System.err.println("订单服务执行本地事务下单,orderId: " + orderId + ", result: " + success);
		return success ? LocalTransactionState.UNKNOW : LocalTransactionState.ROLLBACK_MESSAGE;
	}

	/**
	 * 消息事务状态回查:默认回查15次,一分钟检查一次
	 * @param messageExt
	 * @return
	 */
	@Override
	public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
		String orderId = messageExt.getKeys();
		System.err.println("执行事务消息回查,orderId: " + orderId);
		return Boolean.TRUE.equals(localTrans.get(orderId)) ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;
	}

	private boolean persistLocalTransactionResult(String orderId) {
		boolean success = Math.abs(Objects.hash(orderId)) % 2 == 0;
		localTrans.put(orderId, success);
		return success;
	}
}

注意:

  • 本地事务:回滚状态, 中间状态(需要回查的)
  • 回查时,把中间状态 变成 提交状态。所以:投递给消费者的订单只有订单id的 hashcode为偶数。

3、消息消费者

public class TranscationComuser {

    public static void main(String[] args) throws Exception {
        // 1.创建消费者 Consumer,指定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("mq_client_TransactionProducerGroup");
        // 2.指定 Nameserver 地址
        consumer.setNamesrvAddr("192.168.xxx.xxx:9876");
        // 3.订阅主题 Topic 和 Tag
        consumer.subscribe("TopicTest", "Tag_Transaction2");

        // 4.注册消息监听器,设置回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                try {
                    for (MessageExt msg : msgs) {
                        String topic = msg.getTopic();
                        String msgBody = new String(msg.getBody(), "utf-8");
                        String tags = msg.getTags();
                        System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 5.启动消费者 consumer
        consumer.start();
    }
}

RocketMQ事务消息
注意: 只关注了 消息生产者本地事务的成功或回滚,来决定是否真正投递消息给 消费者。
作为消费者,需要:

  • 确保消息的幂等性消费
  • 如果消费失败就重试机制,最终要是消息成为了死信消息时,就需要特殊处理。

参考官方文档:

Stay Hungry, Stay Foolish. 求知若饥,虚心若愚。

上一篇:Redis 入门


下一篇:MS SQL 學習紀錄-6