问题
- index 文件有什么作用,结构又是如何
概述
index 文件主要是为了 message key 服务的,rocketmq 发送消息的时候可以带上 key , messge key 是为了标识某个消息的一个标志。
思考
我们思考一下,message key 是由用户生成的,我们需要尽可能地保证散列保存,这样当我们就可以快速地拿出来了。那么通常的作法就是利用哈希散列,当然最重要的是如何解决冲突。我们下面看一下rocketmq 是如何实现的。
总体思路
下面两张图片来自参考文章。侵删(作者的文章写得真的好)
我们从这里可以看到index文件分为三部分,头,散列值,索引文件。其中散列值会一一对应索引文件中的一个值,该值就是储存该message信息的。
可以看到假如有冲突(即找到散列值那个位置的时候已经有一个对应的索引位了),那么索引位就存放在新的索引位的“上一个索引位”的属性里,这里就形成一条单链表。
源码分析
index文件和其他consumerQueue文件的思路是一样的,同样是利用持久化在文件中,然后通过mappedFile 文件加载到内存中,开启一个服务,当发消息对消息进行持久化的时候,将消息的key持久化在index文件。
写入
DefaultMessageStore$$CommitLogDispatcherBuildIndex内部类
class CommitLogDispatcherBuildIndex implements CommitLogDispatcher { @Override public void dispatch(DispatchRequest request) { if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) { DefaultMessageStore.this.indexService.buildIndex(request); } } }
该内部类就是当接受到消息对index进行记录的分发器,可以看到最主要的还是利用了indexService,我们来看一下indexService到底执行了什么操作。
从方法名我们对index文件的加载,会刷,获取,写入等。我们看一下 buildIndex 方法
IndexFile&putKey方法
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) { if (this.indexHeader.getIndexCount() < this.indexNum) { //获取hashCode 后取模 int keyHash = indexKeyHashMethod(key); int slotPos = keyHash % this.hashSlotNum; //计算绝对位置 int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize; FileLock fileLock = null; try { // fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize, // false); int slotValue = this.mappedByteBuffer.getInt(absSlotPos); if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) { slotValue = invalidIndex; } long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp(); timeDiff = timeDiff / 1000; if (this.indexHeader.getBeginTimestamp() <= 0) { timeDiff = 0; } else if (timeDiff > Integer.MAX_VALUE) { timeDiff = Integer.MAX_VALUE; } else if (timeDiff < 0) { timeDiff = 0; } //为什么要把 hashSlot 那部分的总长也加进来呢? //因为这个pos是相对于 mapfile的位移量进行获取的 int absIndexPos = IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize + this.indexHeader.getIndexCount() * indexSize; this.mappedByteBuffer.putInt(absIndexPos, keyHash); this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset); this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff); //先保存上一个槽位的值 this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue); //然后覆盖掉旧的 this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount()); if (this.indexHeader.getIndexCount() <= 1) { this.indexHeader.setBeginPhyOffset(phyOffset); this.indexHeader.setBeginTimestamp(storeTimestamp); } //更新index属性 this.indexHeader.incHashSlotCount(); this.indexHeader.incIndexCount(); this.indexHeader.setEndPhyOffset(phyOffset); this.indexHeader.setEndTimestamp(storeTimestamp); return true; } catch (Exception e) { log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e); } finally { if (fileLock != null) { try { fileLock.release(); } catch (IOException e) { log.error("Failed to release the lock", e); } } } } else { log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount() + "; index max num = " + this.indexNum); } return false; } //直接取 hashCode,并返回正数 public int indexKeyHashMethod(final String key) { int keyHash = key.hashCode(); int keyHashPositive = Math.abs(keyHash); if (keyHashPositive < 0) keyHashPositive = 0; return keyHashPositive; }
读取的方法
读取方法的调用链如下 : DefaultMessageStore&queryMessage -> indexService&queryOffset -> IndexFile&selectPhyOffset
DefaultMessageStore&queryMessage 方法
@Override public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin, long end) { QueryMessageResult queryMessageResult = new QueryMessageResult(); long lastQueryMsgTime = end; //重试 for (int i = 0; i < 3; i++) { //查找到返回结果 QueryOffsetResult queryOffsetResult = this.indexService.queryOffset(topic, key, maxNum, begin, lastQueryMsgTime); if (queryOffsetResult.getPhyOffsets().isEmpty()) { break; } //封装返回结果 Collections.sort(queryOffsetResult.getPhyOffsets()); queryMessageResult.setIndexLastUpdatePhyoffset(queryOffsetResult.getIndexLastUpdatePhyoffset()); queryMessageResult.setIndexLastUpdateTimestamp(queryOffsetResult.getIndexLastUpdateTimestamp()); for (int m = 0; m < queryOffsetResult.getPhyOffsets().size(); m++) { long offset = queryOffsetResult.getPhyOffsets().get(m); try { boolean match = true; MessageExt msg = this.lookMessageByOffset(offset); if (0 == m) { lastQueryMsgTime = msg.getStoreTimestamp(); } // String[] keyArray = msg.getKeys().split(MessageConst.KEY_SEPARATOR); // if (topic.equals(msg.getTopic())) { // for (String k : keyArray) { // if (k.equals(key)) { // match = true; // break; // } // } // } if (match) { SelectMappedBufferResult result = this.commitLog.getData(offset, false); if (result != null) { int size = result.getByteBuffer().getInt(0); result.getByteBuffer().limit(size); result.setSize(size); queryMessageResult.addMessage(result); } } else { log.warn("queryMessage hash duplicate, {} {}", topic, key); } } catch (Exception e) { log.error("queryMessage exception", e); } } if (queryMessageResult.getBufferTotalSize() > 0) { break; } if (lastQueryMsgTime < begin) { break; } } return queryMessageResult; }
indexService&queryOffset 逻辑很好懂,定位文件,解决冲突的链表进行查找
public QueryOffsetResult queryOffset(String topic, String key, int maxNum, long begin, long end) { List<Long> phyOffsets = new ArrayList<Long>(maxNum); long indexLastUpdateTimestamp = 0; long indexLastUpdatePhyoffset = 0; maxNum = Math.min(maxNum, this.defaultMessageStore.getMessageStoreConfig().getMaxMsgsNumBatch()); try { this.readWriteLock.readLock().lock(); if (!this.indexFileList.isEmpty()) { for (int i = this.indexFileList.size(); i > 0; i--) { IndexFile f = this.indexFileList.get(i - 1); boolean lastFile = i == this.indexFileList.size(); if (lastFile) { indexLastUpdateTimestamp = f.getEndTimestamp(); indexLastUpdatePhyoffset = f.getEndPhyOffset(); } if (f.isTimeMatched(begin, end)) { f.selectPhyOffset(phyOffsets, buildKey(topic, key), maxNum, begin, end, lastFile); } if (f.getBeginTimestamp() < begin) { break; } if (phyOffsets.size() >= maxNum) { break; } } } } catch (Exception e) { log.error("queryMsg exception", e); } finally { this.readWriteLock.readLock().unlock(); } return new QueryOffsetResult(phyOffsets, indexLastUpdateTimestamp, indexLastUpdatePhyoffset); }
总结
参考资料
- https://www.kunzhao.org/blog/2018/04/08/rocketmq-message-index-flow/