public class Consumer2 { public static void main(String[] args) throws Exception { //1. 创建连接工厂; //2. 创建连接;(抽取一个获取连接的工具类) Connection connection = ConnectionUtil.getConnection(); //3. 创建频道; final Channel channel = connection.createChannel(); channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null); System.out.println("fair的方式consumer2开始消费"); //每次可以预期多少个消息 channel.basicQos(1); //5. 创建消费者(接收消息并处理消息); DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //接收到的消息 System.out.println("消费者2----接收到的消息为:" + new String(body, "utf-8")); try { Thread.sleep(5); //确认消失 /* * 参数1:消息id * 参数2:false表示只有当前这条被处理 * */ channel.basicAck(envelope.getDeliveryTag(),false); } catch (InterruptedException e) { e.printStackTrace(); } } }; //6. 监听队列 /** * 参数1:队列名 * 参数2:是否要自动确认;设置为true表示消息接收到自动向MQ回复接收到了,MQ则会将消息从队列中删除; * 如果设置为false则需要手动确认 * 参数3:消费者 */ channel.basicConsume(Producer.QUEUE_NAME, false, defaultConsumer); } }
⑦. 模式总结
①. 简单模式:一个生产者一个队列,一个消息被一个消费者接受
②. 工作队列模式:一个队列可以有多个消费者;消费者之间是竞争关系
③. 发布与订阅模式:使用了广播的交换机(fanout),可以将一个消息发送到所有有队列对应的消费者
④. 路由模式:使用了定向交换机(Direct);根据消息路由key与队列路由key进行比较,一致则队列可接收消息
⑤. 通配符模式:使用了通配符(Topic)交换机,根据消息路由key与队列路由key进行匹配(#,*),一致则队列可接收消息
⑥. 模式官网