ActiveMQ基础教程(二):安装与配置(单机与集群)

  因为本文会用到集群介绍,因此准备了三台虚拟机(当然读者也可以使用一个虚拟机,然后使用不同的端口来模拟实现伪集群):

    192.168.209.133 test1
192.168.209.134 test2
192.168.209.135 test3

  因为ActiveMQ是java编写,因此需要java的运行环境,这个不做介绍,网上有一堆的教程。

  其次,下载ActiveMQ包,官网下载地址:https://archive.apache.org/dist/activemq/ ,读者可以选择一个版本下载使用,比如,当前最新版是5.16.1,下载地址:https://archive.apache.org/dist/activemq/5.16.1/

  ActiveMQ基础教程(二):安装与配置(单机与集群)

  官网下载速度比较慢,可能需要一两个小时,有百度网盘会员的可以移步百度网盘:https://pan.baidu.com/s/1-ToyzCqo_ypk7FXcjgI3yg (提取码: jcd7 )

    

  单机安装与配置

  首先将tar包传到linux上去(我这里使用的是Ubuntu16.04),然后在tar包所在目录进行解压:  

   # -C 表示解压出来的文件保存目录,默认是tar包所在目录,这里我选择的是/opt目录,注意修改权限,如果你是root用户,则不需要修改
  sudo tar -zxf apache-activemq-5.16.1-bin.tar.gz -C /opt

  其实我们下载好的tar包是编译打包好的,只需解压就可以启动了:  

   # 使用后台线程启动
  sudo ./bin/activemq start
  # 使用控制台方式启动,这种方式会造成当前shell阻塞,如果想使用服务单元或者supervisor这样的工具做守护进程,那么应该采用这种启动方式
  sudo ./bin/activemq console
  # 停止应用
  sudo ./bin/activemq stop
  # 重启应用
  sudo ./bin/activemq restart

  第一次建议采用控制台console的形式启动,看时候会报错,报错则会打印异常信息,方便我们排查,启动后大概是这样的:

  ActiveMQ基础教程(二):安装与配置(单机与集群)

  注意输出的信息,可以看到:  

  1、ActiveMQ版本是5.16.1,使用KahaDB做持久化,版本7
  2、openwire方式连接启动,监听:tcp://test1:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600,还有ampq、stomp、mqtt、ws等分别在不同端口启动监听(注,这里的test1是在hosts中添加的,看我上面提供的三个虚拟机)
  3、Jetty服务启动,其实就是启动ActiveMQ的管理后台
  4、ActiveMQ管理后台地址:http://127.0.0.1:8161/,ActiveMQ的Jolokia API地址:http://127.0.0.1:8161/api/jolokia/,Jolokia API主要是通过API接口获取ActiveMQ的状态信息,方便第三方程序进行监控

  主要到,上面服务启动后,管理后台启动在127.0.0.1:8161,这表明我们只能在本地访问,虚拟机以外的主机是访问不了的,所以我需要修改相关配置。

  配置文件在 conf 目录下,我们常用到的配置文件就是activemq.xml以及jetty.xml,里面全是一些 java bean(有点少见的东西)。activemq.xml主要是ActiveMQ相关配置,而jetty.xml主要是管理后台的一些配置,接下来介绍一些常用的几个配置:

  1、修改Connector

  打开conf/activemq.xml,找到 transportConnectors 节点配置,注释掉多余的连接协议,只留自己需要的就行了,比如我一般用openwire,那我就将其它的注释掉。

  说明一下,为什么要这么做,主要是没必要,也可以避免端口冲突,比如,rabbitmq默认采用ampq监听5672端口,这时你会发现activemq启动不了,因为端口冲突。  

  <transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<!--<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>-->
</transportConnectors>

  需要注意的是,连接端口也是在上面的uri中去配置。

  2、查看持久化方式配置

  在conf/activemq.xml找到 persistenceAdapter 节点,这时消息持久化模式配置,可以看到默认的方式是kahaDB,数据目录是data/kahadb  

  <persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>

  这个节点只是说明一下,因为后续介绍ActiveMQ集群模式就是修改这个节点kahaDB为replicatedLevelDB进行配置,所以这个暂时可以不用动,当然如果你愿意,可以修改成AMQ、JDBC等其他方式。

  3、死信队列配置

  ActiveMQ提供了众多的策略,这里以死信队列策略配置为例来介绍。

  在conf/activemq.xml找到 destinationPolicy 节点,这里配置的是一些队列和topic的策略,当然包括死信队列了,默认策略配置如下:

  ActiveMQ基础教程(二):安装与配置(单机与集群)  

   说明:
policyEntry 节点是配置入口
  topic=">" 表示对所有的topic生效,我们也可以增加一个policyEntry节点,用途queue属性来表示对队列生效,如下文介绍
  pendingMessageLimitStrategy  消息限制策略,只对Topic且非持久化订阅者有效,用于当通道中有大量的消息积压时,broker可以保留的消息量,这是为了防止Topic中有慢速消费者,导致整个通道消息积压
  constantPendingMessageLimitStrategy   保留固定条数的消息,如果消息量超过了限制,将使用消息剔除策略移除消息

  ActiveMQ死信队列配置策略主要有两种:共享的死信策略(SharedDeadLetterStrategy)、单独的死信策略(IndividualDeadLetterStrategy) 

  共享的死信策略(SharedDeadLetterStrategy)   

  共享的死信策略(SharedDeadLetterStrategy)表示将所有的死信消息保存在一个共享队列中,这是ActiveMQ的默认策略
  常用配置属性:
  deadLetterQueue:共享队列名称,默认的队列名为 ActiveMQ.DLQ
  processExpired:表示是否将过期消息放入死信队列,默认为true
  processNonPersistent:表示是否将“非持久化”消息放入死信队列,默认为false

  需要注意的是,之前共享的死信策略配置可以是这样子:  

    <deadLetterStrategy>
<sharedDeadLetterStrategy deadLetterQueue="DLQ.Queue"/>
</deadLetterStrategy>

  但是不知道从什么时候起,这样配置会抛出异常:  

    Caused by: java.lang.IllegalStateException: Cannot convert value of type 'java.lang.String' to required type 'org.apache.activemq.command.ActiveMQDestination' for property 'deadLetterQueue': no matching editors or conversion strategy found
at org.springframework.beans.TypeConverterDelegate.convertIfNecessary(TypeConverterDelegate.java:307)
at org.springframework.beans.AbstractNestablePropertyAccessor.convertIfNecessary(AbstractNestablePropertyAccessor.java:588)
... 52 more

  但是我们可以使用bean去配置,如:  

   <bean id="deadLetterQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg name="name" value="DLQ.Queue"/>
</bean>
  ......
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue=">" >
<deadLetterStrategy>
<bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy">
<property name="deadLetterQueue" ref="deadLetterQueue" />
</bean>
</deadLetterStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
      ....
  </broker>

  上面的配置中,定义了一个id为deadLetterQueue的bean,然后在策略中,queue=">" 表示对所有队列生效,如果想对topic生效,改成 topic=“>” 即可,而策略时一个bean,class指向SharedDeadLetterStrategy,它的属性deadLetterQueue指向我们前面定义的那个bean的id

  其实,上面的配置就是说,对于所有的队列的死信消息,通通发到一个名为DQL.Queue的队列中去。

  单独的死信策略(IndividualDeadLetterStrategy)  

  单独的死信策略(IndividualDeadLetterStrategy)表示把死信消息放入各自的死信通道中,区分的条件时使用不同的前缀,死信通道可以是队列,也可以是topic
  常用配置属性:
  queuePrefix:当死信队列是queue时使用的前缀,默认是 ActiveMQ.DLQ.Queue.
  topicPrefix:当死信队列是topic是使用的前缀,默认是 ActiveMQ.DLQ.Topic.
  useQueueForTopicMessages:表示是否将Topic的死信消息保存在Queue中,默认为true,表示Topic的死信消息保存至Queue中
  processExpired:表示是否将过期消息放入死信队列,默认为true
  processNonPersistent:表示是否将“非持久化”消息放入死信队列,默认为false

  例如:    

        <destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue=">" >
<deadLetterStrategy>
<individualDeadLetterStrategy queuePrefix="queue.DLQ." useQueueForQueueMessages="false"/>
</deadLetterStrategy>
</policyEntry>
<policyEntry topic=">" >
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
<deadLetterStrategy>
<individualDeadLetterStrategy topicPrefix="topic.DLQ." useQueueForQueueMessages="true"/>
</deadLetterStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>

  上面的例子中,queue=">" 表示对所有队列启用策略,即对每个的队列的死信消息,保存到一个以queue.DLQ.[队列名]的topic中,比如一个名为demo的队列对应的死信通道是一个名称是queue.DLQ.demo的topic。

  同理,topic=">"表示对所有topic生效,表示将每个topic的死信消息保存在一个已topic.DLQ.[topic名]的队列中,比如一个名称为demo的topic对应的死信通道是一个名称为topic.DLQ.demo的队列

  4、管理后台启动配置

  前面说到默认情况下启动时,ActiveMQ管理后台绑定到127.0.01:8161,这表示我们只能在本地访问,这肯定是不能接受的,我们需要修改这个地址。

  打开conf/jetty.xml,找到id="jettyPort"的bean,

  ActiveMQ基础教程(二):安装与配置(单机与集群)

  修改这里的host成0.0.0.0,端口随意,也可以使用默认的8161:  

    <bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8161"/>
</bean>

  保存后启动ActiveMQ,然后就可以在浏览器*问管理后台了,比如我部署的服务器IP是192.168.209.133,那么就访问http://192.168.209.133:8161,接着会要求输入账号和密码,默认都是admin(至于怎么添加用户及管理权限,这就不在本文范围内了,有空再单独用博文介绍),登录后进入欢迎也,然后在点击Manage ActiveMQ broker进入管理后台:

  ActiveMQ基础教程(二):安装与配置(单机与集群)

  展示的管理界面如下,至于相关页面的操作信息等等,就不做介绍了:

  ActiveMQ基础教程(二):安装与配置(单机与集群)

  集群部署及配置

  说起ActiveMQ的集群,其实它是一种主从架构,是一种和redis集群中的主从模式很像,就是集群只有主节点提供服务,子节点只是备用,当主节点挂了,就会选用一个子节点作为主节点继续提供服务,如:

  1、正常情况下是主节点提供服务,子节点作为备用只从主节点同步数据

  ActiveMQ基础教程(二):安装与配置(单机与集群)

  2、当主节点挂了,就会重新选择一个子节点作为主节点重新提供服务

  ActiveMQ基础教程(二):安装与配置(单机与集群)

  3、如果之后主节点又起来了,此时它将变作为子节点留作备用

  ActiveMQ基础教程(二):安装与配置(单机与集群)

  根据ActiveMQ官网的介绍,ActiveMQ集群有三种部署方式:基于共享文件系统的主从架构(文件系统如:SAN)、基于数据库的主从架构,基于Zookeeper的主从架构(http://activemq.apache.org/masterslave.html

  这里介绍基于Zookeeper的主从架构。

  准备部署

  1、因为是基于Zookeeper的主从架构,因此我们需要一个Zookeeper集群服务,可以参考博文:Zookeeper基础教程(二):Zookeeper安装

  我这里采用集群地址就是 192.168.209.133:2181,192.168.209.134:2181,192.168.209.135:2181 (其实就是在当前准备部署ActiveMQ的三台虚拟机上部署的Zookeeper集群)

  2、安装上面介绍的单机部署的方式,先在三个节点都安装部署完成ActiveMQ

  3、接着就到最重要的环节了。

  打开conf/activemq.xml,修改 persistenceAdapter 节点中的持久化方式为 replicatedLevelDB ,如:

  test1节点    

        <persistenceAdapter>
<!--<kahaDB directory="${activemq.data}/kahadb"/>-->
<replicatedLevelDB
directory="${activemq.data}/leveldb"
replicas="3"
bind="tcp://0.0.0.0:61619"
zkAddress="192.168.209.133:2181,192.168.209.134:2181,192.168.209.135:2181"
zkPath="/activemq"
hostname="test1"
/>
</persistenceAdapter>

  test2节点

        <persistenceAdapter>
<!--<kahaDB directory="${activemq.data}/kahadb"/>-->
<replicatedLevelDB
directory="${activemq.data}/leveldb"
replicas="3"
bind="tcp://0.0.0.0:61619"
zkAddress="192.168.209.133:2181,192.168.209.134:2181,192.168.209.135:2181"
zkPath="/activemq"
hostname="test2"
/>
</persistenceAdapter>

  test3节点

        <persistenceAdapter>
<!--<kahaDB directory="${activemq.data}/kahadb"/>-->
<replicatedLevelDB
directory="${activemq.data}/leveldb"
replicas="3"
bind="tcp://0.0.0.0:61619"
zkAddress="192.168.209.133:2181,192.168.209.134:2181,192.168.209.135:2181"
zkPath="/activemq"
hostname="test3"
/>
</persistenceAdapter>

  说明一下:  

    directory:表示的是当前节点数据文件保存的目录,不存在就会自动创建,默认值是 LevelDB,注意,这个目录是根据ActiveMQ的根目录来设置,比如上面的配置,是将数据文件保存在ActiveMQ的根目录下的 data/leveldb 目录下
replicas:集群节点数,因为采用Zookeeper,要求这个值至少为3
   bind:集群通信地址端口配置,就是说如果当前节点变成了master节点,然后slave节点会根据这个地址端口进行数据同步。
   zkAddress:Zookeeper集群地址,默认值是 127.0.0.1:2181
   zkPath:根ZNode节点配置,你总不希望Zookeeper只为一个ActiveMQ集群服务吧?默认值是 /default
   zkPassword:如果Zookeeper有认证,那这个就是认证信息,比如我这里没有,就可以不用配置
   hostname:集群内节点通信时采用的hostname,可以认为就是节点的名字,每个节点应该设置成不一样的hostname,建议采用IP作为hostname的值,如果不设置,ActiveMQ将会自动创建一个hostname

  注意,集群要求每个节点对Zookeeper的配置都一样,比如我这里,要求每个节点的 replicas、zkAddress、zkPath 配置值是一样的。replicatedLevelDB节点更多的配置可以参考官网介绍:https://activemq.apache.org/replicated-leveldb-store

  在启动Zookeeper之后,在每个节点ActiveMQ的根目录下使用下面的命令启动ActiveMQ:  

   # 这里采用console启动,因为它会输出日志,如果报错方便我们处理,不报错的话,可以在使用start后台启动: sudo ./bin/activemq start
  sudo ./bin/activemq console

  启动之后,查看Zookeeper,在上面配置的zkPath节点下会有每个broker节点的配置:

  ActiveMQ基础教程(二):安装与配置(单机与集群)

  管理后台

  集群启动之后,我们可以查看管理后台,但是需要注意的是,ActiveMQ集群是master-slave模式,只有master节点对外提供服务,slave节点只是备份,因此,如果要访问master节点的管理后台。

  比如上图,我这边test3被选为master,因此我访问的是:http://192.168.209.135:8161

  ActiveMQ基础教程(二):安装与配置(单机与集群)

  那么你问,如果master节点挂了呢?那就对不起了,原master节点的管理后台地址就访问不了了,你必须访问新的master节点的管理后台才行,如果嫌麻烦,可以自行使用nginx做处理。

  另外,如果集群部署启动报错:java.io.IOException: com/google/common/util/concurrent/internal/InternalFutureFailureAccess

  可以参考我这里的解决方法:关于ActiveMQ+Zookeeper做集群时,解决启动报错:java.io.IOException: com/google/common/util/concurrent/internal/InternalFutureFailureAccess

  结语

  ActiveMQ安装好了,我们也可以使用程序访问试试,提示一下,单机部署的ActiveMQ和集群部署的ActiveMQ在代码中的访问地址形式可能不一样,比如我用的C#使用 Apache.NMS.ActiveMQ 第三方包访问ActiveMQ:

  单机部署时访问地址是:  

    string brokerUri = "activemq:tcp://192.168.209.133:61616";
NMSConnectionFactory factory = new NMSConnectionFactory(brokerUri);

  集群部署时访问地址是:  

    string brokerUri = "activemq:failover:(tcp://192.168.209.133:61616,tcp://192.168.209.134:61616,tcp://192.168.209.135:61616)";
NMSConnectionFactory factory = new NMSConnectionFactory(brokerUri);

  注意:集群部署时访问地址中的端口是conf/activemq.xml中transportConnectors节点下配置的端口,不是replicatedLevelDB节点bind属性配置的节点,bind属性配置的节点是如果当前节点是master时,它与其它slave的通信端口。

  另外,知道怎么部署了,我们还可以试下ActiveMQ众多的策略配置及用法,还可以尝试基于共享文件系统的主从架构(文件系统如:SAN)、基于数据库的主从架构搭建集群,想更深入点的话可以看看集群的负载均衡配置原理。

  ActiveMQ的安装部署就介绍到这里吧,主要是还有安装启动过程中还有几个报错,等之后再写吧。

  

上一篇:搭建基于MyEclipse的Hadoop开发环境


下一篇:如何在ASP.NET 5上搭建基于TypeScript的Angular2项目