在高并发、对稳定性要求极高的系统中,高可用的是必不可少的,当然ActiveMQ也有自己的集群方案。从ActiveMQ 5.9开始,ActiveMQ的集群实现方式取消了传统的Master-Slave方式,增加了基于ZooKeeper + LevelDB 的 Master-Slave 实现方式。
相关文章:
范例项目: http://wosyingjun.iteye.com/blog/2312553
ActiveMQ的简单实用:http://wosyingjun.iteye.com/blog/2314681
一. ActiveMQ的高可用原理
使用ZooKeeper(集群)注册所有的ActiveMQ Broker。只有其中的一个Broker可以提供服务,被视为 Master,其他的 Broker 处于待机状态,被视为Slave。如果Master因故障而不能提供服务,Zookeeper会从Slave中选举出一个Broker充当Master。
Slave连接Master并同步他们的存储状态,Slave不接受客户端连接。所有的存储操作都将被复制到 连接至 Master的Slaves。如果Master宕了,得到了最新更新的Slave会成为 Master。故障节点在恢复后会重新加入到集群中并连接Master进入Slave模式。
是不是觉得和Redis Sentinel主从高可用的方式很像,这里的zookeeper起到的作用和reids里的sentinel作用差不多。
另外,附上官方文档的一则警告,请使用者注意。replicated LevelDB 不支持延迟或者计划任务消息。这 些消息存储在另外的LevelDB文件中,如果使用延迟或者计划任务消息,将不会复制到Slave Broker上,不能实现消息的高可用。
二. ActiveMQ的持久化方式
ActiveMQ有三种持久化方式(在activemq.xml可配):
(1) 基于共享文件系统(KahaDB,默认)
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
(2) 基于JDBC
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#MySQL-DS"/>
</persistenceAdapter>
<!--注意:需要添加mysql-connector-java相关的jar包到avtivemq的lib包下-->
<bean id="MySQL-DS" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://127.0.0.1:3306/beautyssm_mq?useUnicode=true&characterEncoding=UTF-8"/>
<property name="username" value="root"/>
<property name="password" value="xxxx"/>
</bean>
(3) 基于可复制的LevelDB(常用于集群)
<persistenceAdapter>
<replicatedLevelDB
directory="${activemq.data}/leveldb" #数据存储路径
replicas="3" #节点个数
bind="tcp://0.0.0.0:62621" #用于各个节点之间的通讯
zkAddress="localhost:2181,localhost:2182,localhost:2183"
hostname="localhost"
zkPath="/activemq/leveldb-stores"/>#在zookeeper中集群相关数据存放路径
</persistenceAdapter>
LevelDB是Google开发的一套用于持久化数据的高性能类库。LevelDB并不是一种服务,用户需要自行实现Server。是单进程的服务,能够处理十亿级别规模Key-Value型数据,占用内存小。
这里我们采用第三种方式,也是官网推荐的方式。
三. 高可用的部署
1、ActiveMQ的高可用集群基于Zookeeper的高可用集群,所以要先部署Zookeeper集群
2、在3个ActiveMQ节点中配置conf/activemq.xml中的监控端口
节点1:
<property name="port" value="8161"/>
节点2:
<property name="port" value="8162"/>
节点3:
<property name="port" value="8163"/>
3、在3个ActiveMQ节点中配置conf/activemq.xml中的持久化适配器
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">
<persistenceAdapter>
<replicatedLevelDB
directory="${activemq.data}/leveldb"
replicas="3"
bind="tcp://0.0.0.0:6262?"
zkAddress="localhost:2181,localhost:2182,localhost:2183"
hostname="localhost"
zkPath="/activemq/leveldb-stores"/>
</persistenceAdapter>
</broker>
注:每个 ActiveMQ 的 BrokerName 必须相同,否则不能加入集群。
4、修改各节点的消息端口:
节点1:
<transportConnector name="openwire" uri="tcp://0.0.0.0:61611maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
节点2:
<transportConnector name="openwire" uri="tcp://0.0.0.0:61612maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
节点3:
<transportConnector name="openwire" uri="tcp://0.0.0.0:61613maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
5、按顺序启动 3 个 ActiveMQ 节点:
$ /usr/local/activemq/activemq-01/bin/activemq start
$ /usr/local/activemq/activemq-02/bin/activemq start
$ /usr/local/activemq/activemq-03/bin/activemq start
监听日志:
$ tail -f /usr/local/activemq/activemq-01/data/activemq.log
$ tail -f /usr/local/activemq/activemq-02/data/activemq.log
$ tail -f /usr/local/activemq/activemq-03/data/activemq.log
四. 集群部署
之前已经实现了ActiveMQ的高可用部署,单仅仅是高可用集群,无法达到负载均衡的作用,接下来只需简单配置就能完成可以实现负载均衡的集群功能:
在集群1的activemq.xml中链接集群2(在persistenceAdapter标签前配置):
<networkConnectors>
<networkConnector uri="static:(tcp://192.168.2.100:61611,tcp://192.168.2.101:61612,tcp://192.168.2.102:61613)" duplex="false"/>
</networkConnectors>
在集群2的activemq.xml中链接集群1(在persistenceAdapter标签前配置):
<networkConnectors>
<networkConnector uri="static:(tcp://192.168.1.100:61611,tcp://192.168.1.101:61612,tcp://192.168.1.102:61613)" duplex="false"/>
</networkConnectors>
这样就实现了ActiveMQ的集群高可用负载均衡功能。
三. 客户端连接:
ActiveMQ的客户端只能访问Master的Broker,其他处于Slave的Broker不能访问。所以客户端连接Broker应该使用failover协议。
配置文件地址应为:
failover:(tcp://192.168.1.100:61611,tcp://192.168.1.100:61612,tcp://192.168.1.100:61613)?randomize=false
或:
failover:(tcp://192.168.2.100:61611,tcp://192.168.2.100:61612,tcp://192.168.2.100:61613)?randomize=false