Seata 分支事务

引言

前面,我们已经介绍了 Seata 的整体设计思想,接下来我们深入到其实现细节中,本文先来介绍 Seata 中分支事务的整体实现思想。

Branch Type

我们已经知道在 Seata 中, 分支事务分 AT 模式和 TCC 模式, 那么, Seata 是怎么区分出 AT 模式和 TCC 模式的呢? 这也借助了 Spring 的 AOP 特性, 我们在 TM 中介绍的 GlobalTransactionalInterceptor 实际上只负责 AT 模式, TCC 模式是另一套拦截器实现, 而这两种拦截器的注入, 全都是在 GlobalTransactionScanner 中进行的, 也就是说, 我们的 Spring 项目要将 GlobalTransactionScanner 注册为 Bean, 因为其继承自 AbstractAutoProxyCreator, Spring 在处理 AOP 过程时, 就会自动将 GlobalTransactionScanner 执行。接下来, 我们看看整个 Seata 的入口 GlobalTransactionScanner 的核心代码:

public class GlobalTransactionScanner extends AbstractAutoProxyCreator
    implements InitializingBean, ApplicationContextAware, DisposableBean, BeanPostProcessor {
    // 只保留核心代码...

    // 判断是否需要嵌入 Seata 的 AOP 代码
    @Override
    protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
        if (disableGlobalTransaction) {
            return bean;
        }
        try {
            synchronized (PROXYED_SET) {
                if (PROXYED_SET.contains(beanName)) {
                    return bean;
                }
                interceptor = null;
                //check TCC proxy, 实际上是根据 TCC 注解 TwoPhaseBusinessAction + RPC 协议类型进行判断, TCC 目前只支持一些特定的协议
                if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
                    //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
                    // 识别出 TCC 模式, 使用 TCC 拦截器
                    interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
                } else {
                    Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
                    Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
                    // 判断是不是 AT 模式, 通过注解 GlobalTransactional GlobalLock
                    if (!existsAnnotation(new Class[]{serviceInterface})
                        && !existsAnnotation(interfacesIfJdk)) {
                        return bean;
                    }
                     // 识别出AT 模式, 使用前面提到的 GlobalTransactionalInterceptor
                    if (interceptor == null) {
                        interceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
                    }
                }

                LOGGER.info(
                    "Bean[" + bean.getClass().getName() + "] with name [" + beanName + "] would use interceptor ["
                        + interceptor.getClass().getName() + "]");
                if (!AopUtils.isAopProxy(bean)) {
                    // 如果该类不由 Spring 管控, 则无能为力
                    bean = super.wrapIfNecessary(bean, beanName, cacheKey);
                } else {
                    // 如果是由 Spring 管控, 将 TCC 拦截器 或者 AT 拦截器注入
                    AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
                    Advisor[] advisor = buildAdvisors(beanName, new Object[]{interceptor});
                    for (Advisor avr : advisor) {
                        advised.addAdvisor(0, avr);
                    }
                }
                PROXYED_SET.add(beanName);
                return bean;
            }
        } catch (Exception exx) {
            throw new RuntimeException(exx);
        }
    }
    // 判断是不是 AT 模式, 通过注解 GlobalTransactional GlobalLock
    private boolean existsAnnotation(Class<?>[] classes) {
        if (classes != null && classes.length > 0) {
            for (Class clazz : classes) {
                if (clazz == null) {
                    continue;
                }
                Method[] methods = clazz.getMethods();
                for (Method method : methods) {
                    GlobalTransactional trxAnno = method.getAnnotation(GlobalTransactional.class);
                    if (trxAnno != null) {
                        return true;
                    }

                    GlobalLock lockAnno = method.getAnnotation(GlobalLock.class);
                    if (lockAnno != null) {
                        return true;
                    }
                }
            }
        }
        return false;
    }
    // 替换默认的数据库连接源, 改为 AT 模式的数据源代理
    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
        if (bean instanceof DataSource && !(bean instanceof DataSourceProxy) && ConfigurationFactory.getInstance().getBoolean(DATASOURCE_AUTOPROXY, false)) {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Auto proxy of  [" + beanName + "]");
            }
            DataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) bean);
            return Enhancer.create(bean.getClass(), (org.springframework.cglib.proxy.MethodInterceptor) (o, method, args, methodProxy) -> {
                Method m = BeanUtils.findDeclaredMethod(DataSourceProxy.class, method.getName(), method.getParameterTypes());
                if (null != m) {
                    return m.invoke(dataSourceProxy, args);
                } else {
                    return method.invoke(bean, args);
                }
            });
        }
        return bean;
    }
}

整个项目是从这个 GlobalTransactionScanner 作为起点接入 Seata 的, 一般来说 SpringBoot 会用 @Configuration 类将其注册为 @Bean, 原生 Spring 项目需要在在 XML 中将其注册为 Bean,这一点我们从官方的 Sample 中就能发现。

// SpringBoot
@Configuration
public class SeataAutoConfig {
    /**
     * init global transaction scanner
     *
     * @Return: GlobalTransactionScanner
     */
    @Bean
    public GlobalTransactionScanner globalTransactionScanner(){
        return new GlobalTransactionScanner("account-gts-seata-example", "my_test_tx_group");
    }
}

Spring 配置如下所示:

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
                        http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
    <!--省略其他内容-->
    <bean class="io.seata.spring.annotation.GlobalTransactionScanner">
        <constructor-arg value="dubbo-demo-app"/>
        <constructor-arg value="my_test_tx_group"/>
    </bean>

</beans>

好了, 想必大家应该已经清楚 Seata 中是如何发现 TCC 模式和 AT 模式的了, 我们总结一下:

  1. 扫描 Spring 代理的类
  2. 如果发现带有 TCC 的注解, 并且 RPC 协议满足条件, 那么走 TCC 拦截器
  3. 如果发现带有 AT 的注解, 那么走 AT 拦截器
  4. 否则, 什么都不干

接下来我们, 分别看一下这两种模式各自都是如何运作起来的。

参考内容

[1] fescar锁设计和隔离级别的理解
[2] 分布式事务中间件 Fescar - RM 模块源码解读
[3] Fescar分布式事务实现原理解析探秘
[4] Seata TCC 分布式事务源码分析
[5] 深度剖析一站式分布式事务方案 Seata-Server
[6] 分布式事务 Seata Saga 模式首秀以及三种模式详解
[7] 蚂蚁金服大规模分布式事务实践和开源详解
[8] 分布式事务 Seata TCC 模式深度解析
[9] Fescar (Seata)0.4.0 中文文档教程
[10] Seata Github Wiki
[11] 深度剖析一站式分布式事务方案Seata(Fescar)-Server

Seata 分支事务

上一篇:Seata AT 分支事务


下一篇:业务无侵入框架Seata, 解决分布式事务问题