试玩RocketMQ, 事务消息, 以及NOT_CONSUME_YET消息不能被消费等问题
rocketmq版本是
4.9.2
, rocketmq-spring-boot-starter版本是
2.2.1
一. 代码
生产者代码
- App类
package com.zgd.springboot.demo.simple;
import java.util.Date;
import com.zgd.springboot.demo.simple.mq.MQProducer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
@SpringBootApplication
public class Application {
public static void main(String[] args) {
ApplicationContext context = SpringApplication.run(Application.class, args);
MQProducer mqProducer = context.getBean(MQProducer.class);
mqProducer.sendTransactionMsg("我来测试一下事务消息 777" + new Date().toLocaleString());
// for (int i = 0; i < 1; i++) {
// mqProducer.sendMsg("我来测试一下"+ new Date().toLocaleString()+" -- "+i);
// }
}
}
- 普通消息发送类
package com.zgd.springboot.demo.simple.mq;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.UUID;
import org.apache.commons.lang3.RandomUtils;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
/**
* RocketMqListener
* @date: 2020/11/26
* @author weirx
* @version 3.0
*/
@Component
@Slf4j
public class MQProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送普通消息
*/
public void sendMsg(String msgBody) {
log.info("发送普通消息");
rocketMQTemplate.syncSend(
"queue_test_topic",
MessageBuilder
.withPayload(msgBody.getBytes(StandardCharsets.UTF_8))
.setHeader(RocketMQHeaders.MESSAGE_ID, System.currentTimeMillis() + "")
.setHeader(
RocketMQHeaders.KEYS,
"key-" + System.currentTimeMillis() + ""
)
.build()
);
}
public void sendTransactionMsg(String msgBody) {
log.info("发送事务消息");
//模拟这个是事务id
UUID id = UUID.randomUUID();
int nextInt = RandomUtils.nextInt(0, 100);
//会一直卡在这个方法, 直到回查结束, 或者方法提交
System.out.println("发送"+new Date().toLocaleString());
rocketMQTemplate.sendMessageInTransaction(
"queue_test_tc_topic",
MessageBuilder
.withPayload(msgBody.getBytes(StandardCharsets.UTF_8))
.setHeader(RocketMQHeaders.TRANSACTION_ID, id)
.setHeader(RocketMQHeaders.MESSAGE_ID, System.currentTimeMillis() + "")
.setHeader(
RocketMQHeaders.KEYS,
"key-" + System.currentTimeMillis() + ""
)
.build(),
nextInt
);
System.out.println("");
System.out.println("结束发送"+new Date().toLocaleString());
}
}
- 事务消息发送类
package com.zgd.springboot.demo.simple.mq;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
@RocketMQTransactionListener
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
private static Map<String, RocketMQLocalTransactionState> STATE_MAP = new HashMap<>();
private static Map<String, Integer> BACK_QUERY_MAP = new HashMap<>();
/**
* 执行业务逻辑
*
* @param message
* @param o
* @return
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(
Message message,
Object o
) {
String transId = (String) message
.getHeaders()
.get(RocketMQHeaders.TRANSACTION_ID);
System.out.println("监听处理本地事务" + new Date().toLocaleString());
System.out.println("");
try {
System.out.println(
"执行操作,模拟开启事务.事务id: " + transId + " 收到了传过来的对象: " + o
);
Thread.sleep(61000);
// 设置事务状态
if (o.toString().hashCode() % 2 == 0) {
throw new RuntimeException("模拟异常");
}
System.out.println("模拟完成了事务.提交" + new Date().toLocaleString());
STATE_MAP.put(transId, RocketMQLocalTransactionState.COMMIT);
// 返回事务状态给生产者
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("模拟事务出错.回滚" + new Date().toLocaleString());
STATE_MAP.put(transId, RocketMQLocalTransactionState.ROLLBACK);
return RocketMQLocalTransactionState.ROLLBACK;
}
/**
* rocket会在executeLocalTransaction方法执行后,隔段时间来回查, 即便是已经异常回滚了
* ,每次执行完默认会等待1min(transactionCheckMax参数)执行下一次,默认6s(transactionTimeOut参数)为事务检查的最小时间,默认最大检查次数为15次(transactionCheckMax参数)
* @param message
* @return
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
System.out.println("---------");
MessageHeaders headers = message.getHeaders();
Set<Entry<String, Object>> entrySet = headers.entrySet();
System.out.println("收到回查" + new Date().toLocaleString());
for (Entry<String, Object> entry : entrySet) {
System.out.println(entry.getKey() + " > " + entry.getValue());
}
String transId = (String) message
.getHeaders()
.get(RocketMQHeaders.TRANSACTION_ID);
System.out.println(
"回查消息 -> transId = " + transId + ", state = " + STATE_MAP.get(transId)
);
System.out.println("");
if(BACK_QUERY_MAP.get(transId) == null){
BACK_QUERY_MAP.put(transId,1);
}else{
BACK_QUERY_MAP.put(transId,BACK_QUERY_MAP.get(transId)+1);
}
//如果回查超过3次就直接回滚
return STATE_MAP.get(transId) == null ? (BACK_QUERY_MAP.get(transId) < 3 ? RocketMQLocalTransactionState.UNKNOWN : RocketMQLocalTransactionState.ROLLBACK) : STATE_MAP.get(transId);
}
}
消费者代码
- App类
普通的启动类, 忽略 - 普通消息监听类
package com.zgd.springboot.demo.simple.mq;
import java.nio.charset.StandardCharsets;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
/**
* RocketMqListener
* @date: 2020/11/26
* consumer重试(两种:监听、自定义消费者)
* 这是监听的方式
* @author weirx
* @version 3.0
*/
@Slf4j
@Component
@RocketMQMessageListener(
topic = "queue_test_topic",
selectorExpression = "*",
consumerGroup = "queue_group_test",
maxReconsumeTimes=2
)
public class CommonMQListener implements RocketMQListener<MessageExt> , RocketMQPushConsumerLifecycleListener {
@Override
public void onMessage(MessageExt messageExt) {
byte[] body = messageExt.getBody();
String msg = new String(body,StandardCharsets.UTF_8);
int reconsumeTimes = messageExt.getReconsumeTimes();
log.info("接收到消息:{} | 重新消费次数: {}", msg, reconsumeTimes);
//模拟异常 会自动重试. 超过重试次数会:DefaultRocketMQListenerContainer : consume message failed, 然后放到topic加DLQ作为后缀的名字的死信队列中
// int i = 1/0;
}
@Override
public void prepareStart(DefaultMQPushConsumer consumer) {
// 每次拉取的间隔,单位为毫秒
consumer.setPullInterval(3000);
// 设置每次从队列中拉取的消息数为4. 而默认每个主题下创建队列为4个, writeQueueNums=readQueueNums =4, 所以每次拉取 4 * 4 = 16
consumer.setPullBatchSize(4);
}
}
- 事务消息监听类
package com.zgd.springboot.demo.simple.mq;
import java.nio.charset.StandardCharsets;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
/**
* RocketMqListener
* @date: 2020/11/26
* consumer重试(两种:监听、自定义消费者)
* 这是监听的方式
* @author weirx
* @version 3.0
*/
@Slf4j
@Component
@RocketMQMessageListener(
topic = "queue_test_tc_topic",
selectorExpression = "*",
//同一个消费组必须订阅同一个topic
consumerGroup = "queue_group_tc_test",
maxReconsumeTimes=2
)
public class TransactionMQListener implements RocketMQListener<MessageExt> , RocketMQPushConsumerLifecycleListener {
@Override
public void onMessage(MessageExt messageExt) {
byte[] body = messageExt.getBody();
String msg = new String(body,StandardCharsets.UTF_8);
int reconsumeTimes = messageExt.getReconsumeTimes();
String keys = messageExt.getKeys();
String transactionId = messageExt.getTransactionId();
log.info("接收到事务消息:{} | key: {} | tranId: {} | 重新消费次数: {}", msg, keys,transactionId, reconsumeTimes);
//模拟异常 会自动重试. 超过重试次数会:DefaultRocketMQListenerContainer : consume message failed, 然后放到topic加DLQ作为后缀的名字的死信队列中
// int i = 1/0;
}
@Override
public void prepareStart(DefaultMQPushConsumer consumer) {
// 每次拉取的间隔,单位为毫秒
consumer.setPullInterval(1000);
// 设置每次从队列中拉取的消息数为4. 而默认每个主题下创建队列为4个, writeQueueNums=readQueueNums =4, 所以每次拉取 4 * 4 = 16
consumer.setPullBatchSize(4);
log.info("初始化事务消费者");
}
}
二. 笔记
2.1 生产者
2.1.1 事务消息
从RocketMQ 3.0.8
之后 到4.3.0
之前不支持事务消息
2.1.1.1 事务消息回查
如果按事务消息的模式发送
消息是以HALF消息
发送到broker的一个HALF队列
中, 状态为UN_KNOW
, 此时还没投递到目标队列中.
所以消息对消费者不可见. 生产者确认提交则消息才能被消费者消费到. 如果回滚则消息销毁.
同时rocketmq定时(每分钟)回查一次生产者, 每次回查都会重新投递到HALF队列
, 避免提交或回滚没有触发成功.
2.1.1.2 消息重复问题
一开始我为了方便测试, 把本地事务的sleep时间设置成120s, 结果发现总是会出现2条重复消息. 后面仔细排查发现:
当本地事务耗时比较久, 就会导致触发回查, 并往HALF主题插入一条重复消息.
- 第一条投入目标队列的消息
当本地事务执行完成, 返回COMMIT
, 就会丢入目标topic中给消费者消费. - 第二条投入目标队列的消息
在定时任务下, MQ再次回查也得到了了COMMIT
的回复, 就也会往目标队列丢一条消息, 就造成了重复消息.
如下图所示:
避免办法: 尽量保证本地事务在一分钟内完成
2.2 消费者
2.2.1 消费重试
消费者如果在消费的时候抛出异常, 会尝试重新消费, 间隔时间随次数增加而变长. 默认会重试16次. 失败以后丢入死信队列中.
2.2.2 批量拉取
消费者可以设置PullBatchSize
参数, 表示从每个队列中一次拉取的消息数. 而一个broker默认有4个(broker默认是创建8个队列, 但是生产者的DefaultMQProducer
默认创建4个, 以少的为准)
2.2.3 无法消费消息, 消息的状态是NOT_CONSUME_YET
这个我也是一开始并不太清楚下面的注解中, consumerGroup
消费组和topic
之间的关系, 在测试事务消费的时候, 直接复制拿过来用, 改了下topic. 也就是同一个组下订阅了两个topic
@RocketMQMessageListener(
topic = "queue_test_topic",
selectorExpression = "*",
consumerGroup = "queue_group_test",
maxReconsumeTimes=2
)
结果就出现了上面说的有些消息无法消费的问题, 看了下控制台, 消费者那边, 发现queue_test_topic
的消费终端是0个, 而queue_test_tc_topic
的消费终端是2个
再看queue_test_topic
的队列中, 自然是没有消费端, 而queue_test_tc_topic
的队列中, 4个队列(默认4个队列)中只有2个队列有订阅.
原则同一个组, 订阅关系必须一样
RocketMQ里, 消费组group和订阅关系信息是一个Map. 如果消费者实例A和B同在一个组里, 分别订阅了a和b, 那么在注册的时候晚加载的B-b关系就会覆盖A-a, 导致A和B都订阅b.
上面我的情况就是普通消费者和事务消费者最后都订阅了queue_test_tc_topic
错误的情况:
正常的情况:
- 如果 同一个组, 多个消费者, 订阅同一个topic:
可以
, 会把消费者自动分配到topic的每个queue - 如果 同一个组, 多个消费者, 订阅不同topic:
不行
, 会混乱, 导致topic订阅关系被覆盖, 且topic里queue的匹配关系也会分配不全. 导致消息无法被消费 - 如果 同两个组, 各自一个消费者, 分别订阅两个topic:
可以
, 会自动将唯一一个消费者匹配到所有queue队列 - 如果 不同组, 订阅同一个topic:
可以
. RocketMQ会让每个组的消费者都自动分配到topic的每个queue, 可以同时消费到消息