一、消息发送流程
广播模式下,消息的发送流程:
- 可以有多个消费者
- 每个消费者有自己的queue(队列)
- 每个队列都要绑定到Exchange(交换机)
- 生产者发送的消息,只能发送到交换机,交换机来决定要发送给那个队列,生产者无法决定
- 交换机把消息发送给绑定过的队列
- 队列的消费者都能拿到消息。实现一条消息被多个消费者消费
二、代码实现
生产者代码:
package fanout;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author wmh
* @date 2021/1/11
*/
public class Provider {
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("主机名");
connectionFactory.setPort(5672);
// 绑定虚拟主机
connectionFactory.setVirtualHost("/ems");
connectionFactory.setUsername("ems");
connectionFactory.setPassword("123");
// 创建连接
Connection connection = connectionFactory.newConnection();
// 创建连接通道
Channel channel = connection.createChannel();
// 绑定交换机
// 参数1:交换机的名字 参数2:交换机的类型
channel.exchangeDeclare("logs","fanout");
// 发送消息
// 参数1:交换机的名字 参数2:routingKey 参数3:队列消息的持久化设置 参数4:要发送的消息
channel.basicPublish("logs","",null,"fanout messages".getBytes());
// 关闭连接
channel.close();
connection.close();
}
}
消费者代码(创建了三个消费者,只展示其中一个,其他代码消费者相同):
package fanout;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author wmh
* @date 2021/1/11
*/
public class Customer1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("主机名");
connectionFactory.setPort(5672);
// 绑定虚拟主机
connectionFactory.setVirtualHost("/ems");
connectionFactory.setUsername("ems");
connectionFactory.setPassword("123");
// 创建连接
Connection connection = connectionFactory.newConnection();
// 创建连接通道
Channel channel = connection.createChannel();
// 将通道绑定给交换机
channel.exchangeDeclare("logs","fanout");
// channel.exchangeBind
// 创建临时队列
String queue = channel.queueDeclare().getQueue();
// 将通道绑定给队列(通过通道将队列与交换机进行绑定)
// 参数1:队列名称 参数2:交换机名称 参数3:routingKey
channel.queueBind(queue,"logs","");
// 消费消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Customer1消费了消息:"+new String(body));
}
});
}
}
运行结果: