试玩RocketMQ, 事务消息, 以及NOT_CONSUME_YET消息不能被消费等问题

试玩RocketMQ, 事务消息, 以及NOT_CONSUME_YET消息不能被消费等问题


rocketmq版本是 4.9.2, rocketmq-spring-boot-starter版本是 2.2.1

一. 代码

生产者代码

  • App类
package com.zgd.springboot.demo.simple;

import java.util.Date;

import com.zgd.springboot.demo.simple.mq.MQProducer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;

@SpringBootApplication
public class Application {


  public static void main(String[] args) {
    ApplicationContext context = SpringApplication.run(Application.class, args);
    MQProducer mqProducer = context.getBean(MQProducer.class);
    mqProducer.sendTransactionMsg("我来测试一下事务消息 777" + new Date().toLocaleString());
    // for (int i = 0; i < 1; i++) {
    //   mqProducer.sendMsg("我来测试一下"+ new Date().toLocaleString()+" -- "+i);
    // }
  }

}
  • 普通消息发送类
package com.zgd.springboot.demo.simple.mq;

import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.UUID;

import org.apache.commons.lang3.RandomUtils;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import lombok.extern.slf4j.Slf4j;

/**
 * RocketMqListener
 * @date: 2020/11/26
 * @author weirx
 * @version 3.0
 */
@Component
@Slf4j
public class MQProducer {

  @Autowired
  private RocketMQTemplate rocketMQTemplate;

  /**
   * 发送普通消息
   */
  public void sendMsg(String msgBody) {
    log.info("发送普通消息");
    rocketMQTemplate.syncSend(
      "queue_test_topic",
      MessageBuilder
        .withPayload(msgBody.getBytes(StandardCharsets.UTF_8))
        .setHeader(RocketMQHeaders.MESSAGE_ID, System.currentTimeMillis() + "")
        .setHeader(
          RocketMQHeaders.KEYS,
          "key-" + System.currentTimeMillis() + ""
        )
        .build()
    );
  }

  public void sendTransactionMsg(String msgBody) {
    log.info("发送事务消息");
    //模拟这个是事务id
    UUID id = UUID.randomUUID();
    int nextInt = RandomUtils.nextInt(0, 100);
    //会一直卡在这个方法, 直到回查结束, 或者方法提交
    System.out.println("发送"+new Date().toLocaleString());
    rocketMQTemplate.sendMessageInTransaction(
      "queue_test_tc_topic",
      MessageBuilder
        .withPayload(msgBody.getBytes(StandardCharsets.UTF_8))
        .setHeader(RocketMQHeaders.TRANSACTION_ID, id)
        .setHeader(RocketMQHeaders.MESSAGE_ID, System.currentTimeMillis() + "")
        .setHeader(
          RocketMQHeaders.KEYS,
          "key-" + System.currentTimeMillis() + ""
        )
        .build(),
      nextInt
    );
    System.out.println("");
    System.out.println("结束发送"+new Date().toLocaleString());

  }
}

  • 事务消息发送类
package com.zgd.springboot.demo.simple.mq;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;

@RocketMQTransactionListener
public class TransactionListenerImpl  implements RocketMQLocalTransactionListener {

  private static Map<String, RocketMQLocalTransactionState> STATE_MAP = new HashMap<>();
  private static Map<String, Integer> BACK_QUERY_MAP = new HashMap<>();

  /**
   *  执行业务逻辑
   *
   * @param message
   * @param o
   * @return
   */
  @Override
  public RocketMQLocalTransactionState executeLocalTransaction(
    Message message,
    Object o
  ) {
    String transId = (String) message
      .getHeaders()
      .get(RocketMQHeaders.TRANSACTION_ID);
    System.out.println("监听处理本地事务" + new Date().toLocaleString());
    System.out.println("");
    try {
      System.out.println(
        "执行操作,模拟开启事务.事务id: " + transId + " 收到了传过来的对象: " + o
      );
      Thread.sleep(61000);
      // 设置事务状态
      if (o.toString().hashCode() % 2 == 0) {
        throw new RuntimeException("模拟异常");
      }
      System.out.println("模拟完成了事务.提交" + new Date().toLocaleString());
      STATE_MAP.put(transId, RocketMQLocalTransactionState.COMMIT);
      // 返回事务状态给生产者
      return RocketMQLocalTransactionState.COMMIT;
    } catch (Exception e) {
      e.printStackTrace();
    }
    System.out.println("模拟事务出错.回滚" + new Date().toLocaleString());
    STATE_MAP.put(transId, RocketMQLocalTransactionState.ROLLBACK);
    return RocketMQLocalTransactionState.ROLLBACK;
  }

  /**
   * rocket会在executeLocalTransaction方法执行后,隔段时间来回查, 即便是已经异常回滚了
   * ,每次执行完默认会等待1min(transactionCheckMax参数)执行下一次,默认6s(transactionTimeOut参数)为事务检查的最小时间,默认最大检查次数为15次(transactionCheckMax参数)
   * @param message
   * @return
   */
  @Override
  public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
    System.out.println("---------");
    MessageHeaders headers = message.getHeaders();
    Set<Entry<String, Object>> entrySet = headers.entrySet();
    System.out.println("收到回查" + new Date().toLocaleString());
    for (Entry<String, Object> entry : entrySet) {
      System.out.println(entry.getKey() + " > " + entry.getValue());
    }

    String transId = (String) message
      .getHeaders()
      .get(RocketMQHeaders.TRANSACTION_ID);
    System.out.println(
      "回查消息 -> transId = " + transId + ", state = " + STATE_MAP.get(transId)
    );
    System.out.println("");
    if(BACK_QUERY_MAP.get(transId) == null){
      BACK_QUERY_MAP.put(transId,1);
    }else{
      BACK_QUERY_MAP.put(transId,BACK_QUERY_MAP.get(transId)+1);
    }
    //如果回查超过3次就直接回滚

    return STATE_MAP.get(transId) == null ? (BACK_QUERY_MAP.get(transId) < 3 ? RocketMQLocalTransactionState.UNKNOWN : RocketMQLocalTransactionState.ROLLBACK) : STATE_MAP.get(transId);
  }
}

消费者代码

  • App类
    普通的启动类, 忽略
  • 普通消息监听类
package com.zgd.springboot.demo.simple.mq;

import java.nio.charset.StandardCharsets;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
import org.springframework.stereotype.Component;

import lombok.extern.slf4j.Slf4j;

/**
 * RocketMqListener
 * @date: 2020/11/26
 * consumer重试(两种:监听、自定义消费者)
 * 这是监听的方式
 * @author weirx
 * @version 3.0
 */
@Slf4j
@Component
@RocketMQMessageListener(
  topic = "queue_test_topic",
  selectorExpression = "*",
  consumerGroup = "queue_group_test",
  maxReconsumeTimes=2
  
)
public class CommonMQListener implements RocketMQListener<MessageExt> , RocketMQPushConsumerLifecycleListener {

  @Override
  public void onMessage(MessageExt messageExt) {
    byte[] body = messageExt.getBody();
    String msg = new String(body,StandardCharsets.UTF_8);
    int reconsumeTimes = messageExt.getReconsumeTimes();
    log.info("接收到消息:{} | 重新消费次数: {}", msg, reconsumeTimes);
    //模拟异常 会自动重试. 超过重试次数会:DefaultRocketMQListenerContainer : consume message failed, 然后放到topic加DLQ作为后缀的名字的死信队列中
  
    // int i = 1/0;
  }

  @Override
  public void prepareStart(DefaultMQPushConsumer consumer) {
      // 每次拉取的间隔,单位为毫秒
      consumer.setPullInterval(3000);
      // 设置每次从队列中拉取的消息数为4. 而默认每个主题下创建队列为4个, writeQueueNums=readQueueNums =4, 所以每次拉取 4 * 4 = 16
      consumer.setPullBatchSize(4);
  }
}

  • 事务消息监听类
package com.zgd.springboot.demo.simple.mq;

import java.nio.charset.StandardCharsets;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
import org.springframework.stereotype.Component;

import lombok.extern.slf4j.Slf4j;

/**
 * RocketMqListener
 * @date: 2020/11/26
 * consumer重试(两种:监听、自定义消费者)
 * 这是监听的方式
 * @author weirx
 * @version 3.0
 */
@Slf4j
@Component
@RocketMQMessageListener(
  topic = "queue_test_tc_topic",
  selectorExpression = "*",
  //同一个消费组必须订阅同一个topic
  consumerGroup = "queue_group_tc_test",
  maxReconsumeTimes=2
  
)
public class TransactionMQListener implements RocketMQListener<MessageExt> , RocketMQPushConsumerLifecycleListener {

  @Override
  public void onMessage(MessageExt messageExt) {
    byte[] body = messageExt.getBody();
    String msg = new String(body,StandardCharsets.UTF_8);
    int reconsumeTimes = messageExt.getReconsumeTimes();
    String keys = messageExt.getKeys();
    String transactionId = messageExt.getTransactionId();
    log.info("接收到事务消息:{} | key: {} | tranId: {} | 重新消费次数: {}", msg, keys,transactionId, reconsumeTimes);
    //模拟异常 会自动重试. 超过重试次数会:DefaultRocketMQListenerContainer : consume message failed, 然后放到topic加DLQ作为后缀的名字的死信队列中
  
    // int i = 1/0;
  }

  @Override
  public void prepareStart(DefaultMQPushConsumer consumer) {
      // 每次拉取的间隔,单位为毫秒
      consumer.setPullInterval(1000);
      // 设置每次从队列中拉取的消息数为4. 而默认每个主题下创建队列为4个, writeQueueNums=readQueueNums =4, 所以每次拉取 4 * 4 = 16
      consumer.setPullBatchSize(4);
      log.info("初始化事务消费者");
  }
}

二. 笔记

2.1 生产者

2.1.1 事务消息

RocketMQ 3.0.8之后 到4.3.0 之前不支持事务消息

2.1.1.1 事务消息回查

如果按事务消息的模式发送
消息是以HALF消息发送到broker的一个HALF队列中, 状态为UN_KNOW, 此时还没投递到目标队列中.
试玩RocketMQ, 事务消息, 以及NOT_CONSUME_YET消息不能被消费等问题

所以消息对消费者不可见. 生产者确认提交则消息才能被消费者消费到. 如果回滚则消息销毁.
同时rocketmq定时(每分钟)回查一次生产者, 每次回查都会重新投递到HALF队列, 避免提交或回滚没有触发成功.

2.1.1.2 消息重复问题

一开始我为了方便测试, 把本地事务的sleep时间设置成120s, 结果发现总是会出现2条重复消息. 后面仔细排查发现:

当本地事务耗时比较久, 就会导致触发回查, 并往HALF主题插入一条重复消息.

  • 第一条投入目标队列的消息
    当本地事务执行完成, 返回COMMIT, 就会丢入目标topic中给消费者消费.
  • 第二条投入目标队列的消息
    在定时任务下, MQ再次回查也得到了了COMMIT的回复, 就也会往目标队列丢一条消息, 就造成了重复消息.

如下图所示:
试玩RocketMQ, 事务消息, 以及NOT_CONSUME_YET消息不能被消费等问题

试玩RocketMQ, 事务消息, 以及NOT_CONSUME_YET消息不能被消费等问题

避免办法: 尽量保证本地事务在一分钟内完成

2.2 消费者

2.2.1 消费重试

消费者如果在消费的时候抛出异常, 会尝试重新消费, 间隔时间随次数增加而变长. 默认会重试16次. 失败以后丢入死信队列中.

2.2.2 批量拉取

消费者可以设置PullBatchSize参数, 表示从每个队列中一次拉取的消息数. 而一个broker默认有4个(broker默认是创建8个队列, 但是生产者的DefaultMQProducer默认创建4个, 以少的为准)

2.2.3 无法消费消息, 消息的状态是NOT_CONSUME_YET

这个我也是一开始并不太清楚下面的注解中, consumerGroup消费组和topic之间的关系, 在测试事务消费的时候, 直接复制拿过来用, 改了下topic. 也就是同一个组下订阅了两个topic

@RocketMQMessageListener(
  topic = "queue_test_topic",
  selectorExpression = "*",
  consumerGroup = "queue_group_test",
  maxReconsumeTimes=2
  
)

结果就出现了上面说的有些消息无法消费的问题, 看了下控制台, 消费者那边, 发现queue_test_topic的消费终端是0个, 而queue_test_tc_topic的消费终端是2个
再看queue_test_topic的队列中, 自然是没有消费端, 而queue_test_tc_topic的队列中, 4个队列(默认4个队列)中只有2个队列有订阅.

原则
同一个组, 订阅关系必须一样

RocketMQ里, 消费组group订阅关系信息是一个Map. 如果消费者实例A和B同在一个组里, 分别订阅了a和b, 那么在注册的时候晚加载的B-b关系就会覆盖A-a, 导致A和B都订阅b.
上面我的情况就是普通消费者和事务消费者最后都订阅了
queue_test_tc_topic

错误的情况:
试玩RocketMQ, 事务消息, 以及NOT_CONSUME_YET消息不能被消费等问题
试玩RocketMQ, 事务消息, 以及NOT_CONSUME_YET消息不能被消费等问题

正常的情况:
试玩RocketMQ, 事务消息, 以及NOT_CONSUME_YET消息不能被消费等问题
试玩RocketMQ, 事务消息, 以及NOT_CONSUME_YET消息不能被消费等问题

  • 如果 同一个组, 多个消费者, 订阅同一个topic: 可以, 会把消费者自动分配到topic的每个queue
  • 如果 同一个组, 多个消费者, 订阅不同topic: 不行, 会混乱, 导致topic订阅关系被覆盖, 且topic里queue的匹配关系也会分配不全. 导致消息无法被消费
  • 如果 同两个组, 各自一个消费者, 分别订阅两个topic: 可以, 会自动将唯一一个消费者匹配到所有queue队列
  • 如果 不同组, 订阅同一个topic: 可以. RocketMQ会让每个组的消费者都自动分配到topic的每个queue, 可以同时消费到消息
上一篇:蓝桥杯 第八讲 数论


下一篇:CF1359D Yet Another Yet Another Task