【README】
本文po出 mq的发布订阅模式,及代码示例;
【1】intro
1) 角色: 有4个角色, 包括 生产者,消费者, 交换机 exchange(X), 队列;
2)交换机: 一方面,接收生产者的消息,另一方面,处理消息,如发送给队列,或丢弃;这取决于 exchange类型;
3)exchange类型有如下3种:
fanout 广播, 把消费转发给所有 绑定到该交换机的所有队列;
direct 定向, 把消息转发给符合 指定 routing key 路由键的队列;
topic 通配符, 把消息交给 routing pattern(路由模式)的队列;
4)exchange 交换机, 只负责转发消息, 不具备存储消息的能力; 因此如果没有任何队列与 exchange 绑定, 或者没有符合规则的队列, 那么消息会丢失;
5)发布订阅模式:
5.1-每个消费者监听自己的队列;
5.2-生产者把消息发送给 broker, 由交换机把消息转发到绑定此交换机的所有队列;
6)交换机需要与队列绑定, 绑定之后,一个消息可以被多个消费者收到;
【2】代码(生产者1个,交换机exchange1个,但对应到2个队列,即消息有2个replication)
生产者
/**
* 发布订阅模式生产者
* 本文发布订阅模式使用的交换机类型为广播 fanout
* @author tang rong
*/
public class PSProduer {
/** 交换机类型 */
static String FANOUT_EXCHANGE = "fanout_exchange";
/** 队列名称1 */
static String FANOUT_QUEUE_1 = "fanout_queue_1";
/** 队列名称1 */
static String FANOUT_QUEUE_2 = "fanout_queue_2";
public static void main(String[] args) throws Exception {
Connection conn = RBConnectionUtil.getConn(); // 创建连接
Channel channel = conn.createChannel(); // 创建频道
/**
* 声明交换机
* 参数1-交换机名称
* 参数2-交换机类型(fanout, topic, direct, headers)
*/
channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);
/**
* 创建队列
* @param1 队列名称
* @param2 是否持久化队列
* @param3 是否独占本次连接
* @param4 是否在不使用的时候自动删除队列
* @param5 队列其他参数
*/
channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, null);
channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, null);
/**
* 队列绑定交换机
*/
channel.queueBind(FANOUT_QUEUE_1, FANOUT_EXCHANGE, "");
channel.queueBind(FANOUT_QUEUE_2, FANOUT_EXCHANGE, "");
/**
* 发送消息
*/
long temp = 1;
for (int i = 0; i < 1000; i++) {
String msg = "发布订阅模式消息,序号=" + (temp+i) + "时间=" + MyDateUtil.getNow();
/**
* 参数1 交换机名称,没有指定则使用默认交换机 Default change
* 参数2 路由key,简单模式可以传递队列名称
* 参数3 消息其他属性
* 参数4 消息内容
*/
channel.basicPublish(FANOUT_EXCHANGE, "", null, msg.getBytes("UTF-8"));
System.out.println("生产者发送消息" + msg);
}
System.out.println("=== 生产者消息发送完成");
/* 关闭资源 */
channel.close();
conn.close();
}
}
消费者1
/**
* 发布订阅模式消费者1
* @author tang rong
*/
public class PSConsumer1 {
/** 交换机类型 */
static String FANOUT_EXCHANGE = "fanout_exchange";
/** 队列名称1 */
static String FANOUT_QUEUE_1 = "fanout_queue_1";
public static void main(String[] args) throws Exception {
Connection conn = RBConnectionUtil.getConn(); // 创建连接
Channel channel = conn.createChannel(); // 创建队列
channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT); // 创建交换机
/**
* 创建队列
* 参数1 队列名称
* 参数2 是否持久化
* 参数3 是否独占本连接
* 参数4 是否在不使用的时候自动删除队列
* 参数5 队列其他参数
*/
channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, null);
/**
* 队列绑定交换机
*/
channel.queueBind(FANOUT_QUEUE_1, FANOUT_EXCHANGE, "");
/* 创建消费者,设置消息处理逻辑 */
Consumer consumer = new DefaultConsumer(channel) {
/**
* @param consumerTag 消费者标签,在 channel.basicConsume 可以指定
* @param envelope 消息包内容,包括消息id,消息routingkey,交换机,消息和重转标记(收到消息失败后是否需要重新发送)
* @param properties 基本属性
* @param body 消息字节数组
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body) throws IOException {
System.out.println("=== 消费者1 start ===");
System.out.println("路由key=" + envelope.getRoutingKey());
System.out.println("交换机=" + envelope.getExchange());
System.out.println("消息id=" + envelope.getDeliveryTag());
String message = new String(body, "UTF-8");
System.out.println(String.format("消费者收到的消息【%s】", message));
System.out.println("=== 消费者1 end ===\n");
}
};
/**
* 监听消息
* 参数1 队列名称
* 参数2 是否自动确认, 设置为true表示消息接收到自动向 mq回复ack;mq收到ack后会删除消息; 设置为false则需要手动发送ack;
* 参数3 消息接收后的回调
*/
channel.basicConsume(FANOUT_QUEUE_1, true, consumer);
}
}
消费者2
/**
* 发布订阅模式消费者
* @author tang rong
*/
public class PSConsumer2 {
/** 交换机类型 */
static String FANOUT_EXCHANGE = "fanout_exchange";
/** 队列名称1 */
static String FANOUT_QUEUE_2 = "fanout_queue_2";
public static void main(String[] args) throws Exception {
Connection conn = RBConnectionUtil.getConn(); // 创建连接
Channel channel = conn.createChannel(); // 创建队列
channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT); // 创建交换机
/**
* 创建队列
* 参数1 队列名称
* 参数2 是否持久化
* 参数3 是否独占本连接
* 参数4 是否在不使用的时候自动删除队列
* 参数5 队列其他参数
*/
channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, null);
/**
* 队列绑定交换机
*/
channel.queueBind(FANOUT_QUEUE_2, FANOUT_EXCHANGE, "");
/* 创建消费者,设置消息处理逻辑 */
Consumer consumer = new DefaultConsumer(channel) {
/**
* @param consumerTag 消费者标签,在 channel.basicConsume 可以指定
* @param envelope 消息包内容,包括消息id,消息routingkey,交换机,消息和重转标记(收到消息失败后是否需要重新发送)
* @param properties 基本属性
* @param body 消息字节数组
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body) throws IOException {
System.out.println("=== 消费者2 start ===");
System.out.println("路由key=" + envelope.getRoutingKey());
System.out.println("交换机=" + envelope.getExchange());
System.out.println("消息id=" + envelope.getDeliveryTag());
String message = new String(body, "UTF-8");
System.out.println(String.format("消费者收到的消息【%s】", message));
System.out.println("=== 消费者2 end ===\n");
}
};
/**
* 监听消息
* 参数1 队列名称
* 参数2 是否自动确认, 设置为true表示消息接收到自动向 mq回复ack;mq收到ack后会删除消息; 设置为false则需要手动发送ack;
* 参数3 消息接收后的回调
*/
channel.basicConsume(FANOUT_QUEUE_2, true, consumer);
}
}
【3】小结
1)发布订阅模式与工作模式的区别;
区别1)工作队列模式不需要定义交换机, 发布订阅模式需要;
区别2)工作队列模式的生产者向队列发送消息(底层使用默认交换机), 发布订阅模式的生产者向交换机发送消息;
区别3)工作队列模式的队列不需要与交换机绑定(底层与默认交换机绑定), 发布订阅模式中的队列需要与交换机绑定;
2)默认交换机
AMQP default