1.背景
了解过RabbitMQ的Fanout模式,应该知道它原本的Fanout模式就是用来做广播的。但是它的广播有一点区别,来回顾下它的含义:Fanout类型没有路由键的概念,只要队列绑定到了改exchange上面,就会接收到所有的消息。
使用过程一般就是先new 出一个Fanout类型的交换机,然后往这个交换机上绑定多个队列queue,不同的消费者各自监听不同的队列,这就实现了广播效果,因为同一个消息,会分发到所有队列中。
举个例子:
应用A监听了队列A,应用B监听了队列B,Fanout类型交换机同时绑定了队列A和B.假设生产者端发送了一条消息到Fanout类型交换机,交换机就会把消息分发到所有队列,这时应用A和应用B会收到同一条消息,这就是广播。
说了上面一大堆,只是为了强调,对于RabbitMQ的原本Fanout模式,它的设计就是多个消费者必须监听不同的队列,多个消费者之间才会形成广播关系。
那么问题来了,假如在Fanout工作模式下,多个消费者同时监听的是同一个队列,会怎样?实践过的同学应该都知道,这种情况下,这些消费者会形成竞争关系,现象是同一个消息只会被其中一个消费者接收,达不到广播的效果。。
2.需求
假如现在有一个需求,要做到对同一个应用的多个节点进行广播,怎么实现?
注意,这里所说的同一个应用多个节点,通俗点理解就是一个war包,布在多个服务器节点上。
在实际部署集群时,为了高可用,同一个应用可能会部署多个节点,那假如工程里已经通过配置定义某个队列,那多个节点它们定义的队列就会是相同的,那按照上面的背景,那这些节点间肯定就会存在竞争关系,即便是Fanout模式的交换机,一条消息也只能被其中一个节点接收,其他节点收不到,达不到广播的效果。那该如何做?
相信看到这里,有人会问,为何会有 对同一个应用的多个节点进行广播的需求场景?为什么要有这个需求。生产中的业务系统很多,自然而然场景就很多。
举两个经典的例子:
1.想要同时刷新所有节点的缓存
业务系统离不开缓存,有时会用内存缓存,假如我要刷新所有节点的内存缓存,多个节点前可能有负载均衡例如nginx之类的,我只需要访问其中一个节点,然后让这个节点做广播通知所有其他节点刷缓存。(广播刷缓存)
2.websocket会话寻找
websocket是比较受欢迎的实时消息推送方案。用过websocket应该知道,websocket只能与多个节点中的其中一个节点做长连接会话保持,也就是说用户的会话只会存在于一个节点上,假设服务端要主动向用户推一条消息,必须要知道用户的会话在哪个节点上,怎么得知?可以通过广播,通过消息广播,把消息发到多个节点上,然后节点收到消息只需要判断用户会话是否就在本节点上,假如在则主动推消息,不在,则丢弃这条消息。
类似上面这两种需求,就需要用到广播,并且是对同一个应用的多个节点进行广播。当然不用广播肯定也有其他通知方案,本文我们只讨论用MQ怎么做到。
3.思路
假如继续用RabbitMQ的Fanout模式,怎么做到对同一个应用的多个节点进行广播?
要起到广播效果,关键就是让多个应用节点间不要存在竞争关系或者存在竞争关系时它们的消息怎么共享?可以从这两个方向解决这个问题。
方法可能很多种,在这里,我只描述两种比较容易实现的方案。
方案1
过程大致如下
-
应用启动,多个节点监听同一个队列(此时多个节点是竞争关系,一条消息只会发到其中一个节点上)
-
消息生产者发送消息,同一条消息只被其中一个节点收到
-
收到消息的节点通过redis的发布订阅模式来通知其他兄弟节点
这种方案是最容易想到的,思路就是依赖其他组件来做消息共享,例如redis这种可以替换成其他方案,只要能做到消息共享就行,那么最终的效果就肯定是广播效果了。
方案2
过程大致如下
-
应用启动,利用监听器生成唯一ID
-
生成的唯一ID,通过文件写入的方式写到配置文件中
-
spring启动,把这个唯一ID加载为全局属性(为何要用唯一ID,就是为了用这个ID作为该节点的监听队列名,当然前缀可以用相同的,后缀用唯一ID区分即可,举个例子就是:节点1监听队列 kunghsu-123 节点2监听队列 kunghsu-456.必须保证它们的唯一ID是唯一的,不然还是会存在竞争关系)
-
多个节点监听了多个队列(让每个队列名都不同,目的就是让他们不存在竞争关系,没有竞争关系就不用做消息共享,只管由MQ分发即可,这时同一条消息就会发到多个节点上)
-
到MQ控制台,将所有节点生成的队列手动绑定到指定的Fanout交换机上(这一步是手动的,当然也可以通过API做到,下面会说到)
-
生产者发送消息指定的Fanout交换机,交换机将同一条消息被分发到多个节点上
-
广播效果达成!
这种方案,也比较容易。这样做,就是为了让多个节点间是广播关系。总的来说不麻烦,其中第五步手动操作其实有点挫,这种手动操作步骤其实是应该转成自动化,让应用程序来完成,方便以后自动化建设。
这种方案的spring配置也比较简单,参考Fanout模式的配置即可。本文重点在这个思路的实现过程。
只列举部分代码如下:
消息生产者
<!-- 只申明交换机,不定义队列 -->
<rabbit:fanout-exchange name="exchangeFour" durable="true" auto-delete="false" >
</rabbit:fanout-exchange>
<!--定义rabbit template用于数据的接收和发送 -->
<rabbit:template id="amqpTemplate4" connection-factory="connectionFactory2"
exchange="exchangeFour" />
消息消费者
<rabbit:queue name="${queue-name-fanout}" durable="true"
auto-delete="false" exclusive="false" declared-by="connectAdmin2" />
<bean id="fanoutTwoConsumer" class="com.lunch.foo.rabbitmq.FanoutTwoConsumer"></bean>
<rabbit:listener-container
connection-factory="connectionFactory2">
<rabbit:listener queues="${queue-name-fanout}" ref="fanoutOneConsumer" />
</rabbit:listener-container>
另外,RabbitMQ的客户端API支持让我们 将队列绑定到指定的交换机上。具体可参考我的工具类代码。
代码如下:
package com.lunch.foo.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* Created by xuyaokun On 2019/3/10 2:26
* @desc:
*/
public class RabbitMQUtil {
private static final String HOST = "192.168.3.128";
private static final int PORT = AMQP.PROTOCOL.PORT;
private static final String USERNAME = "kunghsu";
private static final String PASSWORD = "123456";
private static final String VIRTUALHOST = "/";
public static void main(String[] args) {
String QUEUE_NAME = "queueOneX";
String EXCHANGE_NAME = "exchangeFour";
try {
queueBind(EXCHANGE_NAME, QUEUE_NAME);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
/**
* 获取会话链接
*
* @return
* @throws IOException
* @throws TimeoutException
*/
private static Connection getConnection() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setVirtualHost(VIRTUALHOST);
return factory.newConnection();
}
/**
* 绑定队列到指定交换机
*
* @param exchangeName
* @param queueName
* @throws IOException
* @throws TimeoutException
*/
public static void queueBind(String exchangeName, String queueName) throws IOException, TimeoutException {
Channel channel = null;
try{
channel = getConnection().createChannel();
} catch(Exception e){
System.out.println("获取RabbitMQ会话连接失败!取消做队列绑定。");
return ;
}
//默认持久化
channel.queueDeclare(queueName, true, false, false, null);
// 声明交换机:指定交换机的名称和类型(广播:fanout)
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true);
// 在消费者端队列绑定
channel.queueBind(queueName, exchangeName, "");
channel.close();
}
}
总结
RabbitMQ的Fanout模式相关的文章,网上一抓一大把,但是几乎没有人讲到 如何实现 对同一个应用的多个节点进行广播。希望通过这篇文章,能帮助到有需要的朋友。