在sendDefaultImpl方法里面:
for (; times < timesTotal; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { mq = mqSelected; brokersSent[times] = mq.getBrokerName(); try { beginTimestampPrev = System.currentTimeMillis(); long costTime = beginTimestampPrev - beginTimestampFirst; if (timeout < costTime) { callTimeout = true; break; } sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); switch (communicationMode) { case ASYNC: return null; case ONEWAY: return null; case SYNC: if (sendResult.getSendStatus() != SendStatus.SEND_OK) { if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { continue; } } return sendResult; default: break; } } catch (RemotingException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; continue; } catch (MQClientException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; continue;
如果是同步的话,必须发送成功,还必须返回是ok才认为成功,否则继续重试。
如果是oneway、异步的话,直接return null出去了,那么异步如何做重试?
上面代码最核心的发送函数就是sendKernelImpl
SendResult sendResult = null; switch (communicationMode) { case ASYNC: Message tmpMessage = msg; if (msgBodyCompressed) { //If msg body was compressed, msgbody should be reset using prevBody. //Clone new message using commpressed message body and recover origin massage. //Fix bug:https://github.com/apache/rocketmq-externals/issues/66 tmpMessage = MessageAccessor.cloneMessage(msg); msg.setBody(prevBody); } long costTimeAsync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeAsync) { throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); } sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), tmpMessage, requestHeader, timeout - costTimeAsync, communicationMode, sendCallback, topicPublishInfo, this.mQClientFactory, this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), context, this); break; case ONEWAY: case SYNC: long costTimeSync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeSync) { throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); } sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), msg, requestHeader, timeout - costTimeSync, communicationMode, context, this); break; default: assert false; break; }
可以看到,对于异步模式和非异步模式,sendMessage传入的方法是不一样的,异步模式下,带入了一个this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed()
这里先不说异步重试。rocketmq对于同一个时间发送one-way和异步消息的线程个数是有约束的,因为不像同步那样进行阻塞,防止发送太多占用太多资源。
具体的约束是通过信号量实现的:
public NettyRemotingAbstract(final int permitsOneway, final int permitsAsync) {
this.semaphoreOneway = new Semaphore(permitsOneway, true);
this.semaphoreAsync = new Semaphore(permitsAsync, true);
}
对于oneway来说,this.semaphoreOneway.tryAcquire成功,意味着线程竞争资源拿到,走到writeandFlush以后,只要没有异常,那么就认为消息发送完毕,不需要再check任何内容。
public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { request.markOnewayRPC(); boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); if (acquired) { final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway); try { channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { once.release(); if (!f.isSuccess()) { log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed."); } } }); } catch (Exception e) { once.release(); log.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed."); throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e); } } else { if (timeoutMillis <= 0) { throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast"); } else { String info = String.format( "invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", timeoutMillis, this.semaphoreOneway.getQueueLength(), this.semaphoreOneway.availablePermits() ); log.warn(info); throw new RemotingTimeoutException(info); } } }
对于同步调用来说:
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException { final int opaque = request.getOpaque(); try { final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null); this.responseTable.put(opaque, responseFuture); final SocketAddress addr = channel.remoteAddress(); channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { responseFuture.setSendRequestOK(true); return; } else { responseFuture.setSendRequestOK(false); } responseTable.remove(opaque); responseFuture.setCause(f.cause()); responseFuture.putResponse(null); log.warn("send a request command to channel <" + addr + "> failed."); } }); RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis); if (null == responseCommand) { if (responseFuture.isSendRequestOK()) { throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis, responseFuture.getCause()); } else { throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause()); } } return responseCommand; } finally { this.responseTable.remove(opaque); } }
跟dubbo类似,构造一个future的map成员responseTable,发出去的请求后,这个线程利用这个future阻塞等待,等待啥呢?既然是同步请求,肯定是等待response。基于netty通信的话,response一般是在其他线程,所以阻塞在这里,下面是ResponseFuture responseFuture的阻塞和唤醒代码
public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException { this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS); return this.responseCommand; } public void putResponse(final RemotingCommand responseCommand) { this.responseCommand = responseCommand; this.countDownLatch.countDown(); }
这个putResponse要不就是上面代码发送的时候已经失败,那么快速put一个null进去,否则是在processResponseCommand等待接收响应。拿到响应以后就是一个同步请求的完成流程。
异步请求我们知道,发送线程发送完后就可以直接返回。
异步消息处理是在这里:
private void sendMessageAsync( final String addr, final String brokerName, final Message msg, final long timeoutMillis, final RemotingCommand request, final SendCallback sendCallback, final TopicPublishInfo topicPublishInfo, final MQClientInstance instance, final int retryTimesWhenSendFailed, final AtomicInteger times, final SendMessageContext context, final DefaultMQProducerImpl producer ) throws InterruptedException, RemotingException { this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() { @Override public void operationComplete(ResponseFuture responseFuture) { RemotingCommand response = responseFuture.getResponseCommand(); if (null == sendCallback && response != null) { try { SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response); if (context != null && sendResult != null) { context.setSendResult(sendResult); context.getProducer().executeSendMessageHookAfter(context); } } catch (Throwable e) { } producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false); return; } if (response != null) { try { SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response); assert sendResult != null; if (context != null) { context.setSendResult(sendResult); context.getProducer().executeSendMessageHookAfter(context); } try { sendCallback.onSuccess(sendResult); } catch (Throwable e) { } producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false); } catch (Exception e) { producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true); onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, e, context, false, producer); } } else { producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true); if (!responseFuture.isSendRequestOK()) { MQClientException ex = new MQClientException("send request failed", responseFuture.getCause()); onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, ex, context, true, producer); } else if (responseFuture.isTimeout()) { MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms", responseFuture.getCause()); onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, ex, context, true, producer); } else { MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause()); onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, ex, context, true, producer); } } } }); }
这个方法一大坨,其实只有一句话this.remotingClient.invokeAsync,除了这一句之外,都是一个回调函数InvokeCallback,这个回调函数InvokeCallback接收一个ResponseFuture,检查这个future里面的response是否成功,来决定要不要进入onExceptionImpl逻辑,onExceptionImpl里面又重新回到sendMessageAsync继续重试,直到异步次数用完。
这里要注意异步发送并不是发送端一点不管是否成功,如果是在异步发送过程中就发生异常,那么是会直接抛出异常、让客户端感知到异常。
如果是发送没有任何问题,拿到错误response或者超时,那么rocketmq会自动帮你重试。
继续深入this.remotingClient.invokeAsync:
public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis, final InvokeCallback invokeCallback) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { long beginStartTime = System.currentTimeMillis(); final int opaque = request.getOpaque(); boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); if (acquired) { final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync); long costTime = System.currentTimeMillis() - beginStartTime; if (timeoutMillis < costTime) { once.release(); throw new RemotingTimeoutException("invokeAsyncImpl call timeout"); } final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once); this.responseTable.put(opaque, responseFuture); try { channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { responseFuture.setSendRequestOK(true); return; } requestFail(opaque); log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel)); } }); } catch (Exception e) { responseFuture.release(); log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e); throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e); } } }
这个跟上面的one-way、同步调用都有相似之处,检查信号量能否拿到、构造一个future对象,放入以opaque为key的map中,同时把最上面的InvokeCallback也放入future,前面说过,异步调用的重试就是实现在InvokeCallback中,那么InvokeCallback最终是通过future被调用的。
在上面我们只是做了发送消息、把包含重试InvokeCallback的future放入map中,这个map啥时候被用掉呢?
1 在发送的时候直接失败在requestFail(opaque)被调用
private void requestFail(final int opaque) { ResponseFuture responseFuture = responseTable.remove(opaque); if (responseFuture != null) { responseFuture.setSendRequestOK(false); responseFuture.putResponse(null); try { executeInvokeCallback(responseFuture); } catch (Throwable e) { log.warn("execute callback in requestFail, and callback throw", e); } finally { responseFuture.release(); } } }
2 收到消息response以后:
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) { final int opaque = cmd.getOpaque(); final ResponseFuture responseFuture = responseTable.get(opaque); if (responseFuture != null) { responseFuture.setResponseCommand(cmd); responseTable.remove(opaque); if (responseFuture.getInvokeCallback() != null) { executeInvokeCallback(responseFuture); } else { responseFuture.putResponse(cmd); responseFuture.release(); } } else { log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel())); log.warn(cmd.toString()); } }
跟同步调用很像,但是比同步调用多一个超时的场景。同步超时可以在线程阻塞拿到超时场景,异步的话就必须依赖后台线程check了:
public void scanResponseTable() { final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>(); Iterator<Entry<Integer, ResponseFuture>> it = this.responseTable.entrySet().iterator(); while (it.hasNext()) { Entry<Integer, ResponseFuture> next = it.next(); ResponseFuture rep = next.getValue(); if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) { rep.release(); it.remove(); rfList.add(rep); log.warn("remove timeout request, " + rep); } } for (ResponseFuture rf : rfList) { try { executeInvokeCallback(rf); } catch (Throwable e) { log.warn("scanResponseTable, operationComplete Exception", e); } } }