消费端消息重试实现
生产端消息重试
重试两次
消费端消息重试
重试16次 然后加入死信
消费端模拟重试代码
1. @Component 2. public class MQConsumeMsgListenerProcessor implements MessageListenerConcurrently { 3. private static final Logger logger = LoggerFactory.getLogger(MQConsumeMsgListenerProcessor.class); 4. /** 5. * 默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息<br/> 6. * 不要抛异常,如果没有return CONSUME_SUCCESS ,consumer会重新消费该消息,直到return CONSUME_SUCCESS 7. */ 8. @Override 9. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { 10. if(CollectionUtils.isEmpty(msgs)){ 11. logger.info("接受到的消息为空,不处理,直接返回成功"); 12. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; 13. } 14. MessageExt messageExt = msgs.get(0); 15. logger.info("接受到的消息为:"+messageExt.toString()); 16. if(messageExt.getTopic().equals("NewMessage")){ 17. if(messageExt.getTags().equals("TagA")){ 18. //TODO 判断该消息是否重复消费(RocketMQ不保证消息不重复,如果你的业务需要保证严格的不重复消息,需要你自己在业务端去重) 19. 20. SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); 21. System.out.printf(df.format(new Date()) + ", %s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); 22. int reconsume = messageExt.getReconsumeTimes(); 23. System.out.println("重试的次数为"+reconsume); 24. return ConsumeConcurrentlyStatus.RECONSUME_LATER; 25. 26. //TODO 获取该消息重试次数 27. //int reconsume = messageExt.getReconsumeTimes(); 28. //if(reconsume ==3){//消息已经重试了3次,如果不需要再次消费,则返回成功 29. // //记录日志 30. // return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; 31. //} 32. //TODO 处理对应的业务逻辑 33. 34. 35. } 36. } 37. // 如果没有return success ,consumer会重新消费该消息,直到return success 38. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; 39. } 40. 41. 42. }
用以上代码模拟消费端的重试 ,每一次重试的时间与延时队列时间等级一样(16次时间还是比较长的)
重试16次之后投入死信
然后通过业务进行判断 如流水号等