RabbitMQ结合JavaAPI简单使用

总结

  • 消息消费者只需要明确从哪个消息队列获取消息
  • exchange创建后 不能再创建相同名字+不同模式的
  • 多个模式可结合使用 可以直接发到指定队列,也可以发到交换机由对应策略转发到对应队列

RabbitMQ Java原生api使用

RabbitMQ结合JavaAPI简单使用

1.HelloWorld 简单模式

添加Virtual Hosts

RabbitMQ结合JavaAPI简单使用

简单模式消息生产者

public class Provider {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建链接工厂对象
        ConnectionFactory factory = new ConnectionFactory();
        // 2.设置RabbitMQ服务主机地址,默认localhost
        factory.setHost("localhost");
        // 3.设置RabbitMQ服务端口,默认5672
        factory.setPort(5672);
        // 4.设置虚拟主机名字,默认/
        factory.setVirtualHost("/demo1");
        // 5.设置用户连接名,默认guest
//        factory.setUsername("guest");
        // 6.设置链接密码,默认guest
//        factory.setPassword("guest");
        // 7.创建一个新链接
        Connection connection = factory.newConnection();
        // 8.创建消息通道
        Channel channel = connection.createChannel();
        // 9.创建队列
        channel.queueDeclare("simple_queue",true,false,false,null);
        // 10.创建消息
        String msg="simple queue demo";
        // 11.消息发送
        channel.basicPublish("","simple_queue",null,msg.getBytes());
        // 12.关闭资源
        channel.close();
        connection.close();
    }
}

简单模式消息消费者

public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建链接工厂对象
        ConnectionFactory factory = new ConnectionFactory();
        // 2.设置RabbitMQ服务主机地址,默认localhost
        factory.setHost("localhost");
        // 3.设置RabbitMQ服务端口,默认5672
        factory.setPort(5672);
        // 4.设置虚拟主机名字,默认/
        factory.setVirtualHost("/demo1");
        // 5.设置用户连接名,默认guest
        // 6.设置链接密码,默认guest
        // 7.创建一个新链接
        Connection connection = factory.newConnection();
        // 8.创建消息通道
        Channel channel = connection.createChannel();
        // 9.创建队列
        channel.queueDeclare("simple_queue",true,false,false,null);
        // 10.创建消费者,并设置消息处理
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 路由
                String routingKey = envelope.getRoutingKey();
                // 交换机
                String exchange = envelope.getExchange();
                // 消息id
                long deliveryTag = envelope.getDeliveryTag();
                // 消息体
                String message = new String(body, "UTF-8");
                System.out.println("路由:" + routingKey + ",交换机:" + exchange + ",消息id:" + deliveryTag + ",消息体:" + message);
                super.handleDelivery(consumerTag, envelope, properties, body);
            }
        };
        // 11.消息监听
        channel.basicConsume("simple_queue", true, consumer);
        // 12.关闭资源(不建议关闭,建议一直监听消息)
    }
}

2.Work queues 工作队列模式

// 与上面一样,不过就是创建多个消息消费者去监听同一个消息队列,消息分配为轮询式

3.Publish/subscribe 发布订阅模式

此模式下呢多出一个概念exchange(交换机)可以在交换机上绑定多个消息队列,而现在消息产生者将消息发送到交换机,由交换机调度给他下面的消息队列,此模式下交换机将会将消息发给所有与他绑定的队列

工作队列模式消息生产者

	    // 9.创建队列
        channel.queueDeclare("simple_queue1",true,false,false,null);
        channel.queueDeclare("simple_queue2",true,false,false,null);
        //  创建交换机:arg0,交换机名称  arg1,交换机类型(广播)
        channel.exchangeDeclare("demo3Exchange", BuiltinExchangeType.FANOUT);
        // 将队列绑定到交换机
        channel.queueBind("simple_queue1","demo3Exchange","");
        channel.queueBind("simple_queue2","demo3Exchange","");
        // 10.创建消息  ....

工作队列模式消息消费者

	class Consumer1
	...
	// 11.监听 消息队列simple_queue1
        channel.basicConsume("simple_queue1", true, consumer);
        ...
            
    class Consumer2
    ...
    // 11.监听 消息队列simple_queue2
        channel.basicConsume("simple_queue2", true, consumer);
        ...

4.Routing 路由模式

此模式下多出概念路由,基于上一模式,上一模式交换机会将接收到的消息发给所有绑定了的队列,此模式下接收到的消息会多一个参数Routing,会将消息转发到对应Routing的队列

路由模式消息生产者

// 9.创建队列
        channel.queueDeclare("simple_queue1",true,false,false,null);
        channel.queueDeclare("simple_queue2",true,false,false,null);
        //  创建交换机:arg0,交换机名称  arg1,交换机类型(路由对应)
        channel.exchangeDeclare("demo4Exchange", BuiltinExchangeType.DIRECT);
        // 将队列绑定到交换机
        // simple_queue1只会接收到routingKey 为error的消息
        channel.queueBind("simple_queue1","demo4Exchange","error");
        // simple_queue2会接收到routingKey 为info,warning,error的消息
        channel.queueBind("simple_queue2","demo4Exchange","info");
        channel.queueBind("simple_queue2","demo4Exchange","warning");
        channel.queueBind("simple_queue2","demo4Exchange","error");
        // 10.创建消息
        // 11.消息发送
        for (int i = 0; i < 100; i++) {
            // 创建消息
            String message = "routing_key:" + i;
            String routingKey = "";
            if (i%2 == 0){ // routing_key_queue1、routing_key_queue2 0、2、4、6、8
                routingKey = "error";
            }else if (i%5 == 0){    // routing_key_queue2:5
                routingKey = "info";
            }else { // 0、1、5
                routingKey = "warning";
            }
            message += "--->" + routingKey;
            // 消息发送
            channel.basicPublish("demo4Exchange", routingKey, null, message.getBytes());
        }

路由模式消息消费者

// 同上 消息消费者只需要监听对应的消息队列即可

5.Topic 通配符模式

此模式下在上面的基础上将routingKey改为可以使用通配符的模式
通配符规则:
多个单词之间以”.”分割
‘#’:匹配一个或多个词
:匹配不多不少恰好1个词
举例:
item.#:能够匹配item.insert.abc 或者 item.insert
item.
:只能匹配item.insert

通配符模式消息生产者

        // 9.创建队列
        channel.queueDeclare("simple_queue1",true,false,false,null);
        channel.queueDeclare("simple_queue2",true,false,false,null);
        //  创建交换机:arg0,交换机名称  arg1,交换机类型(主题)
        channel.exchangeDeclare("demo5Exchange", BuiltinExchangeType.TOPIC);
        // 将队列绑定到交换机
        channel.queueBind("simple_queue1","demo5Exchange","#");
        channel.queueBind("simple_queue2","demo5Exchange","www.#");
        channel.queueBind("simple_queue2","demo5Exchange","*.com");
        // 10.创建消息
        // 11.消息发送
        for (int i = 0; i < 100; i++) {
            // 创建消息
            String message = "routing_key:" + i;
            String routingKey = "";
            if (i%2 == 0){ // routing_key_queue1、routing_key_queue2 0、2、4、6、8
                routingKey = "www.baidu.com";
            }else if (i%5 == 0){    // routing_key_queue2:5
                routingKey = "jd.com";
            }else { // 0、1、5
                routingKey = "dqdwwfwevweevwe21e13r23dr2gerfdqw.dqefw122e.23f.2f.23.f.2.f2.f.24.ff2wf2qef.2";
            }
            message += "--->" + routingKey;

            // 消息发送
            channel.basicPublish("demo5Exchange", routingKey, null, message.getBytes());
        }

通配符模式消息消费者

// 同上 消息消费者只需要监听对应的消息队列即可
上一篇:Js | Each call to then returns a newly created promise object


下一篇:对vue中nextTick()的理解及使用场景说明