Official website of RabbitMQ: https://www.rabbitmq.com/
work pattern of RabbitMQ: https://www.rabbitmq.com/getstarted.html
pattern-one:simple pattern
Producer-code:
public class Producer { static final String QUEUE_NAME = "simple_queue"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/yasuo"); connectionFactory.setUsername("abert"); connectionFactory.setPassword("abert"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); /* 参数1:队列名称 参数2:是否定义持久化队列(消息会持久化保存在服务器上) 参数3:是否独占本连接 参数4:是否在不使用的时候队列自动删除 参数5:其他参数 */ channel.queueDeclare(QUEUE_NAME,true,false,false,null); /* 参数1:交换机名称;如果没有则指定空字符串(表示使用默认的交换机) 参数2:路由key,简单模式可以使用队列名称 参数3:消息其他属性 参数4:消息内容 */ String message = "你好呀!可爱的小尾巴"; channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); System.out.println("列车已经出发"); channel.close(); connection.close(); } }
ConnectionUtil-code:
public class ConnectionUtil { public static Connection getConn() throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/vh1"); connectionFactory.setUsername("abert"); connectionFactory.setPassword("abert"); return connectionFactory.newConnection(); } }
Consumer-code:
public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConn(); Channel channel = connection.createChannel(); channel.queueDeclare(Producer.QUEUE_NAME,true,false,false,null); //创建消费者(接受消息并处理消息,重写方法) DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //路由key System.out.println("消息线路为:"+envelope.getRoutingKey()); //交换机 // System.out.println("交换机为:"+envelope.getExchange()); //消息id // System.out.println("消息id为:"+envelope.getDeliveryTag()); //接受到的消息 System.out.println("接受到的内容为:"+new String(body,"utf-8")); } }; /* 参数1:队列名 参数2:是否自动确认;设置为true表示消息接收到了 自动向MQ回复,MQ将其从消息队列中删除 设置为false表示需要手动确认 参数3:消费者 */ channel.basicConsume(Producer.QUEUE_NAME,true,defaultConsumer); } }
Result:
通过结果可以看出,对于简单模式,说白了就是消息生产者到消息消费者端对端的通信,但这里可能会产生个误解,包括官方图也是这样画的,那就是消息生产者直接发送消息到队列,然后消费者从队列中取消息。
实际情况是,消息必须经由交换机然后转发到对应队列,这里使用了默认交换机。
pattern-two:work queues pattern
Producer-code:
public class Producer { static final String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConn(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,true,false,false,null); for (int i =0;i<30;i++) { String message = "你好呀!"+i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); } channel.close(); connection.close(); } }
Consumer1-code:
public class Consumer1 { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConn(); final Channel channel = connection.createChannel(); channel.queueDeclare(Producer.QUEUE_NAME,true,false,false,null); //每次可以预取多少个消息 channel.basicQos(1); DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { System.out.println("消费者1内容为:"+new String(body,"utf-8")); Thread.sleep(1000); //确认消息 /* 参数1:消息id 参数2:false表示只有当前这条消息被处理 */ channel.basicAck(envelope.getDeliveryTag(),false); } catch (InterruptedException e) { e.printStackTrace(); } } }; channel.basicConsume(Producer.QUEUE_NAME,true,defaultConsumer); } }
Consumer2-code:
public class Consumer2 { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConn(); final Channel channel = connection.createChannel(); channel.queueDeclare(Producer.QUEUE_NAME,true,false,false,null); channel.basicQos(1); DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { System.out.println("消费者2内容为:"+new String(body,"utf-8")); Thread.sleep(1000); channel.basicAck(envelope.getDeliveryTag(),false); } catch (InterruptedException e) { e.printStackTrace(); } } }; channel.basicConsume(Producer.QUEUE_NAME,true,defaultConsumer); } }
Result:
对于工作队列模式,生产者生成了30条信息, 消费者1全为奇数,消费者2全为偶数,所以消费者之间是竞争关系。至于为什么一边全是偶数一边全是奇数,一种可能是时间差的巧合,也有可能消息分发本身采用轮询机制,确保每个消费者都能拿到消息。
目前没遇到项目落地情况,所以就不做进一步探讨。
pattern-three:publish/subscribe
public class Producer { //交换机名称 static final String FANOUT_EXCHANGE = "fanout_exchange"; //定义队列一名称 static final String FANOUT_QUEUE_1 = "fanout_queue_1"; //定义队列二名称 static final String FANOUT_QUEUE_2 = "fanout_queue_2"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConn(); Channel channel = connection.createChannel(); //声明一个交换机;参数1:交换机名称,参数2:交换机类型(fanout,direct,topic) channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT); channel.queueDeclare(FANOUT_QUEUE_1,true,false,false,null); channel.queueDeclare(FANOUT_QUEUE_2,true,false,false,null); // 队列绑定到交换机(参数1:队列名称,参数2:交换机名称,参数3:路由key) channel.queueBind(FANOUT_QUEUE_1,FANOUT_EXCHANGE,""); channel.queueBind(FANOUT_QUEUE_2,FANOUT_EXCHANGE,""); for (int i =0;i<10;i++) { String message = "你好呀!"+i; /** * 参数1:交换机名称;如果没有则指定空字符串(表示默认的交换机) * 参数2:路由key,简单模式中可以使用队列名称 * 参数3:消息其它属性 * 参数4:消息内容 */ channel.basicPublish(FANOUT_EXCHANGE,"", null, message.getBytes()); System.out.print(i+","); } channel.close(); connection.close(); } }
Consumer1-code:
public class Consumer1 { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConn(); final Channel channel = connection.createChannel(); channel.exchangeDeclare(Producer.FANOUT_EXCHANGE,BuiltinExchangeType.FANOUT); channel.queueDeclare(Producer.FANOUT_QUEUE_1,true,false,false,null); //绑定交换机 channel.queueBind(Producer.FANOUT_QUEUE_1,Producer.FANOUT_EXCHANGE,""); DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1内容为:"+new String(body,"utf-8")); channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume(Producer.FANOUT_QUEUE_1,true,defaultConsumer); } }
Consumer2-code:
public class Consumer2 { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConn(); final Channel channel = connection.createChannel(); channel.exchangeDeclare(Producer.FANOUT_EXCHANGE,BuiltinExchangeType.FANOUT); channel.queueDeclare(Producer.FANOUT_QUEUE_2,true,false,false,null); //绑定交换机 channel.queueBind(Producer.FANOUT_QUEUE_2,Producer.FANOUT_EXCHANGE,""); DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2内容为:"+new String(body,"utf-8")); channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume(Producer.FANOUT_QUEUE_2,true,defaultConsumer); } }
Result:
可以看出,在发布与订阅模式,交换机会把消息给每个绑定的队列,所以消费者之间不存在竞争关系
pattern-four:routing
Producer:
public class Producer { //交换机名称 static final String DIRECT_EXCHANGE = "direct_exchange"; //定义队列一名称 static final String DIRECT_QUEUE_1 = "direct_queue_1"; //定义队列二名称 static final String DIRECT_QUEUE_2 = "direct_queue_2"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConn(); Channel channel = connection.createChannel(); //声明一个交换机;参数1:交换机名称,参数2:交换机类型(fanout,direct,topic) channel.exchangeDeclare(DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT); channel.queueDeclare(DIRECT_QUEUE_1,true,false,true,null); channel.queueDeclare(DIRECT_QUEUE_2,true,false,true,null); // 队列绑定到交换机(参数1:队列名称,参数2:交换机名称,参数3:路由key) channel.queueBind(DIRECT_QUEUE_1,DIRECT_EXCHANGE,"1"); channel.queueBind(DIRECT_QUEUE_2,DIRECT_EXCHANGE,"2"); String message1 = "你好呀!路由key为1"; String message2 = "你好呀!路由key为2"; String message3 = "要吃烧饼吗?路由key为2"; channel.basicPublish(DIRECT_EXCHANGE,"1", null, message1.getBytes()); channel.basicPublish(DIRECT_EXCHANGE,"2", null, message2.getBytes()); channel.basicPublish(DIRECT_EXCHANGE,"2", null, message3.getBytes()); System.out.print("send over"); channel.close(); connection.close(); } }
Consumer1:
public class Consumer1 { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConn(); final Channel channel = connection.createChannel(); channel.exchangeDeclare(Producer.DIRECT_EXCHANGE,BuiltinExchangeType.DIRECT); channel.queueDeclare(Producer.DIRECT_QUEUE_1,true,false,true,null); //绑定交换机 channel.queueBind(Producer.DIRECT_QUEUE_1,Producer.DIRECT_EXCHANGE,"1"); DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1内容为:"+new String(body,"utf-8")); channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume(Producer.DIRECT_QUEUE_1,true,defaultConsumer); } }
Consumer2:
public class Consumer2 { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConn(); final Channel channel = connection.createChannel(); channel.exchangeDeclare(Producer.DIRECT_EXCHANGE,BuiltinExchangeType.DIRECT); channel.queueDeclare(Producer.DIRECT_QUEUE_2,true,false,true,null); //绑定交换机 channel.queueBind(Producer.DIRECT_QUEUE_2,Producer.DIRECT_EXCHANGE,"2"); DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2内容为:"+new String(body,"utf-8")); channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume(Producer.DIRECT_QUEUE_2,true,defaultConsumer); }
Result:
对于路由模式,交换器按照队列指定的路由key进行消息分发,同时也侧面印证了消息是发给交换器而不是直接发送给队列的
pattern-five:topics
Producer-code:
public class Producer { //交换机名称 static final String TOPIC_EXCHANGE = "topic_exchange"; //定义队列一名称 static final String TOPIC_QUEUE_1 = "topic_queue_1"; //定义队列二名称 static final String TOPIC_QUEUE_2 = "topic_queue_2"; //定义队列二名称 static final String TOPIC_QUEUE_3 = "topic_queue_3"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConn(); Channel channel = connection.createChannel(); //声明一个交换机;参数1:交换机名称,参数2:交换机类型(fanout,direct,topic) channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC); String message1 = "你好呀!路由key为1.1"; String message2 = "你好呀!路由key为2.1"; String message3 = "要吃烧饼吗?路由key为2.2"; String message4 = "要吃烧饼吗?路由key为2.2.2";
channel.basicPublish(TOPIC_EXCHANGE,"1.1", null, message1.getBytes()); channel.basicPublish(TOPIC_EXCHANGE,"2.1", null, message2.getBytes()); channel.basicPublish(TOPIC_EXCHANGE,"2.2", null, message3.getBytes()); channel.basicPublish(TOPIC_EXCHANGE,"2.2.2", null, message4.getBytes()); System.out.print("send over"); channel.close(); connection.close(); } }
Consumer1-code:
public class Consumer1 { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConn(); final Channel channel = connection.createChannel(); channel.exchangeDeclare(Producer.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC); channel.queueDeclare(Producer.TOPIC_QUEUE_1,true,false,true,null); //绑定交换机 channel.queueBind(Producer.TOPIC_QUEUE_1,Producer.TOPIC_EXCHANGE,"1.1"); channel.queueBind(Producer.TOPIC_QUEUE_1,Producer.TOPIC_EXCHANGE,"2.1"); DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1内容为:"+new String(body,"utf-8")); channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume(Producer.TOPIC_QUEUE_1,true,defaultConsumer); } }
Consumer2-code:
public class Consumer2 { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConn(); final Channel channel = connection.createChannel(); channel.exchangeDeclare(Producer.TOPIC_EXCHANGE,BuiltinExchangeType.TOPIC); channel.queueDeclare(Producer.TOPIC_QUEUE_2,true,false,true,null); //绑定交换机 channel.queueBind(Producer.TOPIC_QUEUE_2,Producer.TOPIC_EXCHANGE,"2.*"); DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2内容为:"+new String(body,"utf-8")); channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume(Producer.TOPIC_QUEUE_2,true,defaultConsumer); } }
Consumer3-code:
public class Consumer3 { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConn(); final Channel channel = connection.createChannel(); channel.exchangeDeclare(Producer.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC); channel.queueDeclare(Producer.TOPIC_QUEUE_3,true,false,true,null); //绑定交换机 channel.queueBind(Producer.TOPIC_QUEUE_3,Producer.TOPIC_EXCHANGE,"2.#"); DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者3内容为:"+new String(body,"utf-8")); channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume(Producer.TOPIC_QUEUE_3,true,defaultConsumer); } }
Result:
对于topic模式,Consumer1队列绑定了两个路由key,侧面印证了队列可以绑定多个路由key,Consumer2和Consumer3都使用通配符*或#
总结:
RabiitMQ五种模式演示完毕,对于RabbitMQ,主要需要理清连接(Connection),信道(Channel),交换机(exchange),路由键(routing key),队列(queue)四者的关系,包括下图Rabbit服务界面也主要由这4部分组成(见页面菜单栏)
一、通信的流程
1.在主线程中,首先和RabbitMQ服务器建立一个连接,这个连接需要指定ip:port,具体虚拟机,同时还需要用户名,密码作为验证。
2.连接建立后,需要创建一个信道,通过信道发送消息到具体交换机。
3.交换机(本质就是一张路由查询表)通过路由键将消息分发给满足路由key条件的队列。
4.消费者通过监听队列拿出消息。
二、通信的基本单位
RabbitMQ连接是基于TCP的,在一条TCP上建立了需多信道(channel),每一条信道由一个独立的线程维护,所以信道是通信的基本单位。通过下面代码及方法也可以看出
channel.exchangeDeclare
channel.queueDeclare
channel.queueBind
channel.basicPublish
交换机的声明,队列的声明,交换机和队列的绑定,队列和路由键的绑定,消息发送(指明交换机,路由键,消息内容)都在信道方法中完成,因此信道是通信的基本单位。
三、交换机的三种模式(Header不常用)
Fanout:广播,只要绑定到交换机的队列都会分发消息,即1对N。
Direct:定向,只有消息路由key与队列路由key一致交换机才会分发消息,即1对1。(一个队列可以指定多个路由key)
Topic:匹配,*占位一个词,#占位多个词,只要能够匹配交换机就会分发消息,即N对1。