简述
本文章针对rocketMq 开源版本4.8.0进行分析;rocketMq 流程简述如下:
nameserv:看作注册中心,broker 启动要注册到nameserv中,同时要定时向nameserv发送心跳来告诉nameserv它还活着,然后nameserv除了维护broker信息,还要维护topic的信息,比如一个topic 消息发送到哪几个broker上,然后topic 分为几个message queue等等,现在我们只需要简单知道这个nameserv 维护这broker信息与topic 信息。
broker:一个实例,存储消息队列等
producer:其发送消息分三种模式:同步发送,异步,单向发送,单向就是发送之后就不管结果了,从消息生产角度出发的;
consumer:消息消费与确认消费,消费方式通过queue_offerset 去consumeQueue查找physical_offerset,并通过physical_offerset查到对应消息内容
它们的连接方式通过netty 连接
消息存储:
broker 主节点接收消息后优先存储到commitLog,然后返回消息存储情况;每个broker 节点通过定时任务生成consumeQueue,BuildIndex ;其中consumeQueue用于消费者使用,存储的核心数据为physical_offset;BuildIndex 为消息建立关键字索引
消息存储分三大块讲解:
commitLog ----本文章讲解
commitLog 流程概述
broker收到消息后交给commitLog来存储,commitLog由让干个MappedFile组成,在物理上对应为ROCKET_HOME/commitlog/00000000000000000000,
每个文件默认大小是1G,文件的起名为起始offset加上每个文件的大小,比如第二个文件名为00000000001073741824,文件的读写采用内存映射技术(MMP),写采用追加写的模式,消息的offset其实就是每个消息在整个commintLog起始位置;
源码分析
broker在启动的时候,会将某个code对应的processor注册到server上,不同类型的消息交给不同的处理器去处理,比如说SEND_MESSAGE 这个code的消息就会交给SendMessageProcessor处理,会调用对应processor的 processRequest 方法来处理,我们消息生产者发送消息的时候,就是使用的SEND_MESSAGE 这个code ,接下来我们看下SendMessageProcessor 的processRequest 方法;其中其调用链为
BrokerStartup.start() -》 BrokerController.start() -》 NettyRemotingServer.start() -》 NettyRemotingServer.prepareSharableHandlers() -》 new NettyServerHandler() -》 NettyRemotingAbstract.processMessageReceived() -》 NettyRemotingAbstract.processRequestCommand() -》 SendMessageProcessor.processRequest()
下面核心对SendMessageProcessor源码分析;
根据code 进入红线部分
过程中有一些不重点介绍,根据源码我们会走到DefaultMessageStore#putMessage
接下来我们来看putMessage 我们这次讲的是普通消息存储,就自动跳过事务代码逻辑部分
// 获取最新的一个mappFile
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
// 获取锁,保持消息的顺序性
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
try {
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
// Here settings are stored timestamp, in order to ensure an orderly
// global
msg.setStoreTimestamp(beginLockTimestamp);
// 重新创建一个文件
if (null == mappedFile || mappedFile.isFull()) {
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
if (null == mappedFile) {
log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
}
//向mappedFile 追加消息
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
switch (result.getStatus()) {
case PUT_OK:
break;
// 当剩余空间不能存储消息的时候,会把之前消息补全,并设置为空,返回此状态,重新创建新的mappeFile ,在追加消息
case END_OF_FILE:
unlockMappedFile = mappedFile;
// Create a new file, re-write the message
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
代码里有我的注释,下面重点介绍消息的追加
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
assert messageExt != null;
assert cb != null;
//获取写的位置
int currentPos = this.wrotePosition.get();
if (currentPos < this.fileSize) {
//获取新的字节缓存区
ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
// 重新设置新的位置
byteBuffer.position(currentPos);
AppendMessageResult result;
if (messageExt instanceof MessageExtBrokerInner) {
// 追加消息
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
} else if (messageExt instanceof MessageExtBatch) {
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
} else {
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
// 更新当前mappedFile 开始位置
this.wrotePosition.addAndGet(result.getWroteBytes());
this.storeTimestamp = result.getStoreTimestamp();
return result;
}
doAppend 内容太多,我们只看核心的部分
// Initialization of storage space
this.resetByteBuffer(msgStoreItemMemory, msgLen);
// 1 TOTALSIZE 消息大小
this.msgStoreItemMemory.putInt(msgLen);
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
// 3 BODYCRC
this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
// 4 QUEUEID
this.msgStoreItemMemory.putInt(msgInner.getQueueId());
// 5 FLAG
this.msgStoreItemMemory.putInt(msgInner.getFlag());
// 6 QUEUEOFFSET
this.msgStoreItemMemory.putLong(queueOffset);
// 7 PHYSICALOFFSET
this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
// 8 SYSFLAG
this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
// 9 BORNTIMESTAMP
this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
// 10 BORNHOST
this.resetByteBuffer(bornHostHolder, bornHostLength);
this.msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder));
// 11 STORETIMESTAMP
this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
// 12 STOREHOSTADDRESS
this.resetByteBuffer(storeHostHolder, storeHostLength);
this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder));
// 13 RECONSUMETIMES
this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
// 14 Prepared Transaction Offset
this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
// 15 BODY
this.msgStoreItemMemory.putInt(bodyLength);
if (bodyLength > 0)
this.msgStoreItemMemory.put(msgInner.getBody());
// 16 TOPIC
this.msgStoreItemMemory.put((byte) topicLength);
this.msgStoreItemMemory.put(topicData);
// 17 PROPERTIES
this.msgStoreItemMemory.putShort((short) propertiesLength);
if (propertiesLength > 0)
this.msgStoreItemMemory.put(propertiesData);
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
// Write messages to the queue buffer
byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);
AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,
msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
上面这段代码很重要,为消息的关键属性如下
wroteOffset:这个就是physicsOffset很关键,以后查找消息的关键属性,定位了消息的物理位置
消息的长度:消息内容
queueoffset :队列的偏移量,我们也可以叫为逻辑offset 这个是根据队列id 原子增长的
由于消息内容是肯定的,我们通过physicsOffset 可以找到各个属性的值,包含消息内容;
通过physicsOffset定位消息的开始位置,就可以取到所需数据,针对消息内容各属性是变化的,都会在之前把内容的大小存储起来,这样就可以截取到完整的消息内容了;
最后就是根据配置同步或者异步刷盘
追加成功后解锁,刷盘,根据配置是否同步刷盘,因为这个时候我们存储还在内存,避免数据消失我们可以配置同步刷盘;
总结
commitlog是一个逻辑上的大文件,下面有很多MappedFile,每个消息都有一个physicsOffset(物理offset),后面不管是构建索引还是ConsumeQueue,存储的都是这个commitlog的offset;
有了physicsOffset 就能很快定位到具体消息开始位置,根据消息内容设置规律原则,就可以拿到消息体各个属性内容,主要有queueId,queue offset ,physicsOffset,消息内容,