Java-RabbitMq-死信队列

RabbitMq死信队列有三种发生情况:

  1. 消息 TTL 过期
  2. 队列达到最大长度(队列满了,无法再添加数据到 mq 中)
  3. 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.

Java-RabbitMq-死信队列
生产者:

import com.cn.mq.utils.ConnectionMq;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

/**
 * @Description 死信
 * @ClassName Producer
 * @Author yangff
 * @date 2021.09.28 22:52
 */
public class Producer {
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    public static void main(String[] argv) throws Exception {
        try (Channel channel = ConnectionMq.getChannel()) {
            channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
            //设置消息的 TTL 时间 10s=10000ms
            //消息被拒绝情况下不使用,手动进行消息确认
//            AMQP.BasicProperties properties = new
//                    AMQP.BasicProperties().builder().expiration("10000").build();
            //该信息是用作演示队列个数限制
            for (int i = 1; i <11 ; i++) {
                String message="info"+i;
                   //消息被拒绝情况下不使用,手动进行消息确认
//                channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties,
//                        message.getBytes());
                channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null,
                        message.getBytes());
                System.out.println("生产者发送消息:"+message);
            }
        }
    }
}

1.消息 TTL 过期
消费者1:


import com.cn.mq.utils.ConnectionMq;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.util.HashMap;
import java.util.Map;

/**
 * @Description
 * @ClassName Consumer01
 * @Author yangff
 * @date 2021.09.28 22:53
 * 启动之后关闭该消费者 模拟其接收不到消息
 */
public class Consumer01 {
    //普通交换机名称
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    //死信交换机名称
    private static final String DEAD_EXCHANGE = "dead_exchange";
    public static void main(String[] argv) throws Exception {
        Channel channel = ConnectionMq.getChannel();
        //声明死信和普通交换机 类型为 direct
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        //声明死信队列
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue, false, false, false, null);
        //死信队列绑定死信交换机与 routingkey
        channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");


        //正常队列绑定死信队列信息
        Map<String, Object> params = new HashMap<>();
        //过期时间 10s=100000ms;可在生产者出设置
//        params.put("x-message-ttl", 10000);
        //正常队列设置死信交换机 参数 key 是固定值
        params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        //正常队列设置死信 routing-key 参数 key 是固定值
        params.put("x-dead-letter-routing-key", "lisi");
        String normalQueue = "normal-queue";
        //声明普通队列
        channel.queueDeclare(normalQueue, false, false, false, params);
        channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
        System.out.println("等待接收消息.....");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("Consumer01 接收到消息"+message);
        };
        channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {
        });
    }
}

消费者2:

import com.cn.mq.utils.ConnectionMq;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

/**
 * @Description
 * @ClassName Consumer02
 * @Author yangff
 * @date 2021.09.28 22:54
 */
public class Consumer02 {
    private static final String DEAD_EXCHANGE = "dead_exchange";
    public static void main(String[] argv) throws Exception {
        Channel channel = ConnectionMq.getChannel();
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue, false, false, false, null);
        channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
        System.out.println("等待接收死信队列消息.....");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");

            System.out.println("Consumer02 接收死信队列的消息" + message);
        };
        channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
        });
    }
}

2.队列达到最大长度
设置最大长度在生产者和消费者都可定义
生产者:
Java-RabbitMq-死信队列
消费者:
Java-RabbitMq-死信队列

3.消息被拒绝
配置消息拒绝时,需要进行手动进行消息确认,将生产者中设置消息的 TTL 时间取消,在消费者中将autoAck设置false;
Java-RabbitMq-死信队列

上一篇:RabbitMq 的理论及应用示例(一)


下一篇:springboot整合rabbitMQ