【mq读书笔记】客户端处理消息

看一下客户端收到消息后的处理:

MQClientAPIImpl#processPullResponse

private PullResult processPullResponse(
        final RemotingCommand response) throws MQBrokerException, RemotingCommandException {
        PullStatus pullStatus = PullStatus.NO_NEW_MSG;
        switch (response.getCode()) {
            case ResponseCode.SUCCESS:
                pullStatus = PullStatus.FOUND;
                break;
            case ResponseCode.PULL_NOT_FOUND:
                pullStatus = PullStatus.NO_NEW_MSG;
                break;
            case ResponseCode.PULL_RETRY_IMMEDIATELY:
                pullStatus = PullStatus.NO_MATCHED_MSG;
                break;
            case ResponseCode.PULL_OFFSET_MOVED:
                pullStatus = PullStatus.OFFSET_ILLEGAL;
                break;

            default:
                throw new MQBrokerException(response.getCode(), response.getRemark());
        }

        PullMessageResponseHeader responseHeader =
            (PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class);

        return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(),
            responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody());
    }

得到PullResult对象后,MQClientAPIImpl#pullMessageAsyncm,回调pullCallback.onSuccess():

 private void pullMessageAsync(
        final String addr,
        final RemotingCommand request,
        final long timeoutMillis,
        final PullCallback pullCallback
    ) throws RemotingException, InterruptedException {
        this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
            @Override
            public void operationComplete(ResponseFuture responseFuture) {
                RemotingCommand response = responseFuture.getResponseCommand();
                if (response != null) {
                    try {
                        PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response);
                        assert pullResult != null;
                        pullCallback.onSuccess(pullResult);
                    } catch (Exception e) {
                        pullCallback.onException(e);
                    }
                } else {
                    if (!responseFuture.isSendRequestOK()) {
                        pullCallback.onException(new MQClientException("send request failed to " + addr + ". Request: " + request, responseFuture.getCause()));
                    } else if (responseFuture.isTimeout()) {
                        pullCallback.onException(new MQClientException("wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request,
                            responseFuture.getCause()));
                    } else {
                        pullCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause()));
                    }
                }
            }
        });
    }

PullCallback#onSuccess:

 

【mq读书笔记】客户端处理消息

 

 

调用pullAPIWrapper的processPullResult将消息字节属组解码成消息列表填充msgFoundList,并对消息进行消息过滤(TAG模式)。

public class PullResult {
   private final PullStatus pullStatus;//拉取结果 private final long nextBeginOffset;  //下次拉取偏移量
private final long minOffset;  //消息队列最小偏移量 private final long maxOffset;  //消息队列最大偏移量 private List<MessageExt> msgFoundList;//具体拉取的消息列表

 

回到onSuccess

【mq读书笔记】客户端处理消息

 

 

可以看到这里如果消息列表为空 马上进行了重试,为什么PullStatus.FOUND,msgFoundList还会为空呢?因为在RocketMQ根据TAG消息过滤,在服务端只是验证了TAG的hashcode,在客户端再次对消息进行过滤,故可能出现msgFoundList为空的情况。

 

else:

【mq读书笔记】客户端处理消息

 

 

将拉取的消息提交到ConsumeMessageService中公消费者消费(异步)。

 

 

【mq读书笔记】客户端处理消息

 

 

根据pullInterval参数,等待pullInterval毫秒后将PullRequest对象放入到PullMessageService的pullRequestQueue中,该消息队列的下次拉取即将被激活,达到持续消息拉取,实现准实时消息拉取的效果。

异常码处理:

 

【mq读书笔记】客户端处理消息

 

 

NO_NEW_MSG和NO_MATCHED_MSG会直接使用服务端矫正后的偏移量进行一次重试。

OFFSET_ILLEGAL:

case OFFSET_ILLEGAL:
                            log.warn("the pull request offset illegal, {} {}",
                                pullRequest.toString(), pullResult.toString());
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());

                            pullRequest.getProcessQueue().setDropped(true);//丢弃该消费队列,ProcessQueue中拉取的消息将停止消费
                            DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {//更新消息的消费进度,并持久化,并将该消息队列从rebalanceImpl
                    
try {
DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
pullRequest.getNextOffset(), false);

DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());

DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());

log.warn("fix the pull request offset, {}", pullRequest);
} catch (Throwable e) {
log.error("executeTaskLater Exception", e);
}
}
}, 10000);
break;
 
 

 

上一篇:RocketMQ源码之 consumer是怎样消费消息的


下一篇:java版时间战士