1.MappedFile类属性说明
dubbo的核心是spi,看懂了spi那么duboo基本上也懂了,对于rmq来说,它的核心是broker,而broker的核心是commitlog、consumequeue、indexfile,而这些文件对应的最终都是MappedFile,那么搞明白了这个类,那么对于broker的存储这块也就很容易明白了
1.1.MappedFile类属性如下
OS_PAGE_SIZE是4k,表示操作系统页
TOTAL_MAPPED_VIRTUAL_MEMORY,TOTAL_MAPPED_FILES 都是static变量,分别保存的是总共映射的数据容量,映射的总文件数
wrotePosition 表示当前写的位置,具体啥是当前写的位置,后续分析
committedPosition 表示当前提交的位置,具体咋理解,往后看
flushedPosition 表示当前刷新的位置,具体咋理解,往后看,以上这三个变量很重要
fileSize 表示该MappedFile对应的磁盘文件的size,对于commitlog来说是1G,对于consumequeue来说是6000000 字节,对于indexfile来说是42040字节
fileChannel表示该文件的通道
writeBuffer是堆外内存,在开启了transientStorePoolEnable=true的情况下非null,生产通常都会开启transientStorePoolEnable以供消费时候读写分离
transientStorePool是堆外内存池,在开启了transientStorePool的情况有有值
fileName 文件名,比如对于commitlog来说第一个文件是0*10243,第二个文件是1*10243
fileFromOffset 即文件的起始位置(相对于整个commitlog or consumequeue来说),比如commitlog consumequeue文件来说,该值就是fileName
file 就是文件
mappedByteBuffer 即pagecache,通过fileChannel.map生成
storeTimestamp 最后一次消息的存储时间
firstCreateInQueue 对于队列有用
1.2.MappedFile构造器说明
有两个构造器
MappedFile(final String fileName, final int fileSize)
MappedFile(final String fileName, final int fileSize, final TransientStorePool transientStorePool)
在不开启transientStorePoolEnable=true的情况下,都是使用第一个构造器
在开启的情况下,broker启动进行load操作加载commitlog consumequeue indexfile文件都是使用第一个构造器,在broker运行过程中,consumequeue indexfiel创建也是使用第一个构造器,在broker运行过程中,创建commitlog文件都是使用的第二个构造器,ctrl+alt+H查看调用如下图
在AllocateMappedFileService.mmapOperation()内通过jdk的spi机制加载META-INF目录下org.apache.rocketmq.store.MappedFile文件,默认是没有该文件的,代码如下
MappedFile mappedFile;
if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
try {
mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();//抛出异常,默认META-INF下是没有org.apache.rocketmq.store.MappedFile文件的
mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
} catch (RuntimeException e) {//捕获异常
log.warn("Use default implementation.");
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());//走这里
}
} else {
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
}
生产通常是开启transientStorePoolEnable=true的,那么以第二个构造器为例说明,第二个构造器明白了,第一个构造器自然就明白了
public MappedFile(final String fileName, final int fileSize,
final TransientStorePool transientStorePool) throws IOException {
init(fileName, fileSize, transientStorePool);
}
public void init(final String fileName, final int fileSize,
final TransientStorePool transientStorePool) throws IOException {
init(fileName, fileSize);
this.writeBuffer = transientStorePool.borrowBuffer();//从堆外内存缓冲池构建堆外内存
this.transientStorePool = transientStorePool;
}
private void init(final String fileName, final int fileSize) throws IOException {
this.fileName = fileName;//文件名
this.fileSize = fileSize;//文件大小
this.file = new File(fileName);//根据文件构造File
this.fileFromOffset = Long.parseLong(this.file.getName());//文件名作为起始offset
boolean ok = false;
ensureDirOK(this.file.getParent());
try {
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);// 将此通道的文件区域直接映射到内存中,该属性就是pagecache
TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
TOTAL_MAPPED_FILES.incrementAndGet();
ok = true;
} catch (FileNotFoundException e) {
log.error("create file channel " + this.fileName + " Failed. ", e);
throw e;
} catch (IOException e) {
log.error("map file " + this.fileName + " Failed. ", e);
throw e;
} finally {
if (!ok && this.fileChannel != null) {
this.fileChannel.close();
}
}
}
该构造器就是根据文件名路径构造MappedFile文件,构造堆外内存、pagecache属性,此时创建后wrotePosition committedPosition flushedPosition三个位置都是0。这两个构造器的唯一区别就是第一个构造器是不赋值其堆外内存属性writeBuffer。
这里记录下堆外内存池TransientStorePool的来源
BrokerStartup.main(String[]) //broker启动入口
BrokerStartup.createBrokerController(String[])//创建BrokerController
BrokerController.initialize()//BrokerController初始化
new DefaultMessageStore(MessageStoreConfig, BrokerStatsManager, MessageArrivingListener, BrokerConfig)//创建DefaultMessageStore,代码如下
public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
//省略其他代码
this.transientStorePool = new TransientStorePool(messageStoreConfig);
if (messageStoreConfig.isTransientStorePoolEnable()) {//默认false,如果broker开启了transientStorePoolEnable=true,则执行。transient含义是短暂的
this.transientStorePool.init();
}
//省略其他代码
}
//TransientStorePool
public void init() {
for (int i = 0; i < poolSize; i++) {
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);//创建堆外内存DirectByteBuffer
final long address = ((DirectBuffer) byteBuffer).address();
Pointer pointer = new Pointer(address);
LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));//在物理内存锁定1G,使用的jna,如果以后自己项目需要进行锁定内存,那么则可以参考这里
availableBuffers.offer(byteBuffer);//把申请的堆外内存缓存起来
}
}
2.MappedFileQueue类说明
2.1.属性说明
storePath 是目录路径,对commitlog来说是${rocketmq-store}/commitlog 对consumequeue来说是${rocketmq-store}/consumequeue
mappedFileSize 单个MappedFile文件对应的磁盘文件的size,对commitlog来说是1024^3即1G 对consumequeue来说是30w*20即600w
mappedFiles 即MappedFile集合,是CopyOnWriteArrayList类型,并发集合
allocateMappedFileService 分配MappedFile服务AllocateMappedFileService,是个runnable对象,启动该服务用于创建commitlog MappedFile对象,对于commitlog来说该属性是AllocateMappedFileService,对于consumequeue来说该属性是null
flushedWhere committedWhere 分别表示刷新位置,提交位置,跟MappedFile的三个位置关联,具体后续说明
storeTimestamp 最后存储的时间戳
实际MappedFileQueue就是个MappedFile集合,个人认为叫MappedFileList更贴切,叫MappedFileQueue刚开始看的时候总是容易混淆。
2.2.MappedFileQueue构造器说明
MappedFileQueue构造器调用:
2.2.1.对于commitlog
在broker启动时候初始化BrokerController的时候创建DefaultMessageStore的时候创建CommitLog对象的时候调用,commitlog对象对应的文件结构如图
2.2.2.对于consumequeue
这个调用就比较多
2.2.2.1.在broker启动进行load操作的时候加载${rocketmq-store}/consumequeue/$topic/$queueId/$fileName的时候针对每一个topic下的queueId下的文件都创建一个ConsumeQueue
一个ConsumeQueue等同一个MappedFileQueue等同多个MappedFile,为什么这么做呢?因为一个消息队列会有多个文件,如下图
2.2.2.2.在broker运行中DefaultMessageStore.findConsumeQueue(String, int)根据topic queueId查找ConsumeQueue,如果查找不到则创建ConsumeQueue对象,创建方式2.2.2.1
3.commitlog文件与MappedFileQueue MappedFile关系
在broker启动的时候创建commitlog对象并加载load磁盘的commitlog文件,并从正常or异常关闭情况恢复,在运行过程中保存消息
3.1.broker启动创建commitlog对象,即创建MappedFileQueue对象
3.2.broker启动加载load consumequeue文件
对于创建consumequeue对象来说,只是创建MappedFileQueue对象,并不创建具体的文件对象MappedFile
3.3.broker启动加载load commitlog文件
org.apache.rocketmq.store.MappedFileQueue.load()方法是加载${rocketmq-store}/commitlog or ${rocketmq-store}/consumequeue目录下的文件,每个文件对应创建一个MappedFile,新建的MappedFile对象wrotePosition committedPosition flushedPosition属性都设置为文件名(起始位置),并把新建的MappedFile对象添加到缓存MappedFileQueue.mappedFiles
至此,commitlog对象对应的MappedFileQueue的flushedWhere committedWhere都是0,每个MappedFile对象的wrotePosition committedPosition flushedPosition属性都是文件名
代码如下
//org.apache.rocketmq.store.DefaultMessageStore.load()
public boolean load() {
boolean result = true;
try {
boolean lastExitOK = !this.isTempFileExist();//存在abort文件,说明broker上次是异常关闭,因为broker启动后会创建abort文件,正常关闭会删除该文件,启动时候存在该文件,说明上次是异常关闭
log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");//abnormally不正常的
if (null != scheduleMessageService) {
result = result && this.scheduleMessageService.load();//加载${rocketmq_home}\store\config/delayOffset.json,该文件保存的是每个延时队列的消费offset,16个延时级别,16个队列
}
// load Commit Log
result = result && this.commitLog.load();//加载${rocketmq_home}\store\commitlog下的commitlog数据文件
// load Consume Queue
result = result && this.loadConsumeQueue();//加载${rocketmq_home}\store\consumequeue下的消费队列
if (result) {
this.storeCheckpoint =
new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));//加载checkpoint文件,该文件是用于异常关闭恢复,保存的是刷盘位置
this.indexService.load(lastExitOK);//加载加载${rocketmq_home}\store\index目录下的文件
this.recover(lastExitOK);//使用commitLog恢复上次异常/正常关闭的broker
log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
}
} catch (Exception e) {
log.error("load exception", e);
result = false;
}
if (!result) {
this.allocateMappedFileService.shutdown();
}
return result;
}
3.3.1.commitlog加载代码如下
//org.apache.rocketmq.store.CommitLog.load()
public boolean load() {
boolean result = this.mappedFileQueue.load();//加载I:\rocketmq\store\commitlog
log.info("load commit log " + (result ? "OK" : "Failed"));
return result;
}
//org.apache.rocketmq.store.MappedFileQueue.load()
public boolean load() {
File dir = new File(this.storePath);//${rocketmq_home}\store\commitlog
File[] files = dir.listFiles();
if (files != null) {
// ascending order
Arrays.sort(files);
for (File file : files) {
if (file.length() != this.mappedFileSize) {
log.warn(file + "\t" + file.length()
+ " length not matched message store config value, ignore it");
return true;
}
try {
MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);//为每个文件创建对应的堆外内存映射
mappedFile.setWrotePosition(this.mappedFileSize);
mappedFile.setFlushedPosition(this.mappedFileSize);
mappedFile.setCommittedPosition(this.mappedFileSize);
this.mappedFiles.add(mappedFile);
log.info("load " + file.getPath() + " OK");
} catch (IOException e) {
log.error("load file " + file + " error", e);
return false;
}
}
}
return true;
}
最终加载的commitlog文件保存到了CommitLog.mappedFileQueue.mappedFiles集合中。而commitlog对象又被包含在DefaultMessageStore,DefaultMessageStore又被包含在BrokerController对象内,最终在broker启动加载commitlog文件就被加载到了broker上。
3.3.2.consumequeue加载如下代码
//org.apache.rocketmq.store.DefaultMessageStore.loadConsumeQueue()
private boolean loadConsumeQueue() {
File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()));//目录${rocketmq}\store\consumequeue
File[] fileTopicList = dirLogic.listFiles();//列举出${rocketmq}\store\consumequeue目录下的所有文件
if (fileTopicList != null) {
for (File fileTopic : fileTopicList) {//遍历topic目录文件
String topic = fileTopic.getName();
File[] fileQueueIdList = fileTopic.listFiles();
if (fileQueueIdList != null) {
for (File fileQueueId : fileQueueIdList) {//遍历队列目录下的文件
int queueId;
try {
queueId = Integer.parseInt(fileQueueId.getName());
} catch (NumberFormatException e) {
continue;
}
ConsumeQueue logic = new ConsumeQueue(
topic,
queueId,
StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),
this);//构建ConsumeQueue,该对象对应MappedFileQueue,对应多个MappedFile
this.putConsumeQueue(topic, queueId, logic);
if (!logic.load()) {//加载consumequeue文件
return false;
}
}
}
}
}
log.info("load logics queue all over, OK");
return true;
}
//org.apache.rocketmq.store.ConsumeQueue.load()
public boolean load() {
boolean result = this.mappedFileQueue.load();
log.info("load consume queue " + this.topic + "-" + this.queueId + " " + (result ? "OK" : "Failed"));
if (isExtReadEnable()) {//false
result &= this.consumeQueueExt.load();
}
return result;
}
最终每个consumequeue被加载到了org.apache.rocketmq.store.DefaultMessageStore.consumeQueueTable集合中保存,key是topic,value是queueID和ConsumeQueue的映射集合
3.3.3.indexfile加载如下
//org.apache.rocketmq.store.index.IndexService.load(boolean)
public boolean load(final boolean lastExitOK) {
File dir = new File(this.storePath);
File[] files = dir.listFiles();
if (files != null) {
// ascending order
Arrays.sort(files);
for (File file : files) {//遍历${rocketmq_home}\store\index目录下的indexfile文件
try {
IndexFile f = new IndexFile(file.getPath(), this.hashSlotNum, this.indexNum, 0, 0);//吧每个indexfile文件包装为IndexFile对象
f.load();//加载indexfile文件,即把前40字节保存到IndexHeader
if (!lastExitOK) {//broker上次是异常关闭
if (f.getEndTimestamp() > this.defaultMessageStore.getStoreCheckpoint()
.getIndexMsgTimestamp()) {//如果indexfile的结束时间戳(保存到indexfile的最后一条消息的时间戳)大于StoreCheckpoint索引刷盘时间戳,则销毁该indexfile对象,即释放对应的MappedFile对象(该对象包装了堆外内存,释放堆外内存)。 为什么要销毁呢?因为StoreCheckpoint是刷盘保存点,用于保存commitlog consumequeue indexfile刷盘的位置,便于异常关闭恢复。如果indexfile的结束时间戳大于StoreCheckpoint索引刷盘时间戳,则说明该IndexFile是由于broker异常关闭并没有被刷盘
f.destroy(0);//释放该MappedFile对象,释放堆外内存
continue;
}
}
log.info("load index file OK, " + f.getFileName());
this.indexFileList.add(f);
} catch (IOException e) {
log.error("load file {} error", file, e);
return false;
} catch (NumberFormatException e) {
log.error("load file {} error", file, e);
}
}
}
return true;
}
//org.apache.rocketmq.store.index.IndexFile.load()
public void load() {
this.indexHeader.load();
}
//org.apache.rocketmq.store.index.IndexHeader.load()
public void load() {
this.beginTimestamp.set(byteBuffer.getLong(beginTimestampIndex));//获取indexfile的前8字节,即起始时间戳
this.endTimestamp.set(byteBuffer.getLong(endTimestampIndex));//结束时间戳 8字节
this.beginPhyOffset.set(byteBuffer.getLong(beginPhyoffsetIndex));//在commitlog的起始offset 8字节
this.endPhyOffset.set(byteBuffer.getLong(endPhyoffsetIndex));//在commitlog的结束offset 8字节
this.hashSlotCount.set(byteBuffer.getInt(hashSlotcountIndex));//已占用的slot数量 4字节
this.indexCount.set(byteBuffer.getInt(indexCountIndex));//已经使用的index数量 4字节
if (this.indexCount.get() <= 0) {
this.indexCount.set(1);
}
}
indexfile文件的格式如下
最终每个indexfile被加载到org.apache.rocketmq.store.index.IndexService.indexFileList集合保存,IndexService又包含在DefaultMessageStore。indexfile和上述两个对象有区别,它只是包装了MappedFile对象,而Commitlog ConsumeQueue对象都是包装了MappedFileQueue对象,包装了MappedFile集合
综上,broker启动的时候加载commitlog consumequeue indexfile到broker,关系如图
3.4.broker启动恢复consumequeu 和commitlog
分为broker上次是正常关闭、异常关闭两种情况
//org.apache.rocketmq.store.DefaultMessageStore.recover(boolean)
private void recover(final boolean lastExitOK) {
long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();//恢复consumerqueue
if (lastExitOK) {
this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);//broker上次是正常关闭
} else {
this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);//broker上次是异常关闭
}
this.recoverTopicQueueTable();
}
3.4.1.恢复consumerqueue
/*
* 恢复消费队列,返回所有消费队列内的最大offset,即该offset就是commitlog中已经转储到消费队列的offset
*/
//org.apache.rocketmq.store.DefaultMessageStore.recoverConsumeQueue()
private long recoverConsumeQueue() {
long maxPhysicOffset = -1;
for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {//遍历同topic下的所有ConsumeQueue集合
for (ConsumeQueue logic : maps.values()) {//遍历同queueID下的所有ConsumeQueue
logic.recover();//恢复消费队列
if (logic.getMaxPhysicOffset() > maxPhysicOffset) {
maxPhysicOffset = logic.getMaxPhysicOffset();
}
}
}
return maxPhysicOffset;//返回所有消息队列文件内消息在commitlog中的最大偏移量
}
//org.apache.rocketmq.store.ConsumeQueue.recover()
public void recover() {
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
if (!mappedFiles.isEmpty()) {
int index = mappedFiles.size() - 3;//最多恢复三个
if (index < 0)
index = 0;
int mappedFileSizeLogics = this.mappedFileSize;//20字节
MappedFile mappedFile = mappedFiles.get(index);
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();//共用同一个缓冲区,但是position各自独立
long processOffset = mappedFile.getFileFromOffset();//队列文件名
long mappedFileOffset = 0;
long maxExtAddr = 1;
while (true) {//消费队列存储单元是一个20字节定长数据,commitlog offset(8) + size(4) + message tag hashcode(8),commitlog offset是指这条消息在commitlog文件实际偏移量,size指消息大小,消息tag的哈希值,用于校验
for (int i = 0; i < mappedFileSizeLogics; i += CQ_STORE_UNIT_SIZE) {
long offset = byteBuffer.getLong();//先读取8字节,即commitlog offset
int size = byteBuffer.getInt();//再读取4字节,即msg size
long tagsCode = byteBuffer.getLong();//再读取8字节,即message tag hashcode
if (offset >= 0 && size > 0) {
mappedFileOffset = i + CQ_STORE_UNIT_SIZE;//consumequeue上当前消息末尾位置,该值为20*N,其中N是表示当前消息在consumequeue上是第几个消息
this.maxPhysicOffset = offset;//队列内消息在commitlog中的偏移量,this.maxPhysicOffset最终为该队列下的consumequeue文件内的消息在commitlog的最大物理偏移量,即在commitlog的位置,该值也就是commitlog转储到consumequeue的位置,该位置后的消息就需要转储到consumequeue
if (isExtAddr(tagsCode)) {//用于扩展的consumequeue,忽略,默认是不开启,生产通常也不开启,没有研究过这个
maxExtAddr = tagsCode;
}
} else {
log.info("recover current consume queue file over, " + mappedFile.getFileName() + " "
+ offset + " " + size + " " + tagsCode);
break;
}
}
if (mappedFileOffset == mappedFileSizeLogics) {//达到consumequeue文件末尾
index++;
if (index >= mappedFiles.size()) {//遍历到该队列下是最后一个consumequeue文件则退出循环
log.info("recover last consume queue file over, last mapped file "
+ mappedFile.getFileName());
break;
} else {
mappedFile = mappedFiles.get(index);//获取下一个mappedFile对象
byteBuffer = mappedFile.sliceByteBuffer();
processOffset = mappedFile.getFileFromOffset();//重置processOffset为mappedFile文件名
mappedFileOffset = 0;
log.info("recover next consume queue file, " + mappedFile.getFileName());
}
} else {
log.info("recover current consume queue queue over " + mappedFile.getFileName() + " "
+ (processOffset + mappedFileOffset));
break;
}
}
processOffset += mappedFileOffset;//processOffset
this.mappedFileQueue.setFlushedWhere(processOffset);//设置刷新位置
this.mappedFileQueue.setCommittedWhere(processOffset);//设置提交位置
this.mappedFileQueue.truncateDirtyFiles(processOffset);//清理大于指定offset的脏文件
if (isExtReadEnable()) {//忽略,生产也不开启扩展consumequeue
this.consumeQueueExt.recover();
log.info("Truncate consume queue extend file by max {}", maxExtAddr);
this.consumeQueueExt.truncateByMaxAddress(maxExtAddr);
}
}
}
3.4.2.恢复commitlog
recoverConsumeQueue返回了已经刷盘到consumequeue的commitlog offset,
3.4.2.1. 上次broker是正常关闭
org.apache.rocketmq.store.CommitLog.recoverNormally(long)大体逻辑和recoverConsumeQueue是相同的,不同之处是consumequeue每条记录是固定20字节,而commitlog内每条记录即一条消息,是变长的,commitlog格式如图(该图是网上截图得来,具体格式可以在org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback.doAppend(long, ByteBuffer, int, MessageExtBrokerInner)方法内查看)
recoverNormally操作也是最多恢复最新的三个commitlog文件,每次读取commitlog中一条消息,最终获取到commitlog对象已经刷盘的位置processOffset,并设置Commitlog对象的MappedFileQueue的刷盘flushedWhere和提交点committedWhere位置为processOffset,清除脏commitlog文件。这个步骤逻辑级别和recoverConsumeQueue相同,不同之处是如果processOffset<=maxPhyOffsetOfConsumeQueue(该值是recoverConsumeQueue操作返回的commitlog已经转储到consumequeue的最大commitlog offset)的时候,需要执行org.apache.rocketmq.store.DefaultMessageStore.truncateDirtyLogicFiles(long)
//org.apache.rocketmq.store.DefaultMessageStore.truncateDirtyLogicFiles(long)
public void truncateDirtyLogicFiles(long phyOffset) {
ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {
for (ConsumeQueue logic : maps.values()) {
logic.truncateDirtyLogicFiles(phyOffset);
}
}
}
//org.apache.rocketmq.store.ConsumeQueue.truncateDirtyLogicFiles(long)
public void truncateDirtyLogicFiles(long phyOffet) {
int logicFileSize = this.mappedFileSize;//600w
this.maxPhysicOffset = phyOffet - 1;
long maxExtAddr = 1;
while (true) {
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();//获取consumequeue的最后一个MappedFile,即文件名最大的那个MappedFile
if (mappedFile != null) {
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
mappedFile.setWrotePosition(0);
mappedFile.setCommittedPosition(0);
mappedFile.setFlushedPosition(0);
for (int i = 0; i < logicFileSize; i += CQ_STORE_UNIT_SIZE) {
long offset = byteBuffer.getLong();//commitlog offset 8字节
int size = byteBuffer.getInt();//msg size 4字节
long tagsCode = byteBuffer.getLong();//tag hashcode 8字节
if (0 == i) {//第一条消息
if (offset >= phyOffet) {
this.mappedFileQueue.deleteLastMappedFile();//从MappedFileQueue.mappedFiles移除最后的MappedFile,并释放该MappedFile
break;//跳出for循环,继续执行while循环,获取删除当前MappedFile后的mappedFileQueue的最后一个文件继续重复执行该步骤
} else {
int pos = i + CQ_STORE_UNIT_SIZE;
mappedFile.setWrotePosition(pos);
mappedFile.setCommittedPosition(pos);
mappedFile.setFlushedPosition(pos);
this.maxPhysicOffset = offset;
// This maybe not take effect, when not every consume queue has extend file.
if (isExtAddr(tagsCode)) {//默认不使用扩展consumequeue.忽略
maxExtAddr = tagsCode;
}
}
} else {
if (offset >= 0 && size > 0) {
if (offset >= phyOffet) {//MappedFile的非第一条消息offset>= phyOffet,说明当前的MappedFile既有
return;
}
int pos = i + CQ_STORE_UNIT_SIZE;
mappedFile.setWrotePosition(pos);
mappedFile.setCommittedPosition(pos);
mappedFile.setFlushedPosition(pos);
this.maxPhysicOffset = offset;
if (isExtAddr(tagsCode)) {//默认不使用扩展consumequeue.忽略
maxExtAddr = tagsCode;
}
if (pos == logicFileSize) {//达到当前MappedFile末尾,则结束
return;
}
} else {
return;
}
}
}
} else {
break;//mappedFile==null 退出while循环
}
}
if (isExtReadEnable()) {//默认不使用扩展consumequeue.忽略
this.consumeQueueExt.truncateByMaxAddress(maxExtAddr);
}
}
DefaultMessageStore.truncateDirtyLogicFiles(long)方法,什么情况下会进入?暂时没搞清楚
3.4.2.2. 上次broker是异常关闭
org.apache.rocketmq.store.CommitLog.recoverAbnormally(long)跟recoverNormally正常关闭恢复commitlog基本相同,代码加了注释,看如下代码
//org.apache.rocketmq.store.CommitLog.recoverAbnormally(long)
public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) {
// recover by the minimum time stamp
boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();//true 默认校验crc
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
if (!mappedFiles.isEmpty()) {
// Looking beginning to recover from which file
int index = mappedFiles.size() - 1;
MappedFile mappedFile = null;
for (; index >= 0; index--) {//从commitlog文件倒序遍历
mappedFile = mappedFiles.get(index);
if (this.isMappedFileMatchedRecover(mappedFile)) {//如果该commitlog的第一天消息存储时间戳<=刷盘检测点的刷盘时间,则说明该commitlog是需要恢复的,否则就继续遍历前一个commitlog
log.info("recover from this mapped file " + mappedFile.getFileName());//recover from this mapped file
break;
}
}
//剩余代码逻辑等同recoverNormally
}
// Commitlog case files are deleted
else {//如果不存在commitlog文件,比如过期被清除了or误删了
this.mappedFileQueue.setFlushedWhere(0);//设置mappedFileQueue的刷盘位置为0
this.mappedFileQueue.setCommittedWhere(0);//设置mappedFileQueue的提交位置为0
this.defaultMessageStore.destroyLogics();//删除所有的consumequeue文件
}
//org.apache.rocketmq.store.CommitLog.isMappedFileMatchedRecover(MappedFile)
/*
* 读取该commitlog的第一条消息的存储时间戳,如果时间戳<=Math.min(刷盘检测点commitlog最后刷盘时间, 刷盘检测点consumequeue最终刷盘时间 ) - 3s,
* 则说明该commitlog文件是需要进行恢复的
*/
private boolean isMappedFileMatchedRecover(final MappedFile mappedFile) {
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
//commitlog组成 消息大小(4) +魔术(4)+消息报文体校验码(4)+队列id(4)+flag(4)+当前消息在队列中的第几个(8)+物理地址偏移量(8)+SYSFLAG(4)+producer时间戳(8)+producer地址(8)+消息在broker存储的时间(8)+消息存储到broker的地址(8)+消息被重新消费了几次(4)+prepard状态的事务消息(8)+bodylenght(4)+body(消息体)+(1)+topicLength+(2)+propertiesLength
int magicCode = byteBuffer.getInt(MessageDecoder.MESSAGE_MAGIC_CODE_POSTION);//获取魔术daa320a7
if (magicCode != MESSAGE_MAGIC_CODE) {//魔数不对,返回false,说明该commitlog不符合规则
return false;
}
long storeTimestamp = byteBuffer.getLong(MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSTION);//获取存储到broker的timestamp
if (0 == storeTimestamp) {//存储时间为0,说明没有消息,因为commitlog创建后默认填充就是0x00
return false;
}
if (this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable()
&& this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) {//默认false
if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestampIndex()) {
log.info("find check timestamp, {} {}",
storeTimestamp,
UtilAll.timeMillisToHumanString(storeTimestamp));
return true;
}
} else {//执行这里
if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestamp()) {//该commitlog第一条消息存储时间戳<=Math.min(刷盘检测点commitlog最后刷盘时间, 刷盘检测点consumequeue最终刷盘时间 ) - 3s
log.info("find check timestamp, {} {}",
storeTimestamp,
UtilAll.timeMillisToHumanString(storeTimestamp));
return true;
}
}
return false;
}
4.broker运行中消息写入commitlog保存
producer发送消息到broker,发送命令是SEND_MESSAGE_V2,broker端对应的处理器是SendMessageProcessor,如下图,是broker端收到消息后的处理堆栈(红框内)
核心方法是org.apache.rocketmq.store.CommitLog.putMessage(MessageExtBrokerInner),代码如下,注释加的比较清楚,应该对于逻辑比较容易懂
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
// Set the storage time
msg.setStoreTimestamp(System.currentTimeMillis());//设置消息落地时间
// Set the message body BODY CRC (consider the most appropriate setting
// on the client)
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
// Back to Results
AppendMessageResult result = null;
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();//DefaultMessageStore.storeStatsService
String topic = msg.getTopic();
int queueId = msg.getQueueId();
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());//从消息的SYSFlag判断消息类型
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {//事务消息,非事务消息
// Delay Delivery
if (msg.getDelayTimeLevel() > 0) {//如果消息的延迟级别>0 说明消息设置了延时
/*
* 如果消息的延迟级别>0 将消息的原topicName和原消息队列ID存入消息属性中,
* 用延迟消息主题SCHEDULE_TOPIC_XXXX,消息队列ID更新原消息的主题与队列
* 对于producer发送的延时消息,消费端消费失败重发的消息都是有延时级别,都会被更改topic保存到commitlog
*/
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
// 对于重发交易,它的延时级别都是大于0的,因此重发交易在消费失败的时候发送到broker保存的时候都先被更高为重试topic,接着在保存到commitlog的时候被保存为schedule主题
topic = ScheduleMessageService.SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());//将消息的原topicName和原消息队列ID存入消息属性中,
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
//Commit消息 包括普通消息, 重发消息,延时消息,事务消息
long eclipseTimeInLock = 0;
MappedFile unlockMappedFile = null;
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();//获取当前可以写入的commitlog文件,获取一个MappedFile对象,内存映射的具体实现
//消息写入是同步操作
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config 加锁 默认使用自旋锁加锁 即cas加锁
try {
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();//加锁时间 当前时间
this.beginTimeInLock = beginLockTimestamp;
// Here settings are stored timestamp, in order to ensure an orderly
// global
msg.setStoreTimestamp(beginLockTimestamp);//设置消息存储时间戳
if (null == mappedFile || mappedFile.isFull()) {
/*
* 如果mappedFile==null 则需要新建mappedFile对象,即新建commitlog
* 如果mappedFile.isFull() 意思是commitlog文件满了,mappedFile.wrotePosition==1024^3,写道了文件末尾
* 创建commitlog文件,创建mappedFile对象
*/
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
if (null == mappedFile) {
log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback);//消息写入commitlog文件对应的pagecache
switch (result.getStatus()) {
case PUT_OK:
break;//写入成功
case END_OF_FILE://文commitlog剩余空间不足写入该消息,则新建commitlog文件,通常创建commitlog是走该分支,mappedFile.isFull()的情况太罕见了,需要恰好一条消息的长度剩余commitlog的剩余写入空间
unlockMappedFile = mappedFile;
// Create a new file, re-write the message
mappedFile = this.mappedFileQueue.getLastMappedFile(0);//新建commitlog文件
if (null == mappedFile) {
// XXX: warn and notify me
log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback);//消息写入新commitlog文件对应的pagecache
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
case UNKNOWN_ERROR:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
default:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
}
eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
putMessageLock.unlock();//cas解锁
}
if (eclipseTimeInLock > 500) {//消息写入pagecache耗时超过500ms报警
log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result);
}
//如果broker开启了文件预热warmMapedFileEnable=true,则通过jna释放commitlog创建时候pagecache锁定的物理内存。生产通常是开启文件预热的,避免日志文件在分配内存时缺页中断
if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);//设置返回producer的结果
// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
handleDiskFlush(result, putMessageResult, msg);//刷盘,吧消息保存到磁盘
handleHA(result, putMessageResult, msg);//ha同步消息
return putMessageResult;
}
4.1.消息写入到commitlog
消息写入到commitlog,即追加到pagecache的方法,如下方法
//org.apache.rocketmq.store.MappedFile.appendMessagesInner(MessageExt, AppendMessageCallback)
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {//cb=DefaultAppendMessageCallback
assert messageExt != null;
assert cb != null;
int currentPos = this.wrotePosition.get();//当前MappedFile的写位置
if (currentPos < this.fileSize) {//当前MappedFile的写位置<文件大小,说明文件有剩余位置可写
/*
* 仅当transientStorePoolEnable 为true,刷盘策略为异步刷盘(FlushDiskType为ASYNC_FLUSH),
* 并且broker为主节点时,才启用堆外分配内存。此时:writeBuffer不为null
* Buffer与同步和异步刷盘相关
* writeBuffer/mappedByteBuffer的position始终为0,而limit则始终等于capacity
* slice创建一个新的buffer, 是根据position和limit来生成byteBuffer,与原buf共享同一内存
* 开启了transientStorePoolEnable,数据是写入到堆外内存,即this.writeBuffer
*/
ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();//重点
byteBuffer.position(currentPos);//设置写位置
AppendMessageResult result = null;
if (messageExt instanceof MessageExtBrokerInner) {//根据消息类型,是批量消息还是单个消息,进入相应的处理
//处理单个消息, this.fileSize - currentPos是可写的空间
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
} else if (messageExt instanceof MessageExtBatch) {
//处理批量消息
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
} else {
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
this.wrotePosition.addAndGet(result.getWroteBytes());//把MappedFile中的写位置更新为写了消息之后的位置
this.storeTimestamp = result.getStoreTimestamp();//更新storeTimestamp为消息存储时间戳,即broker接收到消息的时间戳
return result;
}
//写满会报错,正常不会进入该代码,调用该方法前有判断
log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
以追加单个消息为例说明org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback.doAppend(long, ByteBuffer, int, MessageExtBrokerInner),该方法就是按照commitlog格式吧消息写入到传入的参数ByteBuffer内,如果是开启了transientStorePoolEnable=true,则该ByteBuffer是MappedFile.writeBuffer堆外内存,未开启则是MappedFile.mappedByteBuffer即pagecache。该pagecache是对应具体的commitlog磁盘文件,该writeBuffer是对应什么呢?在刷盘服务线程下会把writeBuffer内的数据写入到pagecache,再由pagecache最终刷新保存到commitlog磁盘上。下面到刷盘时候自然清楚。因此开启transientStorePoolEnable=true的情况下是吧消息写入堆外内存即MappedFile.writeBuffer,未开启情况下是写入到pagecache即MappedFile.mappedByteBuffer。
说到这里,在commitlog剩余空间不足以写入该消息,需要新建commitlog,那么是如何实现的?
在org.apache.rocketmq.store.CommitLog.putMessage(MessageExtBrokerInner)内的
mappedFile = this.mappedFileQueue.getLastMappedFile(0)
这行代码实现的
//org.apache.rocketmq.store.MappedFileQueue.getLastMappedFile(long, boolean)
/*
* 功能:获取最新的MappedFile对象
* 如果没有或者上个commitlog/consumequeue已经满了,则新创建一个commitlog/consumequeue文件
* 对于创建commitlog来说,通常创建的原因是剩余空间不足一写入一条消息,传入参数startOffset==0 , needCreate=true
* 对于创建consumequeue来说,通常创建的原因是因为真满了,因为consumequeue是固定长度,传入参数startOffset==300w*20 , needCreate=true
*/
public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
long createOffset = -1;
MappedFile mappedFileLast = getLastMappedFile();//获取最新的MappedFile
// 一个映射文件都不存在 createOffset=0
if (mappedFileLast == null) {
createOffset = startOffset - (startOffset % this.mappedFileSize);
}
// 已经存在了MappedFile文件,且文件满了则创建
if (mappedFileLast != null && mappedFileLast.isFull()) {//mappedFileLast.isFull()对于commitlog来说是很罕见遇到的,因此创建commitlog通常不走这里,但是由于consumequeue每个消息是固定长度,每次是会写满通常走这里
createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;//createOffset是上个文件名+文件size,用作新建的MappedFile文件名。公式就是createOffset=(N-1)*this.mappedFileSize 其中N为第几个MappedFile
}
// 创建新的commitlog/consumequeue文件对象MappedFile
if (createOffset != -1 && needCreate) {//对于commitlog创建来说,通常执行到这里createOffset==0
String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);//以createOffset作为commitlog or consumequeue的文件名
String nextNextFilePath = this.storePath + File.separator
+ UtilAll.offset2FileName(createOffset + this.mappedFileSize);
MappedFile mappedFile = null;
// 创建commitlog走这里,因为开启后this.allocateMappedFileService为AllocateMappedFileService,在broker启动时候new Commitlog的时候赋值的
if (this.allocateMappedFileService != null) {//true
mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
nextNextFilePath, this.mappedFileSize);//由AllocateMappedFileService线程异步创建mappedFile,并同步等待获取创建结果
} else {//consumequeue文件创建走这里
try {
mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);//创建consumequeue对应的MappedFile
} catch (IOException e) {
log.error("create mappedFile exception", e);
}
}
if (mappedFile != null) {
if (this.mappedFiles.isEmpty()) {
mappedFile.setFirstCreateInQueue(true);//第一个设置MappedFile.firstCreateInQueue=true
}
this.mappedFiles.add(mappedFile);//添加到MappedFileQueue集合
}
return mappedFile;//文件满了则返回新建的文件
}
return mappedFileLast;//文件未满则返回最新的MappedFile
}
这里有个难点是mappedFileLast.isFull()的判断,因为对于consumequeue来说每条记录是固定20字节,写位置是每次达到末尾(即文件size)结束。但是对于commitlog来说,每条记录(消息)是变长的,比如要写入的消息长度是200字节,但是commitlog只有70字节空间,因此这样情况是需要新建commitlog文件,吧消息写入到新的commitlog内,但是旧的commitlog的写位置MappedFile.wrotePosition怎么就变成了MappedFile.fileSize呢?因为MappedFile.wrotePosition变成MappedFile.fileSize是很恰好发生的事情,加入待写入的消息长度和commitlog剩余空间恰好相等,则说明正好容纳,这种很罕见的情况,在写完后wrotePositionfileSize,但是实际也不会发生,因为如果写入,MappedFile.wrotePosition最大只能达到MappedFile.fileSize-8,因为commitlog预留末尾8字节作为commitlog结束。那么是如何使剩余空间不足够写入消息而让wrotePositionfileSize呢?经过分析得知在写入消息方法org.apache.rocketmq.store.MappedFile.appendMessagesInner(MessageExt, AppendMessageCallback)内,追加消息到commitlog后,执行this.wrotePosition.addAndGet(result.getWroteBytes());,这里result.getWroteBytes()是AppendMessageResult.wroteBytes,该值是在AppendMessageResult构造器赋值,在org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback.doAppend(long, ByteBuffer, int, MessageExtBrokerInner)内END_OF_FILE这行,如果消息长度+8字节(commitlog文件结束的保留字节)>commitlog剩余空间maxBlank,则构造AppendMessageResult对象,AppendMessageResult.wroteBytes=maxBlank,而maxBlank==MappedFile.fileSize-MappedFile.wrotePosition,那么在commitlog剩余空间不足写入该条消息的情况下,在返回AppendMessageResult对象END_OF_FILE错误后,写位置变为MappedFile.wrotePosition+maxBlank=MappedFile.wrotePosition+MappedFile.fileSize-MappedFile.wrotePosition=MappedFile.fileSize,即在剩余空间不足写入该条消息的时候,写位置会被更新为文件大小,表示文件满了,需要创建文件。
4.2.消息刷盘
消息写入到pagechace/堆外内存后,执行刷盘handleDiskFlush(消息保存到磁盘),分为同步刷盘和异步刷盘,入口方法如下
//org.apache.rocketmq.store.CommitLog.handleDiskFlush(AppendMessageResult, PutMessageResult, MessageExt)
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
// Synchronization flush
//同步刷写,这里有两种配置,是否一定要收到存储MSG信息,才返回,默认为true
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
if (messageExt.isWaitStoreMsgOK()) {//默认是true, 代码@1
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());//代码@2 result.getWroteOffset() + result.getWroteBytes()即为MappedFile写入消息后的MappedFile.wrotePosition+MappedFile.fileFromOffset
service.putRequest(request);//代码@3
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());代码@4
if (!flushOK) {//即GroupCommitRequest.flushOK为false,表示刷盘超时,那么设置返回结果为超时
log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
+ " client address: " + messageExt.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
} else {//代码@5 通常不走该分支,既然是同步刷盘,那么为了保证100%的消息不丢失,只能是消息写入到磁盘后才不会丢失,那么就只能同步等待消息写入到磁盘后才能返回。
service.wakeup();//唤醒同步刷盘线程进行刷屏。
}
}
// Asynchronous flush
else {
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {//未开启transientStorePoolEnable=true且异步刷盘
flushCommitLogService.wakeup();//代码@6 异步刷盘
} else {//开启了transientStorePoolEnable=true且异步刷盘
commitLogService.wakeup();//代码@7 唤醒把消息写入pagecache的线程
}
}
}
这里解释下为什么代码@1处默认是true,因为通常producer创建消息采用的构造器是
//producer构造Message通常使用下面三个构造器
Message(String topic, byte[] body)
Message(String topic, String tags, byte[] body)
Message(String topic, String tags, String keys, byte[] body)
//对于这三个构造器,均设置 this.setWaitStoreMsgOK(true),即属性PROPERTY_WAIT_STORE_MSG_OK均设置为true
//基本不使用的构造器
public Message(String topic, String tags, String keys, int flag, byte[] body, boolean waitStoreMsgOK)
//该构造器可以指定PROPERTY_WAIT_STORE_MSG_OK属性为false
因此默认通常PROPERTY_WAIT_STORE_MSG_OK是为true的,表示的含义是等待收到消息存储成功。可否使用第四个构造器设置waitStoreMsgOK=false呢?也是可以的,但是这样的会在handleDiskFlush内同步刷盘就走到了代码@5分支,这就无法保证同步刷盘的100%消息落地,而且这样的效率不如异步刷盘,为什么不使用异步刷盘呢,因此代码@5分支实际没人使用。
this.flushCommitLogServic是个刷盘服务类FlushCommitLogService的具体子类,是个runnable对象,根据不同的刷盘方式赋值为不同的对象,同步刷盘为GroupCommitService,异步刷盘为FlushRealTimeService,在broker启动中org.apache.rocketmq.store.CommitLog.start()启动该服务线程的,调用关系如下
4.2.1.同步刷盘
在handleDiskFlush内代码解释
代码@2,result.getWroteOffset() + result.getWroteBytes()即为MappedFile写入消息后的MappedFile.wrotePosition+MappedFile.fileFromOffset,即为整个commitlog对象(也为MappedFileQueue,因为代表MappedFile集合)上的位置,MappedFileQueue.flushedWhere到该位置之间的数据就是需要刷新到磁盘的。
代码@3,吧GroupCommitRequest保存到GroupCommitService.requestsWrite集合,并唤醒阻塞的GroupCommitService服务线程(同步刷盘线程)
public synchronized void putRequest(final GroupCommitRequest request) {//handleDiskFlush()操作执行
synchronized (this.requestsWrite) {
this.requestsWrite.add(request);
}
if (hasNotified.compareAndSet(false, true)) {
waitPoint.countDown(); // notify 唤醒阻塞的GroupCommitService服务线程
}
}
代码@4,就是同步等待数据落盘,阻塞操作。
具体同步刷盘方式是GroupCommitService线程不停的轮询,并提交
//org.apache.rocketmq.store.CommitLog.GroupCommitService.run()
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
this.waitForRunning(10);
this.doCommit();
} catch (Exception e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
//省略其他代码
}
接着看GroupCommitService.waitForRunning(long)方法
protected void waitForRunning(long interval) {
if (hasNotified.compareAndSet(true, false)) {
this.onWaitEnd();
return;
}
//entry to wait
waitPoint.reset();
try {
waitPoint.await(interval, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error("Interrupted", e);
} finally {
hasNotified.set(false);
this.onWaitEnd();
}
}
@Override
protected void onWaitEnd() {
this.swapRequests();
}
private void swapRequests() {//交换两个集合数据,这样在handleDiskFlush添加到requestsWrite的GroupCommitRequest对象就被交换到了requestsRead
List<GroupCommitRequest> tmp = this.requestsWrite;
this.requestsWrite = this.requestsRead;
this.requestsRead = tmp;
}
在同步刷盘服务线程不停轮询的过程中,每次把handleDiskFlush内添加到requestsWrite的GroupCommitRequest对象交换到了requestsRead,这个设计不错,进行了读写分离,但是为什么要设计的这么麻烦呢?直接在handleDiskFlush内把GroupCommitRequest对象提交到GroupCommitService的一个阻塞队列不行么?难道是效率什么的考虑,
接着执行刷盘操作doCommit,代码如下,注释很清楚了,就不写说明了。
private void doCommit() {
synchronized (this.requestsRead) {
if (!this.requestsRead.isEmpty()) {
for (GroupCommitRequest req : this.requestsRead) {//this.requestsRead虽然是集合,但是实际有且只会有一个元素,定为集合是为了考虑将来扩展吧
// There may be a message in the next file, so a maximum of
// two times the flush
boolean flushOK = false;
for (int i = 0; i < 2 && !flushOK; i++) {//循环第一次,flushOK为false,那么进行刷盘,接着循环第二次的时候,flushOK被置为true,退出for循环
//如果commitlog上次刷盘点MappedFileQueue.flushedWhere>=本次待刷盘位置,则不进行刷盘,接着退出for循环
flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();//for循环第一次为false,第二次为true
if (!flushOK) {
CommitLog.this.mappedFileQueue.flush(0);//执行刷盘操作,把消息从pagecache写入到磁盘保存
}
}
req.wakeupCustomer(flushOK);//设置GroupCommitRequest.flushOK为true,表示刷盘成功,该值在handleDiskFlush内用作判断在等待时间内刷盘是否成功。唤醒用户线程,即唤醒handleDiskFlush的代码@3
}
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
//每次刷盘后把刷盘时间戳保存到StoreCheckpoint.physicMsgTimestamp,以供broker异常关闭后启动恢复刷盘位置,这里和broker启动进行recover操作对应
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
this.requestsRead.clear();//清空
} else {
// Because of individual messages is set to not sync flush, it
// will come to this process
CommitLog.this.mappedFileQueue.flush(0);//这里对应着handleDiskFlush的代码@5,直接就刷盘
}
}
}
4.2.2.异步刷盘
4.2.2.1.异步刷盘未开启transientStorePoolEnable=true的情况
handleDiskFlush的代码@6 处是未开启transientStorePoolEnable=true且异步刷盘的情况,执行FlushRealTimeService.wakeup(),唤醒异步刷盘服务线程FlushRealTimeService
异步刷盘服务线程的执行
//org.apache.rocketmq.store.CommitLog.FlushRealTimeService.run()
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();//false
int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();//异步刷盘间隔时间500ms
int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();//默认操作系统提交页数 4
//省略不重要代码
try {
//省略不重要代码
long begin = System.currentTimeMillis();
CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);//把数据刷新到磁盘
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
//每次刷盘后把刷盘时间戳保存到StoreCheckpoint.physicMsgTimestamp,以供broker异常关闭后启动恢复刷盘位置,这里和broker启动进行recover操作对应
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
long past = System.currentTimeMillis() - begin;
if (past > 500) {
log.info("Flush data to disk costs {} ms", past);
}
} catch (Throwable e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
this.printFlushProgress();
}
}//while end
// Normal shutdown, to ensure that all the flush before exit
boolean result = false;
//如果broker被关闭,则执行到这里。由于是异步刷盘,此时磁盘数据落后commitlog,那么尽力吧commitlog数据刷新到磁盘
for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
result = CommitLog.this.mappedFileQueue.flush(0);
CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
}
//省略不重要代码
CommitLog.log.info(this.getServiceName() + " service end");
}
异步刷盘较同步刷盘逻辑简单,仅仅是刷盘而已,没有那么多的控制。
4.2.2.2.异步刷盘开启transientStorePoolEnable=true的情况
handleDiskFlush的代码@7 处是开启transientStorePoolEnable=true且异步刷盘的情况,执行CommitRealTimeService.wakeup(),唤醒异步刷盘服务线程CommitRealTimeService,这个和FlushRealTimeService不同
具体执行看代码和其中注释
//org.apache.rocketmq.store.CommitLog.CommitRealTimeService.run()
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();//刷盘频率 200ms
int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();//默认操作系统页 4
int commitDataThoroughInterval =
CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();//最大提交数据间隔时间 200ms,设置该值可以提高broker性能
long begin = System.currentTimeMillis();
if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {
this.lastCommitTimestamp = begin;
commitDataLeastPages = 0;//如果上次提交数据的时间戳+commitDataThoroughInterval<=当前时间,说明消息过少,不满足4页的提交,在达到最大提交时间后,强制不满足4页提交条件也提交数据到pagecache
}
try {
boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);//把数据由堆外内存MappedFile.writeBuffer提交到pagecache即MappedFile.mappedByteBuffer保存
long end = System.currentTimeMillis();
if (!result) {//result为true说明本次没有数据提交,具体产生该情况待提交的数据不满足4k,因此实际是没有提交。
this.lastCommitTimestamp = end; // result = false means some data committed.
//now wake up flush thread.
flushCommitLogService.wakeup();//唤醒异步刷盘线程FlushRealTimeService,异步刷盘线程把消息从pagecache保存到磁盘
}
if (end - begin > 500) {
log.info("Commit data to file costs {} ms", end - begin);
}
this.waitForRunning(interval);
} catch (Throwable e) {
CommitLog.log.error(this.getServiceName() + " service has exception. ", e);
}
}//end while
//如果broker被关闭,则执行到这里。由于是异步刷盘,此时磁盘数据落后commitlog,那么尽力吧commitlog数据提交到pagecache保存,这样尽管broker关闭了,但是数据是保存在操作系统,因此是不会丢失的
boolean result = false;
for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
result = CommitLog.this.mappedFileQueue.commit(0);//把待刷盘的数据提交到pagecache保存
CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
}
CommitLog.log.info(this.getServiceName() + " service end");
}
总的逻辑就是先把堆外内存数据提交到pagecache,然后唤醒异步刷盘服务线程进行刷盘,由异步刷盘线程吧pagecache内的数据最终刷新到磁盘上保存。
4.2.3三种刷盘方式比较总结
case1:同步刷盘GroupCommitService,producer发送消息到broker,broker保存消息到pagecache,然后同步等待数据落盘后,把结果返回producer,这样producer就知道具体消息是否发送成功。
异步刷盘FlushRealTimeService分两种情况
case2:未开启transientStorePoolEnable=true情况
producer发送消息到broker,broker保存消息到pagecache即MappedFile.mappedByteBuffer,唤醒异步刷盘线程
,然后把结果返回producer,这样producer收到返回结果发送成功,但是不一定就消息就真的保存到了磁盘,这样有小概率会丢失消息。
broker异步刷盘线程FlushRealTimeService
把pagecache的消息通过固定频率刷新保存到磁盘。
case3:开启transientStorePoolEnable=true情况
producer发送消息到broker,broker保存消息到堆外内存即MappedFile.writeBuffer,唤醒异步提交线程
,然后把结果返回producer,这样producer收到返回结果发送成功,但是不一定就消息就真的保存到了磁盘,这样有小概率会丢失消息。
broker异步提交线程CommitRealTimeService
把堆外内存的消息通过固定频率提交到pagecache,唤醒异步刷盘线程FlushRealTimeService,由该线程把pagecache消息刷新保存到磁盘。
总结:对于case1,不会丢失发送的消息,但是效率不高,因为把同步等待吧消息写入到磁盘速度是较慢的(rmq采用的是顺序写,速度也不慢)。对于case2、case3,发送消息效率高,但是如果服务器宕机,那么会丢失少量未提交保存的消息。对于case2,如果只是broker挂了,但是消息是保存在pagecache的,那么也不会丢失消息。对于case3,由于消息是写入到堆外内存的,如果broker挂了,自然就没有指向该堆外内存的指针了,因此堆外内存未提交到pagecache的少量消息会丢失,但是case3效率也是最高的,不仅发送消息效率高,而且消费效率也高,因为对于case3情况,是读写分离,消息发送是写入堆外内存,消息消费是从pagecache读取,消息的消费后续会再写博客。
4.2.4.刷盘的核心实现
从4.2.2可以看出,不论是异步刷盘or同步刷盘,最终都是执行org.apache.rocketmq.store.MappedFileQueue.flush(int)
具体代码加了注释,注释已经把完整功能都描述了,请看代码
/*
* 在刷盘服务线程FlushRealTimeService、GroupCommitService内刷盘调用
* 功能就是把pagecache内的数据刷新保存到磁盘
* 参数flushLeastPages在同步刷盘为0,异步刷盘为4
* 结果返回true说明本次刷盘并没有真正执行(没有数据刷新到磁盘),比如异步刷盘数据不满足4k的要求,结果为false说明有数据被刷新到了磁盘
*/
//org.apache.rocketmq.store.MappedFileQueue.flush(int)
public boolean flush(final int flushLeastPages) {
boolean result = true;
MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);//根据上次刷新的位置,得到当前的MappedFile对象
if (mappedFile != null) {
long tmpTimeStamp = mappedFile.getStoreTimestamp();//获取mappedFile最后一天消息的存储时间戳
int offset = mappedFile.flush(flushLeastPages);//把数据保存到磁盘文件 并返回最新刷盘位置
long where = mappedFile.getFileFromOffset() + offset;//相对于MappedFileQueue的刷盘位置
result = where == this.flushedWhere;//如果有待刷新的数据被刷新到了磁盘,则返回false。如果此时broker很空闲,没有待写入的数据(即MappedFile.wrotePosition==MappedFile.flushedPosition),那么就返回true。还要就是异步刷盘不满足4k的刷盘要求,实际也并不进行刷盘,也返回true.
this.flushedWhere = where;//更新刷新位置为新刷新位置
if (0 == flushLeastPages) {
this.storeTimestamp = tmpTimeStamp;
}
}
return result;
}
/*
* 参数flushLeastPages在同步刷盘为0,异步刷盘为4
* 功能就是把pagecache内的数据刷新保存到磁盘
* 同步刷盘就是把flushedPosition->wrotePosition之间的数据刷新到磁盘,只要这俩个位置不同,就会每次执行都把数据刷新到磁盘
* 异步刷盘(未开启transientStorePoolEnable=true)就是把flushedPosition->wrotePosition之间的数据刷新到磁盘,只有这俩个位置差距大于4k情况,才会把数据刷新到磁盘,实际并不会每次执行flush操作都会把数据刷新到磁盘的
* 异步刷盘(开启transientStorePoolEnable=true)就是把committedPosition->wrotePosition之间的数据刷新到磁盘,只有这俩个位置差距大于4k情况,才会把数据刷新到磁盘,实际并不会每次执行flush操作都会把数据刷新到磁盘的
*/
//org.apache.rocketmq.store.MappedFile.flush(int)
public int flush(final int flushLeastPages) {
if (this.isAbleToFlush(flushLeastPages)) {//如果MappedFile被写满,则需要刷新。MappedFile.wrotePosition(或者committedPosition)>MappedFile.flushedPosition,则说明可以写(同步刷盘每次执行都刷新(只要有数据待刷新)),但是对于异步刷盘来说,只有满足待刷新数据大于4k才会刷盘,这样是为了性能考虑
if (this.hold()) {
int value = getReadPosition();//readPosition,同步刷盘和异步刷盘未开启transientStorePoolEnable=true的情况返回MappedFile.wrotePosition,异步刷盘且开启transientStorePoolEnable=true的情况返回MappedFile.committedPosition
try {
//We only append data to fileChannel or mappedByteBuffer, never both.
if (writeBuffer != null || this.fileChannel.position() != 0) {//异步刷盘且开启了transientStorePoolEnable=true执行这里,把文件通道内的数据写入到磁盘保存
this.fileChannel.force(false);//如果开启了transientStorePoolEnable=true,则把堆外内存内的数据刷入到磁盘。这里采用的不是pagecache,那么在master宕机的时候,会损失消息
} else {//同步刷盘和异步刷盘未开启transientStorePoolEnable=true的情况走这里
this.mappedByteBuffer.force();//把pagecache内的数据刷入到磁盘
}
} catch (Throwable e) {
log.error("Error occurred when force data to disk.", e);
}
this.flushedPosition.set(value);//更新刷盘位置flushedPosition为readPosition,对于同步刷盘和异步刷盘未开启transientStorePoolEnable=true的情况为wrotePosition or 对于异步刷盘且开启transientStorePoolEnable=true的情况为committedPosition
this.release();//每次刷盘动作实际不会释放pagecahe的物理内存,因为只有在MappedFile被销毁的时候才MappedFile.available才会被更新为false,只有在被更新为false情况下才会被释放
} else {
log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
this.flushedPosition.set(getReadPosition());
}
}
return this.getFlushedPosition();//返回刷盘位置MappedFile.flushedPosition
}
数据提交,即把堆外内存的数据提交到pagecache内,直接看代码和注释,功能和flush差不多
/*
* 传入参数commitLeastPages为0 or 4,为0表示需要强制提交,为4表示按照4k页提交。有强制提交是因为消息过少,那么满足4k的提交情况时间跨度比较大,而且关闭broker的时候(需要提交)消息不满足4k就不能提交了,因此有了强制提交
* 如果没有开启transientStorePoolEnable=true则不会执行这里。具体是在CommitReadTimeService服务线程执行
* 功能:把数据由堆外内存提交到pagecache,仅限于commitlog文件,并返回提交结果,为true表示本次并没有真正提交,为false表示本次真正提交了数据到pagecache
*/
//org.apache.rocketmq.store.MappedFileQueue.commit(int)
public boolean commit(final int commitLeastPages) {
boolean result = true;
MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0);//根据上次提交位置,得到对应的MappedFile对象
if (mappedFile != null) {
int offset = mappedFile.commit(commitLeastPages);
long where = mappedFile.getFileFromOffset() + offset;//新提交点,可能跟原提交点相同,因为在commitLeastPages==4的情况下,不满足4k情况是不提交的
result = where == this.committedWhere;
this.committedWhere = where;//更新MappedFileQueue.committedWhere
}
return result;
}
/*
* 传入参数commitLeastPages为0 or 4,为0表示需要强制提交,为4表示按照4k页提交
* 把数据由堆外内存写入到pagecache内,然后返回新提交位置(即写的位置)
*/
//org.apache.rocketmq.store.MappedFile.commit(int)
public int commit(final int commitLeastPages) {
if (writeBuffer == null) {//没有开启transientStorePoolEnable=true的时候,不把消息写入pagecache,返回写位置。即没有开启transientStorePoolEnable的情况下committedPosition就没有意义
//no need to commit data to file channel, so just regard wrotePosition as committedPosition.
return this.wrotePosition.get();
}
if (this.isAbleToCommit(commitLeastPages)) {//如果MappedFile被写满,则需要刷新。或者 MappedFile.wrotePosition与MappedFile.committedPosition满足待提交数据大于4k才会提交,这样是为了性能考虑
if (this.hold()) {
commit0(commitLeastPages);//把堆外内存committedPosition->wrotePosition之间的数据提交到pagecache,并更新committedPosition为wrotePosition
this.release();//每次提交动作实际不会释放pagecache的物理内存,因为只有在MappedFile被销毁的时候才MappedFile.available才会被更新为false,只有在被更新为false情况下才会被释放
} else {
log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
}
}
// All dirty data has been committed to FileChannel.
if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {//如果提交位置committedPosition达到了文件末尾,则该文件数据已经全部被提交到了pagecache,那么堆外内存就可以释放了
this.transientStorePool.returnBuffer(writeBuffer);//归还使用的堆外内存到缓冲池
this.writeBuffer = null;
}
return this.committedPosition.get();//返回最新的committedPosition
}
/*
* 把this.wrotePosition - this.committedPosition之间的数据写入到pagecache,同时把this.committedPosition更新为this.wrotePosition
*/
//org.apache.rocketmq.store.MappedFile.commit0(int)
protected void commit0(final int commitLeastPages) {
int writePos = this.wrotePosition.get();
int lastCommittedPosition = this.committedPosition.get();
if (writePos - this.committedPosition.get() > 0) {
try {
ByteBuffer byteBuffer = writeBuffer.slice();
byteBuffer.position(lastCommittedPosition);
byteBuffer.limit(writePos);//堆外内存committedPosition->wrotePosition之间的数据需要提交到fileChannel
this.fileChannel.position(lastCommittedPosition);//更新fileChannel位置
this.fileChannel.write(byteBuffer);//写入到pagecache
this.committedPosition.set(writePos);//更新committedPosition为wrotePosition
} catch (Throwable e) {
log.error("Error occurred when commit data to FileChannel.", e);
}
}
}
至此broker启动和broker运行写入消息,消息刷盘已经写完,但是对于文章最开始的几个位置点,需要重新说明下,这几个位置点很容易导致迷惑。
4.2.5.MappedFileQueue与MappedFile的几个位置说明
MappedFileQueue.flushedWhere 刷盘位置
MappedFileQueue.committedWhere 数据提交位置
MappedFile.wrotePosition 写位置
MappedFile.committedPosition 提交位置
MappedFile.flushedPosition 刷盘位置
看着中文含义,感觉不到区别,先看图,此图前面已经贴过了,再贴一次
再强调下,MappedFile对应一个个具体的文件,比如commitlog物理文件,但是MappedFileQueue对应的是一组物理文件,它们的关系就如图所示,wrotePosition,committedPosition,flushedPosition都是相对于单个物理文件来说的,而flushedWhere,committedWhere是相对于这一组物理文件来说的,以commitlog文件为例,每个MappedFile的区间范围是[(N-1)10243~N*10243],其中N表示是第几个文件,而MappedFileQueue的区间范围是[0~N10243]。这一组物理文件也可以用commitlog/consumequeue来表达,因为它们也都包装了MappedFileQueue。比如图中的offset就是相对于MappedFileQueue来说的,根据offset就可以得出具体所处的单个物理文件MappedFile,计算方式如下(offset/10243) - (mappedFile1.fileFromOffset/1024^) 即为该offset所属的MappedFile在MappedFileQueue的索引位置。
说下这几个位置的变化:
producer发送消息到broker,broker收到消息吧消息写入到MappedFile,则更新MappedFile.wrotePosition+=msglen。
case1:在同步刷盘和异步刷盘(未开启transientStorePoolEnable=true情况),刷盘线程吧待刷新数据刷新到磁盘内保存,更新MappedFile.flushedPosition=MappedFile.wrotePosition,更新MappedFileQueue.flushedWhere=MappedFile.fileFromOffset+MappedFile.flushedPosition,该情况下MappedFile.committedPosition、MappedFileQueue.committedWhere是无意义的。
case2:在异步刷盘(开启transientStorePoolEnable=true情况),提交线程吧待提交数据从堆外内存提交到pagecache保存,更新MappedFile.committedPosition=MappedFile.wrotePosition,MappedFileQueue.committedWhere=MappedFile.fileFromOffset+MappedFile.committedPosition,提交线唤醒异步刷盘线程,刷盘线程更新MappedFile.flushedPosition=MappedFile.committedPosition,更新MappedFileQueue.flushedWhere=MappedFile.fileFromOffset+MappedFile.flushedPosition
broker启动过程中
在commitlog load中,每个MappedFile的wrotePosition,committedPosition,flushedPosition都被更新为各自的文件size,即文件末尾。这样话broker关闭再启动,之前MappedFile可能只写了一半数据,再次启动,该MappedFile就不会再被写入,转而新键MappedFile进行写入,这样做就避免了在刷盘时候漏刷消息到磁盘,虽然浪费了些空间。
在recover中,MappedFileQueue.flushedWhere、committedWhere被恢复为lastMappedFile.fileFromOffset+commitlog文件中最大的消息位置(即broker关闭之前的MappedFileQueue.flushedWhere)。
5.commitlog内消息转储到consumequeue、indexfile
转储是在服务线程ReputMessageService内进行,该服务线程随broker启动而启动,如下堆栈
该服务线程每次休眠1ms,接着继续执行转储,这里休眠1ms是避免cps空沦陷导致100%,具体执行逻辑在org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService.doReput()内,代码如图
private void doReput() {
for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
//省略不重要代码
// 获取从reputFromOffset%1024^3开始到最后一个MappedFile的wrotePotision的数据引用
SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
if (result != null) {
try {
this.reputFromOffset = result.getStartOffset();
//每读一轮,消息前4字节表示消息总长度,按消息存储结构读取,如果还有剩余的就继续读
for (int readSize = 0; readSize < result.getSize() && doNext; ) {
// 读取一条消息,把消息包装为DispatchRequest
DispatchRequest dispatchRequest =
DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
int size = dispatchRequest.getMsgSize();
if (dispatchRequest.isSuccess()) {
if (size > 0) {
// 成功读取Message,更新ConsumeQueue里的位置信息,更新IndexFile
DefaultMessageStore.this.doDispatch(dispatchRequest);
if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
&& DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {//如果是主broker且开启长轮询(broker默认开启长轮询),则通知阻塞的拉取线程进行拉取消息消费,这个是针对消费者消费但是暂时没有可消费的消息情况,即PULL_NOT_FOUND情况
DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
}
this.reputFromOffset += size;//更新ReputMessageService转储commitlog消息的位置
readSize += size;
//省略不重要代码
} else if (size == 0) {// 读取到commitlog文件尾,则把this.reputFromOffset更新为下个commitlog的文件名(即起始位置)
this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
readSize = result.getSize();
}
} else if (!dispatchRequest.isSuccess()) {
if (size > 0) {
log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
this.reputFromOffset += size;//读取到的消息size>0,但是消息非法,那么跳过该消息,读取下条消息进行转储
} else {//消费不正确且size==0,跳出转储流程
doNext = false;
if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}",
this.reputFromOffset);
this.reputFromOffset += result.getSize() - readSize;//在消息读取失败时候,把this.reputFromOffset更新为最初的this.reputFromOffset+result.getSize(),跳过消息非法的这部分commitlog数据,并且结束for循环读取消息进行转储
}
}
}
}//for end
} finally {
result.release();
}
} else {
doNext = false;
}
}//外层for end
}
该方法的基本思想就是每次循环从commitlog读取一条消息,然后该该消息转储到consumequeue、indexfile。
具体的转储是在
public void doDispatch(DispatchRequest req) {
for (CommitLogDispatcher dispatcher : this.dispatcherList) {// [CommitLogDispatcherCalcBitMap,CommitLogDispatcherBuildConsumeQueue,CommitLogDispatcherBuildIndex]
dispatcher.dispatch(req);
}
}
step1:CommitLogDispatcherCalcBitMap.dispatch(DispatchRequest),默认不开启enableCalcFilterBitMap,忽略。
step2:CommitLogDispatcherBuildConsumeQueue.dispatch(DispatchRequest),转储消息的offset到consumequeue。
step3:CommitLogDispatcherBuildIndex.dispatch(DispatchRequest)转储消息的index到indexfile,供查询。
5.1.转储消息offset到consumequeue,consumequeue刷盘
在org.apache.rocketmq.store.DefaultMessageStore.CommitLogDispatcherBuildConsumeQueue.dispatch(DispatchRequest)内执行,具体如下
public void dispatch(DispatchRequest request) {
final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
switch (tranType) {
case MessageSysFlag.TRANSACTION_NOT_TYPE: // 非事务消息
case MessageSysFlag.TRANSACTION_COMMIT_TYPE: // 事务消息COMMIT
// 非事务消息 或 事务提交消息 建立 消息位置信息 到 ConsumeQueue
DefaultMessageStore.this.putMessagePositionInfo(request);
break;
case MessageSysFlag.TRANSACTION_PREPARED_TYPE: // 事务消息PREPARED
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: // 事务消息ROLLBACK
break;
}
}
根据消息的sysflag来进行转储,如果是事务消息的prepared和rollback消息,则不进行转储,这也说明了为什么事务消息无法被消费者消费的原因,因为事务并没有保存到consumequeue,而消费者是从consumequeue进行消费的。
消息转储逻辑见代码
public void putMessagePositionInfoWrapper(DispatchRequest request) {
final int maxRetries = 30;
boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();//true
for (int i = 0; i < maxRetries && canWrite; i++) {//最大重试30次,重试间隔时间是1s
long tagsCode = request.getTagsCode();
//省略不重要代码
boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),
request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());//把消息写入到cq
if (result) {//消息offset转储到consumequeue成功
//消息offset转储到consumequeue成功,吧消息时间戳保存到存盘检测点logicsMsgTimestamp,供broker异常关闭启动时候恢复
this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
return;
} else {//消息offset转储到consumequeue失败,则暂停1s接着继续进行转储该消息
// XXX: warn and notify me
log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + request.getCommitLogOffset()
+ " failed, retry " + i + " times");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
log.warn("", e);
}
}
}
// XXX: warn and notify me
log.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId);
this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
}
/*
* 传入参数 offset是消息在整组commitlog文件中的位置,size即消息长度,tagsCode即消息的hashcode,cqOffset是消息在consumequeue的位置。
* offset和cqOffset区别,offset是消息在整组commitlog文件中的位置,cqOffset是整组consumequeue文件中的位置,即该topic下该queueID这个整组consumequeue文件中的位置,该值是消息保存到commitlog中从缓存获取的
* 功能就是把消息offset+size+hashcode追加到consumerqueue文件内,即追加到consumequeue文件的pagecache内
*/
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
final long cqOffset) {
// 如果已经转储过,直接返回成功
if (offset <= this.maxPhysicOffset) {
return true;
}
// 写入位置信息到byteBuffer
this.byteBufferIndex.flip();
this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
按照consumequeue格式8+4+8存储
this.byteBufferIndex.putLong(offset);
this.byteBufferIndex.putInt(size);
this.byteBufferIndex.putLong(tagsCode);
// 计算consumeQueue存储位置,并获得对应的MappedFile
final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);//获取expectLogicOffset位置对应的具体mappedfile,如果不存在,则创建
if (mappedFile != null) {
// 当是ConsumeQueue第一个MappedFile && 队列位置非第一个 && MappedFile未写入内容,则填充前置空白占位
if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
this.minLogicOffset = expectLogicOffset;
this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
this.fillPreBlank(mappedFile, expectLogicOffset);
log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
+ mappedFile.getWrotePosition());
}
if (cqOffset != 0) {
long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();//相对于该组consumequeue文件的位置
//expectLogicOffset正常情况应该是等于currentLogicOffset
if (expectLogicOffset < currentLogicOffset) {//会发生重复转储,因此不再进行转储
log.warn("Build consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
return true;
}
if (expectLogicOffset != currentLogicOffset) {
LOG_ERROR.warn(
"[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
expectLogicOffset,
currentLogicOffset,
this.topic,
this.queueId,
expectLogicOffset - currentLogicOffset
);
}
}
this.maxPhysicOffset = offset;//更新为消息在整组commitlog文件中的位置
return mappedFile.appendMessage(this.byteBufferIndex.array());//把消息offset+size+hashcode追加到consumerqueue文件内,即追加到pagecache
}
return false;
}
消息转储到consumeque和消息保存到commitlog类似,都是最终保存到pagecache,那么既然保存到了pagecache,自然就有刷新到磁盘的操作,在FlushConsumeQueueService服务线程内进行刷盘,最终也是调用org.apache.rocketmq.store.MappedFileQueue.flush(int)进行刷盘,这个和commitlog异步刷盘基本相同,前面commitlog刷盘明白了,这个自然也明白,consumequeue默认是待刷盘数据满足操作系统页2页才刷盘,但是如果超过最大时间,也会强制刷盘,同commitlog异步刷盘。
5.2.保存消息index到indexfile,indexfile刷盘
保存消息的index到indexfile的入口是CommitLogDispatcherBuildIndex.dispatch(DispatchRequest),前面也说过,indexfile对应一个具体的MappedFile,而IndexService是对应整组MappedFile(即整组indexfile)。
indexfile文件格式
indexfile文件size=40+4500w+202000w=420000040字节 约400M
添加消息索引的关键方法是putkey,看下面代码
/*
* 把keyhash 物理offset存储到indexfile上,即存储到MappedFile的pagecache上
*/
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
if (this.indexHeader.getIndexCount() < this.indexNum) {//已使用的index数目<2000w,则写入,否则说明indexfile被写满了
int keyHash = indexKeyHashMethod(key);//获取key的hashcode
int slotPos = keyHash % this.hashSlotNum;//取模获取该key要落于哪个slot上
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;//计算该索引在indexfile内的slot绝对位置
FileLock fileLock = null;
try {
// fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize,
// false);
//更新indexfile是ReputMessageService单线程操作,无需加锁
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);//获取该key在indexfile绝对位置上的slot值
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
slotValue = invalidIndex;
}
long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();//消息在commitlog存储的时间戳与indexfile第一条消息索引时间戳之间的时间差
timeDiff = timeDiff / 1000;
if (this.indexHeader.getBeginTimestamp() <= 0) {//说明该条消息前没有在该indexfile存储索引,因此该条消息即是该indexfile的第一条索引记录
timeDiff = 0;
} else if (timeDiff > Integer.MAX_VALUE) {
timeDiff = Integer.MAX_VALUE;
} else if (timeDiff < 0) {//说明broker异常关闭启动后新消息的时间戳小于该indexfile的第一条消息时间戳,可能重复创建索引
timeDiff = 0;
}
int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ this.indexHeader.getIndexCount() * indexSize;//计算该索引要存放在indexfile上索引区域的绝对位置
//按照索引格式更新索引位置 20字节
this.mappedByteBuffer.putInt(absIndexPos, keyHash);//存储hashcode
this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);//接着存储消息在commitlog的物理偏移量
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);//消息的落盘时间与header里的beginTimestamp的差值(为了节省存储空间,如果直接存message的落盘时间就得8bytes)
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);//如果没有hash冲突,slotValue为0,如果hash冲突(即多个index的hash落在同一个slot内),
//更新slot位置 4字节。这个是难理解地方,也是核心地方,slot上存放的是当前索引的个数,那么进行查找的时候根据keyhash定位到slot后,获取了slotValue,该值就是索引数量,那么可以根据该值获取该key对应的索引在indexfile上索引的真正位置
this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());//代码@1
if (this.indexHeader.getIndexCount() <= 1) {
this.indexHeader.setBeginPhyOffset(phyOffset);
this.indexHeader.setBeginTimestamp(storeTimestamp);
}
this.indexHeader.incHashSlotCount();//已使用的hashslot数量+1,这个没具体什么意义
this.indexHeader.incIndexCount();//已经使用的index数量+1,用于判断indexfile是否被写满
this.indexHeader.setEndPhyOffset(phyOffset);//更新indexheader的结束物理位置为最新的消息的物理位置,每次刷新一条索引,都会更新
this.indexHeader.setEndTimestamp(storeTimestamp);//更新indexheader的结束时间戳为最新的消息的存储时间戳,每次刷新一条索引,都会更新
return true;
} catch (Exception e) {
log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
} finally {
if (fileLock != null) {//fileLock为null,不进入
try {
fileLock.release();
} catch (IOException e) {
log.error("Failed to release the lock", e);
}
}
}
} else {
log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
+ "; index max num = " + this.indexNum);
}
return false;
}
本质就是把消息keyhash、消息在commitlog的物理偏移量、时间戳存放到MappedFile的pagecache上,难点是代码@1处,该处代码是核心,解释如下:
代码@1处是把索引区域的slotvalue更新为该keyhash落于的hash槽的值,这个理解麻烦些,更新为这个有什么用呢?既然是通过hash来选择落于哪个slot内,那么必然存在hash冲突,更新为该值为对应slot上存放的是当前索引的个数,那么进行查找的时候根据keyhash定位到slot后,获取了slotValue,该值就是索引数量indexCount,那么可以根据indexCount该值获取该key对应的索引在indexfile上索引的真正位置,公式就是absIndexPos=40+4500w+indexCount20,那么在进行查询的时候,先根据key得到slotvalue即indexCount,继而计算出absIndexPos,继而获取absIndexPos的keyhash和查询的keyHash比较,相同则匹配,接着获取该索引位置的前一个preslotvalue,即preindexCount进行遍历,获取keyhash等同查询keyHash的值,这样看起来就像个链表结构,这个设计也很巧妙,使用索引数量连接冲突的hashkey。
具体根据消息key查询是org.apache.rocketmq.store.index.IndexFile.selectPhyOffset(List
那么有没有查询时候slotvalue为0的情况,当然有,为0说明该key不存在或者暂时没有被ReputMessageService服务线程刷新消息的索引到indexfile文件,这样情况就返回结果是null了。
org.apache.rocketmq.store.index.IndexFile.selectPhyOffset(List
/*
* 根据topic+key+开始时间戳+结束时间戳+最大查询条数来查询消息,查询到的消息phyOffset保存到集合phyOffsets
*/
public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,
final long begin, final long end, boolean lock) {
if (this.mappedFile.hold()) {
int keyHash = indexKeyHashMethod(key);
int slotPos = keyHash % this.hashSlotNum;
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;//获取待查询的key所归属的slot绝对位置
//省略不重要代码
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()
|| this.indexHeader.getIndexCount() <= 1) {
//为slotValue==0说明该key不存在或者暂时没有被ReputMessageService服务线程刷新消息的索引到indexfile文件,这样情况就返回结果是null了。
} else {
for (int nextIndexToRead = slotValue; ; ) {
if (phyOffsets.size() >= maxNum) {
break;
}
int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ nextIndexToRead * indexSize;
int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);//获取keyhash
long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);//获取消息在commitlog上的物理偏移量
long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);//时间戳
int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);//即前preIndexCount
if (timeDiff < 0) {
break;
}
timeDiff *= 1000L;
long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
boolean timeMatched = (timeRead >= begin) && (timeRead <= end);
if (keyHash == keyHashRead && timeMatched) {//hash相同且时间匹配,吧该消息在commitlog上的物理偏移量保存到phyOffsets集合
phyOffsets.add(phyOffsetRead);
}
if (prevIndexRead <= invalidIndex
|| prevIndexRead > this.indexHeader.getIndexCount()
|| prevIndexRead == nextIndexToRead || timeRead < begin) {//preSlotvalue为0退出循环
break;
}
//prevIndexRead!=0则说明该slot位置有hash冲突,需要继续根据prevIndexRead进行查询
nextIndexToRead = prevIndexRead;
}
}
//省略不重要代码
}
}
在ReputMessageService服务线程中把消息key和offset保存到indexfile,即保存到MappedFile的pagecache属性,但是并没有保存到磁盘,具体是在哪里保存到磁盘呢?indexfile刷盘和commitlog、consumequeue刷盘是不同的,broker并没有针对indexfile刷盘起一个线程服务用于刷盘indexfile,为什么呢?因为一个indexfile可以保存2000w个消息索引,不像commitlog、consumequeue需要频繁刷盘保存消息到磁盘防止系统崩溃丢失。indexfile的刷盘最终操作是org.apache.rocketmq.store.index.IndexFile.flush(),该操作的调用堆栈如图
而org.apache.rocketmq.store.index.IndexService.getAndCreateLastIndexFile()的调用堆栈如图
原来是ReputMessageService线程转储消息offset、key到indexfile的时候,如果当前indexfile满了,则新建一个indexfile,把写满的indexfile进行刷盘保存。这样做也有风险,如果服务器宕机了,那么当前indexfile的索引就全部丢失了,单身indexfile只是用于查询,不是重要的消息数据,丢失可以容忍。实际上查询消息在indexfile丢失情况下,可以直接通过grep "消息hex字符串" commitlog文件 这样方法可以搜索到的。
6.结语
至此吧broker启动、broker接收producer消息到消息存储和刷盘。commitlog、consumequeue、indexfile与MappedFileQueue、MappedFile的关系说明白了,平时自己都是记录笔记,没有写博客习惯,以后养成。第一次写博客,写的太长了,博客应该短些好。
参考:丁威 《rocketmq技术内幕》,阅读源码参考这个书效果很好。
最后贴个大图,记录下broker启动的流程图
RocketMQ源码分析 broker启动,commitlog、consumequeue、indexfile、MappedFileQueue、MappedFile之间的关系以及位置说明