消费者可以根据信息的内容,拒绝消费这个消息。这需要手动ACK。
生产者代码:
1 package com.yas.deadexchange; 2 3 import com.rabbitmq.client.AMQP; 4 import com.rabbitmq.client.BuiltinExchangeType; 5 import com.rabbitmq.client.Channel; 6 import com.yas.config.RabbitMQClient; 7 import org.junit.Test; 8 9 public class Publisher_NoParams { 10 private static final String NORMAL_EXCHANGE = "normal_exchange"; 11 12 @Test 13 public void publish() throws Exception { 14 try (Channel channel = RabbitMQClient.getChannel()) { 15 channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); 16 //设置消息的 TTL 时间 17 //AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build(); 18 //该信息是用作演示队列个数限制 19 for (int i = 1; i <11 ; i++) { 20 String message="info"+i; 21 //channel.basicPublish(NORMAL_EXCHANGE, message.getBytes()); 22 channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null,message.getBytes()); 23 System.out.println("生产者发送消息:"+message); 24 } 25 } 26 } 27 }
普通队列消费者代码:
1 package com.yas.deadexchange; 2 3 import com.rabbitmq.client.BuiltinExchangeType; 4 import com.rabbitmq.client.Channel; 5 import com.rabbitmq.client.Connection; 6 import com.rabbitmq.client.DeliverCallback; 7 import com.yas.config.RabbitMQClient; 8 import org.junit.Test; 9 10 import java.util.HashMap; 11 import java.util.Map; 12 13 public class Consumer_Reject { 14 public static final String NORMAL_EXCHANGE = "normal_exchange"; 15 public static final String DEAD_EXCHANGE = "dead_exchange"; 16 public static final String NORMAL_QUEUE = "normal_queue"; 17 public static final String DEAD_QUEUE = "dead_queue"; 18 19 @Test 20 public void consume() throws Exception { 21 //1.获取连接对象 22 Connection connection = RabbitMQClient.getConnection(); 23 //2.创建channel 24 Channel channel = connection.createChannel(); 25 26 //声明死信交换机 类型为 direct 27 channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); 28 //声明死信队列 29 channel.queueDeclare(DEAD_QUEUE, false, false, false, null); 30 //死信队列绑定死信交换机与 routingkey 31 channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi"); 32 33 34 //声明普通交换机 类型为 direct 35 channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); 36 //正常队列绑定死信队列信息 37 Map<String, Object> params = new HashMap<>(); 38 //正常队列设置死信交换机 参数 key 是固定值 39 params.put("x-dead-letter-exchange", DEAD_EXCHANGE); 40 //正常队列设置死信 routing-key 参数 key 是固定值 41 params.put("x-dead-letter-routing-key", "lisi"); 42 //声明普通队列 43 channel.queueDeclare(NORMAL_QUEUE, false, false, false, params); 44 //普通队列绑定普通交换机routing-key 45 channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan"); 46 System.out.println("等待接收消息........... "); 47 48 DeliverCallback deliverCallback = (consumerTag, delivery) -> 49 {String message = new String(delivery.getBody(), "UTF-8"); 50 if(message.equals("info5")){ 51 System.out.println("Consumer01 接收到消息" + message + "并拒绝签收该消息"); 52 //requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中 53 channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false); 54 }else { 55 System.out.println("Consumer01 接收到消息"+message); 56 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 57 } 58 }; 59 boolean autoAck = false; 60 channel.basicConsume(NORMAL_QUEUE, autoAck, deliverCallback, consumerTag -> { 61 }); 62 63 System.in.read(); 64 //5/释放资源 65 channel.close(); 66 connection.close(); 67 } 68 }
测试:
先启动生产者,生产者会发送10条信息到normal_queue中。
再启动消费者,消费者会消费其中的9条消息,并拒绝其中的信息为info5的消息。这1条info5会被转发到死信队列中。