一、订阅模式类型:
1.1 订阅模式示例图:
-
P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
-
C:消费者,消息的接受者,会一直等待消息到来。
-
Queue:消息队列,接收消息、缓存消息。
-
Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
Fanout:广播,将消息交给所有绑定到交换机的队列
Direct:定向,把消息交给符合指定routing key 的队列
Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
重点:Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
1.2 订阅模式小结:
订阅模式与前面的两种模式比较:多了一个角色Exchange交换机,接收生产者发送的消息并决定如何投递消息到其绑定的队列;消息的投递决定于交换机的类型。
交换机类型:广播(fanout)、定向(direct)、通配符(topic)
交换机只做消息转发,自身不存储数据。
二、 Publish/Subscribe发布与订阅模式
2.1 发布订阅模式:
1)每个消费者监听自己的队列。
2)生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收 到消息
3)发布与订阅模式特点:一个消息可以被多个消费者接收;其实是使用了订阅模式,交换机类型为:fanout广播
2.2 需求:
编写生产者、消费者代码并测试了解Publish/Subscribe发布与订阅模式的特点
步骤:
- 生产者(发送10个消息)
- 创建连接;
- 创建频道;
- 声明交换机(fanout);
- 声明队列;
- 队列绑定到交换机;
- 发送消息;
- 关闭资源
- 消费者(至少两个消费者)
- 创建连接;
- 创建频道;
- 声明交换机;
- 声明队列;
- 队列绑定到交换机;
- 创建消费者;
- 监听队列;
代码示例-生产者:
package publishsubscribe;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.study.rabbitmq.util.ConnectionUtil;
import java.io.IOException;
//发布与订阅模式 发送消息
public class Producer {
//交换机名称
static final String FANOUT_EXCHANGE = "fanout_exchange";
//队列名称
static final String FANOUT_QUEUE_1 = "fanout_queue_1";
static final String FANOUT_QUEUE_2 = "fanout_queue_2";
static final String QUEUE_NAME = "publish_queue";
public static void main(String[] args) throws Exception {
// 1. 创建连接
Connection connection = ConnectionUtil.getConnection();
// 2. 创建频道;
Channel channel = connection.createChannel();
// 3.声明交换机,参数1:交换机名称,参数2:交换机类型(FANOUT广播类型、direct 定向路由,topic 通配符)
channel.exchangeDeclare(FANOUT_EXCHANGE , BuiltinExchangeType.FANOUT);
// 4. 声明队列;
/**
* 参数1:队列名称
* 参数2:是否定义持久化队列(消息会持久化保存在服务器上)
* 参数3:是否独占本连接
* 参数4:是否在不使用的时候队列自动删除
* 参数5:其它参数
*/
channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, null);
channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, null);
//5 队列绑定到交换机(参数1:队列名称 参数2:交换机 参数3:路由key)
channel.queueBind(FANOUT_QUEUE_1,FANOUT_EXCHANGE,"");
channel.queueBind(FANOUT_QUEUE_2,FANOUT_EXCHANGE,"");
//6. 发送消息;
for(int i=1;i<=10;i++){
String message = "你好! publishSubscribe发布与订阅模式---"+i;
/**
* 参数1:交换机名称;如果没有则指定空字符串(表示使用默认的交换机)
* 参数2:路由key,简单模式中可以使用队列名称
* 参数3:消息其它属性
* 参数4:消息内容
*/
channel.basicPublish(FANOUT_EXCHANGE,"",null,message.getBytes());
System.out.println("已发送消息:" + message);
}
//6. 关闭资源
channel.close();
connection.close();
}
}
代码示例-消费者:
package publishsubscribe;
import com.rabbitmq.client.*;
import com.study.rabbitmq.util.ConnectionUtil;
import java.io.IOException;
/**
* 发布与订阅模式;消费者接收消息
*/
public class Consumer2 {
public static void main(String[] args) throws Exception {
// 1. 创建连接;(抽取一个获取连接的工具类)
Connection connection = ConnectionUtil.getConnection();
// 2. 创建频道;
Channel channel = connection.createChannel();
//3 声明交换机
channel.exchangeDeclare(Producer.FANOUT_EXCHANGE,BuiltinExchangeType.FANOUT);
// 4. 声明队列;
/**
* 参数1:队列名称
* 参数2:是否定义持久化队列(消息会持久化保存在服务器上)
* 参数3:是否独占本连接
* 参数4:是否在不使用的时候队列自动删除
* 参数5:其它参数
*/
channel.queueDeclare(Producer.FANOUT_QUEUE_2, true, false, false, null);
//5 队列绑定到交换机上
channel.queueBind(Producer.FANOUT_QUEUE_2, Producer.FANOUT_EXCHANGE,"");
// 6. 创建消费者(接收消息并处理消息);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key为:" + envelope.getRoutingKey());
//交换机
System.out.println("交换机为:" + envelope.getExchange());
//消息id
System.out.println("消息id为:" + envelope.getDeliveryTag());
//接收到的消息
System.out.println("消费者2---接收到的消息为:" + new String(body, "utf-8"));
}
};
// 6. 监听队列 (需要持续监听队列消息,所以不要关闭资源)
/**
* 参数1:队列名
* 参数2:是否要自动确认;设置为true表示消息接收到自动向MQ回复接收到了,MQ则会将消息从队列中删除;
* 如果设置为false则需要手动确认
* 参数3:消费者
*/
channel.basicConsume(Producer.FANOUT_QUEUE_2,true,defaultConsumer);
//不关闭资源,应该一直监听消息
// channel.close();
// connection.close();
}
}
三、发布与订阅模式小结:
发布与订阅模式:一个消息可以被多个消费者接收;一个消费者对于的队列,该队列只能被一个消费者监听。使用了订阅模式中交换机类型为:广播。