一、RabbitMQ消息确认机制
RabbitMQ的消息确认有两种:
1、对生产端发送消息的确认。这种是用来确认生产者将消息发送给交换器,交换器传递给队列的过程中,消息是否成功投递。发送确认分为两步,一是确认是否到达交换器,二是确认是否到达队列。
2、对消费端消费消息的确认。这种是确认消费者是否成功消费了队列中的消息。
二、RabbitMQ对生产端发送消息的确认
rabbitmq对生产端发送消息的确认分为事务和实现confirm机制。不过一般不使用事务,性能消耗太大。对生产端的confirm机制参见:https://www.cnblogs.com/alan6/p/11483419.html
三、消费端消费消息后对RabbitMQ的确认
为了保证消息能可靠到达消费端,RabbitMQ也提供了消费端的消息确认机制。消费者在声明队列时,可以指定noAck参数,当noAck=false时,RabbitMQ会等待消费者显式发回ack信号后才从内存(和磁盘,如果是持久化消息的话)中移去消息。否则,RabbitMQ会在队列中消息被消费后立即删除它。
采用消息确认机制后,只要令noAck=false,消费者就有足够的时间处理消息(任务),不用担心处理消息过程中消费者进程挂掉后消息丢失的问题,因为RabbitMQ会一直持有消息直到消费者显式调用basicAck为止。
消费端消息的确认分为:自动确认(默认)、手动确认、不确认
AcknowledgeMode.NONE:不确认
AcknowledgeMode.AUTO:自动确认
AcknowledgeMode.MANUAL:手动确认
手动确认在spring-boot中配置方法:
spring.rabbitmq.listener.simple.acknowledge-mode = manual
1、消费成功手动确认方法:
void basicAck(long deliveryTag, boolean multiple) throws IOException;
deliveryTag:该消息的index
multiple:是否批量确认。true:将一次性ack所有小于deliveryTag的消息。
消费者成功处理消息后,手动调用channel.basicAck(message.getMessageProperties().getDeliveryTag(), false)方法对消息进行消费确认。
try { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 手动确认消息 System.out.println("投递消息确认成功,tag:"+message.getMessageProperties().getDeliveryTag()); } catch (IOException e) { e.printStackTrace(); }
2、消费失败手动确认方法:
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
deliveryTag:该消息的index。
multiple:是否批量. true:将一次性拒绝所有小于deliveryTag的消息。
requeue:被拒绝的是否重新入队列。
void basicReject(long deliveryTag, boolean requeue) throws IOException;
deliveryTag:该消息的index。
requeue:被拒绝的是否重新入队列。
channel.basicNack 方法与 channel.basicReject 方法区别在于basicNack可以批量拒绝多条消息,而basicReject一次只能拒绝一条消息。
四、消费者手动确认可能出现的问题
1、 消息无法ack
2、无效消息循环重入队列
以上两个问题其实属于同一类问题,都需要我们确保代码在消费消息后,一定要通知到MQ。如果消息消费成功,则调用channel.basicAck正常通知mq;如果消息消费过程中产生异常,则调用channel.basicNack或者channel.basicReject确认消费失败,并根据不同的异常类型,选择消息要不要重入队列。