ActiveMQ集群替代方案:客户端负载均衡开源组件

吞吐量与性能需求

ActiveMQ由于提供了可靠的消息传输功能,因此被作为系统不同模块间传递异步消息的重要组件。

但随着业务发展,交易量出现明显增长,单节点的ActiveMQ处理性能很快成为瓶颈。同时,由于业务需要,ActiveMQ必需提供更高的吞吐量,并保持消息处理的延时在较低水平。为此,我的团队寻求较为完备的扩容方式。

官网方案

根据ActiveMQ官方提供的集群方式(阅读这里)进行部署测试,发现了以下问题。

首先测试的是Master-Slave的主备集群,但其只是提供了一种高可用的方式,但对系统吞吐量没有帮助。

其次测试的是Networks of brokers的多节点集群,如果生产者与消费者不在同一个节点上,那么消息需要在不同节点间拷贝。消息经过的节点越多,处理消息的延时也就越高。另外,这种集群方式由于无法追踪消息的路径,造成运维的难度也很高。

ActiveMQ集群替代方案:客户端负载均衡开源组件
image.png

因此,官方提供的集群方式不能满足我们的需求,必须另谋出路。

客户端负载均衡方案

经过思考,参考微服务架构中客户端负载均衡的调用方式,我设计了一个ActiveMQ的客户端负载均衡方案。

ActiveMQ集群替代方案:客户端负载均衡开源组件
image.png

这是一种看起来非常简单直观的方案。如上图所示,

  1. 部署多个独立的ActiveMQ broker实例,实例相互独立,没有联系。
  2. 生产者客户端采用负载均衡的方式向多个broker中发送消息,降低每个broker的负载。
  3. 消费者客户端同时监听多个broker中的消息,无论消息被发送到哪个broker,都可以被成功消费。

组件与代码

为此,我写了一个基于Spring-Boot的自动加载组件,在这里
这个组件除了实现客户端负载均衡的功能,还实现了对开发人员透明,使得开发人员可以照常使用Spring的JmsTemplate或JmsMessageTemplate发送消息 ,并使用@JmsListener注解接收消息,
使用此组件与使用Spring默认组件唯一的区别就是需要配置多个ActiveMQ broker的URL地址。

  1. 添加maven依赖(现在已经可以从maven的*仓库下载到这个依赖)。
<dependency>
    <groupId>com.github.fonoisrev</groupId>
    <artifactId>spring-boot-starter-activemq-clientSideLoadBalance</artifactId>
    <version>1.1.0</version>
</dependency>
  1. 添加ActiveMQ broker的URL地址到Spring-Boot应用的配置文件(properties或yaml格式)中。
activemq.loadbalance.enabled=true
activemq.loadbalance.urls[0]=tcp://localhost:61616
activemq.loadbalance.urls[1]=tcp://localhost:61617
...(more urls)

activemq:
  loadbalance:
    enabled: true
    urls:
      - tcp://localhost:61616
      - tcp://localhost:61617
      ...(more urls)
  1. 如果应用是Spring但没有计划迁移到Spring-Boot风格,那么请忽略第2步的配置,转而使用下面的配置。
    <!-- Config SharedMultiConnectionFactory -->
    <bean class="com.github.fonoisrev.jms.connection.SharedMultiConnectionFactory"
          id="sharedMultiConnectionFactory">
       <constructor-arg name="urls">
            <!-- ActiveMQ urls here -->
            <list>
                <value>tcp://localhost:61616</value>
                <value>tcp://localhost:61617</value>
                <!-- ... more urls -->
            </list>
        </constructor-arg>
    </bean>
    
    <!-- this is the LoadBalanceJmsConnectionFactory for client send -->
    <bean class="com.github.fonoisrev.jms.connection.LoadBalanceJmsConnectionFactory" 
          id="loadBalanceJmsConnectionFactory">
        <constructor-arg ref="sharedMultiConnectionFactory"/>
    </bean>
    
    <!-- this is the JmsMessagingTemplate or can replace with JmsTemplate -->
    <bean class="org.springframework.jms.core.JmsMessagingTemplate" 
          id="jmsMessagingTemplate">
        <property name="connectionFactory" ref="loadBalanceJmsConnectionFactory"/>
    </bean>
    
    <!-- define your own MessageListener -->
    <bean class="com.github.fonoisrev.listener.MyMessageListener" id="messageListener"/>
    
    <!-- define MultiJmsMessageListenerContainer(s) -->
    <!-- configuration is same as org.springframework.jms.listener.DefaultMessageListenerContainer -->
    <bean class="com.github.fonoisrev.jms.container.MultiJmsMessageListenerContainer" 
         id="container1">
        <!--must set connectionFactory first-->
        <property name="connectionFactory" ref="sharedMultiConnectionFactory"/>
        <property name="destinationName" value="test"/>
        <property name="messageListener" ref="messageListener"/>
        <property name="concurrency" value="1-2"/>
    </bean>

如果有特殊需求,可以自行转换为Java Configuration。

  1. 具体源码可以参考以下网址
    https://github.com/fonoisrev/ActiveMQ-ClientSide-LoadBalance
    如使用问题,欢迎在此项目的issue中反馈。

额外再说

理论上,客户端负载均衡方案是可以用于所有不同类型的MQ的,但具体到实现上,不同协议的具体代码不同,因此需要针对不同协议提供不同的实现。由于时间关系,我只实现了基于JMS协议的ActiveMQ的组件。
另外,某些MQ(如Kafka)提供了比较好用的集群支持,因此就不必再自行实现了。

上一篇:日志框架 - 基于spring-boot - 实现2 - 消息定义及消息日志打印


下一篇:找不到完美数据科学家?你还可以组建一支数据科学梦之队