第八章 玩转RabbitMQ 路由、主题模式实战和总结

第1集 玩转RabbitMQ的路由模式和应用场景

简介:RabbitMQ的路由模式和应用场景

  • 什么是rabbitmq的路由模式

    • 文档:https://www.rabbitmq.com/tutorials/tutorial-four-java.html
    • 交换机类型是Direct
    • 队列和交换机绑定,需要指定一个路由key( 也叫Bingding Key)
    • 消息生产者发送消息给交换机,需要指定routingKey
    • 交换机根据消息的路由key,转发给对应的队列

第八章 玩转RabbitMQ 路由、主题模式实战和总结

  • 例子:日志采集系统 ELK

    • 一个队列收集error信息-》告警
    • 一个队列收集全部信息-》日常使用

 

 

 

 

 

 

 

 

 

 

 

 

第2集 RabbitMQ的路由模式代码实战

简介:RabbitMQ路由模式代码实战

  • 消息生产者
     
xxxxxxxxxx
       
public class Send {
    private final static String EXCHANGE_NAME = "exchange_direct";
    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("10.211.55.13");
        factory.setUsername("admin");
        factory.setPassword("password");
        factory.setVirtualHost("/xdclass1");
        factory.setPort(5672);
        /**
         * 消息生产者不用过多操作,只需要和交换机绑定即可
         */
        try (//创建连接
             Connection connection = factory.newConnection();
             //创建信道
             Channel channel = connection.createChannel()) {
            //绑定交换机,直连交换机
            channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);
            String error = "我是错误日志";
            String info = "我是info日志";
            String debug = "我是debug日志";
            channel.basicPublish(EXCHANGE_NAME, "errorRoutingKey", null, error.getBytes(StandardCharsets.UTF_8));
            channel.basicPublish(EXCHANGE_NAME, "infoRoutingKey", null, info.getBytes(StandardCharsets.UTF_8));
            channel.basicPublish(EXCHANGE_NAME, "debugRoutingKey", null, debug.getBytes(StandardCharsets.UTF_8));
            System.out.println("消息发送成功");
        }
    }
}
   

 

  • 消息消费者(两个节点)
     
xxxxxxxxxx
       
public class Recv1 {
    private final static String EXCHANGE_NAME = "exchange_direct";
    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("10.211.55.13");
        factory.setUsername("admin");
        factory.setPassword("password");
        factory.setVirtualHost("/xdclass1");
        factory.setPort(5672);
        //消费者一般不增加自动关闭
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //绑定交换机,fanout扇形,即广播类型
        channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);
        //获取队列
        String queueName = channel.queueDeclare().getQueue();
        //绑定队列和交换机,另外一个节点只绑定一个 errorRoutingKey
        channel.queueBind(queueName,EXCHANGE_NAME,"errorRoutingKey");
        channel.queueBind(queueName,EXCHANGE_NAME,"infoRoutingKey");
        channel.queueBind(queueName,EXCHANGE_NAME,"debugRoutingKey");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        //自动确认消息
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    }
}
   

 

 

第八章 玩转RabbitMQ 路由、主题模式实战和总结

第八章 玩转RabbitMQ 路由、主题模式实战和总结

上一篇:设计模式-工厂模式(Factory )


下一篇:Direct类型交换机