开篇
这篇文章的目的主要是理清楚Fescar的TM发送部分的逻辑,从时序图和源码两个层面进行分析。
文章中间会解答两个自己阅读代码中遇到的困惑(估计大部分人看代码的时候也会遇到这个困惑),包括TmRpcClient的初始化过程和配置加载过程。
文章的最后会附上GlobalAction相关Request的类关系图,便于理解依赖关系。
Fescar TM发送流程
说明:
- 1.DefaultGlobalTransaction执行begin/commit/rollback等调用DefaultTransactionManager。
- 2.DefaultTransactionManager内部调用syncCall()方法,进而调用TmRpcClient的sendMsgWithResponse()方法。
- 3.TmRpcClient调用父类AbstractRpcRemoting的sendAsyncRequest()方法构建发送队列。
- 4.AbstractRpcRemotingClient的MergedSendRunnable线程消费发送队列构建MergedWarpMessage调用sendRequest发送。
- 5.sendRequest()方法内部调用writeAndFlush完成消息发送。
说明:
- TmRpcClient的类依赖关系图如上。
- TmRpcClient继承自AbstractRpcRemotingClient类。
Fescar TM发送源码分析
public class DefaultTransactionManager implements TransactionManager {
private static class SingletonHolder {
private static final TransactionManager INSTANCE = new DefaultTransactionManager();
}
/**
* Get transaction manager.
*
* @return the transaction manager
*/
public static TransactionManager get() {
return SingletonHolder.INSTANCE;
}
private DefaultTransactionManager() {
}
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException {
GlobalBeginRequest request = new GlobalBeginRequest();
request.setTransactionName(name);
request.setTimeout(timeout);
GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
return response.getXid();
}
@Override
public GlobalStatus commit(String xid) throws TransactionException {
long txId = XID.getTransactionId(xid);
GlobalCommitRequest globalCommit = new GlobalCommitRequest();
globalCommit.setTransactionId(txId);
GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
return response.getGlobalStatus();
}
@Override
public GlobalStatus rollback(String xid) throws TransactionException {
long txId = XID.getTransactionId(xid);
GlobalRollbackRequest globalRollback = new GlobalRollbackRequest();
globalRollback.setTransactionId(txId);
GlobalRollbackResponse response = (GlobalRollbackResponse) syncCall(globalRollback);
return response.getGlobalStatus();
}
@Override
public GlobalStatus getStatus(String xid) throws TransactionException {
long txId = XID.getTransactionId(xid);
GlobalStatusRequest queryGlobalStatus = new GlobalStatusRequest();
queryGlobalStatus.setTransactionId(txId);
GlobalStatusResponse response = (GlobalStatusResponse) syncCall(queryGlobalStatus);
return response.getGlobalStatus();
}
private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
try {
return (AbstractTransactionResponse) TmRpcClient.getInstance().sendMsgWithResponse(request);
} catch (TimeoutException toe) {
throw new TransactionException(TransactionExceptionCode.IO, toe);
}
}
}
说明:
- DefaultTransactionManager的beigin/commit/rollback方法内部最终调用syncCall()方法。
- syncCall方法内部执行TmRpcClient.getInstance().sendMsgWithResponse(request)调用TmRpcClient方法。
public final class TmRpcClient extends AbstractRpcRemotingClient {
@Override
public Object sendMsgWithResponse(Object msg) throws TimeoutException {
return sendMsgWithResponse(msg, NettyClientConfig.getRpcRequestTimeout());
}
@Override
public Object sendMsgWithResponse(String serverAddress, Object msg, long timeout)
throws TimeoutException {
return sendAsyncRequestWithResponse(serverAddress, connect(serverAddress), msg, timeout);
}
}
说明:
- TmRpcClient内部执行发送sendMsgWithResponse调用sendAsyncRequestWithResponse。
- sendAsyncRequestWithResponse的实现在父类AbstractRpcRemoting当中。
public abstract class AbstractRpcRemoting extends ChannelDuplexHandler {
protected Object sendAsyncRequestWithResponse(String address, Channel channel, Object msg, long timeout) throws
TimeoutException {
if (timeout <= 0) {
throw new FrameworkException("timeout should more than 0ms");
}
return sendAsyncRequest(address, channel, msg, timeout);
}
private Object sendAsyncRequest(String address, Channel channel, Object msg, long timeout)
throws TimeoutException {
if (channel == null) {
LOGGER.warn("sendAsyncRequestWithResponse nothing, caused by null channel.");
return null;
}
// 构建RpcMessage对象
final RpcMessage rpcMessage = new RpcMessage();
rpcMessage.setId(RpcMessage.getNextMessageId());
rpcMessage.setAsync(false);
rpcMessage.setHeartbeat(false);
rpcMessage.setRequest(true);
rpcMessage.setBody(msg);
// 通过MessageFuture包装实现超时
final MessageFuture messageFuture = new MessageFuture();
messageFuture.setRequestMessage(rpcMessage);
messageFuture.setTimeout(timeout);
futures.put(rpcMessage.getId(), messageFuture);
// 测试代码走的是这个分支
if (address != null) {
// 根据address进行hash放置到不同的Map当中
ConcurrentHashMap<String, BlockingQueue<RpcMessage>> map = basketMap;
BlockingQueue<RpcMessage> basket = map.get(address);
if (basket == null) {
map.putIfAbsent(address, new LinkedBlockingQueue<RpcMessage>());
basket = map.get(address);
}
basket.offer(rpcMessage);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("offer message: " + rpcMessage.getBody());
}
// 发送其实是另外一个线程单独执行发送操作的
if (!isSending) {
synchronized (mergeLock) {
mergeLock.notifyAll();
}
}
} else {
ChannelFuture future;
channelWriteableCheck(channel, msg);
future = channel.writeAndFlush(rpcMessage);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (!future.isSuccess()) {
MessageFuture messageFuture = futures.remove(rpcMessage.getId());
if (messageFuture != null) {
messageFuture.setResultMessage(future.cause());
}
destroyChannel(future.channel());
}
}
});
}
// 通过Future实现限时超时机制
if (timeout > 0) {
try {
return messageFuture.get(timeout, TimeUnit.MILLISECONDS);
} catch (Exception exx) {
LOGGER.error("wait response error:" + exx.getMessage() + ",ip:" + address + ",request:" + msg);
if (exx instanceof TimeoutException) {
throw (TimeoutException)exx;
} else {
throw new RuntimeException(exx);
}
}
} else {
return null;
}
}
}
说明:
- 构建RpcMessage对象,包装Request。
- 构建MessageFuture对象,包装RpcMessage,实现超时等待功能。
- 通过basket进行分桶操作,真正执行发送的代码在AbstractRpcRemotingClient类的MergedSendRunnable。
- Request的发送类似生成消费者模型,上述代码只是生产者部分。
public abstract class AbstractRpcRemotingClient extends AbstractRpcRemoting
implements RemotingService, RegisterMsgListener, ClientMessageSender {
public class MergedSendRunnable implements Runnable {
@Override
public void run() {
while (true) {
synchronized (mergeLock) {
try {
mergeLock.wait(MAX_MERGE_SEND_MILLS);
} catch (InterruptedException e) {}
}
isSending = true;
for (String address : basketMap.keySet()) {
BlockingQueue<RpcMessage> basket = basketMap.get(address);
if (basket.isEmpty()) { continue; }
MergedWarpMessage mergeMessage = new MergedWarpMessage();
while (!basket.isEmpty()) {
RpcMessage msg = basket.poll();
mergeMessage.msgs.add((AbstractMessage)msg.getBody());
mergeMessage.msgIds.add(msg.getId());
}
if (mergeMessage.msgIds.size() > 1) {
printMergeMessageLog(mergeMessage);
}
Channel sendChannel = connect(address);
try {
sendRequest(sendChannel, mergeMessage);
} catch (FrameworkException e) {
if (e.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable
&& address != null) {
destroyChannel(address, sendChannel);
}
LOGGER.error("", "client merge call failed", e);
}
}
isSending = false;
}
}
}
说明:
- MergedSendRunnable 负责消费待发送消息体并组装成MergedWarpMessage对象。
- sendRequest()方法内部将MergedWarpMessage再次包装成RpcMessage进行发送。
public abstract class AbstractRpcRemoting extends ChannelDuplexHandler {
protected void sendRequest(Channel channel, Object msg) {
RpcMessage rpcMessage = new RpcMessage();
rpcMessage.setAsync(true);
rpcMessage.setHeartbeat(msg instanceof HeartbeatMessage);
rpcMessage.setRequest(true);
rpcMessage.setBody(msg);
rpcMessage.setId(RpcMessage.getNextMessageId());
if (msg instanceof MergeMessage) {
mergeMsgMap.put(rpcMessage.getId(), (MergeMessage)msg);
}
channelWriteableCheck(channel, msg);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("write message:" + rpcMessage.getBody() + ", channel:" + channel + ",active?"
+ channel.isActive() + ",writable?" + channel.isWritable() + ",isopen?" + channel.isOpen());
}
channel.writeAndFlush(rpcMessage);
}
}
说明:
- RpcMessage再次包装MergeMessage进行发送。
TmRpcClient初始化
public class GlobalTransactionScanner extends AbstractAutoProxyCreator implements InitializingBean {
public GlobalTransactionScanner(String applicationId, String txServiceGroup, int mode,
FailureHandler failureHandlerHook) {
setOrder(ORDER_NUM);
setProxyTargetClass(true);
this.applicationId = applicationId;
this.txServiceGroup = txServiceGroup;
this.mode = mode;
this.failureHandlerHook = failureHandlerHook;
}
private void initClient() {
TMClient.init(applicationId, txServiceGroup);
if ((AT_MODE & mode) > 0) {
RMClientAT.init(applicationId, txServiceGroup);
}
}
public void afterPropertiesSet() {
initClient();
}
}
说明:
- GlobalTransactionScanner的构造函数执行后执行afterPropertiesSet并执行initClient()操作。
- initClient()内部执行TMClient.init(applicationId, txServiceGroup)进行TMClient的初始化。
public class TMClient {
public static void init(String applicationId, String transactionServiceGroup) {
TmRpcClient tmRpcClient = TmRpcClient.getInstance(
applicationId, transactionServiceGroup);
tmRpcClient.init();
}
}
public final class TmRpcClient extends AbstractRpcRemotingClient {
public void init() {
if (initialized.compareAndSet(false, true)) {
init(SCHEDULE_INTERVAL_MILLS, SCHEDULE_INTERVAL_MILLS);
}
}
public void init(long healthCheckDelay, long healthCheckPeriod) {
// 注意initVars()方法
initVars();
ExecutorService mergeSendExecutorService = new ThreadPoolExecutor(
MAX_MERGE_SEND_THREAD, MAX_MERGE_SEND_THREAD,
KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
new NamedThreadFactory(getThreadPrefix(MERGE_THREAD_PREFIX),
MAX_MERGE_SEND_THREAD));
mergeSendExecutorService.submit(new MergedSendRunnable());
timerExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
reconnect();
} catch (Exception ignore) {
LOGGER.error(ignore.getMessage());
}
}
}, healthCheckDelay, healthCheckPeriod, TimeUnit.SECONDS);
}
private void initVars() {
enableDegrade = CONFIG.getBoolean(
ConfigurationKeys.SERVICE_PREFIX + ConfigurationKeys.ENABLE_DEGRADE_POSTFIX);
super.init();
}
}
说明:
- 核心在于关注initVars()方法。
public abstract class AbstractRpcRemotingClient extends AbstractRpcRemoting
implements RemotingService, RegisterMsgListener, ClientMessageSender {
public void init() {
NettyPoolableFactory keyPoolableFactory = new NettyPoolableFactory(this);
// 核心构建发送的对象的连接池
nettyClientKeyPool = new GenericKeyedObjectPool(keyPoolableFactory);
nettyClientKeyPool.setConfig(getNettyPoolConfig());
serviceManager = new ServiceManagerStaticConfigImpl();
super.init();
}
}
public abstract class AbstractRpcRemoting extends ChannelDuplexHandler {
public void init() {
timerExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
List<MessageFuture> timeoutMessageFutures = new ArrayList<MessageFuture>(futures.size());
for (MessageFuture future : futures.values()) {
if (future.isTimeout()) {
timeoutMessageFutures.add(future);
}
}
for (MessageFuture messageFuture : timeoutMessageFutures) {
futures.remove(messageFuture.getRequestMessage().getId());
messageFuture.setResultMessage(null);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("timeout clear future : " + messageFuture.getRequestMessage().getBody());
}
}
nowMills = System.currentTimeMillis();
}
}, TIMEOUT_CHECK_INTERNAL, TIMEOUT_CHECK_INTERNAL, TimeUnit.MILLISECONDS);
}
}
说明:
- AbstractRpcRemotingClient的init()方法核心构建nettyClientKeyPool工厂。
- nettyClientKeyPool用于获取连接TC的对象的工厂池。
配置加载分析
public class FileConfiguration implements Configuration {
private static final Logger LOGGER = LoggerFactory.getLogger(FileConfiguration.class);
private static final Config CONFIG = ConfigFactory.load();
}
package com.typesafe.config;
public final class ConfigFactory {
private ConfigFactory() {
}
public static Config load() {
return load(ConfigParseOptions.defaults());
}
}
说明:
- 配置加载使用了JAVA 配置管理库 typesafe.config
- 默认加载classpath下的application.conf,application.json和application.properties文件。通过ConfigFactory.load()加载。