RabbitMQ 一二事(2) - 工作队列使用

上篇文章讲了简单队列的使用,这其实就是RMQ给的demo,实际并没有什么用

本篇讲讲工作模式队列,也称之为任务队列

一个生产者发布了多条消息,消费者A可以接受消息,接受消息后该消息就消除,消费者B可以接受其他消息

使用场景,一些数据库操作比较缓慢的话可以分别给多个接口调用,降低压力,或者抢单场景也能考虑,

比如就10个商品,100个消费者来抢单,前10个抢到了后,消息队列就为空了,那么第11个以后的所有消费者都不会抢到

RabbitMQ 一二事(2) - 工作队列使用

代码示例:

生产者

 1 public class Send {
 2 
 3     private final static String QUEUE_NAME = "test_queue_work";
 4 
 5     public static void main(String[] argv) throws Exception {
 6         // 获取到连接以及mq通道
 7         Connection connection = ConnectionUtil.getConnection();
 8         Channel channel = connection.createChannel();
 9 
10         // 声明队列
11         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
12 
13         for (int i = 0; i < 50; i++) {
14             // 消息内容
15             String message = "" + i;
16             channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
17             System.out.println(" [x] Sent '" + message + "'");
18 
19             Thread.sleep(i * 10);
20         }
21 
22         channel.close();
23         connection.close();
24     }
25 }

 

消费者1

 1 public class Recv {
 2 
 3     private final static String QUEUE_NAME = "test_queue_work";
 4 
 5     public static void main(String[] argv) throws Exception {
 6 
 7         // 获取到连接以及mq通道
 8         Connection connection = ConnectionUtil.getConnection();
 9         Channel channel = connection.createChannel();
10 
11         // 声明队列
12         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
13 
14         // 同一时刻服务器只会发一条消息给消费者, 如果注释了就是指生产者平均分配任务给消费者
15         channel.basicQos(1);
16 
17         // 定义队列的消费者
18         QueueingConsumer consumer = new QueueingConsumer(channel);
19         // 监听队列,手动返回完成 设置fasle代表需要手动返回消息的确认状态
20         channel.basicConsume(QUEUE_NAME, false, consumer);
21 
22         // 获取消息
23         while (true) {
24             QueueingConsumer.Delivery delivery = consumer.nextDelivery();
25             String message = new String(delivery.getBody());
26             System.out.println(" [x] Received '" + message + "'");
27             // 休眠
28             Thread.sleep(10);
29             // 手动确认  返回确认状态
30             channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
31         }
32     }
33 }

 

消费者2

 1 public class Recv2 {
 2 
 3     private final static String QUEUE_NAME = "test_queue_work";
 4 
 5     public static void main(String[] argv) throws Exception {
 6 
 7         // 获取到连接以及mq通道
 8         Connection connection = ConnectionUtil.getConnection();
 9         Channel channel = connection.createChannel();
10 
11         // 声明队列
12         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
13 
14         // 同一时刻服务器只会发一条消息给消费者, 如果注释了就是指生产者平均分配任务给消费者
15         channel.basicQos(1);
16 
17         // 定义队列的消费者
18         QueueingConsumer consumer = new QueueingConsumer(channel);
19         // 监听队列,手动返回完成状态 设置fasle代表需要手动返回消息的确认状态
20         channel.basicConsume(QUEUE_NAME, false, consumer);
21 
22         // 获取消息
23         while (true) {
24             QueueingConsumer.Delivery delivery = consumer.nextDelivery();
25             String message = new String(delivery.getBody());
26             System.out.println(" [x] Received '" + message + "'");
27             // 休眠1秒
28             Thread.sleep(1000);
29             // 手动确认  返回确认状态
30             channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
31         }
32     }
33 }

 

上一篇:Linux从入门到精通(第2章--Linux安装)


下一篇:备战阿里面试一年半顺利通过二面,面对突如其来的疫情,让我的阿里三面搁浅ing