RabbitMq- Publish/Subscribe订阅模式(三)

一、订阅模式类型:

1.1 订阅模式示例图:

RabbitMq- Publish/Subscribe订阅模式(三)

  • P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)

  • C:消费者,消息的接受者,会一直等待消息到来。

  • Queue:消息队列,接收消息、缓存消息。

  • Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:

    Fanout:广播,将消息交给所有绑定到交换机的队列

    Direct:定向,把消息交给符合指定routing key 的队列

    Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

重点:Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

1.2 订阅模式小结

订阅模式与前面的两种模式比较:多了一个角色Exchange交换机,接收生产者发送的消息并决定如何投递消息到其绑定的队列;消息的投递决定于交换机的类型。

交换机类型:广播(fanout)、定向(direct)、通配符(topic)

交换机只做消息转发,自身不存储数据。

二、 Publish/Subscribe发布与订阅模式

2.1 发布订阅模式:

1)每个消费者监听自己的队列。

2)生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收 到消息

3)发布与订阅模式特点:一个消息可以被多个消费者接收;其实是使用了订阅模式,交换机类型为:fanout广播

RabbitMq- Publish/Subscribe订阅模式(三)

2.2 需求:

编写生产者、消费者代码并测试了解Publish/Subscribe发布与订阅模式的特点

步骤:

  • 生产者(发送10个消息)
    1. 创建连接;
    2. 创建频道;
    3. 声明交换机(fanout);
    4. 声明队列;
    5. 队列绑定到交换机;
    6. 发送消息;
    7. 关闭资源
  • 消费者(至少两个消费者)
    1. 创建连接;
    2. 创建频道;
    3. 声明交换机;
    4. 声明队列;
    5. 队列绑定到交换机;
    6. 创建消费者;
    7. 监听队列;

代码示例-生产者:

package publishsubscribe;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.study.rabbitmq.util.ConnectionUtil;

import java.io.IOException;

//发布与订阅模式 发送消息

public class Producer {
    //交换机名称
    static final String FANOUT_EXCHANGE = "fanout_exchange";
    //队列名称
    static final String FANOUT_QUEUE_1 = "fanout_queue_1";
    static final String FANOUT_QUEUE_2 = "fanout_queue_2";

    static final String QUEUE_NAME = "publish_queue";

    public static void main(String[] args) throws Exception  {
        // 1. 创建连接
        Connection connection = ConnectionUtil.getConnection();
        // 2. 创建频道;
        Channel channel = connection.createChannel();
        // 3.声明交换机,参数1:交换机名称,参数2:交换机类型(FANOUT广播类型、direct 定向路由,topic 通配符)
        channel.exchangeDeclare(FANOUT_EXCHANGE , BuiltinExchangeType.FANOUT);
//   4. 声明队列;
        /**
         * 参数1:队列名称
         * 参数2:是否定义持久化队列(消息会持久化保存在服务器上)
         * 参数3:是否独占本连接
         * 参数4:是否在不使用的时候队列自动删除
         * 参数5:其它参数
         */
        channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, null);
        channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, null);

        //5 队列绑定到交换机(参数1:队列名称  参数2:交换机  参数3:路由key)
        channel.queueBind(FANOUT_QUEUE_1,FANOUT_EXCHANGE,"");
        channel.queueBind(FANOUT_QUEUE_2,FANOUT_EXCHANGE,"");

            //6. 发送消息;
        for(int i=1;i<=10;i++){
                String message = "你好! publishSubscribe发布与订阅模式---"+i;
                /**
                 * 参数1:交换机名称;如果没有则指定空字符串(表示使用默认的交换机)
                 * 参数2:路由key,简单模式中可以使用队列名称
                 * 参数3:消息其它属性
                 * 参数4:消息内容
                 */
                channel.basicPublish(FANOUT_EXCHANGE,"",null,message.getBytes());
                System.out.println("已发送消息:" + message);

            }
            //6. 关闭资源
            channel.close();
            connection.close();
            }

}

代码示例-消费者:

package publishsubscribe;
import com.rabbitmq.client.*;
import com.study.rabbitmq.util.ConnectionUtil;

import java.io.IOException;


/**
 * 发布与订阅模式;消费者接收消息
 */
public class Consumer2 {
    public static void main(String[] args) throws Exception {
//        1. 创建连接;(抽取一个获取连接的工具类)
        Connection connection = ConnectionUtil.getConnection();
//        2. 创建频道;
        Channel channel = connection.createChannel();
        //3 声明交换机
        channel.exchangeDeclare(Producer.FANOUT_EXCHANGE,BuiltinExchangeType.FANOUT);
//        4. 声明队列;
        /**
         * 参数1:队列名称
         * 参数2:是否定义持久化队列(消息会持久化保存在服务器上)
         * 参数3:是否独占本连接
         * 参数4:是否在不使用的时候队列自动删除
         * 参数5:其它参数
         */
        channel.queueDeclare(Producer.FANOUT_QUEUE_2, true, false, false, null);
        //5 队列绑定到交换机上
        channel.queueBind(Producer.FANOUT_QUEUE_2, Producer.FANOUT_EXCHANGE,"");
//        6. 创建消费者(接收消息并处理消息);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //路由key
                System.out.println("路由key为:" + envelope.getRoutingKey());
                //交换机
                System.out.println("交换机为:" + envelope.getExchange());
                //消息id
                System.out.println("消息id为:" + envelope.getDeliveryTag());
                //接收到的消息
                System.out.println("消费者2---接收到的消息为:" + new String(body, "utf-8"));
            }
        };
//        6. 监听队列   (需要持续监听队列消息,所以不要关闭资源)
        /**
         * 参数1:队列名
         * 参数2:是否要自动确认;设置为true表示消息接收到自动向MQ回复接收到了,MQ则会将消息从队列中删除;
         * 如果设置为false则需要手动确认
         * 参数3:消费者
         */
        channel.basicConsume(Producer.FANOUT_QUEUE_2,true,defaultConsumer);
        //不关闭资源,应该一直监听消息
        // channel.close();
        // connection.close();


    }

}

三、发布与订阅模式小结:

发布与订阅模式:一个消息可以被多个消费者接收;一个消费者对于的队列,该队列只能被一个消费者监听。使用了订阅模式中交换机类型为:广播。

上一篇:Warning: Subscription For Node Down Event Still Pending


下一篇:【山大智云项目日志】Seahub+Proset分析(9)