rabbitmq-发布订阅模式

【README】

本文po出 mq的发布订阅模式,及代码示例;

 

【1】intro

1) 角色: 有4个角色, 包括 生产者,消费者, 交换机 exchange(X), 队列;

rabbitmq-发布订阅模式

2)交换机: 一方面,接收生产者的消息,另一方面,处理消息,如发送给队列,或丢弃;这取决于 exchange类型;
3)exchange类型有如下3种:
fanout 广播, 把消费转发给所有 绑定到该交换机的所有队列;
direct 定向, 把消息转发给符合 指定 routing key 路由键的队列;
topic 通配符, 把消息交给 routing pattern(路由模式)的队列;
4)exchange 交换机, 只负责转发消息, 不具备存储消息的能力; 因此如果没有任何队列与 exchange 绑定, 或者没有符合规则的队列, 那么消息会丢失;

5)发布订阅模式:
5.1-每个消费者监听自己的队列;
5.2-生产者把消息发送给 broker, 由交换机把消息转发到绑定此交换机的所有队列;

6)交换机需要与队列绑定, 绑定之后,一个消息可以被多个消费者收到;

【2】代码(生产者1个,交换机exchange1个,但对应到2个队列,即消息有2个replication)

生产者


/**
 * 发布订阅模式生产者
 * 本文发布订阅模式使用的交换机类型为广播 fanout 
 * @author tang rong 
 */
public class PSProduer {
	/** 交换机类型 */
	static String FANOUT_EXCHANGE = "fanout_exchange";
	/** 队列名称1 */
	static String FANOUT_QUEUE_1 = "fanout_queue_1";
	/** 队列名称1 */
	static String FANOUT_QUEUE_2 = "fanout_queue_2";
	
	public static void main(String[] args) throws Exception {
		Connection conn = RBConnectionUtil.getConn(); // 创建连接
		Channel channel = conn.createChannel();  // 创建频道
		/**
		 * 声明交换机
		 * 参数1-交换机名称 
		 * 参数2-交换机类型(fanout, topic, direct, headers)
		 */
		channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);
		/**
		 * 创建队列
		 * @param1 队列名称
		 * @param2  是否持久化队列
		 * @param3 是否独占本次连接 
		 * @param4 是否在不使用的时候自动删除队列 
		 * @param5 队列其他参数  
		 */ 
		channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, null);
		channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, null);
		/**
		 * 队列绑定交换机 
		 */
		channel.queueBind(FANOUT_QUEUE_1, FANOUT_EXCHANGE, 	"");
		channel.queueBind(FANOUT_QUEUE_2, FANOUT_EXCHANGE, 	"");
		/**
		 * 发送消息 
		 */
		long temp = 1; 
		for (int i = 0; i < 1000; i++) { 
			String msg = "发布订阅模式消息,序号=" + (temp+i) + "时间=" + MyDateUtil.getNow();
			/**
			 * 参数1 交换机名称,没有指定则使用默认交换机 Default change 
			 * 参数2 路由key,简单模式可以传递队列名称 
			 * 参数3 消息其他属性 
			 * 参数4 消息内容 
			 */
			channel.basicPublish(FANOUT_EXCHANGE, "", null, msg.getBytes("UTF-8")); 
			System.out.println("生产者发送消息" + msg);  
		}  
		System.out.println("=== 生产者消息发送完成");
		/* 关闭资源 */
		channel.close();
		conn.close(); 
	}
}

消费者1


/**
 * 发布订阅模式消费者1
 * @author tang rong 
 */
public class PSConsumer1 {
	/** 交换机类型 */
	static String FANOUT_EXCHANGE = "fanout_exchange";
	/** 队列名称1 */
	static String FANOUT_QUEUE_1 = "fanout_queue_1";
	
	public static void main(String[] args) throws Exception {
		Connection conn = RBConnectionUtil.getConn(); // 创建连接 
		Channel channel = conn.createChannel();  // 创建队列 
		channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT); // 创建交换机
		/**
		 * 创建队列 
		 * 参数1 队列名称 
		 * 参数2 是否持久化
		 * 参数3 是否独占本连接 
		 * 参数4 是否在不使用的时候自动删除队列
		 * 参数5 队列其他参数 
		 */
		channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, null);
		/**
		 * 队列绑定交换机
		 */
		channel.queueBind(FANOUT_QUEUE_1, FANOUT_EXCHANGE, "");
		
		/* 创建消费者,设置消息处理逻辑 */
		Consumer consumer = new DefaultConsumer(channel) {
			/**
			 * @param consumerTag 消费者标签,在 channel.basicConsume 可以指定   
			 * @param envelope 消息包内容,包括消息id,消息routingkey,交换机,消息和重转标记(收到消息失败后是否需要重新发送) 
			 * @param properties 基本属性
			 * @param body 消息字节数组  
			 */
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope,
					BasicProperties properties, byte[] body) throws IOException {
				System.out.println("=== 消费者1 start ===");
				
				System.out.println("路由key=" + envelope.getRoutingKey());
				System.out.println("交换机=" + envelope.getExchange());
				System.out.println("消息id=" + envelope.getDeliveryTag()); 
				String message = new String(body, "UTF-8");
				System.out.println(String.format("消费者收到的消息【%s】", message)); 
				System.out.println("=== 消费者1 end ===\n"); 
			} 
		};
		/**
		 * 监听消息
		 * 参数1 队列名称 
		 * 参数2 是否自动确认, 设置为true表示消息接收到自动向 mq回复ack;mq收到ack后会删除消息; 设置为false则需要手动发送ack; 
		 * 参数3 消息接收后的回调 
		 */
		channel.basicConsume(FANOUT_QUEUE_1, true, consumer); 
	}
	
}

消费者2

/**
 * 发布订阅模式消费者
 * @author tang rong 
 */
public class PSConsumer2 {
	/** 交换机类型 */
	static String FANOUT_EXCHANGE = "fanout_exchange";
	/** 队列名称1 */
	static String FANOUT_QUEUE_2 = "fanout_queue_2";
	
	public static void main(String[] args) throws Exception {
		Connection conn = RBConnectionUtil.getConn(); // 创建连接 
		Channel channel = conn.createChannel();  // 创建队列 
		channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT); // 创建交换机
		/**
		 * 创建队列 
		 * 参数1 队列名称 
		 * 参数2 是否持久化
		 * 参数3 是否独占本连接 
		 * 参数4 是否在不使用的时候自动删除队列
		 * 参数5 队列其他参数 
		 */
		channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, null);
		/**
		 * 队列绑定交换机
		 */
		channel.queueBind(FANOUT_QUEUE_2, FANOUT_EXCHANGE, "");
		
		/* 创建消费者,设置消息处理逻辑 */
		Consumer consumer = new DefaultConsumer(channel) {
			/**
			 * @param consumerTag 消费者标签,在 channel.basicConsume 可以指定   
			 * @param envelope 消息包内容,包括消息id,消息routingkey,交换机,消息和重转标记(收到消息失败后是否需要重新发送) 
			 * @param properties 基本属性
			 * @param body 消息字节数组  
			 */
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope,
					BasicProperties properties, byte[] body) throws IOException {
				System.out.println("=== 消费者2 start ===");
				
				System.out.println("路由key=" + envelope.getRoutingKey());
				System.out.println("交换机=" + envelope.getExchange());
				System.out.println("消息id=" + envelope.getDeliveryTag()); 
				String message = new String(body, "UTF-8");
				System.out.println(String.format("消费者收到的消息【%s】", message)); 
				System.out.println("=== 消费者2 end ===\n"); 
			} 
		};
		/**
		 * 监听消息
		 * 参数1 队列名称 
		 * 参数2 是否自动确认, 设置为true表示消息接收到自动向 mq回复ack;mq收到ack后会删除消息; 设置为false则需要手动发送ack; 
		 * 参数3 消息接收后的回调 
		 */
		channel.basicConsume(FANOUT_QUEUE_2, true, consumer); 
	}
	
}

 

【3】小结

1)发布订阅模式与工作模式的区别;
区别1)工作队列模式不需要定义交换机, 发布订阅模式需要;
区别2)工作队列模式的生产者向队列发送消息(底层使用默认交换机),  发布订阅模式的生产者向交换机发送消息;
区别3)工作队列模式的队列不需要与交换机绑定(底层与默认交换机绑定), 发布订阅模式中的队列需要与交换机绑定;

2)默认交换机

AMQP default

rabbitmq-发布订阅模式

 

 

 

 

 

 

 

 

上一篇:springboot2.x 接入阿里云市场短信发送


下一篇:苏州办毕业证j