一、可靠性问题分析
消息的可靠性投递是使用消息中间件不可避免的问题,不管是使用哪种MQ都存在这种问题,接下来要说的就是在RabbitMQ中如何解决可靠性问题;在前面
在前面说过消息的传递过程中有三个对象参与分别是:生产者、RabbitMQ(broker)、消费者;接下来就是要围绕这三个对象来分析消息在传递过程中会在哪些环节出来可靠性问题;
RabbitMQ消息的可靠性投递主要两种实现: 1、通过实现消费的重试机制,通过@Retryable来实现重试,可以设置重试次数和重试频率; 2、生产端实现消息可靠性投递。 两种方法消费端都可能收到重复消息,要求消费端必须实现幂等性消费。
1.1、生产者丢失消息
生产者发送消息到broker时,要保证消息的可靠性,主要的方案有以下2种:
1.事务
2.confirm机制
1.1.1、事务
RabbitMQ提供了事务功能,也即在生产者发送数据之前开启RabbitMQ事务,然后再发送消息,如果消息没有成功发送到RabbitMQ,那么就抛出异常,然后进行事务回滚,回滚之后再重新发送消息,如果RabbitMQ接收到了消息,那么进行事务提交,再开始发送下一条数据。
优点
保证消息一定能够发送到RabbitMQ中,发送端不会出现消息丢失的情况;
缺点
事务机制是阻塞(同步)的,每次发送消息必须要等到mq回应之后才能继续发送消息,比较耗费性能,会导致吞吐量降下来
1.1.2、confirm模式
基于事务的特性,作为补偿,RabbitMQ添加了消息确认机制,也即confirm机制。confirm机制和事务机制最大的不同就是事务是同步的,confirm是异步的,发送完一个消息后可以继续发送下一个消息,mq接收到消息后会异步回调接口告知消息接收结果。生产者开启confirm模式后,每次发送的消息都会分配一个唯一id,如果消息成功发送到了mq中,那么就会返回一个ack消息,表示消息接收成功,反之会返回一个nack,告诉你消息接收失败,可以进行重试。依据这个机制,我们可以维护每个消息id的状态,如果超过一定时间还是没有接收到mq的回调,那么就重发消息。
1.1.3、confirm模式代码演示
其实这块代码在前面几篇文章的代码中有体现过;下面以springboot集成的方式再演示一种
pom.xml文件
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!--web包-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
application.yml
spring: rabbitmq: #rabbitmq 连接配置 publisher-confirm-type: correlated # 开启confirm确认模式 host: 192.168.0.1 port: 5672 username: admin password: admin server: port: 8081实现confirm方法 实现ConfirmCallback接口中的confirm方法,消息只要被 rabbitmq broker接收到就会触ConfirmCallback 回调,ack为true表示消息发送成功,ack为false表示消息发送失败
@Component public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback { /*** * @param correlationData 相关配置信息 * @param ack exchange交换机 是否成功收到了消息。true 成功,false代表失败 * @param cause 失败原因 * */ @Override public void confirm(CorrelationData correlationData ,boolean ack ,String cause) { if (ack){ //消息发送成功 System.out.println ("消息发送成功到交换机"); }else{ System.out.println ("发送失败"+cause); } } }定义 Exchange 和 Queue 定义交换机 confirmTestExchange 和队列 confirm_test_queue ,并将队列绑定在交换机上。
/** * 定义队列和交换机 */ @Configuration public class QueueConfig { @Bean(name="confirmTestExchange") public FanoutExchange confirmTestExchange(){ return new FanoutExchange("confirmTestExchange",true,false); } @Bean(name = "confirmTestQueue") public Queue confirmTestQueue(){ return new Queue("confirm_test_queue",true,false,false); } @Bean public Binding confirmTestFanoutExcangeAndQueue(@Qualifier("confirmTestQueue")Queue queue,@Qualifier("confirmTestExchange") FanoutExchange fanoutExchange){ return BindingBuilder.bind(queue).to(fanoutExchange); } }生产者
@RestController @RequestMapping(value = "/producer") @CrossOrigin public class Producer { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private ConfirmCallbackService confirmCallbackService; @Autowired private ReturnCallbackService returnCallbackService; @GetMapping public void producer(){ rabbitTemplate.setConfirmCallback ( confirmCallbackService ); rabbitTemplate.convertAndSend ( "confirmTestExchange","","测试RabbitTemplate功能" ); } }正确情况,ack返回true,表示投递成功;下面测试下,发一个正常的截图和非正常截图,非正常的截图只用把生产者的交换机名称就行 消息未投递到queue的退回模式
上面演示了消息投放到交换机的案例,下面演示一个消息从 exchange–>queue 投递失败则会返回一个 returnCallback的案例;生产端通过实现ReturnCallback接口,启动消息失败返回,消息路由不到队列时会触发该回调接口
修改yml文件spring: rabbitmq: #rabbitmq 连接配置 publisher-confirm-type: correlated # 开启confirm确认模式 publisher-returns: true #开启退回模式 host: 192.168.0.1 port: 5672 username: admin password: admin server: port: 8081设置投递失败的模式 根据前面文章的讲解可知如果消息没有路由到Queue,则丢弃消息(默认);但开启ReturnCallBack后,如果消息没有路由到Queue,返回给消息发送方ReturnCallBack(开启后)
rabbitTemplate.setMandatory(true);实现returnedMessage方法 启动消息失败返回,消息路由不到队列时会触发该回调接口
@Component public class ReturnCallbackService implements RabbitTemplate.ReturnCallback { /** * * @param message 消息对象 * @param i 错误码 * @param s 错误信息 * @param s1 交换机 * @param s2 路由键 */ @Override public void returnedMessage(Message message ,int i ,String s ,String s1 ,String s2) { System.out.println("消息对象:" + message); System.out.println("错误码:" + i); System.out.println("错误信息:" + s); System.out.println("消息使用的交换器:" + s1); System.out.println("消息使用的路由key:" + s2); //业务代码处理 } }
yml配置
spring: rabbitmq: #rabbitmq 连接配置 publisher-confirm-type: correlated # 开启confirm确认模式 publisher-returns: true #开启退回模式 host: 124.71.33.75 port: 5672 username: admin password: ghy20200707rabbitmq server: port: 8081
public void producerLose(){ /** *确保消息发送失败后可以重新返回到队列中 */ rabbitTemplate.setMandatory(true); /** * 消息投递确认模式 */ rabbitTemplate.setConfirmCallback(confirmCallbackService); /** * 消息投递到队列失败回调处理 */ rabbitTemplate.setReturnCallback(returnCallbackService); CorrelationData correlationData = new CorrelationData("id_"+System.currentTimeMillis()+""); //发送消息 rabbitTemplate.convertAndSend("directExchange", "RabbitTemplate","测试RabbitTemplate功能" ,correlationData); }
测试接口
1.2、消费者丢失消息
其实在生产者和消费者中间,rabbitmq也是会丢失消息的,解决方案就是持久化存储,这个方案在前面有讲过;所以在这里就跳过;下面直接说消息确认机制ack,ack指Acknowledge确认。 表示消费端收到消息后的确认方式
消费端消息的确认分为:自动确认(默认)、手动确认、不确认- AcknowledgeMode.NONE:不确认
- AcknowledgeMode.AUTO:自动确认
- AcknowledgeMode.MANUAL:手动确认
spring: rabbitmq: #rabbitmq 连接配置 publisher-confirm-type: correlated # 开启confirm确认模式 publisher-returns: true #开启退回模式 host: 124.71.33.75 port: 5672 username: admin password: ghy20200707rabbitmq listener: simple: acknowledge-mode: manual #手动确认 server: port: 8081确认配置
/** * 消费者消息确认机制 */ @Component @RabbitListener(queues = "confirm_test_queue") public class ReceiverMessage { @RabbitHandler public void processHandler(String msg,Channel channel,Message message) throws IOException { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { System.out.println("消息内容" + new String(message.getBody())); //TODO 具体业务逻辑 // 手动签收[参数1:消息投递序号,参数2:批量签收] channel.basicAck(deliveryTag, true); } catch (Exception e) { //拒绝签收[参数1:消息投递序号,参数2:批量拒绝,参数3:是否重新加入队列] channel.basicNack(deliveryTag, true, true); } } }channel.basicNack 方法与 channel.basicReject 方法区别在于basicNack可以批量拒绝多条消息,而basicReject一次只能拒绝一条消息。测试效果如下:
要想测试异常很简单,在代码加一个报错语句就可以测试了,我这里就不搞事了;
二、消费端限流
假设一个场景,首先,在 Rabbitmq 服务器积压了有上万条未处理的消息,这时随便打开一个消费者客户端,会出现这样情况: 巨量的消息瞬间全部推送过来,但是单个客户端无法同时处理这么多数据!当数据量特别大的时候,如果对生产端限流肯定是不科学的,因为有时候并发量就是特别大,有时候并发量又特别少,这是用户的行为,用户的行为是不可控的。所以正确的处理方案应该是对消费端限流,用于保持消费端的稳定,当消息数量激增的时候很有可能造成资源耗尽,以及影响服务的性能,导致系统的卡顿甚至直接崩溃。2.1、TTL
Time To Live,消息过期时间设置 声明队列时,指定即可 TTL:过期时间 1. 队列统一过期 2. 消息单独过期 如果设置了消息的过期时间,也设置了队列的过期时间,它以时间短的为准。队列过期后,会将队列所有消息全部移除;消息过期后,只有消息在队列顶端,才会判断其是否过期(移除掉)三、死信队列
死信队列,当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX;关于死信队列的演示代码在第一篇中有上传过;这里就不再演示了;
消息成为死信的三种情况:
- 队列消息长度到达限制;
- 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;
- 原队列存在消息过期设置,消息到达超时时间未被消费;