对于activemq消息的持久化我们在第二节的时候就简单介绍过,今天我们详细的来分析一下activemq的持久化过程以及持久化插件。在生产环境中为确保消息的可靠性,我们肯定的面临持久化消息的问题,今天就一起来攻克他吧。
1. 持久化方式介绍
前面我们也简单提到了activemq提供的插件式的消息存储,在这里再提一下,主要有以下几种方式:
- AMQ消息存储-基于文件的存储方式,是activemq开始的版本默认的消息存储方式;
- KahaDB消息存储-提供了容量的提升和恢复能力,是现在的默认存储方式;
- JDBC消息存储-消息基于JDBC存储的;
- Memory消息存储-基于内存的消息存储,由于内存不属于持久化范畴,而且如果使用内存队列,可以考虑使用更合适的产品,如ZeroMQ。所以内存存储不在讨论范围内。
上面几种消息存储方式对于消息存储的逻辑来说并没有什么区别,只是在性能以及存储方式上来说有所不同。但是对于消息发送的方式来说,p2p和Pub/Sub两种类型的消息他们的持久化方式却是不同的:
对于点对点的消息一旦消费者完成消费这条消息将从broker上删除;对于发布订阅类型的消息,即使所有的订阅者都完成了消费,Broker也不一定会马上删除无用消息,而是保留推送历史,之后会异步清除无用消息。而每个订阅者消费到了哪条消息的offset会记录在Broker,以免下次重复消费。因为消息是顺序消费,先进先出,所以只需要记录上次消息消费到哪里就可以了。
因为AMQ现在已经被不再使用被KahaDB所替代,所以我们就讲KahaDB,JDBC消息存储在许多对可靠性要求高而对性能要求低一些的大公司还是经常使用的,下面我们就这两种持久化方式的使用做一节专题。
2. Kahadb
说到Kahadb之前我们还是得提到他的前身AMQ,AMQ是一种文件存储形式,他具有写入速度快和容易恢复的特点,消息存储在一个个的文件里,文件默认大小为32M,超过这个大小的消息将会存入下一个文件。当一个文件中的消息已经全部消费,那么这个文件将被标志我可删除,在下一个清除阶段这个文件将被删除。
如果需要使用持久化,则需要在前文中的配置文件applicationContext-ActiveMQ.xml中增加如下配置:
<persistenceAdapter>
<kahaDB directory="activemq-data"journalMaxFileLength="32mb"/>
</persistenceAdapter>
KahaDB的属性件下表格:
属性名称 | 属性值 | 描述 |
directory | activemq-data | 消息文件和日志的存储目录 |
indexWriteBatchSize | 1000 | 一批索引的大小,当要更新的索引量到达这个值时,更新到消息文件中 |
indexCacheSize | 1000 | 内存中,索引的页大小 |
enableIndexWriteAsync | false | 索引是否异步写到消息文件中 |
journalMaxFileLength | 32mb | 一个消息文件的大小 |
enableJournalDiskSyncs | true | 是否讲非事务的消息同步写入到磁盘 |
cleanupInterval | 30000 | 清除操作周期,单位ms |
checkpointInterval | 5000 | 索引写入到消息文件的周期,单位ms |
ignoreMissingJournalfiles | false | 忽略丢失的消息文件,false,当丢失了消息文件,启动异常 |
checkForCorruptJournalFiles | false | 检查消息文件是否损坏,true,检查发现损坏会尝试修复 |
checksumJournalFiles | false | 产生一个checksum,以便能够检测journal文件是否损坏。 |
5.4版本之后有效的属性: | ||
archiveDataLogs | false | 当为true时,归档的消息文件被移到directoryArchive,而不是直接删除 |
directoryArchive | null | 存储被归档的消息文件目录 |
databaseLockedWaitDelay | 10000 | 在使用负载时,等待获得文件锁的延迟时间,单位ms |
maxAsyncJobs | 10000 | 同个生产者产生等待写入的异步消息最大量 |
concurrentStoreAndDispatchTopics | false | 当写入消息的时候,是否转发主题消息 |
concurrentStoreAndDispatchQueues | true | 当写入消息的时候,是否转发队列消息 |
5.6版本之后有效的属性: | ||
archiveCorruptedIndex | false | 是否归档错误的索引 |
由于在ActiveMQ V5.4+的版本中,KahaDB是默认的持久化存储方案。所以即使你不配置任何的KahaDB参数信息,ActiveMQ也会启动KahaDB。这种情况下,KahaDB文件所在位置是你的ActiveMQ安装路径下的/data/broker.Name/KahaDB子目录。其中{broker.Name}代表这个ActiveMQ服务节点的名称。下面我把刚启动服务并发送了消息之后的activemq安装目录打开给大家看看:
正式的生产环境还是建议在主配置文件中明确设置KahaDB的工作参数:
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="broker" persistent="true" useShutdownHook="false">
...
<persistenceAdapter>
<kahaDB directory="activemq-data"
journalMaxFileLength="32mb"
concurrentStoreAndDispatchQueues="false"
concurrentStoreAndDispatchTopics="false"
/>
</persistenceAdapter>
</broker>
3. 关系型数据库存储方案
从ActiveMQ 4+版本开始,ActiveMQ就支持使用关系型数据库进行持久化存储——通过JDBC实现的数据库连接。可以使用的关系型数据库囊括了目前市面的主流数据库。
使用JDBC的方式持久化我们就得修改之前的配置文件:
将其中的这段配置:
<persistenceAdapter>
<kahaDB directory="${activemq.base}/data/kahadb"/>
</persistenceAdapter>
修改为下面这段内容:
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="# mysql-ds "/>
</persistenceAdapter>
在结点之后,增加数据源的配置,如下:
<!-- MySql DataSource Sample Setup -->
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost:3306/activemqdb?relaxAutoCommit=true&useUnicode=true&characterEncoding=utf-8"/>
<property name="username" value="root"/>
<property name="password" value="root"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
<!-- Oracle DataSource Sample Setup -->
<bean id="oracle-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="oracle.jdbc.driver.OracleDriver"/>
<property name="url" value="jdbc:oracle:thin:@localhost:1521:activemqdb"/>
<property name="username" value="root"/>
<property name="password" value="root"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
<!-- Oracle DataSource Sample Setup -->
<bean id="db2-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.ibm.db2.jcc.DB2Driver"/>
<property name="url" value="jdbc:db2://hndb02.bf.ctc.com:50002/activemq"/>
<property name="username" value="root"/>
<property name="password" value="root"/>
<property name="maxActive" value="200"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
还是在上一篇的实例工程中,我们改变一下applicationContext-ActiveMQ.xml的配置如下:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.1.xsd
http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc-4.1.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.12.1.xsd">
<context:component-scan base-package="cn.edu.hust.activemq" />
<mvc:annotation-driven />
<amq:connectionFactory id="amqConnectionFactory"
brokerURL="tcp://127.0.0.1:61616"
userName="admin"
password="admin" />
<!-- 配置JMS连接工厂 -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<constructor-arg ref="amqConnectionFactory" />
<property name="sessionCacheSize" value="100" />
</bean>
<!-- 定义消息队列(Queue) -->
<bean id="demoQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<!-- 设置消息队列的名字 -->
<constructor-arg>
<value>first-queue</value>
</constructor-arg>
</bean>
<!-- 配置JMS模板(Queue),Spring提供的JMS工具类,它发送、接收消息。 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory" />
<property name="defaultDestination" ref="demoQueueDestination" />
<property name="receiveTimeout" value="10000" />
<!-- true是topic,false是queue,默认是false,此处显示写出false -->
<property name="pubSubDomain" value="false" />
</bean>
<!-- 配置消息队列监听者(Queue) -->
<bean id="queueMessageListener" class="cn.edu.hust.activemq.filter.QueueMessageListener" />
<!-- 显示注入消息监听容器(Queue),配置连接工厂,监听的目标是demoQueueDestination,监听器是上面定义的监听器 -->
<bean id="queueListenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="demoQueueDestination" />
<property name="messageListener" ref="queueMessageListener" />
</bean>
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" persistent="true">
<!--<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter> -->
<persistenceAdapter>
<jdbcPersistenceAdapter dataDirectory="${activemq.data}" dataSource="#mysql-ds">
</jdbcPersistenceAdapter>
</persistenceAdapter>
</broker>
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://127.0.0.1/activemq?relaxAutoCommit=true"/>
<property name="username" value="root"/>
<property name="password" value="123456"/>
<property name="maxActive" value="200"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
</beans>
此时,重新启动MQ,就会发现db数据库中多了三张表:activemq_acks,activemq_lock,activemq_msgs,OK,说明activemq已经持久化成功啦!
-
activemq_acks:用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存,主要数据库字段如下:
- container:消息的destination
- sub_dest:如果是使用static集群,这个字段会有集群其他系统的信息
- client_id:每个订阅者都必须有一个唯一的客户端id用以区分
- sub_name:订阅者名称
- selector:选择器,可以选择只消费满足条件的消息。条件可以用自定义属性实现,可支持多属性and和or操作
- last_acked_id:记录消费过的消息的id
activemq_lock:在集群环境中才有用,只有一个Broker可以获得消息,称为Master Broker,其他的只能作为备份等待Master Broker不可用,才可能成为下一个Master Broker。这个表用于记录哪个Broker是当前的Master Broker。
-
activemq_msgs:用于存储消息,Queue和Topic都存储在这个表中。主要的数据库字段如下:
- id:自增的数据库主键
- container:消息的destination
- msgid_prod:消息发送者客户端的主键
- msg_seq:是发送消息的顺序,msgid_prod+msg_seq可以组成jms的messageid
- expiration:消息的过期时间,存储的是从1970-01-01到现在的毫秒数
- msg:消息本体的java序列化对象的二进制数据
- priority:优先级,从0-9,数值越大优先级越高
- activemq_acks用于存储订阅关系。如果是持久化topic,订阅者和服务器的订阅关系在这个表保存。