看了seata的TCC部分源码,看到TC调用每个分支事务的提交逻辑,看到了如何利用netty进行同步调用的写法,感觉挺好
AbstractNettyRemoting # sendSync
protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException { if (timeoutMillis <= 0) { throw new FrameworkException("timeout should more than 0ms"); } if (channel == null) { LOGGER.warn("sendSync nothing, caused by null channel."); return null; } MessageFuture messageFuture = new MessageFuture(); messageFuture.setRequestMessage(rpcMessage); messageFuture.setTimeout(timeoutMillis); futures.put(rpcMessage.getId(), messageFuture); channelWritableCheck(channel, rpcMessage.getBody()); channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> { if (!future.isSuccess()) { MessageFuture messageFuture1 = futures.remove(rpcMessage.getId()); if (messageFuture1 != null) { messageFuture1.setResultMessage(future.cause()); } destroyChannel(future.channel()); } }); try { return messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS); } catch (Exception exx) {
MessageFuture 是seata 实现的future类,是对CompletableFuture的封装
public class MessageFuture { private RpcMessage requestMessage; private long timeout; private long start = System.currentTimeMillis(); private transient CompletableFuture<Object> origin = new CompletableFuture<>(); public Object get(long timeout, TimeUnit unit) throws TimeoutException, InterruptedException { Object result = null; try { result = origin.get(timeout, unit); } catch (ExecutionException e) { throw new ShouldNeverHappenException("Should not get results in a multi-threaded environment", e); } catch (TimeoutException e) { throw new TimeoutException("cost " + (System.currentTimeMillis() - start) + " ms"); } if (result instanceof RuntimeException) { throw (RuntimeException)result; } else if (result instanceof Throwable) { throw new RuntimeException((Throwable)result); } return result; } public void setResultMessage(Object obj) { origin.complete(obj); }
重点就是这个setResultMessage 啥时候被调用的了
AbstractNettyRemotingServer # ServerHandler
class ServerHandler extends ChannelDuplexHandler { /** * Channel read. * * @param ctx the ctx * @param msg the msg * @throws Exception the exception */ @Override public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { if (!(msg instanceof RpcMessage)) { return; } processMessage(ctx, (RpcMessage) msg); }
protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception { if (LOGGER.isDebugEnabled()) { LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody())); } Object body = rpcMessage.getBody(); if (body instanceof MessageTypeAware) { MessageTypeAware messageTypeAware = (MessageTypeAware) body; final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode()); if (pair != null) { if (pair.getSecond() != null) { try { pair.getSecond().execute(() -> { try { pair.getFirst().process(ctx, rpcMessage);
ServerOnResponseProcessor # process
public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception { MessageFuture messageFuture = futures.remove(rpcMessage.getId()); if (messageFuture != null) { messageFuture.setResultMessage(rpcMessage.getBody());
原理就是在服务端发送前,构造Future,然后将Future缓存在map,key是发送报文的id号。同时Netty的服务端注册handler时,实现channelRead中,把从客户端的报文中的id好再取出来,这样就能取出来future,根据报文的结果调用
CompletableFuture.complete 这样在同步发送的逻辑里
try { return messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS); } catch (Exception exx) {
就可以从阻塞中返回了