第1集 玩转RabbitMQ的路由模式和应用场景
简介:RabbitMQ的路由模式和应用场景
-
什么是rabbitmq的路由模式
- 文档:https://www.rabbitmq.com/tutorials/tutorial-four-java.html
- 交换机类型是Direct
- 队列和交换机绑定,需要指定一个路由key( 也叫Bingding Key)
- 消息生产者发送消息给交换机,需要指定routingKey
- 交换机根据消息的路由key,转发给对应的队列
-
例子:日志采集系统 ELK
- 一个队列收集error信息-》告警
- 一个队列收集全部信息-》日常使用
第2集 RabbitMQ的路由模式代码实战
简介:RabbitMQ路由模式代码实战
- 消息生产者
xxxxxxxxxx
public class Send {
private final static String EXCHANGE_NAME = "exchange_direct";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("10.211.55.13");
factory.setUsername("admin");
factory.setPassword("password");
factory.setVirtualHost("/xdclass1");
factory.setPort(5672);
/**
* 消息生产者不用过多操作,只需要和交换机绑定即可
*/
try (//创建连接
Connection connection = factory.newConnection();
//创建信道
Channel channel = connection.createChannel()) {
//绑定交换机,直连交换机
channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);
String error = "我是错误日志";
String info = "我是info日志";
String debug = "我是debug日志";
channel.basicPublish(EXCHANGE_NAME, "errorRoutingKey", null, error.getBytes(StandardCharsets.UTF_8));
channel.basicPublish(EXCHANGE_NAME, "infoRoutingKey", null, info.getBytes(StandardCharsets.UTF_8));
channel.basicPublish(EXCHANGE_NAME, "debugRoutingKey", null, debug.getBytes(StandardCharsets.UTF_8));
System.out.println("消息发送成功");
}
}
}
- 消息消费者(两个节点)
xxxxxxxxxx
public class Recv1 {
private final static String EXCHANGE_NAME = "exchange_direct";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("10.211.55.13");
factory.setUsername("admin");
factory.setPassword("password");
factory.setVirtualHost("/xdclass1");
factory.setPort(5672);
//消费者一般不增加自动关闭
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//绑定交换机,fanout扇形,即广播类型
channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);
//获取队列
String queueName = channel.queueDeclare().getQueue();
//绑定队列和交换机,另外一个节点只绑定一个 errorRoutingKey
channel.queueBind(queueName,EXCHANGE_NAME,"errorRoutingKey");
channel.queueBind(queueName,EXCHANGE_NAME,"infoRoutingKey");
channel.queueBind(queueName,EXCHANGE_NAME,"debugRoutingKey");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
//自动确认消息
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}