AMQP-RabbitMQ/3/发布订阅模式

3. 发布订阅模式 Publish/Subscribe - 全集监听fanout

一次向多个消费者发送消息

  • 图示
    AMQP-RabbitMQ/3/发布订阅模式

个人理解

  • 生产者定义Exchange,同时将Exchange的类型定义为fanout,并向该Exchange发送消息。
  • 消费者定义队列Queue,并将队列与该交换机进行绑定。之后交换机付负责将消息全量推送给每一个与之绑定的Queue

**RabbitMQ中消息传递模型的核心思想是生产者永远不会将任何消息直接发送到队列。实际上,生产者通常甚至不知道消息是否会被传递到任何队列。
相反,生产者只能向Exchange发送消息。Exchange所做的工作非常简单。一方面,它接收来自生产者的消息,另一方面将它们推送到队列。Exchange必须确切知道如何处理它收到的消息。它应该附加到特定队列吗?它应该附加到多个队列吗?或者它应该被丢弃。其规则由交换类型定义 。**
有几种交换类型可供选择:direct ,topic ,headers andfanout

  • fanout: 将它接收到的消息广播到所有绑定到它的消息队列上。(忽略路由键routingKey)
  • 生产者 - 发布者
package com.futao.springmvcdemo.mq.rabbit.ps;

import com.futao.springmvcdemo.mq.rabbit.ExchangeTypeEnum;
import com.futao.springmvcdemo.mq.rabbit.RabbitMqConnectionTools;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import lombok.Cleanup;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

/**
 * 发布订阅-发布者
 *
 * @author futao
 * Created on 2019-04-22.
 */
@Slf4j
public class Publisher {
    @SneakyThrows
    public static void main(String[] args) {
        @Cleanup
        Connection connection = RabbitMqConnectionTools.getConnection();
        @Cleanup
        Channel channel = connection.createChannel();
        //定义交换器类型
        channel.exchangeDeclare(ExchangeTypeEnum.FANOUT.getExchangeName(), BuiltinExchangeType.FANOUT);
        String msg = "Hello RabbitMq!";
        for (int i = 0; i < 20; i++) {
            channel.basicPublish(ExchangeTypeEnum.FANOUT.getExchangeName(), "", null, (msg + i).getBytes());
            log.info("Send msg:[{}] success", (msg + i));
        }
    }
}
  • 消费者1 - 订阅者1
package com.futao.springmvcdemo.mq.rabbit.ps;

import com.futao.springmvcdemo.mq.rabbit.ExchangeTypeEnum;
import com.futao.springmvcdemo.mq.rabbit.RabbitMqConnectionTools;
import com.futao.springmvcdemo.mq.rabbit.RabbitMqQueueEnum;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

/**
 * 发布订阅-订阅者
 *
 * @author futao
 * Created on 2019-04-22.
 */
@Slf4j
public class SubscriberOne {
    @SneakyThrows
    public static void main(String[] args) {
        Channel channel = RabbitMqConnectionTools.getChannel();
        //开启持久化队列
        boolean durable = true;
        channel.queueDeclare(RabbitMqQueueEnum.EXCHANGE_QUEUE_FANOUT_ONE.getQueueName(), durable, false, false, null);
        //定义交换器类型
        channel.exchangeDeclare(ExchangeTypeEnum.FANOUT.getExchangeName(), BuiltinExchangeType.FANOUT);
        //将消息队列与Exchange交换器绑定
        channel.queueBind(RabbitMqQueueEnum.EXCHANGE_QUEUE_FANOUT_ONE.getQueueName(), ExchangeTypeEnum.FANOUT.getExchangeName(), "");
        //告诉rabbitmq一次只发送一条消息,并且在前一个消息未被处理或者消费之前,不继续发送下一个消息
        channel.basicQos(1);
        log.info("Waiting for message...");
        DeliverCallback deliverCallback = ((consumerTag, message) -> {
            log.info("收到消息:[{}],tag:[{}]", new String(message.getBody()), consumerTag);
            //acknowledgment应答
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
            try {
                Thread.sleep(1000);
            } catch (Exception e) {

            }
        });
        //关闭自动应答
        boolean autoAck = false;
        channel.basicConsume(RabbitMqQueueEnum.EXCHANGE_QUEUE_FANOUT_ONE.getQueueName(), autoAck, deliverCallback, consumerTag -> {
        });
    }
}
  • 消费者2 - 订阅者2
package com.futao.springmvcdemo.mq.rabbit.ps;

import com.futao.springmvcdemo.mq.rabbit.ExchangeTypeEnum;
import com.futao.springmvcdemo.mq.rabbit.RabbitMqConnectionTools;
import com.futao.springmvcdemo.mq.rabbit.RabbitMqQueueEnum;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

/**
 * 发布订阅-订阅者
 *
 * @author futao
 * Created on 2019-04-22.
 */
@Slf4j
public class SubscriberTwo {
    @SneakyThrows
    public static void main(String[] args) {
        Channel channel = RabbitMqConnectionTools.getChannel();
        //开启持久化队列
        boolean durable = true;
        channel.queueDeclare(RabbitMqQueueEnum.EXCHANGE_QUEUE_FANOUT_TWO.getQueueName(), durable, false, false, null);
        //定义交换器类型为fanout
        channel.exchangeDeclare(ExchangeTypeEnum.FANOUT.getExchangeName(), BuiltinExchangeType.FANOUT);
        //将消息队列与Exchange交换器进行绑定
        channel.queueBind(RabbitMqQueueEnum.EXCHANGE_QUEUE_FANOUT_TWO.getQueueName(), ExchangeTypeEnum.FANOUT.getExchangeName(), "");
        //告诉rabbitmq一次只发送一条消息,并且在前一个消息未被处理或者消费之前,不继续发送下一个消息
        channel.basicQos(1);
        log.info("Waiting for message...");
        DeliverCallback deliverCallback = ((consumerTag, message) -> {
            log.info("收到消息:[{}],tag:[{}]", new String(message.getBody()), consumerTag);
            //acknowledgment应答
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
            try {
                Thread.sleep(2000);
            } catch (Exception e) {

            }
        });
        //关闭自动应答
        boolean autoAck = false;
        channel.basicConsume(RabbitMqQueueEnum.EXCHANGE_QUEUE_FANOUT_TWO.getQueueName(), autoAck, deliverCallback, consumerTag -> {
        });
    }
}
  • 注意

    • 没有在发布者定义队列,而是定义了交换器Exchange。发布者将消息发送到Exchange,而不是Queue
    • 在订阅者端,每个订阅者定义了自己的消息队列,并且将自己的消息队列与Exchange进行绑定。则在每次发布者向相应的Exchange发送消息的时候,Exchange会将消息发送至订阅了该Exchange的队列。(即:每个订阅者收到的消息都是一样的)
  • 测试结果
>>> 订阅者1
[main] INFO mq.rabbit.ps.SubscriberOne - Waiting for message...
[pool-1-thread-4] INFO mq.rabbit.ps.SubscriberOne - 收到消息:[Hello RabbitMq!0],tag:[amq.ctag-Fc_B_CoCYUBoBhEcOlC7vw]
[pool-1-thread-5] INFO mq.rabbit.ps.SubscriberOne - 收到消息:[Hello RabbitMq!1],tag:[amq.ctag-Fc_B_CoCYUBoBhEcOlC7vw]
[pool-1-thread-6] INFO mq.rabbit.ps.SubscriberOne - 收到消息:[Hello RabbitMq!2],tag:[amq.ctag-Fc_B_CoCYUBoBhEcOlC7vw]
[pool-1-thread-7] INFO mq.rabbit.ps.SubscriberOne - 收到消息:[Hello RabbitMq!3],tag:[amq.ctag-Fc_B_CoCYUBoBhEcOlC7vw]
[pool-1-thread-8] INFO mq.rabbit.ps.SubscriberOne - 收到消息:[Hello RabbitMq!4],tag:[amq.ctag-Fc_B_CoCYUBoBhEcOlC7vw]
[pool-1-thread-9] INFO mq.rabbit.ps.SubscriberOne - 收到消息:[Hello RabbitMq!5],tag:[amq.ctag-Fc_B_CoCYUBoBhEcOlC7vw]
[pool-1-thread-10] INFO mq.rabbit.ps.SubscriberOne - 收到消息:[Hello RabbitMq!17],tag:[amq.ctag-Fc_B_CoCYUBoBhEcOlC7vw]
[pool-1-thread-22] INFO mq.rabbit.ps.SubscriberOne - 收到消息:[Hello RabbitMq!18],tag:[amq.ctag-Fc_B_CoCYUBoBhEcOlC7vw]
[pool-1-thread-23] INFO mq.rabbit.ps.SubscriberOne - 收到消息:[Hello RabbitMq!19],tag:[amq.ctag-Fc_B_CoCYUBoBhEcOlC7vw]


>>> 订阅者2
[main] INFO mq.rabbit.ps.SubscriberTwo - Waiting for message...
[pool-1-thread-4] INFO mq.rabbit.ps.SubscriberTwo - 收到消息:[Hello RabbitMq!0],tag:[amq.ctag-ip59jtcKJBQFC2KU9DperQ]
[pool-1-thread-5] INFO mq.rabbit.ps.SubscriberTwo - 收到消息:[Hello RabbitMq!1],tag:[amq.ctag-ip59jtcKJBQFC2KU9DperQ]
[pool-1-thread-6] INFO mq.rabbit.ps.SubscriberTwo - 收到消息:[Hello RabbitMq!2],tag:[amq.ctag-ip59jtcKJBQFC2KU9DperQ]
[pool-1-thread-7] INFO mq.rabbit.ps.SubscriberTwo - 收到消息:[Hello RabbitMq!3],tag:[amq.ctag-ip59jtcKJBQFC2KU9DperQ]
[pool-1-thread-8] INFO mq.rabbit.ps.SubscriberTwo - 收到消息:[Hello RabbitMq!4],tag:[amq.ctag-ip59jtcKJBQFC2KU9DperQ]
[pool-1-thread-9] INFO mq.rabbit.ps.SubscriberTwo - 收到消息:[Hello RabbitMq!5],tag:[amq.ctag-ip59jtcKJBQFC2KU9DperQ]
[pool-1-thread-10] INFO mq.rabbit.ps.SubscriberTwo - 收到消息:[Hello RabbitMq!17],tag:[amq.ctag-ip59jtcKJBQFC2KU9DperQ]
[pool-1-thread-22] INFO mq.rabbit.ps.SubscriberTwo - 收到消息:[Hello RabbitMq!18],tag:[amq.ctag-ip59jtcKJBQFC2KU9DperQ]
[pool-1-thread-23] INFO mq.rabbit.ps.SubscriberTwo - 收到消息:[Hello RabbitMq!19],tag:[amq.ctag-ip59jtcKJBQFC2KU9DperQ]
上一篇:AMQP-RabbitMQ/5/主题模式


下一篇:谈谈个人关于程序开发中,“零配置”和“有配置”的看法