this.dispatcherList = new LinkedList<>();
this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());
doDispatch()会遍历CommitLogDispatcher,调用它们的dispatch()方法。其中专门用来通知ConsumeQueue的Dispatcher是CommitLogDispatcherBuildConsumeQueue。
class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {
@Override
public void dispatch(DispatchRequest request) {
final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
switch (tranType) {
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
DefaultMessageStore.this.putMessagePositionInfo(request);
break;
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
}
}
}
当ReputMessageService调用了CommitLogDispatcherBuildConsumeQueue的dispatch()后,CommitLogDispatcherBuildConsumeQueue便会调用 DefaultMessageStore.this.putMessagePositionInfo(request):
public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
cq.putMessagePositionInfoWrapper(dispatchRequest);
}
putMessagePositionInfo()逻辑有两步:
1:调用findConsumeQueue(),根据消息的topic以及消息所属的ConsumeQueueId,找到对应的ConsumeQueue。
findConsumeQueue()会先从consumeQueueTable中查询topic的ConsumeQueueMap,如果未找到,便会为Topic创建一个新的ConcurrentMap<Integer/* queueId */, ConsumeQueue>,存放到表中。
接着在从Topic的ConcurrentMap中,根据QueueId,查询ConsumeQueue,如果未找到,便也会创建一个新的ConsumeQueue,存放到Map中。ConsumeQueue便是此时被创建的。
2:当找到消息对应的ConsumeQueue后,便调用ConsumeQueue的putMessagePositionInfoWrapper()方法,更新ConsumeQueue。
ConsumeQueue的更新
上面主要讲了ReputMessageService是如何通知ConsumeQueue的,现在我们就要看看ConsumeQueue收到通知后是如何更新的,更新逻辑就在putMessagePositionInfoWrapper()中。
putMessagePositionInfoWrapper()中调用了putMessagePositionInfo(),并引入了重试机制。
我们来看看putMessagePositionInfo()中的主要逻辑:
1:判断消息是否已经被处理过
if (offset <= this.maxPhysicOffset) {
return true;
}
maxPhysicOffset记录了上一次ConsumeQueue更新的消息在CommitLog中的偏移量,如果本次消息偏移量小于maxPhysicOffset,则表明消息已经被更新过,直接返回。