ActiveMQ-5.13.0集群

ActiveMQ集群介绍

ActiveMQ具有强大和灵活的集群功能,但在使用的过程中会发现很多的缺点,ActiveMQ的集群方式主要由两种:Master-Slave(ActiveMQ5.8版本已不可用)和Broker Cluster。

Broker集群

一个常见的场景是有多个 JMS broker ,有一个客户连接到其中一个 broker 。如果这个 broker 失效,那么客户会自动重新连接到其它的 broker 。在 ActiveMQ 中使用 failover:// 协议来实现这个功能。如果某个网络上有多个 brokers 而且客户使用静态发现(使用 Static Transport 或 Failover Transport )或动态发现(使用 Discovery Transport ),那么客户可以容易地在某个 broker 失效的情况下切换到其它的 brokers, 下面介绍静态发现方式:

动态方式也同理--只是配置文件的不同:
ActiveMQ解压目录\examples\conf\activemq-dynamic-network-broker1.xml

首先把从官网下载的压缩包文件解压成三个文件夹!
Static-broker1的配置:
(ActiveMQ解压目录
1\examples\conf\activemq-static-network-broker1.xml)的内容全部替换到(ActiveMQ解压目
录1\conf\activemq.xml) ---默认tcp://0.0.0.0:61616
配置文件内容稍加修改如下:

<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!--
Create a static network of brokers
For more information, see: http://activemq.apache.org/networks-of-brokers.html To run this example network of ActiveMQ brokers run $ bin/activemq console xbean:examples/conf/activemq-static-network-broker1.xml and $ bin/activemq console xbean:examples/conf/activemq-static-network-broker2.xml in separate consoles
-->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
http://activemq.apache.org/camel/schema/spring http://activemq.apache.org/camel/schema/spring/camel-spring.xsd"> <!-- Allows us to use system properties as variables in this configuration file -->
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/> <broker xmlns="http://activemq.apache.org/schema/core" brokerName="static-broker2" dataDirectory="${activemq.data}"> <plugins>
<!-- use JAAS to authenticate using the login.config file on the classpath to configure JAAS -->
<jaasAuthenticationPlugin configuration="activemq" />
<!-- lets configure a destination based authorization mechanism -->
<authorizationPlugin>
<map>
<authorizationMap>
<authorizationEntries>
<authorizationEntry topic=">" read="admins" write="admins" admin="admins" />
<authorizationEntry queue=">" read="admins" write="admins" admin="admins" />
<authorizationEntry topic="ActiveMQ.Advisory.>" read="admins" write="admins" admin="admins"/>
<authorizationEntry queue="ActiveMQ.Advisory.>" read="admins" write="admins" admin="admins"/>
</authorizationEntries>
</authorizationMap>
</map>
</authorizationPlugin>
</plugins> <!-- Destination specific policies using destination names or wildcards -->
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue=">" producerFlowControl="true" memoryLimit="20mb">
<deadLetterStrategy>
<individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true" />
</deadLetterStrategy>
</policyEntry>
<policyEntry topic=">" producerFlowControl="true" memoryLimit="20mb">
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy> <!-- Use the following to configure how ActiveMQ is exposed in JMX -->
<managementContext>
<managementContext createConnector="true" connectorPort="1102"/>
</managementContext> <!--
The store and forward broker networks ActiveMQ will listen to
Create a duplex connector to the first broker
-->
<networkConnectors>
<networkConnector userName="admin" password="admin123456" uri="static:(tcp://localhost:61618)" duplex="true"/>
</networkConnectors> <persistenceAdapter>
<kahaDB directory="${activemq.data}/static-broker2/kahadb" />
</persistenceAdapter> <!-- The maximum amount of space the broker will use before slowing down producers -->
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage limit="20 mb"/>
</memoryUsage>
<storeUsage>
<storeUsage limit="1 gb" name="foo"/>
</storeUsage>
<tempUsage>
<tempUsage limit="100 mb"/>
</tempUsage>
</systemUsage>
</systemUsage> <!-- The transport connectors ActiveMQ will listen to -->
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
</transportConnectors> </broker> <import resource="jetty.xml"/> </beans>

(ActiveMQ解压目录2\examples\conf\activemq-static-network-broker1.xml)的内容全部替换 到(ActiveMQ解压目录2\conf\activemq.xml) ---默认tcp://0.0.0.0:61618
配置文件内容稍加修改如下:

<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!--
Create a static network of brokers
For more information, see: http://activemq.apache.org/networks-of-brokers.html To run this example network of ActiveMQ brokers run $ bin/activemq console xbean:examples/conf/activemq-static-network-broker1.xml and $ bin/activemq console xbean:examples/conf/activemq-static-network-broker2.xml in separate consoles
-->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
http://activemq.apache.org/camel/schema/spring http://activemq.apache.org/camel/schema/spring/camel-spring.xsd"> <!-- Allows us to use system properties as variables in this configuration file -->
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/> <broker xmlns="http://activemq.apache.org/schema/core" brokerName="static-broker2" dataDirectory="${activemq.data}"> <plugins>
<!-- use JAAS to authenticate using the login.config file on the classpath to configure JAAS -->
<jaasAuthenticationPlugin configuration="activemq" />
<!-- lets configure a destination based authorization mechanism -->
<authorizationPlugin>
<map>
<authorizationMap>
<authorizationEntries>
<authorizationEntry topic=">" read="admins" write="admins" admin="admins" />
<authorizationEntry queue=">" read="admins" write="admins" admin="admins" />
<authorizationEntry topic="ActiveMQ.Advisory.>" read="admins" write="admins" admin="admins"/>
<authorizationEntry queue="ActiveMQ.Advisory.>" read="admins" write="admins" admin="admins"/>
</authorizationEntries>
</authorizationMap>
</map>
</authorizationPlugin>
</plugins> <!-- Destination specific policies using destination names or wildcards -->
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue=">" producerFlowControl="true" memoryLimit="20mb">
<deadLetterStrategy>
<individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true" />
</deadLetterStrategy>
</policyEntry>
<policyEntry topic=">" producerFlowControl="true" memoryLimit="20mb">
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy> <!-- Use the following to configure how ActiveMQ is exposed in JMX -->
<managementContext>
<managementContext createConnector="true" connectorPort="1102"/>
</managementContext> <!--
The store and forward broker networks ActiveMQ will listen to
Create a duplex connector to the first broker
-->
<networkConnectors>
<networkConnector userName="admin" password="admin123456" uri="static:(tcp://localhost:61620)" duplex="true"/>
</networkConnectors> <persistenceAdapter>
<kahaDB directory="${activemq.data}/static-broker2/kahadb" />
</persistenceAdapter> <!-- The maximum amount of space the broker will use before slowing down producers -->
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage limit="20 mb"/>
</memoryUsage>
<storeUsage>
<storeUsage limit="1 gb" name="foo"/>
</storeUsage>
<tempUsage>
<tempUsage limit="100 mb"/>
</tempUsage>
</systemUsage>
</systemUsage> <!-- The transport connectors ActiveMQ will listen to -->
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61618"/>
</transportConnectors> </broker> <import resource="jetty.xml"/> </beans>

(ActiveMQ解压目录3\examples\conf\activemq-static-network-broker1.xml)的内容全部替换 到(ActiveMQ解压目录3\conf\activemq.xml) ---默认tcp://0.0.0.0:61618
配置文件内容稍加修改如下:

<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!--
Create a static network of brokers
For more information, see: http://activemq.apache.org/networks-of-brokers.html To run this example network of ActiveMQ brokers run $ bin/activemq console xbean:examples/conf/activemq-static-network-broker1.xml and $ bin/activemq console xbean:examples/conf/activemq-static-network-broker2.xml in separate consoles
-->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
http://activemq.apache.org/camel/schema/spring http://activemq.apache.org/camel/schema/spring/camel-spring.xsd"> <!-- Allows us to use system properties as variables in this configuration file -->
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/> <broker xmlns="http://activemq.apache.org/schema/core" brokerName="static-broker2" dataDirectory="${activemq.data}"> <plugins>
<!-- use JAAS to authenticate using the login.config file on the classpath to configure JAAS -->
<jaasAuthenticationPlugin configuration="activemq" />
<!-- lets configure a destination based authorization mechanism -->
<authorizationPlugin>
<map>
<authorizationMap>
<authorizationEntries>
<authorizationEntry topic=">" read="admins" write="admins" admin="admins" />
<authorizationEntry queue=">" read="admins" write="admins" admin="admins" />
<authorizationEntry topic="ActiveMQ.Advisory.>" read="admins" write="admins" admin="admins"/>
<authorizationEntry queue="ActiveMQ.Advisory.>" read="admins" write="admins" admin="admins"/>
</authorizationEntries>
</authorizationMap>
</map>
</authorizationPlugin>
</plugins> <!-- Destination specific policies using destination names or wildcards -->
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue=">" producerFlowControl="true" memoryLimit="20mb">
<deadLetterStrategy>
<individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true" />
</deadLetterStrategy>
</policyEntry>
<policyEntry topic=">" producerFlowControl="true" memoryLimit="20mb">
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy> <!-- Use the following to configure how ActiveMQ is exposed in JMX -->
<managementContext>
<managementContext createConnector="true" connectorPort="1102"/>
</managementContext> <!--
The store and forward broker networks ActiveMQ will listen to
Create a duplex connector to the first broker
-->
<networkConnectors>
<networkConnector userName="admin" password="admin123456" uri="static:(tcp://localhost:61616)" duplex="true"/>
</networkConnectors> <persistenceAdapter>
<kahaDB directory="${activemq.data}/static-broker2/kahadb" />
</persistenceAdapter> <!-- The maximum amount of space the broker will use before slowing down producers -->
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage limit="20 mb"/>
</memoryUsage>
<storeUsage>
<storeUsage limit="1 gb" name="foo"/>
</storeUsage>
<tempUsage>
<tempUsage limit="100 mb"/>
</tempUsage>
</systemUsage>
</systemUsage> <!-- The transport connectors ActiveMQ will listen to -->
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61620"/>
</transportConnectors> </broker> <import resource="jetty.xml"/> </beans>

需要注意的是:由于是本地配置测试,故不能端口号相同(包括tcp和http),如果是三台服务器上配置,那么端口可以相同。

tcp:(ActiveMQ解压目录\conf\activemq.xml)

(1).<managementContext createConnector="true" connectorPort="1102"/>
(2).<networkConnector userName="admin" password="admin123456" uri="static:(tcp://localhost:61616)" duplex="true"/>
(3).<transportConnector name="openwire" uri="tcp://0.0.0.0:61620"/>

http:(ActiveMQ解压目录\conf\jetty.xml)

<property name="port" value="8162"/>

最后分别启动这三个activemq服务! (此处是windows7 64位系统)
(ActiveMQ解压目录1\bin\win64\activemq.bat)
(ActiveMQ解压目录2\bin\win64\activemq.bat)
(ActiveMQ解压目录3\bin\win64\activemq.bat)

测试:先启动消费者!然后启动生产者发送消息。(至于消费者和生产者代码,请留意我的另一篇博文activemq整合spring)
先测试两个服务同时启动情况,然后把其中一个服务关闭保留一个再测试。(点对点 一对多也进行测试)

上一篇:centos网络配置方法(手动设置,自动获取)


下一篇:CentOS 6.9使用Setup配置网络(解决dhcp模式插入网线不自动获取IP的问题)