背景
Fescar 是 阿里巴巴 开源的 分布式事务中间件,以 高效 并且对业务0侵入的方式,解决微服务场景下面临的分布式事务问题,介绍可以参考Fescar介绍。
Fescar example解析系列主要是通过阅读Fescar的example源码梳理Fescar的整体逻辑,偏重于整体流程的梳理,让大家在整体上能够有一个宏观了解。
最后建议有兴趣了解Fescar工作原理的同学,一定需要先参考Fescar介绍,这是了解Fescar的第一步,也是最重要的一步。
Example 目录介绍
说明:
- Fescar的代码目录如上图所示。
- BusinessServiceImpl属于入口代码
- AccountServiceImpl、OrderServiceImpl、StorageServiceImple属于业务逻辑代码。
- BusinessServiceImpl整合了AccountServiceImpl、OrderServiceImpl、StorageServiceImple的调用关系。
说明:
- Business和Storage、Order、Account的三者调用关系如上图。
- 各组件之间的rpc调用如通过dubbo协议通过dubbo支持传递上下文标识。
说明:
- GlobalTransactionScanner负责执行自动扫描注解包含GlobalTransactional注解的代码逻辑。
- GlobalTransactionalInterceptor负责对包含GlobalTransactional注解进行拦截。
- 理解TM的核心需要重点看看GlobalTransactionalInterceptor的代码。
Example 代码介绍
BusinessServiceImpl源码
public class BusinessServiceImpl implements BusinessService {
private static final Logger LOGGER = LoggerFactory.getLogger(BusinessService.class);
private StorageService storageService;
private OrderService orderService;
@Override
@GlobalTransactional(timeoutMills = 300000, name = "dubbo-demo-tx")
public void purchase(String userId, String commodityCode, int orderCount) {
storageService.deduct(commodityCode, orderCount);
orderService.create(userId, commodityCode, orderCount);
}
public static void main(String[] args) {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
new String[] {"dubbo-business.xml"});
final BusinessService business = (BusinessService)context.getBean("business");
LOGGER.info("Main business begin ... xid: " + RootContext.getXID());
business.purchase("U100001", "C00321", 2);
LOGGER.info("Main business end ... xid: " + RootContext.getXID());
}
}
说明:
- BusinessServiceImpl的核心在于注解@GlobalTransactional(timeoutMills = 300000, name = "dubbo-demo-tx")。
- 通过@GlobalTransactional注解的purchase方法会走拦截器GlobalTransactionalInterceptor的执行逻辑。
dubbo-business.xml源码
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
<dubbo:application name="dubbo-demo-app" />
<dubbo:registry address="multicast://224.5.6.7:1234?unicast=false" />
<dubbo:reference id="orderService" check="false" interface="com.alibaba.fescar.tm.dubbo.OrderService"/>
<dubbo:reference id="storageService" check="false" interface="com.alibaba.fescar.tm.dubbo.StorageService"/>
<bean id="business" class="com.alibaba.fescar.tm.dubbo.impl.BusinessServiceImpl">
<property name="orderService" ref="orderService"/>
<property name="storageService" ref="storageService"/>
</bean>
<bean class="com.alibaba.fescar.spring.annotation.GlobalTransactionScanner">
<constructor-arg value="dubbo-demo-app"/>
<constructor-arg value="my_test_tx_group"/>
</bean>
</beans>
说明:
- dubbo-business.xml的定义了GlobalTransactionScanner对象
- 定义了GlobalTransactionScanner方法后就会扫描有@GlobalTransactionalInterceptor注解的代码。
- GlobalTransactionScanner的扫描生成代理的逻辑跟Spring相关暂时没有理解清楚。
GlobalTransactionalInterceptor介绍
public class GlobalTransactionalInterceptor implements MethodInterceptor {
private final TransactionalTemplate transactionalTemplate = new TransactionalTemplate();
private final FailureHandler failureHandler;
@Override
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
final GlobalTransactional anno = getAnnotation(methodInvocation.getMethod());
if (anno != null) {
try {
// 核心入口在于transactionalTemplate.execute
return transactionalTemplate.execute(new TransactionalExecutor() {
@Override
public Object execute() throws Throwable {
return methodInvocation.proceed();
}
@Override
public int timeout() {
return anno.timeoutMills();
}
@Override
public String name() {
if (anno.name() != null) {
return anno.name();
}
return formatMethod(methodInvocation.getMethod());
}
});
} catch (TransactionalExecutor.ExecutionException e) {
TransactionalExecutor.Code code = e.getCode();
switch (code) {
case RollbackDone:
throw e.getOriginalException();
case BeginFailure:
failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
throw e.getCause();
case CommitFailure:
failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
throw e.getCause();
case RollbackFailure:
failureHandler.onRollbackFailure(e.getTransaction(), e.getCause());
throw e.getCause();
default:
throw new ShouldNeverHappenException("Unknown TransactionalExecutor.Code: " + code);
}
}
}
return methodInvocation.proceed();
}
}
说明:
- 关注GlobalTransactionalInterceptor的invoke()方法。
- invoke()方法transactionalTemplate.execute开启全局事务的执行。
Fescar 全局事务的执行类
public class TransactionalTemplate {
public Object execute(TransactionalExecutor business) throws TransactionalExecutor.ExecutionException {
// 1. 获取全局事务
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
// 2. 开始事务
try {
tx.begin(business.timeout(), business.name());
} catch (TransactionException txe) {
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.BeginFailure);
}
Object rs = null;
try {
// 执行业务逻辑
rs = business.execute();
} catch (Throwable ex) {
// 3. 回滚全局事务
try {
tx.rollback();
// 3.1 Successfully rolled back
throw new TransactionalExecutor.ExecutionException(tx, TransactionalExecutor.Code.RollbackDone, ex);
} catch (TransactionException txe) {
// 3.2 Failed to rollback
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.RollbackFailure, ex);
}
}
// 4. 提交全局事务
try {
tx.commit();
} catch (TransactionException txe) {
// 4.1 Failed to commit
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.CommitFailure);
}
return rs;
}
}
说明:
- 创建全局事务对象,GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate()。
- 开启全局事务,tx.begin(business.timeout(), business.name())。
- 执行业务逻辑,rs = business.execute()。
- 提交事务, tx.commit()。
- 回滚事务,tx.rollback()。
下一步
下一步尝试分析事务的每一步骤,包括事务的开启、提交、回滚等逻辑操作。