阿里云MQ消息重试机制验证


阿里云mq消息在业务逻辑处理失败之后的处理方式,如官方文档

本文的目的是结合实际业务场景验证阿里公共云MQ产品的消息消费重试机制。

以下共采用以下三种测试场景,重试失败之后的重试次数和间隔时间,消息消费超时情况下的重试策略,应用不稳定情况下的消息重试情况。

以阿里云mq公网环境为例,测试配置如下:

public static final String TOPIC = "topic_yb";    

public static final String PRODUCER_ID = "PID_yinbing";    

public static final String CONSUMER_ID = "CID_yinbing";      

public static final String ACCESS_KEY = "xxxx";    

public static final String SECRET_KEY = "xxxx";    

public static final String TAG = "mq_test_tag";

public static final String ONSADDR = "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet";

 

1 测试方案一

1.1 测试用例

1、生产者发送一条普通消息。

2、消费者订阅消息,模拟消费失败,并直接返回 Action.ReconsumeLater 。

3、观察现象:当消费者未能正常消费时,观察消息重试次数及时间间隔。

1.2 测试代码

1、生产者代码如下:

String msgpre = "mq send exception message test 1";

Message message = new Message(MqConfig.TOPIC, MqConfig.TAG, msgbody.getBytes());

 try {

     SendResult sendResult = producer.send(message);

     assert sendResult != null;

     System.out.println(new Date() + ", msgid is "+ sendResult.getMessageId());

 } catch (ONSClientException e) {

     System.out.println("发送失败");

 }

代码描述:发送1次普通消息

2、消费者代码:

consumer.subscribe(MqConfig.TOPIC, MqConfig.TAG, new MessageListenerImpl() {

   @Override

   public Action consume(Message message, ConsumeContext consumeContext) {

           System.out.println(new Date() + ", msgid is " + message.getMsgID() + ", reconsume times is " + message.getReconsumeTimes());

           return Action.ReconsumeLater;

        }

   }

代码描述:消费者订阅该消息,模拟消费失败,直接返回Action.ReconsumeLater

 

 

1.3 测试过程

1、生产者发送消息

代码日志描述:发送1次普通消息

阿里云MQ消息重试机制验证

2、消费者订阅消息

阿里云MQ消息重试机制验证

3、消息重试间隔时间

消费序次

消费时间

消费间隔

官网文档间隔

0

13:44:52

 

 

1

13:45:19

27

10

2

13:45:49

30

30

3

13:46:49

1

1

4

13:48:49

2

2

5

13:51:49

3

3

6

13:55:49

4

4

7

14:00:49

5

5

8

14:06:50

6分1秒

6

9

14:13:59

7分9秒

7

10

14:21:59

8

8

11

14:30:59

9

9

12

14:41:00

10分1秒

10

13

15:01:00

20

20

14

15:31:00

30

30

15

16:31:00

1小时

1小时

16

18:31:00

2小时

2小时

17

 

 

4、消息轨迹查询

查询重试的消息,包括16次重试次数,一共消费了17次

阿里云MQ消息重试机制验证

 

1.4 测试结果

经测试,消费者短时间内返回消费失败结果的情况下,得出如下结论:

1、消费者未能正常消费时,MQ将重新投递消息;

2、消费者重试时间间隔与阿里云官方文档基本一致;

3、消费者重试超过16次之后不再进行重试,与阿里云官方文档一致;

2 测试方案二

2.1 测试用例

1、生产者发送一条普通消息。

2、消费者订阅该消息,并睡眠3分钟(模拟复杂业务消费),消息消费超时时间设置为2分钟。

3、观察消费超时情况下的消息重试策略。

2.2 测试代码

1、生产者代码如下:

 

代码与方案一一致

2、消费者代码:

consumer.subscribe(MqConfig.TOPIC, MqConfig.TAG, new MessageListener() {

@Override

   public Action consume(Message message, ConsumeContext consumeContext) {

       System.out.println(new Date() + ", sleep begin, msgid is " + message.getMsgID());

       try {

Thread.sleep(3*60*1000);

             } catch (InterruptedException e) {

e.printStackTrace();

             }

           System.out.println(new Date() + ", sleep end, msgid is " + message.getMsgID());

           System.out.println(new Date() + ", consume commit, msgid is " + message.getMsgID() + ", reconsume times is " + message.getReconsumeTimes());

           return Action.CommitMessage;

   }

});

 

代码描述:消费者订阅并消费消息,并睡眠3分钟。

2.3 测试过程

1、生产者发送消息

     代码日志描述:发送一次普通消息

 阿里云MQ消息重试机制验证


2、消费者订阅消息

   消费者19:34:23第一次消费消息,sleep 3分钟后,19:37:23 客户端向mq服务端发送消息消费成功的确认。

阿里云MQ消息重试机制验证

3、查看消息订阅情况

这段时间,2分钟的消费超时机制并没有被触发,此时查看消息的消费记录,发现该消息id只消费一次,处于消费成功的状态。

阿里云MQ消息重试机制验证

 

2.4 测试结果

经测试,消费者执行长时间后(3分钟),返回MQ消费成功信息情况下,得出如下结论:

1、消费者未能在MQ超时时间窗口内返回消费成功的信息时,MQ未重新投递消息;

2、阿里云官方文档描述” 设置每条消息消费的最大超时时间,超过设置时间则被视为消费失败,等下次重新投递再次消费”,与实际验证结果不符。

3 测试方案三

3.1 测试用例

1、在消息重试投递的情况下。

2、消费者订阅该消息,并重启应用。

3、观察现象:应用重启时,是否会出现瞬间同时多次重复消费同一条消息的情况。

3.2 测试代码

测试代码与方案一代码一致

3.3 测试过程

1、生产者代码发送消息

代码日志描述:发送5条普通消息

阿里云MQ消息重试机制验证

2、消费者订阅消息,消费失败,导致消息重复投递

阿里云MQ消息重试机制验证

3、重启消费端应用,查看重新投递的消息

阿里云MQ消息重试机制验证

发现,5条消息在第三次重复投递的过程中,投放了15条消息

 

3.4 测试结果

经测试,消息重复投递的情况下,应用重启后,得出如下结论:

1、应用重启瞬间,同时多次重复消费同一条消息;

 

 

 

上一篇:教你如何拍出好看的风光摄影 理论+前期+后期


下一篇:DRDS向ADS数据迁移指南