ActiveMQ 笔记(六)ActiveMQ的消息存储和持久化

个人博客网:https://wushaopei.github.io/    (你想要这里多有)

一、持久化机制

1、Activemq持久化

1.1 什么是持久化:

持久化就是高可用的机制,即使服务器宕机了,消息也不会丢失

ActiveMQ 笔记(六)ActiveMQ的消息存储和持久化

1.2 持久化的作用

将MQ 收到的消息存储到文件、硬盘、数据库 等、 则叫MQ 的持久化,这样即使服务器宕机,消息在本地还是有,仍就可以访问到。

ActiveMQ 笔记(六)ActiveMQ的消息存储和持久化

详情——官网 : http://activemq.apache.org/persistence

1.3 ActiveMQ 支持的消息持久化机制

  • 为了避免意外宕机以后丢失信息,需要做到重启后可以恢复消息队列,消息系统一半都会采用持久化机制。
  • ActiveMQ的消息持久化机制有JDBC,AMQ,KahaDB和LevelDB,无论使用哪种持久化方式,消息的存储逻辑都是一致的。
  • 就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件、内存数据库或者远程数据库等。再试图将消息发给接收者,成功则将消息从存储中删除,失败则继续尝试尝试发送。
  • 消息中心启动以后,要先检查指定的存储位置是否有未成功发送的消息,如果有,则会先把存储位置中的消息发出去。

注意:一句话:ActiveMQ宕机了,消息不会丢失的机制。

二、持久化方式

1、Activemq的持久化方式有几种

1.1 AMQ Mesage Store(了解)

AMQ是一种文件存储形式,它具有写入速度快和容易恢复的特点。消息存储再一个个文件中文件的默认大小为32M,当一个文件中的消息已经
全部被消费,那么这个文件将被标识为可删除,在下一个清除阶段,这个文件被删除。AMQ适用于ActiveMQ5.3之前的版本

ActiveMQ 笔记(六)ActiveMQ的消息存储和持久化

注意:基于文件的存储方式,是以前的默认消息存储,现在不用了

1.2 KahaDB消息存储(默认)

 5.4 之后基于日志文件的持久化插件,默认持久化插件,提高了性能和恢复能力

ActiveMQ 笔记(六)ActiveMQ的消息存储和持久化

KahaDB 的属性配置 : http://activemq.apache.org/kahadb

说明:它使用一个事务日志和 索引文件来存储所有的地址

ActiveMQ 笔记(六)ActiveMQ的消息存储和持久化ActiveMQ 笔记(六)ActiveMQ的消息存储和持久化

db-<数字>.log 存储数据,一个存满会再次创建 db-2 db-3 …… ,当不会有引用到数据文件的内容时,文件会被删除或归档

db.data 是一个BTree 索引,索引了消息数据记录的消息,是消息索引文件,它作为索引指向了 db-<x>.log 里的消息

一点题外话:就像mysql 数据库,新建一张表,就有这个表对应的 .MYD 文件,作为它的数据文件,就有一个 .MYI 作为索引文件。

db.free 存储空闲页 ID 有时会被清除

db.redo 当 KahaDB 消息存储在强制退出后启动,用于恢复 BTree 索引

lock 顾名思义就是锁

四类文件+一把锁 ==》 KahaDB

ActiveMQ 笔记(六)ActiveMQ的消息存储和持久化

1.3 JDBC消息存储

消息基于JDBC存储的

ActiveMQ 笔记(六)ActiveMQ的消息存储和持久化

1.4 LevelDB消息存储(了解)

希望作为以后的存储引擎,5.8 以后引进,也是基于文件的本地数据存储形式,但是比 KahaDB 更快

ActiveMQ 笔记(六)ActiveMQ的消息存储和持久化

它比KahaDB 更快的原因是她不使用BTree 索引,而是使用本身自带的 LeavelDB 索引

这种文件系统是从ActiveMQ5.8之后引进的,它和KahaDB非常相似,也是基于文件的本地数据库存储形式,但是它提供比KahaDB更快的持久性。
但它不使用自定义B-Tree实现来索引独写日志,而是使用基于LevelDB的索引

题外话:为什么LeavelDB 更快,并且5.8 以后就支持,为什么还是默认 KahaDB 引擎,因为 activemq 官网本身没有定论,LeavelDB 之后又出了可复制的LeavelDB 比LeavelDB 更性能更优越,但需要基于 Zookeeper 所以这些官方还没有定论,任就使用 KahaDB

默认配置如下:

<persistenceAdapter>
<levelDB directory="activemq-data"/>
</persistenceAdapter>

ActiveMQ 笔记(六)ActiveMQ的消息存储和持久化

1.5 JDBC Message Store with ActiveMQ Journal

2、JDBC存储消息(重点)

JDBC : 有一部分数据会真实的存储到数据库中
使用JDBC 的持久化,

ActiveMQ 笔记(六)ActiveMQ的消息存储和持久化

修改配置文件,默认 kahaDB

修改之前:

<persistenceAdapter>

       <kahaDB directory="${activemq.data}/kahadb"/>  

 </persistenceAdapter>

ActiveMQ 笔记(六)ActiveMQ的消息存储和持久化

修改之后:

<persistenceAdapter>

      <jdbcPersistenceAdapter dataSource="#mysql-ds"/>

 </persistenceAdapter>

ActiveMQ 笔记(六)ActiveMQ的消息存储和持久化

在activemq 的lib 目录下添加 jdbc 的jar 包 (connector.jar 我使用5.1.41 版本)

修改配置文件 : activemq.xml 使其连接自己windows 上的数据库,并在本地创建名为activemq 的数据库

ActiveMQ 笔记(六)ActiveMQ的消息存储和持久化ActiveMQ 笔记(六)ActiveMQ的消息存储和持久化

让linux 上activemq 可以访问到 mysql ,之后产生消息。

ActiveMQ 启动后会自动在 mysql 的activemq 数据库下创建三张表:activemq_msgs 、activemq_acks、activemq_lock

activemq_acks:用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存

activemq_lock:在集群环境中才有用,只有一个Broker可以获得消息,称为Master Broker

activemq_msgs:用于存储消息,Queue和Topic都存储在这个表中

ActiveMQ 笔记(六)ActiveMQ的消息存储和持久化

点对点会在数据库的数据表 ACTIVEMQ_MSGS 中加入消息的数据,且在点对点时,消息被消费就会从数据库中删除

但是对于主题,订阅方式接受到的消息,会在 ACTIVEMQ_MSGS 存储消息,即使MQ 服务器下线,并在 ACTIVEMQ_ACKS 中存储消费者信息 。 并且存储以 activemq 为主,当activemq 中的消息被删除后,数据库中的也会自动被删除。

如果表没生成,可能需要自己创建

-- auto-generated definition
create table ACTIVEMQ_ACKS
(
CONTAINER varchar(250) not null comment '消息的Destination',
SUB_DEST varchar(250) null comment '如果使用的是Static集群,这个字段会有集群其他系统的信息',
CLIENT_ID varchar(250) not null comment '每个订阅者都必须有一个唯一的客户端ID用以区分',
SUB_NAME varchar(250) not null comment '订阅者名称',
SELECTOR varchar(250) null comment '选择器,可以选择只消费满足条件的消息,条件可以用自定义属性实现,可支持多属性AND和OR操作',
LAST_ACKED_ID bigint null comment '记录消费过消息的ID',
PRIORITY bigint default 5 not null comment '优先级,默认5',
XID varchar(250) null,
primary key (CONTAINER, CLIENT_ID, SUB_NAME, PRIORITY)
)
comment '用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存'; create index ACTIVEMQ_ACKS_XIDX
on ACTIVEMQ_ACKS (XID); -- auto-generated definition
create table ACTIVEMQ_LOCK
(
ID bigint not null
primary key,
TIME bigint null,
BROKER_NAME varchar(250) null
); -- auto-generated definition
create table ACTIVEMQ_MSGS
(
ID bigint not null
primary key,
CONTAINER varchar(250) not null,
MSGID_PROD varchar(250) null,
MSGID_SEQ bigint null,
EXPIRATION bigint null,
MSG blob null,
PRIORITY bigint null,
XID varchar(250) null
); create index ACTIVEMQ_MSGS_CIDX
on ACTIVEMQ_MSGS (CONTAINER);
create index ACTIVEMQ_MSGS_EIDX
on ACTIVEMQ_MSGS (EXPIRATION);
create index ACTIVEMQ_MSGS_MIDX
on ACTIVEMQ_MSGS (MSGID_PROD, MSGID_SEQ);
create index ACTIVEMQ_MSGS_PIDX
on ACTIVEMQ_MSGS (PRIORITY);
create index ACTIVEMQ_MSGS_XIDX
on ACTIVEMQ_MSGS (XID);

ActiveMQ 笔记(六)ActiveMQ的消息存储和持久化

坑:

ActiveMQ 笔记(六)ActiveMQ的消息存储和持久化ActiveMQ 笔记(六)ActiveMQ的消息存储和持久化

JDBC 改进: 加入高速缓存机制 Journal

高速缓存在 activemq.xml 中的配置:

ActiveMQ 笔记(六)ActiveMQ的消息存储和持久化ActiveMQ 笔记(六)ActiveMQ的消息存储和持久化

⑤代码运行验证

一定要开启持久化 :  messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);

(1)队列Queue:

生产者:

      。。。。。
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setBrokerURL(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
MessageProducer messageProducer = session.createProducer(queue);
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start();
for (int i = 0; i < 3; i++) { 。。。。。。。。。。。

ActiveMQ 笔记(六)ActiveMQ的消息存储和持久化

运行结果:

在点对点类型中

  • 当DeliveryMode设置为NON_PERSISTENCE时,消息被保存在内存中
  • 当DeliveryMode设置为PERSISTENCE时,消息保存在broker的相应的文件或者数据库中。

而且点对点类型中消息一旦被Consumer消费,就从数据中删除
 
消费前的消息,会被存放到数据库

ActiveMQ 笔记(六)ActiveMQ的消息存储和持久化ActiveMQ 笔记(六)ActiveMQ的消息存储和持久化

上面的消息被消费后被MQ自动删除

ActiveMQ 笔记(六)ActiveMQ的消息存储和持久化ActiveMQ 笔记(六)ActiveMQ的消息存储和持久化

(2)主题Topic

        。。。。
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setBrokerURL(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.setClientID("我是生产者张三");
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(ACTIVEMQ_TOPIC_NAME);
MessageProducer messageProducer = session.createProducer(topic);
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start();
for (int i = 0; i < 3; i++) {
。。。。。。

ActiveMQ 笔记(六)ActiveMQ的消息存储和持久化

 在点对点类型中

  • 当DeliveryMode设置为NON_PERSISTENCE时,消息被保存在内存中
  • 当DeliveryMode设置为PERSISTENCE时,消息保存在broker的相应的文件或者数据库中。

而且点对点类型中消息一旦被Consumer消费,就从数据中删除
 
消费前的消息,会被存放到数据库

ActiveMQ 笔记(六)ActiveMQ的消息存储和持久化ActiveMQ 笔记(六)ActiveMQ的消息存储和持久化

上面的消息被消费后被MQ自动删除

ActiveMQ 笔记(六)ActiveMQ的消息存储和持久化ActiveMQ 笔记(六)ActiveMQ的消息存储和持久化

3、JDBC Message store with ActiveMQ Journal(重点)

3.1 定义:

ActiveMQ 笔记(六)ActiveMQ的消息存储和持久化ActiveMQ 笔记(六)ActiveMQ的消息存储和持久化

3.2 说明

这种方式克服了JDBC Store的不足,JDBC每次消息过来,都需要去写库读库。
ActiveMQ Journal,使用高速缓存写入技术,大大提高了性能。
 
当消费者的速度能够及时跟上生产者消息的生产速度时,journal文件能够大大减少需要写入到DB中的消息。
举个例子:

生产者生产了1000条消息,这1000条消息会保存到journal文件,如果消费者的消费速度很快的情况下,在journal文件还没有同步到DB之前,消费者已经消费了90%的以上消息,那么这个时候只需要同步剩余的10%的消息到DB。如果消费者的速度很慢,这个时候journal文件可以使消息以批量方式写到DB。

3.3 配置方式:

原来的配置:

<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysql-ds" />
</persistenceAdapter>

ActiveMQ 笔记(六)ActiveMQ的消息存储和持久化

修改的结果:

<persistenceFactory>
<journalPersistenceAdapterFactory
journalLogFiles="5"
journalLogFileSize="32768"
useJournal="true"
useQuickJournal="true"
dataSource="#mysql-ds"
dataDirectory="../activemq-data" />
</persistenceFactory>

ActiveMQ 笔记(六)ActiveMQ的消息存储和持久化

以前是实时写入mysql,在使用了journal后,数据会被journal处理,如果在一定时间内journal处理(消费)完了,就不写入mysql,如果没消费完,就写入mysql,起到一个缓存的作用

小总结:

  1. 持久化消息是指:
  • MQ 所在的服务器down 了消息也不会丢失

2.持久化机制演化过程:

  • 从最初的AMQ Message Store 方案到 ActiveMQ V4版本推出的High performance journal (高性能事务)附件,并且同步推出了关系型数据库的存储方案, ActiveMQ 5.3 版本有推出了KahaDB 的支持,(也是5.4之后的默认持久化方案),后来ActiveMQ 从5.8开始支持LevelDB ,现在5.9 提供了 Zookeeper + LevelDB 的集群化方案。

3. ActiveMQ 消息持久化机制有:

AMQ 基于日志文件
KahaDB 基于日志文件,5.4 之后的默认持久化
JDBC 基于第三方数据库
LevelDB 基于文件的本地数据库存储,从5.8 之后推出了LevelDB 性能高于 KahaDB
ReplicatedLevelDB Store

从5.8之后提供了基于LevelDB 和Zookeeper 的数据复制方式,用于Master-slave方式的首数据复制选方案

但是无论使用哪种持久化方式,消息的存储逻辑都一样

上一篇:IOC的理解


下一篇:activeMq笔记