吞吐量与性能需求
ActiveMQ由于提供了可靠的消息传输功能,因此被作为系统不同模块间传递异步消息的重要组件。
但随着业务发展,交易量出现明显增长,单节点的ActiveMQ处理性能很快成为瓶颈。同时,由于业务需要,ActiveMQ必需提供更高的吞吐量,并保持消息处理的延时在较低水平。为此,我的团队寻求较为完备的扩容方式。
官网方案
根据ActiveMQ官方提供的集群方式(阅读这里)进行部署测试,发现了以下问题。
首先测试的是Master-Slave的主备集群,但其只是提供了一种高可用的方式,但对系统吞吐量没有帮助。
其次测试的是Networks of brokers的多节点集群,如果生产者与消费者不在同一个节点上,那么消息需要在不同节点间拷贝。消息经过的节点越多,处理消息的延时也就越高。另外,这种集群方式由于无法追踪消息的路径,造成运维的难度也很高。
因此,官方提供的集群方式不能满足我们的需求,必须另谋出路。
客户端负载均衡方案
经过思考,参考微服务架构中客户端负载均衡的调用方式,我设计了一个ActiveMQ的客户端负载均衡方案。
这是一种看起来非常简单直观的方案。如上图所示,
- 部署多个独立的ActiveMQ broker实例,实例相互独立,没有联系。
- 生产者客户端采用负载均衡的方式向多个broker中发送消息,降低每个broker的负载。
- 消费者客户端同时监听多个broker中的消息,无论消息被发送到哪个broker,都可以被成功消费。
组件与代码
为此,我写了一个基于Spring-Boot的自动加载组件,在这里。
这个组件除了实现客户端负载均衡的功能,还实现了对开发人员透明,使得开发人员可以照常使用Spring的JmsTemplate或JmsMessageTemplate发送消息 ,并使用@JmsListener注解接收消息,
使用此组件与使用Spring默认组件唯一的区别就是需要配置多个ActiveMQ broker的URL地址。
- 添加maven依赖(现在已经可以从maven的*仓库下载到这个依赖)。
<dependency>
<groupId>com.github.fonoisrev</groupId>
<artifactId>spring-boot-starter-activemq-clientSideLoadBalance</artifactId>
<version>1.1.0</version>
</dependency>
- 添加ActiveMQ broker的URL地址到Spring-Boot应用的配置文件(properties或yaml格式)中。
activemq.loadbalance.enabled=true
activemq.loadbalance.urls[0]=tcp://localhost:61616
activemq.loadbalance.urls[1]=tcp://localhost:61617
...(more urls)
或
activemq:
loadbalance:
enabled: true
urls:
- tcp://localhost:61616
- tcp://localhost:61617
...(more urls)
- 如果应用是Spring但没有计划迁移到Spring-Boot风格,那么请忽略第2步的配置,转而使用下面的配置。
<!-- Config SharedMultiConnectionFactory -->
<bean class="com.github.fonoisrev.jms.connection.SharedMultiConnectionFactory"
id="sharedMultiConnectionFactory">
<constructor-arg name="urls">
<!-- ActiveMQ urls here -->
<list>
<value>tcp://localhost:61616</value>
<value>tcp://localhost:61617</value>
<!-- ... more urls -->
</list>
</constructor-arg>
</bean>
<!-- this is the LoadBalanceJmsConnectionFactory for client send -->
<bean class="com.github.fonoisrev.jms.connection.LoadBalanceJmsConnectionFactory"
id="loadBalanceJmsConnectionFactory">
<constructor-arg ref="sharedMultiConnectionFactory"/>
</bean>
<!-- this is the JmsMessagingTemplate or can replace with JmsTemplate -->
<bean class="org.springframework.jms.core.JmsMessagingTemplate"
id="jmsMessagingTemplate">
<property name="connectionFactory" ref="loadBalanceJmsConnectionFactory"/>
</bean>
<!-- define your own MessageListener -->
<bean class="com.github.fonoisrev.listener.MyMessageListener" id="messageListener"/>
<!-- define MultiJmsMessageListenerContainer(s) -->
<!-- configuration is same as org.springframework.jms.listener.DefaultMessageListenerContainer -->
<bean class="com.github.fonoisrev.jms.container.MultiJmsMessageListenerContainer"
id="container1">
<!--must set connectionFactory first-->
<property name="connectionFactory" ref="sharedMultiConnectionFactory"/>
<property name="destinationName" value="test"/>
<property name="messageListener" ref="messageListener"/>
<property name="concurrency" value="1-2"/>
</bean>
如果有特殊需求,可以自行转换为Java Configuration。
- 具体源码可以参考以下网址
https://github.com/fonoisrev/ActiveMQ-ClientSide-LoadBalance
如使用问题,欢迎在此项目的issue中反馈。
额外再说
理论上,客户端负载均衡方案是可以用于所有不同类型的MQ的,但具体到实现上,不同协议的具体代码不同,因此需要针对不同协议提供不同的实现。由于时间关系,我只实现了基于JMS协议的ActiveMQ的组件。
另外,某些MQ(如Kafka)提供了比较好用的集群支持,因此就不必再自行实现了。