开篇
这篇文章是接着Fescar example解析 - TM流程的下一步分析,主要是对TM的处理逻辑的进一步分析,理清楚TM(Transaction Manager )的处理步骤以及代码调用链。
这篇文章的结论是TM执行事务操作包括begin/commit/rollback都是通过DefaultTransactionManager类来实现,实现形式是TM和TC进行网络通信,在整个TM->TC的过程中TM担当了Client端的角色,TC担当了Server端的角色。
背景介绍
事务资料摘自Fescar概览。
与XA 的模型类似,我们定义 3 个组件来协议分布式事务的处理过程。
- Transaction Coordinator (TC): 事务协调器,维护全局事务的运行状态,负责协调并驱动全局事务的提交或回滚。
- Transaction Manager (TM): 控制全局事务的边界,负责开启一个全局事务,并最终发起全局提交或全局回滚的决议。
- Resource Manager (RM): 控制分支事务,负责分支注册、状态汇报,并接收事务协调器的指令,驱动分支(本地)事务的提交和回滚。
一个典型的分布式事务过程:
- TM 向 TC 申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的 XID。
- XID 在微服务调用链路的上下文中传播。
- RM 向 TC 注册分支事务,将其纳入 XID 对应全局事务的管辖。
- TM 向 TC 发起针对 XID 的全局提交或回滚决议。
- TC 调度 XID 下管辖的全部分支事务完成提交或回滚请求。
执行过程
说明,整个执行流程如下:
- 1.TransactionalTemplate通过GlobalTransactionContext.getCurrentOrCreate()返回GlobalTransaction对象。
- 2.GlobalTransactionContext的createNew()方法创建DefaultGlobalTransaction对象。
- 3.DefaultGlobalTransaction的构造方法当中创建DefaultTransactionManager对象。
- 4.TransactionalTemplate通过DefaultGlobalTransaction执行begin/commit/rollback等操作。
- 5.DefaultGlobalTransaction内部通过DefaultTransactionManager执行begin/commit/rollback等操作。
源码解析
public class TransactionalTemplate {
public Object execute(TransactionalExecutor business) throws TransactionalExecutor.ExecutionException {
// 1. 创建一个GlobalTransaction对象
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
// 2. 通过GlobalTransaction开始执行事务
try {
tx.begin(business.timeout(), business.name());
} catch (TransactionException txe) {
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.BeginFailure);
}
}
说明:
- 创建tx对象,GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate()。
- 执行全局事务,tx.beigin()及其他省略的一部分代码。
public class GlobalTransactionContext {
private static final ThreadLocal<GlobalTransaction> THREAD_TRANSACTION_CONTEXT = new ThreadLocal<>();
private GlobalTransactionContext() {
}
// 创建GlobalTransaction对象
private static GlobalTransaction createNew() {
GlobalTransaction tx = new DefaultGlobalTransaction();
THREAD_TRANSACTION_CONTEXT.set(tx);
return THREAD_TRANSACTION_CONTEXT.get();
}
public static GlobalTransaction getCurrent() {
GlobalTransaction tx = THREAD_TRANSACTION_CONTEXT.get();
if (tx != null) {
return tx;
}
String xid = RootContext.getXID();
if (xid == null) {
return null;
}
tx = new DefaultGlobalTransaction(xid);
THREAD_TRANSACTION_CONTEXT.set(tx);
return THREAD_TRANSACTION_CONTEXT.get();
}
public static GlobalTransaction getCurrentOrCreate() {
GlobalTransaction tx = getCurrent();
if (tx == null) {
return createNew();
}
return tx;
}
}
说明:
- createNew()方法创建GlobalTransaction tx对象,类型是DefaultGlobalTransaction。
- 保存tx到线程当中实现线程隔离,THREAD_TRANSACTION_CONTEXT.set(tx)。
- GlobalTransaction对象负责执行事务的begin()、commit()、rollback()等方法。
public class DefaultGlobalTransaction implements GlobalTransaction {
private static final int DEFAULT_GLOBAL_TX_TIMEOUT = 60000;
private static final String DEFAULT_GLOBAL_TX_NAME = "default";
private TransactionManager transactionManager;
private String xid;
private GlobalStatus status = GlobalStatus.UnKnown;
private GlobalTransactionRole role = GlobalTransactionRole.Launcher;
DefaultGlobalTransaction(String xid) {
this.transactionManager = DefaultTransactionManager.get();
this.xid = xid;
if (xid != null) {
status = GlobalStatus.Begin;
role = GlobalTransactionRole.Participant;
}
}
@Override
public void begin(int timeout, String name) throws TransactionException {
if (xid == null && role == GlobalTransactionRole.Launcher) {
xid = transactionManager.begin(null, null, name, timeout);
status = GlobalStatus.Begin;
RootContext.bind(xid);
} else {
if (xid == null) {
throw new ShouldNeverHappenException(role + " is NOT in a global transaction context.");
}
LOGGER.info(role + " is already in global transaction " + xid);
}
}
@Override
public void commit() throws TransactionException {
check();
RootContext.unbind();
if (role == GlobalTransactionRole.Participant) {
// Participant has no responsibility of committing
return;
}
status = transactionManager.commit(xid);
}
@Override
public void rollback() throws TransactionException {
check();
RootContext.unbind();
if (role == GlobalTransactionRole.Participant) {
// Participant has no responsibility of committing
return;
}
status = transactionManager.rollback(xid);
}
@Override
public GlobalStatus getStatus() throws TransactionException {
check();
status = transactionManager.getStatus(xid);
return status;
}
}
说明:
- DefaultGlobalTransaction构造函数创建transactionManager 对象,this.transactionManager = DefaultTransactionManager.get();
- DefaultGlobalTransaction的begin/commit/rollback通过TransactionManager的begin/commit/rollback实现。
public class DefaultTransactionManager implements TransactionManager {
private static class SingletonHolder {
private static final TransactionManager INSTANCE = new DefaultTransactionManager();
}
public static TransactionManager get() {
return SingletonHolder.INSTANCE;
}
private DefaultTransactionManager() {
}
@Override
public String begin(String applicationId, String transactionServiceGroup,
String name, int timeout) throws TransactionException {
GlobalBeginRequest request = new GlobalBeginRequest();
request.setTransactionName(name);
request.setTimeout(timeout);
GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
return response.getXid();
}
@Override
public GlobalStatus commit(String xid) throws TransactionException {
long txId = XID.getTransactionId(xid);
GlobalCommitRequest globalCommit = new GlobalCommitRequest();
globalCommit.setTransactionId(txId);
GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
return response.getGlobalStatus();
}
@Override
public GlobalStatus rollback(String xid) throws TransactionException {
long txId = XID.getTransactionId(xid);
GlobalRollbackRequest globalRollback = new GlobalRollbackRequest();
globalRollback.setTransactionId(txId);
GlobalRollbackResponse response = (GlobalRollbackResponse) syncCall(globalRollback);
return response.getGlobalStatus();
}
@Override
public GlobalStatus getStatus(String xid) throws TransactionException {
long txId = XID.getTransactionId(xid);
GlobalStatusRequest queryGlobalStatus = new GlobalStatusRequest();
queryGlobalStatus.setTransactionId(txId);
GlobalStatusResponse response = (GlobalStatusResponse) syncCall(queryGlobalStatus);
return response.getGlobalStatus();
}
private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
try {
return (AbstractTransactionResponse) TmRpcClient.getInstance().sendMsgWithResponse(request);
} catch (TimeoutException toe) {
throw new TransactionException(TransactionExceptionCode.IO, toe);
}
}
}
说明:
- DefaultTransactionManager 是单例实现全局唯一。
- DefaultTransactionManager 是TM实现begin/commit/rollback的核心逻辑。
- DefaultTransactionManager 的begin/commit/rollback通过和TC通信实现。
- DefaultTransactionManager 的syncCall实现和TC通信。