springboot整合RabbitMQ消费端手动ACK确认机制

ack——acknowledge(vt. 承认;答谢;报偿;告知已收到;确认的意思),在RabbitMQ中指代的是消费者收到消息后确认的一种行为,关注点在于消费者能否实际接收到MQ发送的消息

其提供了三种确认方式:

  • 自动确认acknowledge=“none”:当消费者接收到消息的时候,就会自动给到RabbitMQ一个回执,告诉MQ我已经收到消息了,不在乎消费者接收到消息之后业务处理的成功与否。

  • 手动确认acknowledge=“manual”:当消费者收到消息后,不会立刻告诉RabbitMQ已经收到消息了,而是等待业务处理成功后,通过调用代码的方式手动向MQ确认消息已经收到。当业务处理失败,就可以做一些重试机制,甚至让MQ重新向消费者发送消息都是可以的。

  • 根据异常情况确认acknowledge=“auto”:该方式是通过抛出异常的类型,来做响应的处理(如重发、确认等布拉不拉布拉)。这种方式比较麻烦。

当消息一旦被消费者接收到,会立刻自动向MQ确认接收,并将响应的message从RabbitMQ消息缓存中移除,但是在实际的业务处理中,会出现消息收到了,但是业务处理出现异常的情况,在自动确认的模式下,该条业务处理失败的message就相当于被丢弃了。

如果设置了手动确认,则需要在业务处理完成之后,手动调用channel.basicAck(),手动的签收,

如果业务处理失败,则手动调用channel.basicNack()方法拒收,并让MQ重新发送该消息。

如果不做任何关于acknowledge的配置,默认就是自动确认签收的。

消费端手动ACK确认机制

1. 消费端配置:开启ack

springboot整合RabbitMQ消费端手动ACK确认机制

2. 通过channel的basicAck方法手动签收,通过basicNack方法拒绝签收

/*
        通过channel的basicAck方法手动签收,通过basicNack方法拒绝签收
     */
    @Override
    @RabbitListener(queues = {RabbitMQConfig.DIRECT_QUEUE}) //监听
    public void receiveMessage(String message,Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
        try{
            //String message= (String)amqpTemplate.receiveAndConvert(RabbitMQConfig.DIRECT_QUEUE);
            System.out.println("接收的mq消息:"+message);

            // 业务处理 异常测试
            // System.out.println("业务处理"+1/0);

            // long deliveryTag 消息接收tag boolean multiple 是否批量确认
            System.out.println("deliveryTag="+deliveryTag);

            /**
             * 无异常就确认消息
             * basicAck(long deliveryTag, boolean multiple)
             * deliveryTag:取出来当前消息在队列中的的索引;
             * multiple:为true的话就是批量确认,如果当前deliveryTag为5,那么就会确认
             * deliveryTag为5及其以下的消息;一般设置为false
             */

            if(deliveryTag==5){
                channel.basicAck(deliveryTag,true);
            }

        }catch (Exception e){

            e.printStackTrace();

            /**
             * 有异常就绝收消息
             * basicNack(long deliveryTag, boolean multiple, boolean requeue)
             * requeue:true为将消息重返当前消息队列,还可以重新发送给消费者;
             *         false:将消息丢弃
             */

            // long deliveryTag, boolean multiple, boolean requeue

            try {

                channel.basicNack(deliveryTag,false,true);
                // long deliveryTag, boolean requeue
                // channel.basicReject(deliveryTag,true);

                Thread.sleep(1000);     // 这里只是便于出现死循环时查看

                /*
				 * 一般实际异常情况下的处理过程:记录出现异常的业务数据,将它单独插入到一个单独的模块,
				 * 然后尝试3次,如果还是处理失败的话,就进行人工介入处理
				*/

            } catch (Exception e1) {
                e1.printStackTrace();
            }

        }

3. 测试

3.1 测试消费1条消息

springboot整合RabbitMQ消费端手动ACK确认机制
springboot整合RabbitMQ消费端手动ACK确认机制
由于代码里面deliveryTag为5才会确认,所以这条消息Unacked,如下图:
springboot整合RabbitMQ消费端手动ACK确认机制

3.2 测试消费5条消息

springboot整合RabbitMQ消费端手动ACK确认机制
springboot整合RabbitMQ消费端手动ACK确认机制

上一篇:三次握手和四次挥手详解


下一篇:新一代容器平台ACK Anywhere,来了