CommunicationMode
public enum CommunicationMode {
SYNC, // 同步
ASYNC, // 异步
ONEWAY, // 发出去就不管了
}
重试次数
# 同步默认 = 1+ 2 =3 , # 非同步(异步、OneWay) = 1 (总共就1次,其实就是没有重试)
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
发送容错
发送容错
broker
SendMessageProcessor
实现接口
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
}
processRequest
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
RemotingCommand response = null;
try {
response = asyncProcessRequest(ctx, request).get(); // Future.get()
} catch (InterruptedException | ExecutionException e) {
log.error("process SendMessage error, request : " + request.toString(), e);
}
return response;
}
asyncProcessRequest
public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final SendMessageContext mqtraceContext;
switch (request.getCode()) {
case RequestCode.CONSUMER_SEND_MSG_BACK:
return this.asyncConsumerSendMsgBack(ctx, request);
default:
SendMessageRequestHeader requestHeader = parseRequestHeader(request);
if (requestHeader == null) {
return CompletableFuture.completedFuture(null);
}
mqtraceContext = buildMsgContext(ctx, requestHeader);
this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
if (requestHeader.isBatch()) { // 批量消息
return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);
} else { // 非批量消息
return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);
}
}
}