ActiveMQ 高可用集群安装、配置、高可用测试( ZooKeeper + LevelDB)
(ZooKeeper + LevelDB)
从 ActiveMQ 5.9 开始,ActiveMQ 的集群实现方式取消了传统的 Master-Slave 方式,增加了基于 ZooKeeper + LevelDB 的 Master-Slave 实现方式,其他两种方式目录共享和数据库共享依然存在。
三种集群方式的对比:
(1)基于共享文件系统(KahaDB,默认):
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
在单主机上可以,但是在多主机上就需要引入分布式文件系统。
(2)基于 JDBC:
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost:3306/amq?relaxAutoCommit=true"/>
<property name="username" value="root"/>
<property name="password" value="root"/>
<property name="maxActive" value="20"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
<persistenceAdapter>
<jdbcPersistenceAdapter dataDirectory="${activemq.data}" dataSource="#mysql-ds"
createTablesOnStartup="false"/>
</persistenceAdapter>
(3)基于可复制的 LevelDB(本教程采用这种集群方式
):
LevelDB 是 Google 开发的一套用于持久化数据的高性能类库
。LevelDB 并不是一种服务,用户需要自行实现 Server。是单进程的服务,能够处理十亿级别规模 Key-Value 型数据,占用内存小。
<persistenceAdapter>
<replicatedLevelDB
directory="${activemq.data}/leveldb"
replicas="3"
bind="tcp://0.0.0.0:62621"
zkAddress="localhost:2181,localhost:2182,localhost:2183"
hostname="localhost"
zkPath="/activemq/leveldb-stores"
/>
</persistenceAdapter>
本节课程主要讲解基于 ZooKeeper 和 LevelDB 搭建 ActiveMQ 集群。集群仅提供主备方式的高可用集群功能,避免单点故障,没有负载均衡功能。
官方文档:http://activemq.apache.org/replicated-leveldb-store.html
集群原理图:
高可用的原理:使用 ZooKeeper(集群)注册所有的 ActiveMQ Broker。只有其中的一个 Broker 可以提供服务,被视为 Master,其他的 Broker 处于待机状态,被视为 Slave(从节点)。如果 Master 因故障而不能提供服务, ZooKeeper 会从 Slave 中选举出一个 Broker 充当 Master。
Slave 连接 Master 并同步他们的存储状态,Slave 不接受客户端连接。所有的存储操作都将被复制到连接至 Master 的 Slaves。如果 Master 宕了,得到了最新更新的 Slave 会成为 Master。
故障节点在恢复后会重新加入到集群中并连接 Master 进入 Slave 模式。
所有需要同步的 disk 的消息操作都将等待存储状态被复制到其他法定节点的操作完成才能完成。所以,如果你配置了 replicas=3,那么法定大小是(3/2)+1=2。Master 将会存储并更新然后等待 (2-1)=1 个 Slave 存储和更新完成,才汇报 success。至于为什么是 2-1,熟悉 Zookeeper 的应该知道,有一个 node 要作为观擦者存在。当一个新的 Master 被选中,你需要至少保障一个法定 node 在线以能够找到拥有最新状态的 node。这个 node 可以成为新的 Master。因此,推荐运行至少 3 个 replica nodes,以防止一个 node 失败了,服务中断。(原理与 ZooKeeper 集群的高可用实现方式类似
)
1、ActiveMQ 集群部署规划:
环境:CentOS 6.6 x64 、 JDK7
版本:ActiveMQ 5.11.1
ZooKeeper 集群环境:192.168.1.81:2181,192.168.1.82:2182,192.168.1.83:2183
(ZooKeeper 集群部署请参考《高可用架构篇–第 01 节–ZooKeeper 集群的安装、配置、高可用测试》)
主机 | 集群端口 | 消息端口 | 管控台端口 | 节点安装目录 |
---|---|---|---|---|
192.168.1.81 | 62621 | 51511 | 8161 | /home/wusc/activemq/node-01 |
192.168.1.82 | 62622 | 51512 | 8162 | /home/wusc/activemq/node-02 |
192.168.1.83 | 62623 | 51513 | 8163 | /home/wusc/activemq/node-03 |
2、防火墙打开对应的端口
3、分别在三台主机中创建/home/wusc/activemq 目录
$ mkdir /home/wusc/activemq
上传 apache-activemq-5.11.1-bin.tar.gz 到/home/wusc/activemq
目录
4、解压并按节点命名
$ cd /home/wusc/activemq
$ tar -xvf apache-activemq-5.11.1-bin.tar.gz
$ mv apache-activemq-5.11.1 node-0X #(X 代表节点号 1、2、3,下同)
5、修改管理控制台端口(默认为 8161)可在 conf/jetty.xml 中修改,如下:
Node-01 管控台端口:
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8161"/>
</bean>
注意修改:,下面同样的
Node-02 管控台端口:
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8162"/>
</bean>
Node-03 管控台端口:
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8163"/>
</bean>
6、集群配置:
在 3 个 ActiveMQ 节点中配置 conf/activemq.xml 中的持久化适配器。修改其中 bind、zkAddress、hostname 和 zkPath。注意:每个 ActiveMQ 的 BrokerName 必须相同,否则不能加入集群。
Node-01 中的持久化配置:
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="DubboEdu" dataDirectory="${activemq.data}">
<persistenceAdapter>
<!-- kahaDB directory="${activemq.data}/kahadb"/ -->
<replicatedLevelDB
directory="${activemq.data}/leveldb"
replicas="3"//节点数,最好也是单数的
bind="tcp://0.0.0.0:62621" //集群通讯的端口,并不是消息发送的端口
//zookeeper集群的配置
zkAddress="192.168.1.81:2181,192.168.1.82:2182,192.168.1.83:2183"
hostname="edu-zk-01"//做了host映射
zkPath="/activemq/leveldb-stores"//zookeeper中的路径,下面会演示
/>
</persistenceAdapter>
</broker>
这里主要注意三个节点的brokerName=“DubboEdu” 必须相同。
主要配置
replicas="3"//节点数,最好也是单数的
bind="tcp://0.0.0.0:62621" //集群通讯的端口,并不是消息发送的端口
//zookeeper集群的配置
zkAddress="192.168.1.81:2181,192.168.1.82:2182,192.168.1.83:2183"
hostname="edu-zk-01"//做了host映射
zkPath="/activemq/leveldb-stores"//zookeeper中的路径,下面会演示
/>
Node-02 中的持久化配置:
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="DubboEdu" dataDirectory="${activemq.data}">
<persistenceAdapter>
<!-- kahaDB directory="${activemq.data}/kahadb"/ -->
<replicatedLevelDB
directory="${activemq.data}/leveldb"
replicas="3"
bind="tcp://0.0.0.0:62622" zkAddress="192.168.1.81:2181,192.168.1.82:2182,192.168.1.83:2183" hostname="edu-zk-02"
zkPath="/activemq/leveldb-stores" />
</persistenceAdapter>
</broker>
Node-03 中的持久化配置:
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="DubboEdu" dataDirectory="${activemq.data}">
<persistenceAdapter>
<!-- kahaDB directory="${activemq.data}/kahadb"/ -->
<replicatedLevelDB
directory="${activemq.data}/leveldb"
replicas="3"
bind="tcp://0.0.0.0:62623" zkAddress="192.168.1.81:2181,192.168.1.82:2182,192.168.1.83:2183" hostname="edu-zk-03"
zkPath="/activemq/leveldb-stores" />
</persistenceAdapter>
</broker>
修改各节点的消息端口(注意,避免端口冲突
):
Node-01 中的消息端口配置:
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:51511?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
注意这里主要修改 51511 ,下面同样。
Node-02 中的消息端口配置:
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:51512?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
Node-03 中的消息端口配置:
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:51513?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
7、按顺序启动 3 个 ActiveMQ 节点:
$ /home/wusc/activemq/node-01/bin/activemq start
$ /home/wusc/activemq/node-02/bin/activemq start
$ /home/wusc/activemq/node-03/bin/activemq start
监听日志:
$ tail -f /home/wusc/activemq/node-01/data/activemq.log
$ tail -f /home/wusc/activemq/node-02/data/activemq.log
$ tail -f /home/wusc/activemq/node-03/data/activemq.log
8、集群的节点状态分析:
集群启动后对 ZooKeeper 数据的抓图,可以看到 ActiveMQ 的有 3 个节点,分别是 00000000000,00000000001,00000000002。
以下第一张图展现了 00000000000 的值,可以看到 elected 的值是不为空,说明这个节点是 Master,其他两个节点是 Slave。
双击zookeeper-dev-ZooInspector.jar即可打开
在Connect String里面输入类似192.168.1.32:2181这种ip+端口的形式
上面持久化配置中的zkPath="/activemq/leveldb-stores" />就是这里存放的路径
注意上面语句中的id,就是我们上面配置的brokerName=“DubboEdu”,只有相同,才能确保在一个几区里面。
9、集群可用性测试(配置和测试代码,请看视频):
ActiveMQ 的客户端只能访问 Master 的 Broker,其他处于 Slave 的 Broker 不能访问。所以客户端连接 Broker 应该使用 failover 协议
failover(tcp://192.168.1.81:51511,tcp://192.168.1.82:51512,tcp://192.168.1.83:51513)?randomize=false
mq.properties(注意生产者,消费者都要配置)
my.brockerURL=failover(tcp://192.168.1.81:51511,tcp://192.168.1.82:51512,tcp://192.168.1.83:51513)?randomize=false
分别启动消费者和生产者。
10、集群高可用测试(请看视频):
当一个 ActiveMQ 节点挂掉,或者一个 ZooKeeper 节点挂掉,ActiveMQ 服务依然正常运转。如果仅剩一个 ActiveMQ 节点,因为不能选举 Master,ActiveMQ 不能正常运转;同样的,如果 ZooKeeper 仅剩一个节点活动,不管 ActiveMQ 各节点是否存活,ActiveMQ 也不能正常提供服务。(ActiveMQ 集群的高可用,依赖于 ZooKeeper 集群的高可用。
)
一开始第一个节点是主节点,将第一个节点关停,第二个节点变成主节点,再将第二个节点关掉,报错java.io.EOFException,只剩下一个节点的时候,推举不出来master节点,当再次开启第一个节点的时候,又自动开始运行,最新状态的节点是主节点,所以第一个节点是主节点。将zookeeper关掉一个node1,没有影响,再关掉节点node2,还是会报错java.io.EOFException,单纯的运行zookeeper的start是没用的,需要将activeMq进行重启,才能重新发送消息。
注意在eclipse的console中可能出现警告,并不是错误,只是activeMq主节点的推荐数据同步的一个过程,就算是短时间的闪断在业务里面不会有太大的影响。
解释:
所有需要同步的 disk 的消息操作都将等待存储状态被复制到其他法定节点的操作完成才能完成。所以,如果你配置了 replicas=3,那么法定大小是(3/2)+1=2。Master 将会存储并更新然后等待 (2-1)=1 个 Slave 存储和更新完成,才汇报 success。至于为什么是 2-1,熟悉 Zookeeper 的应该知道,有一个 node 要作为观擦者存在。当一个新的 Master 被选中,你需要至少保障一个法定 node 在线以能够找到拥有最新状态的 node。这个 node 可以成为新的 Master。因此,推荐运行至少 3 个 replica nodes,以防止一个 node 失败了,服务中断。(原理与 ZooKeeper 集群的高可用实现方式类似)
11、设置开机启动:
# vi /etc/rc.local
su - wusc -c '/home/wusc/activemq/node-01/bin/activemq start'
su - wusc -c '/home/wusc/activemq/node-02/bin/activemq start'
su - wusc -c '/home/wusc/activemq/node-03/bin/activemq start'
12、配置优化(可选
):
updateURIsURL,通过 URL(或者本地路径)获取重连的 url,这样做具有良好的扩展性,因为客户端每次连接都是从 URL(或文件)中加载一次,所以可以随时从文件中更新 url 列表,做到动态添加 MQ 的备点。 failover:()?randomize=false&updateURIsURL=file:/home/wusc/activemq/urllist.txt urllist.txt 中的地址通过英文逗号分隔,示例:
tcp://192.168.1.81:51511,tcp://192.168.1.82:51512,tcp://192.168.1.83:51513
最后,附上官方文档的一则警告,请使用者注意。replicatedLevelDB 不支持延迟或者计划任务消息。这些消息存储在另外的 LevelDB 文件中,如果使用延迟或者计划任务消息,将不会复制到 slave Broker 上,不能实现这种消息的高可用。
高可用+负载均衡实现介绍(下一节):
Broker-Cluster 可以解实现载均衡,但当其中一个 Broker 突然宕掉的话,那么存在于该 Broker 上处于 Pending 状态的 message 将会丢失,无法达到高可用的目的。
但是ZooKeeper + LevelDB 是会实现数据的持久化的,保证数据的不丢失。
将集群中的三个机器都宕掉后:
然后再恢复,发现数据还是在的。
Master-Slave 与 Broker-Cluster 相结合的部署。
ActiveMQ高可用+负载均衡集群的安装、配置、高可用测试
上面的这种方法只有master节点在承担主要的工作,实现可高可用,但是没有实现负载均衡。
准备
这里首先再搭建一套和上面一样的集群(伪集群),没有什么区别,只是为了节省资源而已,主要修改的地方是:
brokerName=“DubboEdu”
消息端口设置<transportConnector name="openwire" uri="tcp://0.0.0.0:51513?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
zkPath="/activemq/leveldb-stores"//zookeeper中的路径。
项目中的配置文件
步骤:
ActiveMQ 高可用集群安装、配置(伪集群) (ZooKeeper + LevelDB)
1、ActiveMQ 集群部署规划:
环境:CentOS 6.6 x64 、 JDK7
版本:ActiveMQ 5.11.1
ZooKeeper 集群环境:192.168.1.81:2181,192.168.1.82:2182,192.168.1.83:2183
(ZooKeeper 集群部署请参考《高可用架构篇–第 01 节–ZooKeeper 集群的安装、配置、高可用测试》)
主机 | 集群端口 | 消息端口 | 管控台端口 | 节点安装目录 |
---|---|---|---|---|
192.168.1.101 | 63631 | 53531 | 8361 | /home/wusc/activemq/node-01 |
192.168.1.101 | 63632 | 53532 | 8362 | /home/wusc/activemq/node-02 |
192.168.1.101 | 63633 | 53533 | 8363 | /home/wusc/activemq/node-03 |
2、防火墙打开对应的端口
## mq cluster
-A INPUT -m state --state NEW -m tcp -p tcp --dport 8361 -j ACCEPT
-A INPUT -m state --state NEW -m tcp -p tcp --dport 8362 -j ACCEPT
-A INPUT -m state --state NEW -m tcp -p tcp --dport 8363 -j ACCEPT
-A INPUT -m state --state NEW -m tcp -p tcp --dport 53531 -j ACCEPT
-A INPUT -m state --state NEW -m tcp -p tcp --dport 53532 -j ACCEPT
-A INPUT -m state --state NEW -m tcp -p tcp --dport 53533 -j ACCEPT
-A INPUT -m state --state NEW -m tcp -p tcp --dport 63631 -j ACCEPT
-A INPUT -m state --state NEW -m tcp -p tcp --dport 63632 -j ACCEPT
-A INPUT -m state --state NEW -m tcp -p tcp --dport 63633 -j ACCEPT
-A INPUT -m state --state NEW -m tcp -p tcp --dport 63631 -j ACCEPT
3、分别在三台主机中创建/home/wusc/activemq 目录
$ mkdir /home/wusc/activemq
上传 apache-activemq-5.11.1-bin.tar.gz 到/home/wusc/activemq 目录
4、解压并按节点命名
$ cd /home/wusc/activemq
$ tar -xvf apache-activemq-5.11.1-bin.tar.gz
$ mv apache-activemq-5.11.1 node-0X #(X 代表节点号 1、2、3,下同)
5、修改管理控制台端口(默认为 8161)可在 conf/jetty.xml 中修改,如下:
Node-01 管控台端口:
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8361"/>
</bean>
Node-02 管控台端口:
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8362"/>
</bean>
Node-03 管控台端口:
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8363"/>
</bean>
6、集群配置:
在 3 个 ActiveMQ 节点中配置 conf/activemq.xml 中的持久化适配器。修改其中 bind、zkAddress、hostname 和 zkPath。注意:每个 ActiveMQ 的 BrokerName 必须相同,否则不能加入集群。
Node-01 中的持久化配置:
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="DubboEdu2" dataDirectory="${activemq.data}"> <persistenceAdapter>
<!-- kahaDB directory="${activemq.data}/kahadb"/ -->
<replicatedLevelDB
directory="${activemq.data}/leveldb"
replicas="3"
bind="tcp://0.0.0.0:63631" zkAddress="192.168.1.81:2181,192.168.1.82:2182,192.168.1.83:2183" hostname="edu-mq-01"
zkPath="/activemq2/leveldb-stores" />
</persistenceAdapter>
</broker>
Node-02 中的持久化配置:
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="DubboEdu2" dataDirectory="${activemq.data}">
<persistenceAdapter>
<!-- kahaDB directory="${activemq.data}/kahadb"/ --> <replicatedLevelDB
directory="${activemq.data}/leveldb"
replicas="3"
bind="tcp://0.0.0.0:63632" zkAddress="192.168.1.81:2181,192.168.1.82:2182,192.168.1.83:2183" hostname="edu-mq-01"
zkPath="/activemq2/leveldb-stores" />
</persistenceAdapter>
</broker>
Node-03 中的持久化配置:
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="DubboEdu2" dataDirectory="${activemq.data}">
<persistenceAdapter>
<!-- kahaDB directory="${activemq.data}/kahadb"/ --> <replicatedLevelDB
directory="${activemq.data}/leveldb"
replicas="3"
bind="tcp://0.0.0.0:63633" zkAddress="192.168.1.81:2181,192.168.1.82:2182,192.168.1.83:2183" hostname="edu-mq-01"
zkPath="/activemq2/leveldb-stores" />
</persistenceAdapter>
</broker>
修改各节点的消息端口(注意,避免端口冲突):
Node-01 中的消息端口配置:
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:53531?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
Node-02 中的消息端口配置:
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:53532?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
Node-03 中的消息端口配置:
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:53533?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
7、按顺序启动 3 个 ActiveMQ 节点:
$ /home/wusc/activemq/node-01/bin/activemq start
$ /home/wusc/activemq/node-02/bin/activemq start
$ /home/wusc/activemq/node-03/bin/activemq start
8、集群的节点状态分析:
集群启动后对 ZooKeeper 数据的抓图,可以看到 ActiveMQ 的有 3 个节点,分别是 00000000000,00000000001,00000000002。
以下第一张图展现了 00000000000 的值,可以看到 elected 的值是不为空,说明这个节点是 Master,其他两个节点是 Slave。
9、集群可用性测试(配置和测试代码,请看视频):
ActiveMQ 的客户端只能访问 Master 的 Broker,其他处于 Slave 的 Broker 不能访问。所以客户端连
接 Broker 应该使用 failover 协议。
failover:(tcp://192.168.1.101:53531,tcp://192.168.1.101:53532,tcp://192.168.1.101:53533)?randomize=false
10、集群高可用测试:
当一个 ActiveMQ 节点挂掉,或者一个 ZooKeeper 节点挂掉,ActiveMQ 服务依然正常运转。如果仅剩一个 ActiveMQ 节点,因为不能选举 Master,ActiveMQ 不能正常运转;同样的,如果 ZooKeeper 仅剩一个节点活动,不管 ActiveMQ 各节点是否存活,ActiveMQ 也不能正常提供服务。
11、设置开机启动:
# vi /etc/rc.local
su - wusc -c '/home/wusc/activemq/node-01/bin/activemq start'
su - wusc -c '/home/wusc/activemq/node-02/bin/activemq start'
su - wusc -c '/home/wusc/activemq/node-03/bin/activemq start'
正式开始
参考文章:https://blog.csdn.net/kimmking/article/details/8440150/
上面的ActiveMQ集群的安装、配置、高可用测试,是Master Slave的方式,下面开始是一种NetWorks of brockers
在activeMq中的配置文件(active.xml)中加入:
即:集群1中加入集群2的ip端口,进群2中加入进群1的ip端口。(这里的集群1和集群2是上面配置的两个集群,包括一个真实进群和一个伪集群,一共六个节点)
主要配置(具体内容,请看视频):
集群 1 链接集群 2:
<networkConnectors>
<networkConnector uri="static:(tcp://192.168.1.101:53531,tcp://192.168.1.101:53532,tcp://192.168.1.101:53533)" duplex="false"/>
</networkConnectors>
集群 2 链接集群 1:
<networkConnectors>
<networkConnector uri="static:(tcp://192.168.1.81:51511,tcp://192.168.1.82:51512,tcp://192.168.1.83:51513)" duplex="false"/>
</networkConnectors>
重启两套集群的所有节点
这里集群A和集群B之间相当于一个网桥。
上图是具体项目中的演示:
A B 是我们上面搭建的两个集群
P-a是项目中连接A集群的生产者类
P-b是项目中连接B集群的生产者类
C-a是项目中连接A集群的消费者类
C-a是项目中连接A集群的消费者类
将两个消费者启动:
上图是集群一中的master节点的管控台(注意,三个节点中只有master节点是真正提供数据的,所以只有这个管控控台能看到数据),可以看到队列有两个消费者。
打开集群而的管控台,也是同样的效果。
启动生产者a:
发现生产的数据两个消费者都消费了一部分。
启动生产者b
发现消费者b消费生产者a的数据,而消费者a消费生产者a和生产者b的数据。
发现代码中线程的压力不一样,一个sleep500,一个sleep300,一个压力大,其他的就会去帮助消费。
关闭消费者b
发现,消费者a同时消费生产者a和生产者b的数据
以上:说明是能够达到负载均衡的。
两个消费者同时连接到集群a,
将第一套集群的master节点关掉
会总动切换
上面是在切换过程中会出现的问题
切换完成后还是能够继续消费,不会影响集群的使用
将第二套的master节点关掉
切换完成后还是能够继续消费,不会影响集群的使用