1.简介
1.1此处持久化和之前的持久化的区别
MQ高可用:事务、可持久、签收,是属于MQ自身特性,自带的。这里的持久化是外力,是外部插件。之前讲的持久化是MQ的外在表现,现在讲的的持久是是底层实现。
1.2持久化是什么?
持久化是什么?一句话就是:ActiveMQ宕机了,消息不会丢失的机制。
说明:为了避免意外宕机以后丢失信息,需要做到重启后可以恢复消息队列,消息系统一半都会采用持久化机制。ActiveMQ的消息持久化机制有JDBC,AMQ,KahaDB和LevelDB,无论使用哪种持久化方式,消息的存储逻辑都是一致的。就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件、内存数据库或者远程数据库等。再试图将消息发给接收者,成功则将消息从存储中删除,失败则继续尝试尝试发送。消息中心启动以后,要先检查指定的存储位置是否有未成功发送的消息,如果有,则会先把存储位置中的消息发出去。
2.持久化的方式
2.1AMQ Message Store
基于文件的存储机制,是以前的默认机制,现在不再使用。
AMQ是一种文件存储形式,它具有写入速度快和容易恢复的特点。消息存储再一个个文件中文件的默认大小为32M,当一个文件中的消息已经全部被消费,那么这个文件将被标识为可删除,在下一个清除阶段,这个文件被删除。AMQ适用于ActiveMQ5.3之前的版本
2.2kahaDB
基于日志文件,从ActiveMQ5.4(含)开始默认的持久化插件。
官网文档:http://activemq.aache.org/kahadb,官网上还有一些其他配置参数。
配置文件activemq.xml中,如下
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
日志文件的存储目录在:%activemq安装目录%/data/kahadb
2.3JDBC消息存储
2.3.1原理图
2.3.2添加mysql数据库的驱动包到lib文件夹
2.3.3 jdbcPersistenceAdapter配置
修改配置文件activemq.xml。将之前的替换为jdbc的配置。如下:
<!--
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
-->
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysql-ds" createTableOnStartup="true"/>
</persistenceAdapter>
2.3.4 数据库连接池配置
需要我们准备一个mysql数据库,并创建一个名为activemq的数据库。在标签和标签之间插入数据库连接池配置。
之后需要建一个数据库,名为activemq。新建的数据库要采用latin1 或者ASCII编码
默认是的dbcp数据库连接池,如果要换成其他数据库连接池,需要将该连接池jar包,也放到lib目录下。
2.3.5建库SQL和创表说明
重启activemq。会自动生成如下3张表。如果没有自动生成,需要我们手动执行SQL。如果实在不行,下面是手动建表的SQL:
-- auto-generated definition
create table ACTIVEMQ_ACKS
(
CONTAINER varchar(250) not null comment '消息的Destination',
SUB_DEST varchar(250) null comment '如果使用的是Static集群,这个字段会有集群其他系统的信息',
CLIENT_ID varchar(250) not null comment '每个订阅者都必须有一个唯一的客户端ID用以区分',
SUB_NAME varchar(250) not null comment '订阅者名称',
SELECTOR varchar(250) null comment '选择器,可以选择只消费满足条件的消息,条件可以用自定义属性实现,可支持多属性AND和OR操作',
LAST_ACKED_ID bigint null comment '记录消费过消息的ID',
PRIORITY bigint default 5 not null comment '优先级,默认5',
XID varchar(250) null,
primary key (CONTAINER, CLIENT_ID, SUB_NAME, PRIORITY)
)
comment '用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存';
create index ACTIVEMQ_ACKS_XIDX
on ACTIVEMQ_ACKS (XID);
-- auto-generated definition
create table ACTIVEMQ_LOCK
(
ID bigint not null
primary key,
TIME bigint null,
BROKER_NAME varchar(250) null
);
-- auto-generated definition
create table ACTIVEMQ_MSGS
(
ID bigint not null
primary key,
CONTAINER varchar(250) not null,
MSGID_PROD varchar(250) null,
MSGID_SEQ bigint null,
EXPIRATION bigint null,
MSG blob null,
PRIORITY bigint null,
XID varchar(250) null
);
create index ACTIVEMQ_MSGS_CIDX
on ACTIVEMQ_MSGS (CONTAINER);
create index ACTIVEMQ_MSGS_EIDX
on ACTIVEMQ_MSGS (EXPIRATION);
create index ACTIVEMQ_MSGS_MIDX
on ACTIVEMQ_MSGS (MSGID_PROD, MSGID_SEQ);
create index ACTIVEMQ_MSGS_PIDX
on ACTIVEMQ_MSGS (PRIORITY);
create index ACTIVEMQ_MSGS_XIDX
on ACTIVEMQ_MSGS (XID);
ACTIVEMQ_MSGS数据表:
ACTIVEMQ_ACKS数据表:
ACTIVEMQ_LOCK数据表:
2.3.6总结
如果是queue:在没有消费者消费的情况下会将消息保存到activemq_msgs表中,消费之后这些消息会被立刻删除。
如果是topic:一般是先启动消费者订阅然后在生产的情况下将消息保存到activemq_acks。
- 注意点1:要将使用到的jar文件房知道ActiveMQ安装目录下的lib中。包括mysql_jdbc和对应的连接池的jar包
- 注意点2:在jdbcPersistenceAdapter标签中设置了createTableOnStartup属性为true时候,第一次启动ActiveMQ时,将会自动创建数据表,但是启动之后可以更改为false
- 下划线坑爹:java.lang.IllageStateException:BeanFactory not initialized or aleady closed:这是因为您的操作机器中有_符号,更改机器名后重启就行。
2.4 LevelDB消息存储
过于新兴的技术,现在有些不确定。这种文件系统是ActiveMQ5.8之后引进的,他和KhaDB非常近似,也是基于文件的本地数据库的存储形式,但是他比KahaDB更快的持久性。他不使用自定义的B-Tree实现索引预写日志,而是使用levelDB索引。
<persistenceAdapter>
<levelDBdirectory="acivemq-data"/>
</persistenceAdapter>
2.5JDBC Message Store with ActiveMQ Journal
这种方式克服了JDBC Store的不足,JDBC每次消息过来,都需要去写库读库。ActiveMQ Journal,使用高速缓存写入技术,大大提高了性能。当消费者的速度能够及时跟上生产者消息的生产速度时,journal文件能够大大减少需要写入到DB中的消息。
举个例子:生产者生产了1000条消息,这1000条消息会保存到journal文件,如果消费者的消费速度很快的情况下,在journal文件还没有同步到DB之前,消费者已经消费了90%的以上消息,那么这个时候只需要同步剩余的10%的消息到DB。如果消费者的速度很慢,这个时候journal文件可以使消息以批量方式写到DB。
为了高性能,这种方式使用日志文件存储+数据库存储。先将消息持久到日志文件,等待一段时间再将未消费的消息持久到数据库。该方式要比JDBC性能要高。
下面是基于上面JDBC配置,再做一点修改:
3.总结
① jdbc效率低,kahaDB效率高,jdbc+Journal效率较高。
② 持久化消息主要指的是:MQ所在服务器宕机了消息不会丢试的机制。
③ 持久化机制演变的过程:
从最初的AMQ Message Store方案到ActiveMQ V4版本退出的High Performance Journal(高性能事务支持)附件,并且同步推出了关于关系型数据库的存储方案。ActiveMQ5.3版本又推出了对KahaDB的支持(5.4版本后被作为默认的持久化方案),后来ActiveMQ 5.8版本开始支持LevelDB,到现在5.9提供了标准的Zookeeper+LevelDB集群化方案。