Fescar example解析 - TM发送逻辑

开篇

 这篇文章的目的主要是理清楚Fescar的TM发送部分的逻辑,从时序图和源码两个层面进行分析。

 文章中间会解答两个自己阅读代码中遇到的困惑(估计大部分人看代码的时候也会遇到这个困惑),包括TmRpcClient的初始化过程配置加载过程

 文章的最后会附上GlobalAction相关Request的类关系图,便于理解依赖关系。



Fescar TM发送流程

Fescar example解析 - TM发送逻辑

说明:

  • 1.DefaultGlobalTransaction执行begin/commit/rollback等调用DefaultTransactionManager。
  • 2.DefaultTransactionManager内部调用syncCall()方法,进而调用TmRpcClient的sendMsgWithResponse()方法。
  • 3.TmRpcClient调用父类AbstractRpcRemoting的sendAsyncRequest()方法构建发送队列。
  • 4.AbstractRpcRemotingClient的MergedSendRunnable线程消费发送队列构建MergedWarpMessage调用sendRequest发送。
  • 5.sendRequest()方法内部调用writeAndFlush完成消息发送。



Fescar example解析 - TM发送逻辑
说明:

  • TmRpcClient的类依赖关系图如上。
  • TmRpcClient继承自AbstractRpcRemotingClient类。


Fescar TM发送源码分析

public class DefaultTransactionManager implements TransactionManager {

    private static class SingletonHolder {
        private static final TransactionManager INSTANCE = new DefaultTransactionManager();
    }

    /**
     * Get transaction manager.
     *
     * @return the transaction manager
     */
    public static TransactionManager get() {
        return SingletonHolder.INSTANCE;
    }

    private DefaultTransactionManager() {

    }

    @Override
    public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) 
            throws TransactionException {
        GlobalBeginRequest request = new GlobalBeginRequest();
        request.setTransactionName(name);
        request.setTimeout(timeout);
        GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
        return response.getXid();
    }

    @Override
    public GlobalStatus commit(String xid) throws TransactionException {
        long txId = XID.getTransactionId(xid);
        GlobalCommitRequest globalCommit = new GlobalCommitRequest();
        globalCommit.setTransactionId(txId);
        GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
        return response.getGlobalStatus();
    }

    @Override
    public GlobalStatus rollback(String xid) throws TransactionException {
        long txId = XID.getTransactionId(xid);
        GlobalRollbackRequest globalRollback = new GlobalRollbackRequest();
        globalRollback.setTransactionId(txId);
        GlobalRollbackResponse response = (GlobalRollbackResponse) syncCall(globalRollback);
        return response.getGlobalStatus();
    }

    @Override
    public GlobalStatus getStatus(String xid) throws TransactionException {
        long txId = XID.getTransactionId(xid);
        GlobalStatusRequest queryGlobalStatus = new GlobalStatusRequest();
        queryGlobalStatus.setTransactionId(txId);
        GlobalStatusResponse response = (GlobalStatusResponse) syncCall(queryGlobalStatus);
        return response.getGlobalStatus();
    }

    private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
        try {
            return (AbstractTransactionResponse) TmRpcClient.getInstance().sendMsgWithResponse(request);
        } catch (TimeoutException toe) {
            throw new TransactionException(TransactionExceptionCode.IO, toe);
        }
    }
}

说明:

  • DefaultTransactionManager的beigin/commit/rollback方法内部最终调用syncCall()方法。
  • syncCall方法内部执行TmRpcClient.getInstance().sendMsgWithResponse(request)调用TmRpcClient方法。


public final class TmRpcClient extends AbstractRpcRemotingClient {
    @Override
    public Object sendMsgWithResponse(Object msg) throws TimeoutException {
        return sendMsgWithResponse(msg, NettyClientConfig.getRpcRequestTimeout());
    }

    @Override
    public Object sendMsgWithResponse(String serverAddress, Object msg, long timeout)
        throws TimeoutException {
        return sendAsyncRequestWithResponse(serverAddress, connect(serverAddress), msg, timeout);
    }
}

说明:

  • TmRpcClient内部执行发送sendMsgWithResponse调用sendAsyncRequestWithResponse。
  • sendAsyncRequestWithResponse的实现在父类AbstractRpcRemoting当中。


public abstract class AbstractRpcRemoting extends ChannelDuplexHandler {

    protected Object sendAsyncRequestWithResponse(String address, Channel channel, Object msg, long timeout) throws
        TimeoutException {
        if (timeout <= 0) {
            throw new FrameworkException("timeout should more than 0ms");
        }
        return sendAsyncRequest(address, channel, msg, timeout);
    }

    private Object sendAsyncRequest(String address, Channel channel, Object msg, long timeout)
        throws TimeoutException {
        if (channel == null) {
            LOGGER.warn("sendAsyncRequestWithResponse nothing, caused by null channel.");
            return null;
        }

        // 构建RpcMessage对象
        final RpcMessage rpcMessage = new RpcMessage();
        rpcMessage.setId(RpcMessage.getNextMessageId());
        rpcMessage.setAsync(false);
        rpcMessage.setHeartbeat(false);
        rpcMessage.setRequest(true);
        rpcMessage.setBody(msg);

        // 通过MessageFuture包装实现超时
        final MessageFuture messageFuture = new MessageFuture();
        messageFuture.setRequestMessage(rpcMessage);
        messageFuture.setTimeout(timeout);
        futures.put(rpcMessage.getId(), messageFuture);

        // 测试代码走的是这个分支
        if (address != null) {
            // 根据address进行hash放置到不同的Map当中
            ConcurrentHashMap<String, BlockingQueue<RpcMessage>> map = basketMap;
            BlockingQueue<RpcMessage> basket = map.get(address);
            if (basket == null) {
                map.putIfAbsent(address, new LinkedBlockingQueue<RpcMessage>());
                basket = map.get(address);
            }
            basket.offer(rpcMessage);
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("offer message: " + rpcMessage.getBody());
            }

            // 发送其实是另外一个线程单独执行发送操作的
            if (!isSending) {
                synchronized (mergeLock) {
                    mergeLock.notifyAll();
                }
            }
        } else {
            ChannelFuture future;
            channelWriteableCheck(channel, msg);
            future = channel.writeAndFlush(rpcMessage);
            future.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) {
                    if (!future.isSuccess()) {
                        MessageFuture messageFuture = futures.remove(rpcMessage.getId());
                        if (messageFuture != null) {
                            messageFuture.setResultMessage(future.cause());
                        }
                        destroyChannel(future.channel());
                    }
                }
            });
        }

        // 通过Future实现限时超时机制
        if (timeout > 0) {
            try {
                return messageFuture.get(timeout, TimeUnit.MILLISECONDS);
            } catch (Exception exx) {
                LOGGER.error("wait response error:" + exx.getMessage() + ",ip:" + address + ",request:" + msg);
                if (exx instanceof TimeoutException) {
                    throw (TimeoutException)exx;
                } else {
                    throw new RuntimeException(exx);
                }
            }
        } else {
            return null;
        }
    }
}

说明:

  • 构建RpcMessage对象,包装Request。
  • 构建MessageFuture对象,包装RpcMessage,实现超时等待功能。
  • 通过basket进行分桶操作,真正执行发送的代码在AbstractRpcRemotingClient类的MergedSendRunnable。
  • Request的发送类似生成消费者模型,上述代码只是生产者部分。


public abstract class AbstractRpcRemotingClient extends AbstractRpcRemoting
    implements RemotingService, RegisterMsgListener, ClientMessageSender {

    public class MergedSendRunnable implements Runnable {

        @Override
        public void run() {
            while (true) {
                synchronized (mergeLock) {
                    try {
                        mergeLock.wait(MAX_MERGE_SEND_MILLS);
                    } catch (InterruptedException e) {}
                }
                isSending = true;
                for (String address : basketMap.keySet()) {
                    BlockingQueue<RpcMessage> basket = basketMap.get(address);
                    if (basket.isEmpty()) { continue; }

                    MergedWarpMessage mergeMessage = new MergedWarpMessage();
                    while (!basket.isEmpty()) {
                        RpcMessage msg = basket.poll();
                        mergeMessage.msgs.add((AbstractMessage)msg.getBody());
                        mergeMessage.msgIds.add(msg.getId());
                    }
                    if (mergeMessage.msgIds.size() > 1) {
                        printMergeMessageLog(mergeMessage);
                    }
                    Channel sendChannel = connect(address);
                    try {
                        sendRequest(sendChannel, mergeMessage);
                    } catch (FrameworkException e) {
                        if (e.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable
                            && address != null) {
                            destroyChannel(address, sendChannel);
                        }
                        LOGGER.error("", "client merge call failed", e);
                    }
                }
                isSending = false;
            }
        }
}

说明:

  • MergedSendRunnable 负责消费待发送消息体并组装成MergedWarpMessage对象。
  • sendRequest()方法内部将MergedWarpMessage再次包装成RpcMessage进行发送。


public abstract class AbstractRpcRemoting extends ChannelDuplexHandler {

    protected void sendRequest(Channel channel, Object msg) {
        RpcMessage rpcMessage = new RpcMessage();
        rpcMessage.setAsync(true);
        rpcMessage.setHeartbeat(msg instanceof HeartbeatMessage);
        rpcMessage.setRequest(true);
        rpcMessage.setBody(msg);
        rpcMessage.setId(RpcMessage.getNextMessageId());
        if (msg instanceof MergeMessage) {
            mergeMsgMap.put(rpcMessage.getId(), (MergeMessage)msg);
        }
        channelWriteableCheck(channel, msg);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("write message:" + rpcMessage.getBody() + ", channel:" + channel + ",active?"
                + channel.isActive() + ",writable?" + channel.isWritable() + ",isopen?" + channel.isOpen());
        }
        channel.writeAndFlush(rpcMessage);
    }
}

说明:

  • RpcMessage再次包装MergeMessage进行发送。


TmRpcClient初始化

public class GlobalTransactionScanner extends AbstractAutoProxyCreator implements InitializingBean {

    public GlobalTransactionScanner(String applicationId, String txServiceGroup, int mode,
                                    FailureHandler failureHandlerHook) {
        setOrder(ORDER_NUM);
        setProxyTargetClass(true);
        this.applicationId = applicationId;
        this.txServiceGroup = txServiceGroup;
        this.mode = mode;
        this.failureHandlerHook = failureHandlerHook;
    }

    private void initClient() {

        TMClient.init(applicationId, txServiceGroup);
        if ((AT_MODE & mode) > 0) {
            RMClientAT.init(applicationId, txServiceGroup);
        }
    }

    public void afterPropertiesSet() {
        initClient();
    }
}

说明:

  • GlobalTransactionScanner的构造函数执行后执行afterPropertiesSet并执行initClient()操作。
  • initClient()内部执行TMClient.init(applicationId, txServiceGroup)进行TMClient的初始化。


public class TMClient {
    public static void init(String applicationId, String transactionServiceGroup) {
        TmRpcClient tmRpcClient = TmRpcClient.getInstance(
                 applicationId, transactionServiceGroup);
        tmRpcClient.init();
    }
}

public final class TmRpcClient extends AbstractRpcRemotingClient {
    public void init() {
        if (initialized.compareAndSet(false, true)) {
            init(SCHEDULE_INTERVAL_MILLS, SCHEDULE_INTERVAL_MILLS);
        }
    }


    public void init(long healthCheckDelay, long healthCheckPeriod) {
        // 注意initVars()方法
        initVars();

        ExecutorService mergeSendExecutorService = new ThreadPoolExecutor(
           MAX_MERGE_SEND_THREAD, MAX_MERGE_SEND_THREAD,
           KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS, 
           new LinkedBlockingQueue<Runnable>(),
           new NamedThreadFactory(getThreadPrefix(MERGE_THREAD_PREFIX), 
                        MAX_MERGE_SEND_THREAD));
        mergeSendExecutorService.submit(new MergedSendRunnable());
        timerExecutor.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    reconnect();
                } catch (Exception ignore) {
                    LOGGER.error(ignore.getMessage());
                }
            }
        }, healthCheckDelay, healthCheckPeriod, TimeUnit.SECONDS);
    }


    private void initVars() {
        enableDegrade = CONFIG.getBoolean(
        ConfigurationKeys.SERVICE_PREFIX + ConfigurationKeys.ENABLE_DEGRADE_POSTFIX);
        super.init();
    }
}

说明:

  • 核心在于关注initVars()方法。


public abstract class AbstractRpcRemotingClient extends AbstractRpcRemoting
    implements RemotingService, RegisterMsgListener, ClientMessageSender {

    public void init() {
        NettyPoolableFactory keyPoolableFactory = new NettyPoolableFactory(this);
        // 核心构建发送的对象的连接池
        nettyClientKeyPool = new GenericKeyedObjectPool(keyPoolableFactory);
        nettyClientKeyPool.setConfig(getNettyPoolConfig());
        serviceManager = new ServiceManagerStaticConfigImpl();
        super.init();
    }
}


public abstract class AbstractRpcRemoting extends ChannelDuplexHandler {
    public void init() {
        timerExecutor.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                List<MessageFuture> timeoutMessageFutures = new ArrayList<MessageFuture>(futures.size());

                for (MessageFuture future : futures.values()) {
                    if (future.isTimeout()) {
                        timeoutMessageFutures.add(future);
                    }
                }

                for (MessageFuture messageFuture : timeoutMessageFutures) {
                    futures.remove(messageFuture.getRequestMessage().getId());
                    messageFuture.setResultMessage(null);
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("timeout clear future : " + messageFuture.getRequestMessage().getBody());
                    }
                }
                nowMills = System.currentTimeMillis();
            }
        }, TIMEOUT_CHECK_INTERNAL, TIMEOUT_CHECK_INTERNAL, TimeUnit.MILLISECONDS);
    }
}

说明:

  • AbstractRpcRemotingClient的init()方法核心构建nettyClientKeyPool工厂。
  • nettyClientKeyPool用于获取连接TC的对象的工厂池。


配置加载分析

Fescar example解析 - TM发送逻辑

public class FileConfiguration implements Configuration {

    private static final Logger LOGGER = LoggerFactory.getLogger(FileConfiguration.class);

    private static final Config CONFIG = ConfigFactory.load();
}


package com.typesafe.config;
public final class ConfigFactory {
    private ConfigFactory() {
    }

    public static Config load() {
        return load(ConfigParseOptions.defaults());
    }
}

说明:

  • 配置加载使用了JAVA 配置管理库 typesafe.config
  • 默认加载classpath下的application.conf,application.json和application.properties文件。通过ConfigFactory.load()加载。

Request的类关系图

Fescar example解析 - TM发送逻辑

上一篇:Ubuntu YouCompleteMe 插件配置


下一篇:算法:街区最短路径问题