RocketMQ源码分析(三)消息存储

1.4 消息存储

1.4.1 消息存储核心类

RocketMQ源码分析(三)消息存储

private final MessageStoreConfig messageStoreConfig;	//消息配置属性
private final CommitLog commitLog;		//CommitLog文件存储的实现类
private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable;	//消息队列存储缓存表,按照消息主题分组
private final FlushConsumeQueueService flushConsumeQueueService;	//消息队列文件刷盘线程
private final CleanCommitLogService cleanCommitLogService;	//清除CommitLog文件服务
private final CleanConsumeQueueService cleanConsumeQueueService;	//清除ConsumerQueue队列文件服务
private final IndexService indexService;	//索引实现类
private final AllocateMappedFileService allocateMappedFileService;	//MappedFile分配服务
private final ReputMessageService reputMessageService;//CommitLog消息分发,根据CommitLog文件构建ConsumerQueue、IndexFile文件
private final HAService haService;	//存储HA机制
private final ScheduleMessageService scheduleMessageService;	//消息服务调度线程
private final StoreStatsService storeStatsService;	//消息存储服务
private final TransientStorePool transientStorePool;	//消息堆外内存缓存
private final BrokerStatsManager brokerStatsManager;	//Broker状态管理器
private final MessageArrivingListener messageArrivingListener;	//消息拉取长轮询模式消息达到监听器
private final BrokerConfig brokerConfig;	//Broker配置类
private StoreCheckpoint storeCheckpoint;	//文件刷盘监测点
private final LinkedList<CommitLogDispatcher> dispatcherList;	//CommitLog文件转发请求
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

1.4.2 消息存储流程

RocketMQ源码分析(三)消息存储

消息存储入口:DefaultMessageStore#putMessage

//判断Broker角色如果是从节点,则无需写入
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
        long value = this.printTimes.getAndIncrement();
        if ((value % 50000) == 0) {
            log.warn("message store is slave mode, so putMessage is forbidden ");
        }

    return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
}

//判断当前写入状态如果是正在写入,则不能继续
if (!this.runningFlags.isWriteable()) {
        long value = this.printTimes.getAndIncrement();
    	return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
} else {
    this.printTimes.set(0);
}
//判断消息主题长度是否超过最大限制
if (msg.getTopic().length() > Byte.MAX_VALUE) {
    log.warn("putMessage message topic length too long " + msg.getTopic().length());
    return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
}
//判断消息属性长度是否超过限制
if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
    log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
    return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
}
//判断系统PageCache缓存去是否占用
if (this.isOSPageCacheBusy()) {
    return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
}

//将消息写入CommitLog文件
PutMessageResult result = this.commitLog.putMessage(msg);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

代码:CommitLog#putMessage

//记录消息存储时间
msg.setStoreTimestamp(beginLockTimestamp);

//判断如果mappedFile如果为空或者已满,创建新的mappedFile文件
if (null == mappedFile || mappedFile.isFull()) {
    mappedFile = this.mappedFileQueue.getLastMappedFile(0); 
}
//如果创建失败,直接返回
if (null == mappedFile) {
    log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
    beginTimeInLock = 0;
    return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
}

//写入消息到mappedFile中
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

代码:MappedFile#appendMessagesInner

//获得文件的写入指针
int currentPos = this.wrotePosition.get();

//如果指针大于文件大小则直接返回
if (currentPos < this.fileSize) {
    //通过writeBuffer.slice()创建一个与MappedFile共享的内存区,并设置position为当前指针
    ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
    byteBuffer.position(currentPos);
    AppendMessageResult result = null;
    if (messageExt instanceof MessageExtBrokerInner) {
       	//通过回调方法写入
        result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
    } else if (messageExt instanceof MessageExtBatch) {
        result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
    } else {
        return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
    }
    this.wrotePosition.addAndGet(result.getWroteBytes());
    this.storeTimestamp = result.getStoreTimestamp();
    return result;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

代码:CommitLog#doAppend

//文件写入位置
long wroteOffset = fileFromOffset + byteBuffer.position();
//设置消息ID
this.resetByteBuffer(hostHolder, 8);
String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset);

//获得该消息在消息队列中的偏移量
keyBuilder.setLength(0);
keyBuilder.append(msgInner.getTopic());
keyBuilder.append('-');
keyBuilder.append(msgInner.getQueueId());
String key = keyBuilder.toString();
Long queueOffset = CommitLog.this.topicQueueTable.get(key);
if (null == queueOffset) {
    queueOffset = 0L;
    CommitLog.this.topicQueueTable.put(key, queueOffset);
}

//获得消息属性长度
final byte[] propertiesData =msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);

final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;

if (propertiesLength > Short.MAX_VALUE) {
    log.warn("putMessage message properties length too long. length={}", propertiesData.length);
    return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);
}

//获得消息主题大小
final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
final int topicLength = topicData.length;

//获得消息体大小
final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;
//计算消息总长度
final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

代码:CommitLog#calMsgLength

protected static int calMsgLength(int bodyLength, int topicLength, int propertiesLength) {
    final int msgLen = 4 //TOTALSIZE
        + 4 //MAGICCODE  
        + 4 //BODYCRC
        + 4 //QUEUEID
        + 4 //FLAG
        + 8 //QUEUEOFFSET
        + 8 //PHYSICALOFFSET
        + 4 //SYSFLAG
        + 8 //BORNTIMESTAMP
        + 8 //BORNHOST
        + 8 //STORETIMESTAMP
        + 8 //STOREHOSTADDRESS
        + 4 //RECONSUMETIMES
        + 8 //Prepared Transaction Offset
        + 4 + (bodyLength > 0 ? bodyLength : 0) //BODY
        + 1 + topicLength //TOPIC
        + 2 + (propertiesLength > 0 ? propertiesLength : 0) //propertiesLength
        + 0;
    return msgLen;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

代码:CommitLog#doAppend

//消息长度不能超过4M
if (msgLen > this.maxMessageSize) {
    CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
        + ", maxMessageSize: " + this.maxMessageSize);
    return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
}

//消息是如果没有足够的存储空间则新创建CommitLog文件
if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
    this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
    // 1 TOTALSIZE
    this.msgStoreItemMemory.putInt(maxBlank);
    // 2 MAGICCODE
    this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
    // 3 The remaining space may be any value
    // Here the length of the specially set maxBlank
    final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
    byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
    return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
        queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
}

//将消息存储到ByteBuffer中,返回AppendMessageResult
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
// Write messages to the queue buffer
byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);
AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, 
                                                     msgLen, msgId,msgInner.getStoreTimestamp(), 
                                                     queueOffset, 
                                                     CommitLog.this.defaultMessageStore.now() 
                                                     -beginTimeMills);
switch (tranType) {
    case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
    case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
        break;
    case MessageSysFlag.TRANSACTION_NOT_TYPE:
    case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
        //更新消息队列偏移量
        CommitLog.this.topicQueueTable.put(key, ++queueOffset);
        break;
    default:
        break;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

代码:CommitLog#putMessage

//释放锁
putMessageLock.unlock();
//刷盘
handleDiskFlush(result, putMessageResult, msg);
//执行HA主从同步
handleHA(result, putMessageResult, msg);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

1.4.3 存储文件

  • commitLog:消息存储目录
  • config:运行期间一些配置信息
  • consumerqueue:消息消费队列存储目录
  • index:消息索引文件存储目录
  • abort:如果存在改文件寿命Broker非正常关闭
  • checkpoint:文件检查点,存储CommitLog文件最后一次刷盘时间戳、consumerquueue最后一次刷盘时间,index索引文件最后一次刷盘时间戳。

1.4.4 存储文件内存映射

RocketMQ通过使用内存映射文件提高IO访问性能,无论是CommitLog、ConsumerQueue还是IndexFile,单个文件都被设计为固定长度,如果一个文件写满以后再创建一个新文件,文件名就为该文件第一条消息对应的全局物理偏移量。

1)MappedFileQueue
RocketMQ源码分析(三)消息存储

String storePath;	//存储目录
int mappedFileSize;	// 单个文件大小
CopyOnWriteArrayList<MappedFile> mappedFiles;	//MappedFile文件集合
AllocateMappedFileService allocateMappedFileService;	//创建MapFile服务类
long flushedWhere = 0;		//当前刷盘指针
long committedWhere = 0;	//当前数据提交指针,内存中ByteBuffer当前的写指针,该值大于等于flushWhere
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 根据存储时间查询MappedFile
public MappedFile getMappedFileByTime(final long timestamp) {
    Object[] mfs = this.copyMappedFiles(0);
	
    if (null == mfs)
        return null;
	//遍历MappedFile文件数组
    for (int i = 0; i < mfs.length; i++) {
        MappedFile mappedFile = (MappedFile) mfs[i];
        //MappedFile文件的最后修改时间大于指定时间戳则返回该文件
        if (mappedFile.getLastModifiedTimestamp() >= timestamp) {
            return mappedFile;
        }
    }

    return (MappedFile) mfs[mfs.length - 1];
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 根据消息偏移量offset查找MappedFile
public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {
    try {
        //获得第一个MappedFile文件
        MappedFile firstMappedFile = this.getFirstMappedFile();
        //获得最后一个MappedFile文件
        MappedFile lastMappedFile = this.getLastMappedFile();
        //第一个文件和最后一个文件均不为空,则进行处理
        if (firstMappedFile != null && lastMappedFile != null) {
            if (offset < firstMappedFile.getFileFromOffset() || 
                offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {
            } else {
                //获得文件索引
                int index = (int) ((offset / this.mappedFileSize) 
                                   - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));
                MappedFile targetFile = null;
                try {
                    //根据索引返回目标文件
                    targetFile = this.mappedFiles.get(index);
                } catch (Exception ignored) {
                }

                if (targetFile != null && offset >= targetFile.getFileFromOffset()
                    && offset < targetFile.getFileFromOffset() + this.mappedFileSize) {
                    return targetFile;
                }

                for (MappedFile tmpMappedFile : this.mappedFiles) {
                    if (offset >= tmpMappedFile.getFileFromOffset()
                        && offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {
                        return tmpMappedFile;
                    }
                }
            }

            if (returnFirstOnNotFound) {
                return firstMappedFile;
            }
        }
    } catch (Exception e) {
        log.error("findMappedFileByOffset Exception", e);
    }

    return null;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 获取存储文件最小偏移量
public long getMinOffset() {

    if (!this.mappedFiles.isEmpty()) {
        try {
            return this.mappedFiles.get(0).getFileFromOffset();
        } catch (IndexOutOfBoundsException e) {
            //continue;
        } catch (Exception e) {
            log.error("getMinOffset has exception.", e);
        }
    }
    return -1;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 获取存储文件最大偏移量
public long getMaxOffset() {
    MappedFile mappedFile = getLastMappedFile();
    if (mappedFile != null) {
        return mappedFile.getFileFromOffset() + mappedFile.getReadPosition();
    }
    return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 返回存储文件当前写指针
public long getMaxWrotePosition() {
    MappedFile mappedFile = getLastMappedFile();
    if (mappedFile != null) {
        return mappedFile.getFileFromOffset() + mappedFile.getWrotePosition();
    }
    return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

2)MappedFile

RocketMQ源码分析(三)消息存储

int OS_PAGE_SIZE = 1024 * 4;		//操作系统每页大小,默认4K
AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);	//当前JVM实例中MappedFile虚拟内存
AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);	//当前JVM实例中MappedFile对象个数
AtomicInteger wrotePosition = new AtomicInteger(0);	//当前文件的写指针
AtomicInteger committedPosition = new AtomicInteger(0);	//当前文件的提交指针
AtomicInteger flushedPosition = new AtomicInteger(0);	//刷写到磁盘指针
int fileSize;	//文件大小
FileChannel fileChannel;	//文件通道	
ByteBuffer writeBuffer = null;	//堆外内存ByteBuffer
TransientStorePool transientStorePool = null;	//堆外内存池
String fileName;	//文件名称
long fileFromOffset;	//该文件的处理偏移量
File file;	//物理文件
MappedByteBuffer mappedByteBuffer;	//物理文件对应的内存映射Buffer
volatile long storeTimestamp = 0;	//文件最后一次内容写入时间
boolean firstCreateInQueue = false;	//是否是MappedFileQueue队列中第一个文件
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

MappedFile初始化

  • 未开启transientStorePoolEnabletransientStorePoolEnable=truetrue表示数据先存储到堆外内存,然后通过Commit线程将数据提交到内存映射Buffer中,再通过Flush线程将内存映射Buffer中数据持久化磁盘。
private void init(final String fileName, final int fileSize) throws IOException {
    this.fileName = fileName;
    this.fileSize = fileSize;
    this.file = new File(fileName);
    this.fileFromOffset = Long.parseLong(this.file.getName());
    boolean ok = false;
	
    ensureDirOK(this.file.getParent());

    try {
        this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
        this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
        TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
        TOTAL_MAPPED_FILES.incrementAndGet();
        ok = true;
    } catch (FileNotFoundException e) {
        log.error("create file channel " + this.fileName + " Failed. ", e);
        throw e;
    } catch (IOException e) {
        log.error("map file " + this.fileName + " Failed. ", e);
        throw e;
    } finally {
        if (!ok && this.fileChannel != null) {
            this.fileChannel.close();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

开启transientStorePoolEnable

public void init(final String fileName, final int fileSize,
    final TransientStorePool transientStorePool) throws IOException {
    init(fileName, fileSize);
    this.writeBuffer = transientStorePool.borrowBuffer();	//初始化writeBuffer
    this.transientStorePool = transientStorePool;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

MappedFile提交

提交数据到FileChannel,commitLeastPages为本次提交最小的页数,如果待提交数据不满commitLeastPages,则不执行本次提交操作。如果writeBuffer如果为空,直接返回writePosition指针,无需执行commit操作,表名commit操作主体是writeBuffer。

public int commit(final int commitLeastPages) {
    if (writeBuffer == null) {
        //no need to commit data to file channel, so just regard wrotePosition as committedPosition.
        return this.wrotePosition.get();
    }
    //判断是否满足提交条件
    if (this.isAbleToCommit(commitLeastPages)) {
        if (this.hold()) {
            commit0(commitLeastPages);
            this.release();
        } else {
            log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
        }
    }

    // 所有数据提交后,清空缓冲区
    if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {
        this.transientStorePool.returnBuffer(writeBuffer);
        this.writeBuffer = null;
    }

    return this.committedPosition.get();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

MappedFile#isAbleToCommit

判断是否执行commit操作,如果文件已满返回true;如果commitLeastpages大于0,则比较writePosition与上一次提交的指针commitPosition的差值,除以OS_PAGE_SIZE得到当前脏页的数量,如果大于commitLeastPages则返回true,如果commitLeastpages小于0表示只要存在脏页就提交。

protected boolean isAbleToCommit(final int commitLeastPages) {
    //已经刷盘指针
    int flush = this.committedPosition.get();
    //文件写指针
    int write = this.wrotePosition.get();
	//写满刷盘
    if (this.isFull()) {
        return true;
    }

    if (commitLeastPages > 0) {
        //文件内容达到commitLeastPages页数,则刷盘
        return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= commitLeastPages;
    }

    return write > flush;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

MappedFile#commit0

具体提交的实现,首先创建WriteBuffer区共享缓存区,然后将新创建的position回退到上一次提交的位置(commitPosition),设置limit为wrotePosition(当前最大有效数据指针),然后把commitPosition到wrotePosition的数据写入到FileChannel中,然后更新committedPosition指针为wrotePosition。commit的作用就是将MappedFile的writeBuffer中数据提交到文件通道FileChannel中。

protected void commit0(final int commitLeastPages) {
    //写指针
    int writePos = this.wrotePosition.get();
    //上次提交指针
    int lastCommittedPosition = this.committedPosition.get();

    if (writePos - this.committedPosition.get() > 0) {
        try {
            //复制共享内存区域
            ByteBuffer byteBuffer = writeBuffer.slice();
            //设置提交位置是上次提交位置
            byteBuffer.position(lastCommittedPosition);
            //最大提交数量
            byteBuffer.limit(writePos);
            //设置fileChannel位置为上次提交位置
            this.fileChannel.position(lastCommittedPosition);
            //将lastCommittedPosition到writePos的数据复制到FileChannel中
            this.fileChannel.write(byteBuffer);
            //重置提交位置
            this.committedPosition.set(writePos);
        } catch (Throwable e) {
            log.error("Error occurred when commit data to FileChannel.", e);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

MappedFile#flush

刷写磁盘,直接调用MappedByteBuffer或fileChannel的force方法将内存中的数据持久化到磁盘,那么flushedPosition应该等于MappedByteBuffer中的写指针;如果writeBuffer不为空,则flushPosition应该等于上一次的commit指针;因为上一次提交的数据就是进入到MappedByteBuffer中的数据;如果writeBuffer为空,数据时直接进入到MappedByteBuffer,wrotePosition代表的是MappedByteBuffer中的指针,故设置flushPosition为wrotePosition。
RocketMQ源码分析(三)消息存储

public int flush(final int flushLeastPages) {
    //数据达到刷盘条件
    if (this.isAbleToFlush(flushLeastPages)) {
        //加锁,同步刷盘
        if (this.hold()) {
            //获得读指针
            int value = getReadPosition();
            try {
                //数据从writeBuffer提交数据到fileChannel再刷新到磁盘
                if (writeBuffer != null || this.fileChannel.position() != 0) {
                    this.fileChannel.force(false);
                } else {
                    //从mmap刷新数据到磁盘
                    this.mappedByteBuffer.force();
                }
            } catch (Throwable e) {
                log.error("Error occurred when force data to disk.", e);
            }
			//更新刷盘位置
            this.flushedPosition.set(value);
            this.release();
        } else {
            log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
            this.flushedPosition.set(getReadPosition());
        }
    }
    return this.getFlushedPosition();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

MappedFile#getReadPosition

获取当前文件最大可读指针。如果writeBuffer为空,则直接返回当前的写指针;如果writeBuffer不为空,则返回上一次提交的指针。在MappedFile设置中,只有提交了的数据(写入到MappedByteBuffer或FileChannel中的数据)才是安全的数据

public int getReadPosition() {
    //如果writeBuffer为空,刷盘的位置就是应该等于上次commit的位置,如果为空则为mmap的写指针
    return this.writeBuffer == null ? this.wrotePosition.get() : this.committedPosition.get();
}
  • 1
  • 2
  • 3
  • 4

MappedFile#selectMappedBuffer

查找pos到当前最大可读之间的数据,由于在整个写入期间都未曾改MappedByteBuffer的指针,如果mappedByteBuffer.slice()方法返回的共享缓存区空间为整个MappedFile,然后通过设置ByteBuffer的position为待查找的值,读取字节长度当前可读最大长度,最终返回的ByteBuffer的limit为size。整个共享缓存区的容量为(MappedFile#fileSize-pos)。故在操作SelectMappedBufferResult不能对包含在里面的ByteBuffer调用filp方法。

public SelectMappedBufferResult selectMappedBuffer(int pos) {
    //获得最大可读指针
    int readPosition = getReadPosition();
    //pos小于最大可读指针,并且大于0
    if (pos < readPosition && pos >= 0) {
        if (this.hold()) {
            //复制mappedByteBuffer读共享区
            ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
            //设置读指针位置
            byteBuffer.position(pos);
            //获得可读范围
            int size = readPosition - pos;
            //设置最大刻度范围
            ByteBuffer byteBufferNew = byteBuffer.slice();
            byteBufferNew.limit(size);
            return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);
        }
    }

    return null;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

MappedFile#shutdown

MappedFile文件销毁的实现方法为public boolean destory(long intervalForcibly),intervalForcibly表示拒绝被销毁的最大存活时间。

public void shutdown(final long intervalForcibly) {
    if (this.available) {
        //关闭MapedFile
        this.available = false;
        //设置当前关闭时间戳
        this.firstShutdownTimestamp = System.currentTimeMillis();
        //释放资源
        this.release();
    } else if (this.getRefCount() > 0) {
        if ((System.currentTimeMillis() - this.firstShutdownTimestamp) >= intervalForcibly) {
            this.refCount.set(-1000 - this.getRefCount());
            this.release();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

3)TransientStorePool

短暂的存储池。RocketMQ单独创建一个MappedByteBuffer内存缓存池,用来临时存储数据,数据先写入该内存映射中,然后由commit线程定时将数据从该内存复制到与目标物理文件对应的内存映射中。RocketMQ引入该机制主要的原因是提供一种内存锁定,将当前堆外内存一直锁定在内存中,避免被进程将内存交换到磁盘。
RocketMQ源码分析(三)消息存储

private final int poolSize;		//availableBuffers个数
private final int fileSize;		//每隔ByteBuffer大小
private final Deque<ByteBuffer> availableBuffers;	//ByteBuffer容器。双端队列
  • 1
  • 2
  • 3

初始化

public void init() {
    //创建poolSize个堆外内存
    for (int i = 0; i < poolSize; i++) {
        ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);
        final long address = ((DirectBuffer) byteBuffer).address();
        Pointer pointer = new Pointer(address);
        //使用com.sun.jna.Library类库将该批内存锁定,避免被置换到交换区,提高存储性能
        LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));

        availableBuffers.offer(byteBuffer);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

1.4.5 实时更新消息消费队列与索引文件

消息消费队文件、消息属性索引文件都是基于CommitLog文件构建的,当消息生产者提交的消息存储在CommitLog文件中,ConsumerQueue、IndexFile需要及时更新,否则消息无法及时被消费,根据消息属性查找消息也会出现较大延迟。RocketMQ通过开启一个线程ReputMessageService来准实时转发CommitLog文件更新事件,相应的任务处理器根据转发的消息及时更新ConsumerQueue、IndexFile文件。

RocketMQ源码分析(三)消息存储
RocketMQ源码分析(三)消息存储
代码:DefaultMessageStore:start

//设置CommitLog内存中最大偏移量
this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
//启动
this.reputMessageService.start();
  • 1
  • 2
  • 3
  • 4

代码:DefaultMessageStore:run

public void run() {
    DefaultMessageStore.log.info(this.getServiceName() + " service started");
	//每隔1毫秒就继续尝试推送消息到消息消费队列和索引文件
    while (!this.isStopped()) {
        try {
            Thread.sleep(1);
            this.doReput();
        } catch (Exception e) {
            DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }

    DefaultMessageStore.log.info(this.getServiceName() + " service end");
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

代码:DefaultMessageStore:deReput

//从result中循环遍历消息,一次读一条,创建DispatherRequest对象。
for (int readSize = 0; readSize < result.getSize() && doNext; ) {
	DispatchRequest dispatchRequest =                               DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
	int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();

	if (dispatchRequest.isSuccess()) {
	    if (size > 0) {
	        DefaultMessageStore.this.doDispatch(dispatchRequest);
	    }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

DispatchRequest
RocketMQ源码分析(三)消息存储

String topic; //消息主题名称
int queueId;  //消息队列ID
long commitLogOffset;	//消息物理偏移量
int msgSize;	//消息长度
long tagsCode;	//消息过滤tag hashCode
long storeTimestamp;	//消息存储时间戳
long consumeQueueOffset;	//消息队列偏移量
String keys;	//消息索引key
boolean success;	//是否成功解析到完整的消息
String uniqKey;	//消息唯一键
int sysFlag;	//消息系统标记
long preparedTransactionOffset;	//消息预处理事务偏移量
Map<String, String> propertiesMap;	//消息属性
byte[] bitMap;	//位图
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

1)转发到ConsumerQueue
RocketMQ源码分析(三)消息存储

class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {
    @Override
    public void dispatch(DispatchRequest request) {
        final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
        switch (tranType) {
            case MessageSysFlag.TRANSACTION_NOT_TYPE:
            case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                //消息分发
                DefaultMessageStore.this.putMessagePositionInfo(request);
                break;
            case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
            case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                break;
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

代码:DefaultMessageStore#putMessagePositionInfo

public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
    //获得消费队列
    ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
    //消费队列分发消息
    cq.putMessagePositionInfoWrapper(dispatchRequest);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

代码:DefaultMessageStore#putMessagePositionInfo

//依次将消息偏移量、消息长度、tag写入到ByteBuffer中
this.byteBufferIndex.flip();
this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
this.byteBufferIndex.putLong(offset);
this.byteBufferIndex.putInt(size);
this.byteBufferIndex.putLong(tagsCode);
//获得内存映射文件
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
if (mappedFile != null) {
    //将消息追加到内存映射文件,异步输盘
    return mappedFile.appendMessage(this.byteBufferIndex.array());
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

2)转发到Index
RocketMQ源码分析(三)消息存储

class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {

    @Override
    public void dispatch(DispatchRequest request) {
        if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
            DefaultMessageStore.this.indexService.buildIndex(request);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

代码:DefaultMessageStore#buildIndex

public void buildIndex(DispatchRequest req) {
    //获得索引文件
    IndexFile indexFile = retryGetAndCreateIndexFile();
    if (indexFile != null) {
        //获得文件最大物理偏移量
        long endPhyOffset = indexFile.getEndPhyOffset();
        DispatchRequest msg = req;
        String topic = msg.getTopic();
        String keys = msg.getKeys();
        //如果该消息的物理偏移量小于索引文件中的最大物理偏移量,则说明是重复数据,忽略本次索引构建
        if (msg.getCommitLogOffset() < endPhyOffset) {
            return;
        }

        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
        switch (tranType) {
            case MessageSysFlag.TRANSACTION_NOT_TYPE:
            case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
            case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                break;
            case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                return;
        }
		
        //如果消息ID不为空,则添加到Hash索引中
        if (req.getUniqKey() != null) {
            indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));
            if (indexFile == null) {
                return;
            }
        }
		//构建索引key,RocketMQ支持为同一个消息建立多个索引,多个索引键空格隔开.
        if (keys != null && keys.length() > 0) {
            String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);
            for (int i = 0; i < keyset.length; i++) {
                String key = keyset[i];
                if (key.length() > 0) {
                    indexFile = putKey(indexFile, msg, buildKey(topic, key));
                    if (indexFile == null) {

                        return;
                    }
                }
            }
        }
    } else {
        log.error("build index error, stop building index");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49

1.4.6 消息队列和索引文件恢复

由于RocketMQ存储首先将消息全量存储在CommitLog文件中,然后异步生成转发任务更新ConsumerQueue和Index文件。如果消息成功存储到CommitLog文件中,转发任务未成功执行,此时消息服务器Broker由于某个原因宕机,导致CommitLog、ConsumerQueue、IndexFile文件数据不一致。如果不加以人工修复的话,会有一部分消息即便在CommitLog中文件中存在,但由于没有转发到ConsumerQueue,这部分消息将永远复发被消费者消费。
RocketMQ源码分析(三)消息存储
1)存储文件加载

代码:DefaultMessageStore#load

判断上一次是否异常退出。实现机制是Broker在启动时创建abort文件,在退出时通过JVM钩子函数删除abort文件。如果下次启动时存在abort文件。说明Broker时异常退出的,CommitLog与ConsumerQueue数据有可能不一致,需要进行修复。

//判断临时文件是否存在
boolean lastExitOK = !this.isTempFileExist();
//根据临时文件判断当前Broker是否异常退出
private boolean isTempFileExist() {
    String fileName = StorePathConfigHelper
        .getAbortFile(this.messageStoreConfig.getStorePathRootDir());
    File file = new File(fileName);
    return file.exists();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

代码:DefaultMessageStore#load

//加载延时队列
if (null != scheduleMessageService) {
    result = result && this.scheduleMessageService.load();
}

// 加载CommitLog文件
result = result && this.commitLog.load();

// 加载消费队列文件
result = result && this.loadConsumeQueue();

if (result) {
	//加载存储监测点,监测点主要记录CommitLog文件、ConsumerQueue文件、Index索引文件的刷盘点
    this.storeCheckpoint =new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
	//加载index文件
    this.indexService.load(lastExitOK);
	//根据Broker是否异常退出,执行不同的恢复策略
    this.recover(lastExitOK);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

代码:MappedFileQueue#load

加载CommitLog到映射文件

//指向CommitLog文件目录
File dir = new File(this.storePath);
//获得文件数组
File[] files = dir.listFiles();
if (files != null) {
    // 文件排序
    Arrays.sort(files);
    //遍历文件
    for (File file : files) {
		//如果文件大小和配置文件不一致,退出
        if (file.length() != this.mappedFileSize) {
            
            return false;
        }

        try {
            //创建映射文件
            MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);
            mappedFile.setWrotePosition(this.mappedFileSize);
            mappedFile.setFlushedPosition(this.mappedFileSize);
            mappedFile.setCommittedPosition(this.mappedFileSize);
            //将映射文件添加到队列
            this.mappedFiles.add(mappedFile);
            log.info("load " + file.getPath() + " OK");
        } catch (IOException e) {
            log.error("load file " + file + " error", e);
            return false;
        }
    }
}

return true;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

代码:DefaultMessageStore#loadConsumeQueue

加载消息消费队列

//执行消费队列目录
File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()));
//遍历消费队列目录
File[] fileTopicList = dirLogic.listFiles();
if (fileTopicList != null) {

    for (File fileTopic : fileTopicList) {
        //获得子目录名称,即topic名称
        String topic = fileTopic.getName();
		//遍历子目录下的消费队列文件
        File[] fileQueueIdList = fileTopic.listFiles();
        if (fileQueueIdList != null) {
            //遍历文件
            for (File fileQueueId : fileQueueIdList) {
                //文件名称即队列ID
                int queueId;
                try {
                    queueId = Integer.parseInt(fileQueueId.getName());
                } catch (NumberFormatException e) {
                    continue;
                }
                //创建消费队列并加载到内存
                ConsumeQueue logic = new ConsumeQueue(
                    topic,
                    queueId,
                    StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
            this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),
                    this);
                this.putConsumeQueue(topic, queueId, logic);
                if (!logic.load()) {
                    return false;
                }
            }
        }
    }
}

log.info("load logics queue all over, OK");

return true;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

代码:IndexService#load

加载索引文件

public boolean load(final boolean lastExitOK) {
    //索引文件目录
    File dir = new File(this.storePath);
    //遍历索引文件
    File[] files = dir.listFiles();
    if (files != null) {
        //文件排序
        Arrays.sort(files);
        //遍历文件
        for (File file : files) {
            try {
                //加载索引文件
                IndexFile f = new IndexFile(file.getPath(), this.hashSlotNum, this.indexNum, 0, 0);
                f.load();

                if (!lastExitOK) {
                    //索引文件上次的刷盘时间小于该索引文件的消息时间戳,该文件将立即删除
                    if (f.getEndTimestamp() > this.defaultMessageStore.getStoreCheckpoint()
                        .getIndexMsgTimestamp()) {
                        f.destroy(0);
                        continue;
                    }
                }
				//将索引文件添加到队列
                log.info("load index file OK, " + f.getFileName());
                this.indexFileList.add(f);
            } catch (IOException e) {
                log.error("load file {} error", file, e);
                return false;
            } catch (NumberFormatException e) {
                log.error("load file {} error", file, e);
            }
        }
    }

    return true;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

代码:DefaultMessageStore#recover

文件恢复,根据Broker是否正常退出执行不同的恢复策略

private void recover(final boolean lastExitOK) {
    //获得最大的物理便宜消费队列
    long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();

    if (lastExitOK) {
        //正常恢复
        this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
    } else {
        //异常恢复
        this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
    }
	//在CommitLog中保存每个消息消费队列当前的存储逻辑偏移量
    this.recoverTopicQueueTable();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

代码:DefaultMessageStore#recoverTopicQueueTable

恢复ConsumerQueue后,将在CommitLog实例中保存每隔消息队列当前的存储逻辑偏移量,这也是消息中不仅存储主题、消息队列ID、还存储了消息队列的关键所在。

public void recoverTopicQueueTable() {
    HashMap<String/* topic-queueid */, Long/* offset */> table = new HashMap<String, Long>(1024);
    //CommitLog最小偏移量
    long minPhyOffset = this.commitLog.getMinOffset();
    //遍历消费队列,将消费队列保存在CommitLog中
    for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
        for (ConsumeQueue logic : maps.values()) {
            String key = logic.getTopic() + "-" + logic.getQueueId();
            table.put(key, logic.getMaxOffsetInQueue());
            logic.correctMinOffset(minPhyOffset);
        }
    }
    this.commitLog.setTopicQueueTable(table);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

2)正常恢复

代码:CommitLog#recoverNormally

public void recoverNormally(long maxPhyOffsetOfConsumeQueue) {
	
    final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
    if (!mappedFiles.isEmpty()) {
         //Broker正常停止再重启时,从倒数第三个开始恢复,如果不足3个文件,则从第一个文件开始恢复。
        int index = mappedFiles.size() - 3;
        if (index < 0)
            index = 0;
        MappedFile mappedFile = mappedFiles.get(index);
        ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
        long processOffset = mappedFile.getFileFromOffset();
        //代表当前已校验通过的offset
        long mappedFileOffset = 0;
        while (true) {
            //查找消息
            DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
            //消息长度
            int size = dispatchRequest.getMsgSize();
           	//查找结果为true,并且消息长度大于0,表示消息正确.mappedFileOffset向前移动本消息长度
            if (dispatchRequest.isSuccess() && size > 0) {
                mappedFileOffset += size;
            }
			//如果查找结果为true且消息长度等于0,表示已到该文件末尾,如果还有下一个文件,则重置processOffset和MappedFileOffset重复查找下一个文件,否则跳出循环。
            else if (dispatchRequest.isSuccess() && size == 0) {
              index++;
              if (index >= mappedFiles.size()) {
                  // Current branch can not happen
                  break;
              } else {
                  //取出每个文件
                  mappedFile = mappedFiles.get(index);
                  byteBuffer = mappedFile.sliceByteBuffer();
                  processOffset = mappedFile.getFileFromOffset();
                  mappedFileOffset = 0;
                  
          		}
            }
            // 查找结果为false,表明该文件未填满所有消息,跳出循环,结束循环
            else if (!dispatchRequest.isSuccess()) {
                log.info("recover physics file end, " + mappedFile.getFileName());
                break;
            }
        }
		//更新MappedFileQueue的flushedWhere和committedWhere指针
        processOffset += mappedFileOffset;
        this.mappedFileQueue.setFlushedWhere(processOffset);
        this.mappedFileQueue.setCommittedWhere(processOffset);
        //删除offset之后的所有文件
        this.mappedFileQueue.truncateDirtyFiles(processOffset);

        
        if (maxPhyOffsetOfConsumeQueue >= processOffset) {
            this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
        }
    } else {
        this.mappedFileQueue.setFlushedWhere(0);
        this.mappedFileQueue.setCommittedWhere(0);
        this.defaultMessageStore.destroyLogics();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60

代码:MappedFileQueue#truncateDirtyFiles

public void truncateDirtyFiles(long offset) {
    List<MappedFile> willRemoveFiles = new ArrayList<MappedFile>();
	//遍历目录下文件
    for (MappedFile file : this.mappedFiles) {
        //文件尾部的偏移量
        long fileTailOffset = file.getFileFromOffset() + this.mappedFileSize;
        //文件尾部的偏移量大于offset
        if (fileTailOffset > offset) {
            //offset大于文件的起始偏移量
            if (offset >= file.getFileFromOffset()) {
                //更新wrotePosition、committedPosition、flushedPosistion
                file.setWrotePosition((int) (offset % this.mappedFileSize));
                file.setCommittedPosition((int) (offset % this.mappedFileSize));
                file.setFlushedPosition((int) (offset % this.mappedFileSize));
            } else {
                //offset小于文件的起始偏移量,说明该文件是有效文件后面创建的,释放mappedFile占用内存,删除文件
                file.destroy(1000);
                willRemoveFiles.add(file);
            }
        }
    }

    this.deleteExpiredFile(willRemoveFiles);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

3)异常恢复

Broker异常停止文件恢复的实现为CommitLog#recoverAbnormally。异常文件恢复步骤与正常停止文件恢复流程基本相同,其主要差别有两个。首先,正常停止默认从倒数第三个文件开始进行恢复,而异常停止则需要从最后一个文件往前走,找到第一个消息存储正常的文件。其次,如果CommitLog目录没有消息文件,如果消息消费队列目录下存在文件,则需要销毁。

代码:CommitLog#recoverAbnormally

if (!mappedFiles.isEmpty()) {
    // Looking beginning to recover from which file
    int index = mappedFiles.size() - 1;
    MappedFile mappedFile = null;
    for (; index >= 0; index--) {
        mappedFile = mappedFiles.get(index);
        //判断消息文件是否是一个正确的文件
        if (this.isMappedFileMatchedRecover(mappedFile)) {
            log.info("recover from this mapped file " + mappedFile.getFileName());
            break;
        }
    }
	//根据索引取出mappedFile文件
    if (index < 0) {
        index = 0;
        mappedFile = mappedFiles.get(index);
    }
    //...验证消息的合法性,并将消息转发到消息消费队列和索引文件
       
}else{
    //未找到mappedFile,重置flushWhere、committedWhere都为0,销毁消息队列文件
    this.mappedFileQueue.setFlushedWhere(0);
    this.mappedFileQueue.setCommittedWhere(0);
    this.defaultMessageStore.destroyLogics();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

1.4.7 刷盘机制

RocketMQ的存储是基于JDK NIO的内存映射机制(MappedByteBuffer)的,消息存储首先将消息追加到内存,再根据配置的刷盘策略在不同时间进行刷写磁盘。

同步刷盘

消息追加到内存后,立即将数据刷写到磁盘文件
RocketMQ源码分析(三)消息存储
代码:CommitLog#handleDiskFlush

//刷盘服务
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
if (messageExt.isWaitStoreMsgOK()) {
    //封装刷盘请求
    GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
    //提交刷盘请求
    service.putRequest(request);
    //线程阻塞5秒,等待刷盘结束
    boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
    if (!flushOK) {
        putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

GroupCommitRequest

RocketMQ源码分析(三)消息存储

long nextOffset;	//刷盘点偏移量
CountDownLatch countDownLatch = new CountDownLatch(1);	//倒计树锁存器
volatile boolean flushOK = false;	//刷盘结果;默认为false
  • 1
  • 2
  • 3

代码:GroupCommitService#run

public void run() {
    CommitLog.log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        try {
            //线程等待10ms
            this.waitForRunning(10);
            //执行提交
            this.doCommit();
        } catch (Exception e) {
            CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }
	...
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

代码:GroupCommitService#doCommit

private void doCommit() {
    //加锁
    synchronized (this.requestsRead) {
        if (!this.requestsRead.isEmpty()) {
            //遍历requestsRead
            for (GroupCommitRequest req : this.requestsRead) {
                // There may be a message in the next file, so a maximum of
                // two times the flush
                boolean flushOK = false;
                for (int i = 0; i < 2 && !flushOK; i++) {
                    flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
					//刷盘
                    if (!flushOK) {
                        CommitLog.this.mappedFileQueue.flush(0);
                    }
                }
				//唤醒发送消息客户端
                req.wakeupCustomer(flushOK);
            }
			
            //更新刷盘监测点
            long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
            if (storeTimestamp > 0) {               CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
            }
			
            this.requestsRead.clear();
        } else {
            // Because of individual messages is set to not sync flush, it
            // will come to this process
            CommitLog.this.mappedFileQueue.flush(0);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

异步刷盘

在消息追加到内存后,立即返回给消息发送端。如果开启transientStorePoolEnable,RocketMQ会单独申请一个与目标物理文件(commitLog)同样大小的堆外内存,该堆外内存将使用内存锁定,确保不会被置换到虚拟内存中去,消息首先追加到堆外内存,然后提交到物理文件的内存映射中,然后刷写到磁盘。如果未开启transientStorePoolEnable,消息直接追加到物理文件直接映射文件中,然后刷写到磁盘中。
RocketMQ源码分析(三)消息存储
开启transientStorePoolEnable后异步刷盘步骤:

  1. 将消息直接追加到ByteBuffer(堆外内存)
  2. CommitRealTimeService线程每隔200ms将ByteBuffer新追加内容提交到MappedByteBuffer中
  3. MappedByteBuffer在内存中追加提交的内容,wrotePosition指针向后移动
  4. commit操作成功返回,将committedPosition位置恢复
  5. FlushRealTimeService线程默认每500ms将MappedByteBuffer中新追加的内存刷写到磁盘

代码:CommitLog$CommitRealTimeService#run

提交线程工作机制

//间隔时间,默认200ms
int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();

//一次提交的至少页数
int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();

//两次真实提交的最大间隔,默认200ms
int commitDataThoroughInterval =
CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();

//上次提交间隔超过commitDataThoroughInterval,则忽略提交commitDataThoroughInterval参数,直接提交
long begin = System.currentTimeMillis();
if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {
    this.lastCommitTimestamp = begin;
    commitDataLeastPages = 0;
}

//执行提交操作,将待提交数据提交到物理文件的内存映射区
boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
long end = System.currentTimeMillis();
if (!result) {
    this.lastCommitTimestamp = end; // result = false means some data committed.
    //now wake up flush thread.
    //唤醒刷盘线程
    flushCommitLogService.wakeup();
}

if (end - begin > 500) {
    log.info("Commit data to file costs {} ms", end - begin);
}
this.waitForRunning(interval);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

代码:CommitLog$FlushRealTimeService#run

刷盘线程工作机制

//表示await方法等待,默认false
boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
//线程执行时间间隔
int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
//一次刷写任务至少包含页数
int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
//两次真实刷写任务最大间隔
int flushPhysicQueueThoroughInterval =
CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();
...
//距离上次提交间隔超过flushPhysicQueueThoroughInterval,则本次刷盘任务将忽略flushPhysicQueueLeastPages,直接提交
long currentTimeMillis = System.currentTimeMillis();
if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
    this.lastFlushTimestamp = currentTimeMillis;
    flushPhysicQueueLeastPages = 0;
    printFlushProgress = (printTimes++ % 10) == 0;
}
...
//执行一次刷盘前,先等待指定时间间隔
if (flushCommitLogTimed) {
    Thread.sleep(interval);
} else {
    this.waitForRunning(interval);
}
...
long begin = System.currentTimeMillis();
//刷写磁盘
CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
//更新存储监测点文件的时间戳
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

1.4.8 过期文件删除机制

由于RocketMQ操作CommitLog、ConsumerQueue文件是基于内存映射机制并在启动的时候回加载CommitLog、ConsumerQueue目录下的所有文件,为了避免内存与磁盘的浪费,不可能将消息永久存储在消息服务器上,所以要引入一种机制来删除已过期的文件。RocketMQ顺序写CommitLog、ConsumerQueue文件,所有写操作全部落在最后一个CommitLog或者ConsumerQueue文件上,之前的文件在下一个文件创建后将不会再被更新。RocketMQ清除过期文件的方法时:如果当前文件在在一定时间间隔内没有再次被消费,则认为是过期文件,可以被删除,RocketMQ不会关注这个文件上的消息是否全部被消费。默认每个文件的过期时间为72小时,通过在Broker配置文件中设置fileReservedTime来改变过期时间,单位为小时。

代码:DefaultMessageStore#addScheduleTask

private void addScheduleTask() {
	//每隔10s调度一次清除文件
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            DefaultMessageStore.this.cleanFilesPeriodically();
        }
    }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);
	...
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

代码:DefaultMessageStore#cleanFilesPeriodically

private void cleanFilesPeriodically() {
    //清除存储文件
    this.cleanCommitLogService.run();
    //清除消息消费队列文件
    this.cleanConsumeQueueService.run();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

代码:DefaultMessageStore#deleteExpiredFiles

private void deleteExpiredFiles() {
    //删除的数量
    int deleteCount = 0;
    //文件保留的时间
    long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
    //删除物理文件的间隔
    int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
    //线程被占用,第一次拒绝删除后能保留的最大时间,超过该时间,文件将被强制删除
    int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();

boolean timeup = this.isTimeToDelete();
boolean spacefull = this.isSpaceToDelete();
boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;
if (timeup || spacefull || manualDelete) {
	...执行删除逻辑
}else{
    ...无作为
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

删除文件操作的条件

  1. 指定删除文件的时间点,RocketMQ通过deleteWhen设置一天的固定时间执行一次删除过期文件操作,默认4点
  2. 磁盘空间如果不充足,删除过期文件
  3. 预留,手工触发。

代码:CleanCommitLogService#isSpaceToDelete

当磁盘空间不足时执行删除过期文件

private boolean isSpaceToDelete() {
    //磁盘分区的最大使用量
    double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0;
	//是否需要立即执行删除过期文件操作
    cleanImmediately = false;

    {
        String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
        //当前CommitLog目录所在的磁盘分区的磁盘使用率
        double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
        //diskSpaceWarningLevelRatio:磁盘使用率警告阈值,默认0.90
        if (physicRatio > diskSpaceWarningLevelRatio) {
            boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
            if (diskok) {
                DefaultMessageStore.log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full");
            }
			//diskSpaceCleanForciblyRatio:强制清除阈值,默认0.85
            cleanImmediately = true;
        } else if (physicRatio > diskSpaceCleanForciblyRatio) {
            cleanImmediately = true;
        } else {
            boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
            if (!diskok) {
            DefaultMessageStore.log.info("physic disk space OK " + physicRatio + ", so mark disk ok");
        }
    }

    if (physicRatio < 0 || physicRatio > ratio) {
        DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + physicRatio);
        return true;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

代码:MappedFileQueue#deleteExpiredFileByTime

执行文件销毁和删除

for (int i = 0; i < mfsLength; i++) {
    //遍历每隔文件
    MappedFile mappedFile = (MappedFile) mfs[i];
    //计算文件存活时间
    long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;
    //如果超过72小时,执行文件删除
    if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {
        if (mappedFile.destroy(intervalForcibly)) {
            files.add(mappedFile);
            deleteCount++;

            if (files.size() >= DELETE_FILES_BATCH_MAX) {
                break;
            }

            if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {
                try {
                    Thread.sleep(deleteFilesInterval);
                } catch (InterruptedException e) {
                }
            }
        } else {
            break;
        }
    } else {
        //avoid deleting files in the middle
        break;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

1.4.9 小结

RocketMQ的存储文件包括消息文件(Commitlog)、消息消费队列文件(ConsumerQueue)、Hash索引文件(IndexFile)、监测点文件(checkPoint)、abort(关闭异常文件)。单个消息存储文件、消息消费队列文件、Hash索引文件长度固定以便使用内存映射机制进行文件的读写操作。RocketMQ组织文件以文件的起始偏移量来命令文件,这样根据偏移量能快速定位到真实的物理文件。RocketMQ基于内存映射文件机制提供了同步刷盘和异步刷盘两种机制,异步刷盘是指在消息存储时先追加到内存映射文件,然后启动专门的刷盘线程定时将内存中的文件数据刷写到磁盘。

CommitLog,消息存储文件,RocketMQ为了保证消息发送的高吞吐量,采用单一文件存储所有主题消息,保证消息存储是完全的顺序写,但这样给文件读取带来了不便,为此RocketMQ为了方便消息消费构建了消息消费队列文件,基于主题与队列进行组织,同时RocketMQ为消息实现了Hash索引,可以为消息设置索引键,根据所以能够快速从CommitLog文件中检索消息。

当消息达到CommitLog后,会通过ReputMessageService线程接近实时地将消息转发给消息消费队列文件与索引文件。为了安全起见,RocketMQ引入abort文件,记录Broker的停机是否是正常关闭还是异常关闭,在重启Broker时为了保证CommitLog文件,消息消费队列文件与Hash索引文件的正确性,分别采用不同策略来恢复文件。

RocketMQ不会永久存储消息文件、消息消费队列文件,而是启动文件过期机制并在磁盘空间不足或者默认凌晨4点删除过期文件,文件保存72小时并且在删除文件时并不会判断该消息文件上的消息是否被消费。

上一篇:rocketMq-消息存储-commitLog


下一篇:深入研究Broker是如何持久化的