阿里云mq消息在业务逻辑处理失败之后的处理方式,如官方文档。
本文的目的是结合实际业务场景验证阿里公共云MQ产品的消息消费重试机制。
以下共采用以下三种测试场景,重试失败之后的重试次数和间隔时间,消息消费超时情况下的重试策略,应用不稳定情况下的消息重试情况。
以阿里云mq公网环境为例,测试配置如下:
public static final String TOPIC = "topic_yb"; public static final String PRODUCER_ID = "PID_yinbing"; public static final String CONSUMER_ID = "CID_yinbing"; public static final String ACCESS_KEY = "xxxx"; public static final String SECRET_KEY = "xxxx"; public static final String TAG = "mq_test_tag"; public static final String ONSADDR = "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet"; |
1 测试方案一
1.1 测试用例
1ã生产者发送一条普通消息。
2ã消费者订阅消息,模拟消费失败,并直接返回 Action.ReconsumeLater 。
3ã观察现象:当消费者未能正常消费时,观察消息重试次数及时间间隔。
1.2 测试代码
1ã生产者代码如下:
String msgpre = "mq send exception message test 1"; Message message = new Message(MqConfig.TOPIC, MqConfig.TAG, msgbody.getBytes()); try { SendResult sendResult = producer.send(message); assert sendResult != null; System.out.println(new Date() + ", msgid is "+ sendResult.getMessageId()); } catch (ONSClientException e) { System.out.println("发送失败"); } |
代码描述:发送1次普通消息
2ã消费者代码:
consumer.subscribe(MqConfig.TOPIC, MqConfig.TAG, new MessageListenerImpl() { @Override public Action consume(Message message, ConsumeContext consumeContext) { System.out.println(new Date() + ", msgid is " + message.getMsgID() + ", reconsume times is " + message.getReconsumeTimes()); return Action.ReconsumeLater; } } |
代码描述:消费者订阅该消息,模拟消费失败,直接返回Action.ReconsumeLater
1.3 测试过程
1ã生产者发送消息
代码日志描述:发送1次普通消息
2ã消费者订阅消息
3ã消息重试间隔时间
消费序次 |
消费时间 |
消费间隔 |
官网文档间隔 |
0 |
13:44:52 |
|
|
1 |
13:45:19 |
27秒 |
10秒 |
2 |
13:45:49 |
30秒 |
30秒 |
3 |
13:46:49 |
1分 |
1分 |
4 |
13:48:49 |
2分 |
2分 |
5 |
13:51:49 |
3分 |
3分 |
6 |
13:55:49 |
4分 |
4分 |
7 |
14:00:49 |
5分 |
5分 |
8 |
14:06:50 |
6分1秒 |
6分 |
9 |
14:13:59 |
7分9秒 |
7分 |
10 |
14:21:59 |
8分 |
8分 |
11 |
14:30:59 |
9分 |
9分 |
12 |
14:41:00 |
10分1秒 |
10分 |
13 |
15:01:00 |
20分 |
20分 |
14 |
15:31:00 |
30分 |
30分 |
15 |
16:31:00 |
1小时 |
1小时 |
16 |
18:31:00 |
2小时 |
2小时 |
17 |
无 |
|
|
4ã消息轨迹查询
查询重试的消息,包括16次重试次数,一共消费了17次
1.4 测试结果
经测试,消费者短时间内返回消费失败结果的情况下,得出如下结论:
1ã消费者未能正常消费时,MQ将重新投递消息;
2ã消费者重试时间间隔与阿里云官方文档基本一致;
3ã消费者重试超过16次之后不再进行重试,与阿里云官方文档一致;
2 测试方案二
2.1 测试用例
1、生产者发送一条普通消息。
2、消费者订阅该消息,并睡眠3分钟(模拟复杂业务消费),消息消费超时时间设置为2分钟。
3、观察消费超时情况下的消息重试策略。
2.2 测试代码
1、生产者代码如下:
代码与方案一一致
2、消费者代码:
consumer.subscribe(MqConfig.TOPIC, MqConfig.TAG, new MessageListener() { @Override public Action consume(Message message, ConsumeContext consumeContext) { System.out.println(new Date() + ", sleep begin, msgid is " + message.getMsgID()); try { Thread.sleep(3*60*1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(new Date() + ", sleep end, msgid is " + message.getMsgID()); System.out.println(new Date() + ", consume commit, msgid is " + message.getMsgID() + ", reconsume times is " + message.getReconsumeTimes()); return Action.CommitMessage; } }); |
代码描述:消费者订阅并消费消息,并睡眠3分钟。
2.3 测试过程
1、生产者发送消息
代码日志描述:发送一次普通消息
2、消费者订阅消息
消费者19:34:23第一次消费消息,sleep 3分钟后,19:37:23 客户端向mq服务端发送消息消费成功的确认。
3、查看消息订阅情况
这段时间,2分钟的消费超时机制并没有被触发,此时查看消息的消费记录,发现该消息id只消费一次,处于消费成功的状态。
2.4 测试结果
经测试,消费者执行长时间后(3分钟),返回MQ消费成功信息情况下,得出如下结论:
1、消费者未能在MQ超时时间窗口内返回消费成功的信息时,MQ未重新投递消息;
2、阿里云官方文档描述” 设置每条消息消费的最大超时时间,超过设置时间则被视为消费失败,等下次重新投递再次消费”,与实际验证结果不符。
3 测试方案三
3.1 测试用例
1、在消息重试投递的情况下。
2、消费者订阅该消息,并重启应用。
3、观察现象:应用重启时,是否会出现瞬间同时多次重复消费同一条消息的情况。
3.2 测试代码
测试代码与方案一代码一致
3.3 测试过程
1、生产者代码发送消息
代码日志描述:发送5条普通消息
2、消费者订阅消息,消费失败,导致消息重复投递
3、重启消费端应用,查看重新投递的消息
发现,5条消息在第三次重复投递的过程中,投放了15条消息
3.4 测试结果
经测试,消息重复投递的情况下,应用重启后,得出如下结论:
1、应用重启瞬间,同时多次重复消费同一条消息;