看一下客户端收到消息后的处理:
MQClientAPIImpl#processPullResponse
private PullResult processPullResponse( final RemotingCommand response) throws MQBrokerException, RemotingCommandException { PullStatus pullStatus = PullStatus.NO_NEW_MSG; switch (response.getCode()) { case ResponseCode.SUCCESS: pullStatus = PullStatus.FOUND; break; case ResponseCode.PULL_NOT_FOUND: pullStatus = PullStatus.NO_NEW_MSG; break; case ResponseCode.PULL_RETRY_IMMEDIATELY: pullStatus = PullStatus.NO_MATCHED_MSG; break; case ResponseCode.PULL_OFFSET_MOVED: pullStatus = PullStatus.OFFSET_ILLEGAL; break; default: throw new MQBrokerException(response.getCode(), response.getRemark()); } PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class); return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(), responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody()); }
得到PullResult对象后,MQClientAPIImpl#pullMessageAsyncm,回调pullCallback.onSuccess():
private void pullMessageAsync( final String addr, final RemotingCommand request, final long timeoutMillis, final PullCallback pullCallback ) throws RemotingException, InterruptedException { this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() { @Override public void operationComplete(ResponseFuture responseFuture) { RemotingCommand response = responseFuture.getResponseCommand(); if (response != null) { try { PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response); assert pullResult != null; pullCallback.onSuccess(pullResult); } catch (Exception e) { pullCallback.onException(e); } } else { if (!responseFuture.isSendRequestOK()) { pullCallback.onException(new MQClientException("send request failed to " + addr + ". Request: " + request, responseFuture.getCause())); } else if (responseFuture.isTimeout()) { pullCallback.onException(new MQClientException("wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request, responseFuture.getCause())); } else { pullCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause())); } } } }); }
PullCallback#onSuccess:
调用pullAPIWrapper的processPullResult将消息字节属组解码成消息列表填充msgFoundList,并对消息进行消息过滤(TAG模式)。
public class PullResult {
private final PullStatus pullStatus;//拉取结果 private final long nextBeginOffset; //下次拉取偏移量
private final long minOffset; //消息队列最小偏移量 private final long maxOffset; //消息队列最大偏移量 private List<MessageExt> msgFoundList;//具体拉取的消息列表
回到onSuccess
可以看到这里如果消息列表为空 马上进行了重试,为什么PullStatus.FOUND,msgFoundList还会为空呢?因为在RocketMQ根据TAG消息过滤,在服务端只是验证了TAG的hashcode,在客户端再次对消息进行过滤,故可能出现msgFoundList为空的情况。
else:
将拉取的消息提交到ConsumeMessageService中公消费者消费(异步)。
根据pullInterval参数,等待pullInterval毫秒后将PullRequest对象放入到PullMessageService的pullRequestQueue中,该消息队列的下次拉取即将被激活,达到持续消息拉取,实现准实时消息拉取的效果。
异常码处理:
NO_NEW_MSG和NO_MATCHED_MSG会直接使用服务端矫正后的偏移量进行一次重试。
OFFSET_ILLEGAL:
case OFFSET_ILLEGAL: log.warn("the pull request offset illegal, {} {}", pullRequest.toString(), pullResult.toString()); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); pullRequest.getProcessQueue().setDropped(true);//丢弃该消费队列,ProcessQueue中拉取的消息将停止消费 DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {//更新消息的消费进度,并持久化,并将该消息队列从rebalanceImpl
try {
DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
pullRequest.getNextOffset(), false);
DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
log.warn("fix the pull request offset, {}", pullRequest);
} catch (Throwable e) {
log.error("executeTaskLater Exception", e);
}
}
}, 10000);
break;