一、基本模型
P代表生产者,就是发消息的一方。
X代表的是路由器交换机,它负责接收发送者发送的消息并将消息转发到订阅它的所有队列上。
红色部分是队列。它如果对某个交换机感兴趣的话,那么就可以把自己绑定到这个交换机上,专业术语叫绑定。
交换机X在收到任何消息后,都会直接将消息分发给订阅它的队列。这相当于发布订阅模式。这是一种最直接的广播形式,直接不BB,路由器接收到什么消息,就转发什么消息到所有订阅它的队列中。
在RabbitMQ中,Fanout类型的交换机(路由器)将会直接将数据转发到绑定它的队列中。
二、使用代码
共三个类,第一个类是发送者,第二个和第三个类是两个接收者。
代码大致思路:
我们创建了一个Fanout类型的交换机。再创建了两个队列,这两个队列都绑定了这个Fanout类型的交换机。
然后我们给这个交换机发送消息,查看绑定这个交换机的队列们是否可以接收到数据。
发送者.java
package com.safesoft.fanoutexchanger.mq02;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.ExchangeTypes;
/**
* @author jay.zhou
* @date 2019/4/23
* @time 10:10
*/
public class Producer {
private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);
private static final String EXCHANGE_NAME = "local::mq02:exchange:e01";
public static void main(String[] args) {
try {
//连接管理器:我们的应用程序与RabbitMQ建立连接的管理器。
ConnectionFactory factory = new ConnectionFactory();
//设置RabbitMQ服务器地址
factory.setHost("127.0.0.1");
//设置帐号密码,默认为guest/guest,所以下面两行可以省略
factory.setUsername("guest");
factory.setPassword("guest");
//创建一个连接
Connection connection = factory.newConnection();
//创建一个信道
Channel channel = connection.createChannel();
//通过信道创建一个交换机
/**
* 第一个参数是交换机的名称
* 第二个参数是交换机的类型 Fanout:
* fanout:直接把队列绑定到路由器。路由器在收到消息后,直接把消息投递到队列中,不需要路由键。
*/
channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.FANOUT);
//消息用二进制的方式传输
final String message = "中国联通股票价格又跌了,不开心";
final byte[] msg = message.getBytes("UTF-8");
//直接把消息发送到路由器。路由在收到消息后,直接投递到订阅这个路由器的队列
channel.basicPublish(EXCHANGE_NAME, "", null, msg);
LOGGER.info("消息发送成功:{}", message);
channel.close();
connection.close();
} catch (Exception e) {
LOGGER.error("an exception was occurred , caused by :{}", e.getMessage());
}
}
}
接收者一号.java
package com.safesoft.fanoutexchanger.mq02;
import com.rabbitmq.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.ExchangeTypes;
import java.io.IOException;
/**
* @author jay.zhou
* @date 2019/4/23
* @time 10:49
*/
public class Consume01 {
private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);
private static final String EXCHANGE_NAME = "local::mq02:exchange:e01";
private static final String QUEUE_NAME_01 = "local::mq02:queue:q01";
private static final String QUEUE_NAME_02 = "local::mq02:queue:q02";
public static void main(String[] args) {
try {
//设置RabbitMQ服务器信息
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//申明一个队列
/**
* 第一个参数是queue:要创建的队列名
* 第二个参数是durable:是否持久化。如果为true,可以在RabbitMQ崩溃后恢复消息
* 第三个参数是exclusive:true表示一个队列只能被一个消费者占有并消费
* 第四个参数是autoDelete:true表示服务器不在使用这个队列是会自动删除它
* 第五个参数是arguments:其它参数
*/
channel.queueDeclare(QUEUE_NAME_01, true, false, false, null);
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.FANOUT);
//在fanout类型的路由器中,路由键无效,所以设计为空字符串
final String routeKey = "";
//将这个队列订阅到这个路由器上,表示这个队列对这个路由器感兴趣
channel.queueBind(QUEUE_NAME_01, EXCHANGE_NAME, routeKey);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
LOGGER.info("收件人一号收到消息:{}", message);
}
};
//队列一确认收到消息
channel.basicConsume(QUEUE_NAME_01, true, consumer);
} catch (Exception e) {
LOGGER.error("an exception was occurred , caused by :{}", e.getMessage());
}
}
}
接收者2号.java
package com.safesoft.fanoutexchanger.mq02;
import com.rabbitmq.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.ExchangeTypes;
import java.io.IOException;
/**
* @author jay.zhou
* @date 2019/4/23
* @time 10:49
*/
public class Consume02 {
private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);
private static final String EXCHANGE_NAME = "local::mq02:exchange:e01";
private static final String QUEUE_NAME_01 = "local::mq02:queue:q01";
private static final String QUEUE_NAME_02 = "local::mq02:queue:q02";
public static void main(String[] args) {
try {
//设置RabbitMQ服务器信息
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//申明一个队列
/**
* 第一个参数是queue:要创建的队列名
* 第二个参数是durable:是否持久化。如果为true,可以在RabbitMQ崩溃后恢复消息
* 第三个参数是exclusive:true表示一个队列只能被一个消费者占有并消费
* 第四个参数是autoDelete:true表示服务器不在使用这个队列是会自动删除它
* 第五个参数是arguments:其它参数
*/
channel.queueDeclare(QUEUE_NAME_02, true, false, false, null);
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.FANOUT);
//在fanout类型的路由器中,路由键无效,所以设计为空字符串
final String routeKey = "";
//将这个队列订阅到这个路由器上,表示这个队列对这个路由器感兴趣
channel.queueBind(QUEUE_NAME_02, EXCHANGE_NAME, routeKey);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
LOGGER.info("收件人二号收到消息:{}", message);
}
};
//队列一确认收到消息
channel.basicConsume(QUEUE_NAME_02, true, consumer);
} catch (Exception e) {
LOGGER.error("an exception was occurred , caused by :{}", e.getMessage());
}
}
}
首先运行一次发送者类Producer,可以先多运行几次。我就运行了两次。
消息是:消息发送成功:"中国联通股票价格又跌了,不开心"。表示当我写这篇文章的时候,我又在股票市场亏钱了。
分别运行接收者类,接收者一号和二号均收到股票下跌的噩耗(话说我炒股怎么老亏钱呀)。
三、总结
Fanout类型的路由器,不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。
四、源代码下载
源代码地址:https://github.com/hairdryre/Study_RabbitMQ
下一篇:第二节 Fanout路由交换机:订阅发布模式