前面讲到Apache ActiveMQ 集群配置方法 的实现,主要是为了解决点单故障的情况,如果要对AMQ进行分流、提高吞吐率,那么可以尝试搭建一个负载均衡的AMQ集群。
要搭建这样的一个环境也是非常的简单,我们只需要增加几行配置项到activemq.xml里就好,剩下的事情全部都由AMQ去做。
AMQ负载均衡的实现有三种方案:
1、static
2、Multicast Discovery
3、MasterSlave Discovery
可以参考官网实现:
http://activemq.apache.org/networks-of-brokers.html
本例使用的是静态路由static,他的缺点就是需要把已知的节点都要预先配置进去,不像动态路由灵活。
在Apache ActiveMQ单点基本配置基础上,用新的activemq.xml替换:
activemq.xml:
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <value>file:${activemq.conf}/credentials.properties</value> </property> </bean> <broker xmlns="http://activemq.apache.org/schema/core" brokerName="brokerTester3" dataDirectory="${activemq.data}"> <destinationPolicy> <policyMap> <policyEntries> <!-- 属性enableAudit=false,是防止消息在回流后被当做重复消息而不被转发 --> <policyEntry queue=">" producerFlowControl="false" memoryLimit="10mb" enableAudit="false"> <!-- 属性replayWhenNoConsumers=true,保证在该节点断开,并重启后,且consumers已经连接到另外一个节点上的情况下,消息自动回流到原始节点 --> <networkBridgeFilterFactory> <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/> </networkBridgeFilterFactory> </policyEntry> </policyEntries> </policyMap> </destinationPolicy> <managementContext> <managementContext createConnector="false"/> </managementContext> <!-- 该节点的配置必须要在persistenceAdapter节点之前 --> <networkConnectors> <!-- 静态路由 --> <networkConnector uri="static:(tcp://192.168.0.87:61617)" duplex="true"/> </networkConnectors> <persistenceAdapter> <kahaDB directory="${activemq.data}/kahadb" enableIndexWriteAsync="true" enableJournalDiskSyncs="false"/> </persistenceAdapter> <systemUsage> <systemUsage> <memoryUsage> <memoryUsage limit="1400 mb"/> </memoryUsage> <storeUsage> <storeUsage limit="100 gb"/> </storeUsage> <tempUsage> <tempUsage limit="50 gb"/> </tempUsage> </systemUsage> </systemUsage> <transportConnectors> <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --> <transportConnector name="openwire" uri="nio://0.0.0.0:61616?maximumConnections=1000&wireformat.maxFrameSize=104857600"/> <transportConnector name="amqp" uri="amqp://0.0.0.0:5671?maximumConnections=1000&wireformat.maxFrameSize=104857600"/> </transportConnectors> <shutdownHooks> <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" /> </shutdownHooks> </broker> <import resource="jetty.xml"/> </beans>
同样需要注意的是,如果在一台设备上部署多个AMQ实例,要注意修改对应的端口号。
e.g.:这里会存在一个问题,假设这里配置的节点数为3个节点(A,B,C),且配置了消息回流。
1、假设节点A不接收消费者,只接受生产者;
2、当消费者X连接到B节点消息A节点上的消息(由于AMQ的机制,B节点会预先消费A节点上一定数量的消息到B节点,默认值1000,且假设只有1000个消息待消费),在消费过程中B节点断开,且B节点上有未消费的消息存在,这时消费者X自动路由到C节点上想继续消费未消费完成的消息;
3、由于当前的未消费的消息都存储于B节点上,当B节点重启后按照当前的配置方式,AMQ的逻辑应该是当X与A节点建立连接后B节点消息会回流,且可以正常消费,而当前消费者X是连接到了C节点上。
如果在业务上发生上面的这个场景,以当前的集群负载配置方式是无法满足需求的。