1.存储结构
- commitLog
消息存储文件,所有主题的消息都存储在commitLog文件中 - consumeQueue
每个MessageQueue对应一个ConsumeQueue文件,存储的是该队列的所有消息数据,但是不是存储的全量数据,只是存储了该消息在commitLog里的offset。相当于索引文件。消息到达commitLog后将异步发送到consumeQueue供消费者消费 - indexFile
存储了msgId key等键,对应commitLog的offset,可以快速根据msgId,msgKey等查询消息 - 事务状态服务
存储每条消息的事务状态 - 定时消息服务
每个延迟级别对应一个消息消费队列,存储延迟队列的消息拉取进度
2.存储流程
- broker-processor-SendMessageProcessor
broker处理消息的流程都在processor包里,其中处理消息生产者发送的消息的类是 SendMessageProcessor;处理方法为processRequest();进入到asyncSendMessage()方法,asyncPutMessage()方法
public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {
PutMessageStatus checkStoreStatus = this.checkStoreStatus();
if (checkStoreStatus != PutMessageStatus.PUT_OK) {
return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null));
}
PutMessageStatus msgCheckStatus = this.checkMessage(msg);
if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));
}
long beginTime = this.getSystemClock().now();
CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);
putResultFuture.thenAccept((result) -> {
long elapsedTime = this.getSystemClock().now() - beginTime;
if (elapsedTime > 500) {
log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
}
this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
if (null == result || !result.isOk()) {
this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
}
});
return putResultFuture;
}
- checkStoreStatus();检查MessageStore的状态,是否可用;如果Broker停止工作,或者该broker为slave则不支持写入,以及printTimes为5万次,也不能写入
- checkMessage();检查消息,如果主题长度超过127,则拒绝写入;如果properties超过32767,也拒绝写入
- 往commitLog里面写入数据
commitLog.asyncPutMessage(final MessageExtBrokerInner msg){
// 省略部分代码
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
try {
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
// Here settings are stored timestamp, in order to ensure an orderly
// global
msg.setStoreTimestamp(beginLockTimestamp);
if (null == mappedFile || mappedFile.isFull()) {
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
if (null == mappedFile) {
log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
}
设置消息存储的时间,如果MappedFile为空,表示commitLog下面不存在任何文件,本次消息时第一次发送,用偏移量作为文件名,不足20位补0,如果创建失败,有可能是磁盘空间不足,
- appendMessage()
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
assert messageExt != null;
assert cb != null;
int currentPos = this.wrotePosition.get();
if (currentPos < this.fileSize) {
ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
byteBuffer.position(currentPos);
AppendMessageResult result;
if (messageExt instanceof MessageExtBrokerInner) {
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
} else if (messageExt instanceof MessageExtBatch) {
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
} else {
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
this.wrotePosition.addAndGet(result.getWroteBytes());
this.storeTimestamp = result.getStoreTimestamp();
return result;
}
log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
首先获取MappedFile的当前写指针,如果当前指针大于或等于文件大小,则表示文件已经写满了;抛出未知错误异常。如果小于文件大小,通过slice()方法创建一个MappedFile的共享内存区,并且设置position(指针)
创建byteBuffer
- doAppend()
long wroteOffset = fileFromOffset + byteBuffer.position();
int sysflag = msgInner.getSysFlag();
int bornHostLength = (sysflag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
int storeHostLength = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
ByteBuffer bornHostHolder = ByteBuffer.allocate(bornHostLength);
ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength);
this.resetByteBuffer(storeHostHolder, storeHostLength);
String msgId;
if ((sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0) {
msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset);
} else {
msgId = MessageDecoder.createMessageId(this.msgIdV6Memory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset);
}
创建全局唯一消息ID,16字节组成
4 IP + 4 PORT + 8 偏移量
为了可读性,msgId会转成字符串;后续可以再转成字节数组,根据偏移量查找消息
- calMsgLength
protected static int calMsgLength(int sysFlag, int bodyLength, int topicLength, int propertiesLength) {
int bornhostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 8 : 20;
int storehostAddressLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 8 : 20;
final int msgLen = 4 //TOTALSIZE 该消息条目总长度 字节
+ 4 //MAGICCODE 魔数, 字节 固定 daa320a7
+ 4 //BODYCRC 消息体 校验码, 字节
+ 4 //QUEUEID 消息消费队列 ID
+ 4 //FLAG
+ 8 //QUEUEOFFSET 消息在消息消费队列的偏移量
+ 8 //PHYSICALOFFSET 消息在 CommitLog 件中的偏移量
+ 4 //SYSFLAG 消息系统 Flag ,例如是否压缩,是否是事务消息
+ 8 //BORNTIMESTAMP 消息生产者调用消息发送API 的时间戳
+ bornhostLength //BORNHOST 消息发送者 IP端口号
+ 8 //STORETIMESTAMP 消息存储时间
+ storehostAddressLength //STOREHOSTADDRESS broker服务器Ip + 端口
+ 4 //RECONSUMETIMES 消息重试次数
+ 8 //Prepared Transaction Offset 事务消息偏移量
+ 4 + (bodyLength > 0 ? bodyLength : 0) //BODY 消息体长度
+ 1 + topicLength //TOPIC 主题长度
+ 2 + (propertiesLength > 0 ? propertiesLength : 0) //propertiesLength 消息属性长度
+ 0;
return msgLen;
}
根据消息体长度等结合消息存储格式计算消息的总长度
如果消息长度大于commitLog文件剩余长度,则需要新创建一个文件来进行存储
// Write messages to the queue buffer
byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);
将消息内容存储到ByteBuffer中,然后创建AppendMessageResult。这里只是将消息存储在MappedFile对应的内存映射Buffer中,并没有刷到磁盘
3. 存储文件组织与内存映射
-
MappedFileQueue
管理所有的MappedFile,是存储目录的封装
1 ) String storePath :存储目
2 ) int mappedFileSize 个文件的存储大小
3 ) CopyOnWriteArrayList mappedFiles: MappedFile 文件集合
4 ) AllocateMappedFileService allocateMappedFileService :创 MappedFile 服务类
5 ) long flushedWhere = 0 前刷 指针, 表示该指针之前的所有数据 部持久化到磁盘
6 ) long committedWhere = 0 当前数据提交指针,内存中 ByteBuffer 当前的写指针,
该值大于等于 flushedWhere -
MappedFile
1 ) int OS PAGE SIZE :操作系统每页大小,默认 4k
2 ) AtomicLong TOTAL_MAPPED _ VIRTUAL_MEMORY JVM 例中 MappedFile 虚拟内存
3 ) Atomiclnteger TOTAL_MAPPED_FILES :当前 JVM 实例中 MappedFile 对象个数
4 ) Atomiclnteger wrotePosition 该文件的写指针,从0开始(内存映射文件中的写指针)
5 ) Atomiclnteger committedPosition :当前文件的提交指针,如果开启 transientStore
PoolEnable 则数据会存储在 TransientStorePool 中, 然后提交到内存映射 ByteBuffer 中,再刷到磁盘中
6 ) Atomiclnteger flushedPosition :刷写到磁盘指针,该指针之前的数据持久化到磁盘中
7 ) int fileSize :文件大小
8 ) FileChannel fileChannel 文件通道
9 ) ByteBuffer writeBuffer :堆内存 ByteBuffer 如果不为空,数 先将存储在
Buffer 中, 然后提交到 appedFile 对应的内存映射 Buffer transientStorePoolEnable
为true时不为空
10 ) TransientStorePool transientStorePool :堆内存池, transientStor PoolEnable true
时启用
11 ) String fileName :文件名称
12 ) long fileFromOffset :该文件的初始偏移量
13 ) File file :物理文件
14 ) MappedByteBuffer mappedByteBuffer :物理文件对应的内存映射 Buffer
15 ) volatile long storeTimestamp = :文件最后一次内容写入时间
16 ) boolean firstCreatelnQueue :是否是 MappedFileQueue 队列中第一个文件 -
commitLog服务有一个线程在跑,里面执行了mappedFile的commit方法
@Override
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();
int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();
int commitDataThoroughInterval =
CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();
long begin = System.currentTimeMillis();
if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {
this.lastCommitTimestamp = begin;
commitDataLeastPages = 0;
}
try {
boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
long end = System.currentTimeMillis();
if (!result) {
this.lastCommitTimestamp = end; // result = false means some data committed.
//now wake up flush thread.
flushCommitLogService.wakeup();
}
if (end - begin > 500) {
log.info("Commit data to file costs {} ms", end - begin);
}
this.waitForRunning(interval);
} catch (Throwable e) {
CommitLog.log.error(this.getServiceName() + " service has exception. ", e);
}
}
boolean result = false;
for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
result = CommitLog.this.mappedFileQueue.commit(0);
CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
}
CommitLog.log.info(this.getServiceName() + " service end");
}
}
- commit方法(将MappedFile中writeBuffer中的数据提交到文件通道FileChannel中)
public int commit(final int commitLeastPages) {
if (writeBuffer == null) {
//no need to commit data to file channel, so just regard wrotePosition as committedPosition.
return this.wrotePosition.get();
}
if (this.isAbleToCommit(commitLeastPages)) {
if (this.hold()) {
commit0(commitLeastPages);
this.release();
} else {
log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
}
}
// All dirty data has been committed to FileChannel.
if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {
this.transientStorePool.returnBuffer(writeBuffer);
this.writeBuffer = null;
}
return this.committedPosition.get();
}
执行提交操作,commitLeastPages为本次提交最小页数,如果待提交数据不满commitLeastPages,则不执行本次操作,待下次提交。writeBuffer如果为空,则直接返回wrotePosition指针,无需执行commit操作,表明commit操作的主体是wirterBuffer
- 提交完之后有个唤醒flushCommitLogService的操作
public int flush(final int flushLeastPages) {
if (this.isAbleToFlush(flushLeastPages)) {
if (this.hold()) {
int value = getReadPosition();
try {
//We only append data to fileChannel or mappedByteBuffer, never both.
if (writeBuffer != null || this.fileChannel.position() != 0) {
this.fileChannel.force(false);
} else {
this.mappedByteBuffer.force();
}
} catch (Throwable e) {
log.error("Error occurred when force data to disk.", e);
}
this.flushedPosition.set(value);
this.release();
} else {
log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
this.flushedPosition.set(getReadPosition());
}
}
return this.getFlushedPosition();
}
- 刷写磁盘
直接调 mappedByteBuffer fileChannel force() 方法将内存中数据持久化到磁盘,那么 flushedPosition 应该等于 MappedByteBuffer 中的写指针;如 writeBuffer 不为空, flushedPosition 应等于上一次 commit 指针;因为上一次提交的数据就是进入到 MappedByteBuffer 中的数据;如 writeBuffer 空,数据是直接进入到 appedByteBuffer, wrotePosition 代表的是 MappedByteBuffer 中的指针,故设置 flushedPosition为wrotePosition - MappedFile文件销毁
默认情况下,消息会存放48小时,过期就会被删除
4. RocketMQ的存储文件
1 ) commitlog :消息存储 目录
2 ) config :运行期间一些配置信息,主要包括下列信息
consumerFilter.json 主题消息过滤信息
consumerOffset.json 集群消费模式消息消 进度
delayOffset.json :延时消息队列拉取进
subscri ptionGroup .j son 消息消费组配置信息
topics.json: topic配置属性
3 ) consumequ ue :消息消 队列存储目
4 ) index :消息索引文件存储目录。
5 ) abort :如果存在 abort 文件说明 Broker 非正常关闭,该文件在broker启动时创建,正常
退出之前删除
6 ) checkpoint :文件检测点,存储 commitlog 文件 一次刷盘时间戳、 consumequeue
最后一次刷盘时间、 index 索引文件最后一次刷盘时间戳。
4.1 实时更新ConsumerQueue和indexFile文件
Broker文件在启动的时候,会启动一个ReputMessageService线程,并且初始化一个非常重要的参数reputFromOffset,该参数的含义是线程从哪个偏移量开始转发消息给ConsumeQueue和IndexFile。如果允许重发转发,reputFromOffset设置为commitLog的提交指针;如果不允许,则设置为commitLog内存中的最大偏移量
public void run() {
DefaultMessageStore.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
Thread.sleep(1);
this.doReput();
} catch (Exception e) {
DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
DefaultMessageStore.log.info(this.getServiceName() + " service end");
}
- doReput()
SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
1.返回从reputFromOffset偏移量开始的全部有效数据
if (result != null) {
try {
this.reputFromOffset = result.getStartOffset();
// 循环读取数据
for (int readSize = 0; readSize < result.getSize() && doNext; ) {
// 校验并且返回size
DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
//构建DispatchRequest对象,如果消息长度大于0,则调用doDispatch()
int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();
if (dispatchRequest.isSuccess()) {
if (size > 0) {
DefaultMessageStore.this.doDispatch(dispatchRequest);
2.最终实现是CommitLogDispatcherBuildConsumerQueue和CommitLogDispatcherBuildIndex
- consumerQueue的实现
class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {
@Override
public void dispatch(DispatchRequest request) {
final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
switch (tranType) {
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
DefaultMessageStore.this.putMessagePositionInfo(request);
break;
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
}
}
}
putMessagePositionInfo()
依次将消息的偏移量,长度,tagHashCode写入到byteBuffer中,并根据consumerQueueOffset计算ConsumeQueue中的物理地址,将内容追加到内存映射文件中(映射为MappedFile文件),并不进行刷盘操作
- IndexFile
如果messageIndexEnable设置为true,则调用IndexService的buildIndex构建hash索引,否则忽略本次转发任务
1.获取或创建IndexFile文件并获取所有文件的最大物理偏移量。如果该消息的物理偏移量小于索引文件中的物理偏移,则说明是重复数据
2.如果消息的唯一键不为空,则添加到Hash索引中,一遍快速检索消息
3.构建索引键,支持一条消息多个索引,多个索引间空格隔开
4.2消息队列与索引文件恢复
Broker是将消息全量存在commitLog文件中,同时异步开启一个线程去转发消息到consumerQueue和IndexFile中,如果转发任务未成功执行,此时Broker宕机,则会导致commitLog,consumerQueue,IndexFile中的文件不一致。会有一部分消息在commitLog中存在,但是不在consumerQueue中,永远不会被消费到。
- DefaultMessageStore - load();
public boolean load() {
boolean result = true;
try {
boolean lastExitOK = !this.isTempFileExist();
log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");
// 加载延时消息
if (null != scheduleMessageService) {
result = result && this.scheduleMessageService.load();
}
// load Commit Log
result = result && this.commitLog.load();
// load Consume Queue
result = result && this.loadConsumeQueue();
if (result) {
this.storeCheckpoint =
new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
this.indexService.load(lastExitOK);
this.recover(lastExitOK);
log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
}
} catch (Exception e) {
log.error("load exception", e);
result = false;
}
if (!result) {
this.allocateMappedFileService.shutdown();
}
return result;
}
判断上次退出是否正常。实现机制是Broker启动时创建abort文件,退出时勾子函数删除abort文件。如果下一次启动时存在abort文件,则表示是异常退出,需要修复文件
- MappedFiledQueue -load();
public boolean load() {
File dir = new File(this.storePath);
File[] files = dir.listFiles();
if (files != null) {
// ascending order
Arrays.sort(files);
for (File file : files) {
if (file.length() != this.mappedFileSize) {
log.warn(file + "\t" + file.length()
+ " length not matched message store config value, please check it manually");
return false;
}
try {
MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);
mappedFile.setWrotePosition(this.mappedFileSize);
mappedFile.setFlushedPosition(this.mappedFileSize);
mappedFile.setCommittedPosition(this.mappedFileSize);
this.mappedFiles.add(mappedFile);
log.info("load " + file.getPath() + " OK");
} catch (IOException e) {
log.error("load file " + file + " error", e);
return false;
}
}
}
return true;
}
1.加载commitLog文件,并且按照文件名排序。如果文件大小与配置文件的单个配置文件大小不一致,将忽略下面所有文件,重新创建MappedFile
2.加载消息消费队列,与上面类似,构建ConsumerQueue
3.加载存储检测点,检测三个文件的刷盘点,在下次刷盘中再次提交
4.2.1Broker正常关闭恢复
- CommitLog.recoverNormally()
从倒数第三个文件开始进行恢复,如果不足三个,则从第一个开始恢复,遍历CommitLog文件,每次取出一条消息,如果查找结果为true并且消息的长度大于0表示正确
4.2.2Broker异常关闭恢复
- CommitLog.recoverAbnormally()
从最后一个文件往前走,找到第一个正常存储的文件。
4.3刷盘机制
RocketMQ 存储与读写是基于 JDK NIO 的内存映射机制( MappedByteBuffer )的,消
息存储时首先将消息追加到内存,再根据配置的刷盘策略在不同时间进行刷写磁盘。如果
是同步刷盘,消息追加到内存后,将同步调用 MappedByteBufferforce()方法;如果是异
步刷盘,在消息追加到内存后立刻返回给消息发送端 RocketMQ 使用一个单独的线程按照某种设定的频率执行刷盘操作;commitLog,ConsumerQueue,IndexFile的刷盘原理都类似,但是IndexFile是在每次收到消息更新时进行的
4.3.1 同步刷盘
消息追加到内存映射文件的内存后,立刻将数据从内存刷到磁盘文件中,由CommitLog的handleDiskFlush实现
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
if (messageExt.isWaitStoreMsgOK()) {
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
CompletableFuture<PutMessageStatus> flushOkFuture = request.future();
PutMessageStatus flushStatus = null;
try {
flushStatus = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
//flushOK=false;
}
if (flushStatus != PutMessageStatus.PUT_OK) {
log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
+ " client address: " + messageExt.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
} else {
service.wakeup();
}
}
1.构建GroupCommitService同步任务并提交到GroupCommitRequest
2.等待同步刷盘任务完成,如果超时(5s)则返回刷盘错误,刷盘成功则正常返回给调用方
- GroupCommitService线程处理
阻塞等待刷盘结果
protected void waitForRunning(long interval) {
if (hasNotified.compareAndSet(true, false)) {
this.onWaitEnd();
return;
}
//entry to wait
waitPoint.reset();
try {
waitPoint.await(interval, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error("Interrupted", e);
} finally {
hasNotified.set(false);
this.onWaitEnd();
}
}
处理完所有刷盘任务后,更新刷盘检测点storeCheckPoint中的physicMsgTimestamp,但是并没有执行监测点的刷盘操作,该操作在刷写消息队列文件时触发
总结:消息生产者在消息服务端将内容追加到内存映射文件(内存)后,需要同步将内存的内容立刻刷写到磁盘,通过调用内存映射文件的force方法可将内存中的数据写入磁盘
4.3.2 异步刷盘
先把消息追加到内存中,然后开启一个刷盘线程定时将数据刷到磁盘中
4.4 过期文件删除机制
如果当前写文件在一定时间内未更新,则认为是过期文件,可以被删除,RocketMQ不会关注这个文件上的消息是否被全部消费,默认每个文件的过期时间是72小时,通过Broker配置文件中设置fileReservedTime来改变过期时间,单位为小时。
private void addScheduleTask() {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
DefaultMessageStore.this.cleanFilesPeriodically();
}
}, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);
每10s会调度一次cleanFilesPeriodcally,检查是否需要清除过期文件,执行频率可以通过设置cleanResourceInterval,默认为10s
该方法分别清除commitLog 和 consumeQueue,两种文件公用一套文件删除机制
- CleanCommitLogService - deleteExpireFiles()
// 文件保留时间,也就是从最后一次更新时间到现在,如果超过了该时间,则是过期文件
long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
// 删除物理文件的时间间隔,因为在一次删除过程中,可能需要删除的文件不止一个,该值指定两次删除文件的间隔
int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
// 清除文件时,如果该文件被其他线程占用,此时会影响删除任务,同时在第一次试图删除该文件时记录当前时间戳;该值表示第一次拒绝删除后能保留的最大时间
int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
// 满足以下三个条件之一,即可进行删除
boolean timeup = this.isTimeToDelete(); //到时间删除,默认凌晨4点,可配置;broker.conf
boolean spacefull = this.isSpaceToDelete(); // 磁盘空间不足
boolean manualDelete = this.manualDeleteFileSeveralTimes > 0; // 预留,手动触发,调用execueDeleteFilesManually,目前未封装
if (timeup || spacefull || manualDelete) {
if (manualDelete)
this.manualDeleteFileSeveralTimes--;
boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;
log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}",
fileReservedTime,
timeup,
spacefull,
manualDeleteFileSeveralTimes,
cleanAtOnce);
fileReservedTime *= 60 * 60 * 1000;
deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
destroyMapedFileIntervalForcibly, cleanAtOnce);
if (deleteCount > 0) {
} else if (spacefull) {
log.warn("disk space will be full soon, but delete file failed.");
}
}
- isSpaceToDelete
private boolean isSpaceToDelete() {
// commitLog consumerQueue在磁盘中的最大使用量
double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0;
cleanImmediately = false;
{
String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
// commitLog 磁盘使用率
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
// 如果使用率超过0.9,表示快满了,设置为不可写
if (physicRatio > diskSpaceWarningLevelRatio) {
boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
if (diskok) {
DefaultMessageStore.log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full");
}
cleanImmediately = true;
// 到了0.85就建议清除,但是不会拒绝写操作
} else if (physicRatio > diskSpaceCleanForciblyRatio) {
cleanImmediately = true;
} else {
boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
if (!diskok) {
DefaultMessageStore.log.info("physic disk space OK " + physicRatio + ", so mark disk ok");
}
}
if (physicRatio < 0 || physicRatio > ratio) {
DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + physicRatio);
return true;
}
}
- 删除操作
public int deleteExpiredFile(
final long expiredTime,
final int deleteFilesInterval,
final long intervalForcibly,
final boolean cleanImmediately
) {
return this.mappedFileQueue.deleteExpiredFileByTime(expiredTime, deleteFilesInterval, intervalForcibly, cleanImmediately);
}
// ------------------------------------------------执行文件销毁与删除
// 如果满足72小时删除或者磁盘空间条件删除,则进行删除MappedFile的destory方法进行删除
for (int i = 0; i < mfsLength; i++) {
MappedFile mappedFile = (MappedFile) mfs[i];
long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;
if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {
if (mappedFile.destroy(intervalForcibly)) {
files.add(mappedFile);
deleteCount++;
if (files.size() >= DELETE_FILES_BATCH_MAX) {
break;
}
if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {
try {
Thread.sleep(deleteFilesInterval);
} catch (InterruptedException e) {
}
}
} else {
break;
}
} else {
//avoid deleting files in the middle
break;
}
}