【Spring】Transation:源码执行流程(上)准备阶段

TransactionAspectSupport

invokeWithinTransaction

@Nullable
	protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
			final InvocationCallback invocation) throws Throwable {

		// If the transaction attribute is null, the method is non-transactional.
		TransactionAttributeSource tas = getTransactionAttributeSource();
		final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
		final TransactionManager tm = determineTransactionManager(txAttr);
		PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
		final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

		if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
			// Standard transaction demarcation with getTransaction and commit/rollback calls.
			// 创建或者拿到当前事务信息
			TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification); 

			Object retVal;
			try {
				// This is an around advice: Invoke the next interceptor in the chain.
				// This will normally result in a target object being invoked.
				retVal = invocation.proceedWithInvocation();// 执行切面
			}
			catch (Throwable ex) {
				// target invocation exception
				completeTransactionAfterThrowing(txInfo, ex);// 回滚逻辑
				throw ex;
			}
			finally {
				cleanupTransactionInfo(txInfo);// 拿回上一个事务
			}
			
			commitTransactionAfterReturning(txInfo);// 提交当前事务
			return retVal;
		}
	}

上述代码逻辑还是很清晰的

  • 先从 TransactionDefinition(TransactionAttribute) 开始,看看当前这个切面需不需要执行事务,
  • 如果能获取到,则通过其获取到合适的 PlatformTransactionManager,创建或者获取、封装到我们的 TransactionInfo
  • 执行切面,再根据切面情况选择提交或者回滚。

createTransactionIfNecessary

获取事务信息

  • 事务的信息会在TransactionAspectSupport#createTransactionIfNecessary方法中获取,这个方法非常重要,隔离级别、传播方式都会在这个方法里处理。该方法代码如下:
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
        @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
    // 如果未指定名称,则将方法名当做事务名称
    if (txAttr != null && txAttr.getName() == null) {
        txAttr = new DelegatingTransactionAttribute(txAttr) {
            @Override
            public String getName() {
                return joinpointIdentification;
            }
        };
    }

    TransactionStatus status = null;
    if (txAttr != null) {
        if (tm != null) {
            // 获取事务状态,如果当前没有事务,可能会创建事务
            status = tm.getTransaction(txAttr);
        }
    }
    // 准备事务信息,就是将前面得到的信息封装成 TransactionInfo
    return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}

这个方法主要是两个操作:

  1. 获取事务状态
  2. 准备事务信息

prepareTransactionInfo

	protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm,
			@Nullable TransactionAttribute txAttr, String joinpointIdentification,
			@Nullable TransactionStatus status) {

		TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
		// We always bind the TransactionInfo to the thread, even if we didn't create
		// a new transaction here. This guarantees that the TransactionInfo stack
		// will be managed correctly even if no transaction was created by this aspect.
		txInfo.bindToThread();
		return txInfo;
	}

	private static final ThreadLocal<TransactionInfo> transactionInfoHolder =
			new NamedThreadLocal<>("Current aspect-driven transaction");

	private void bindToThread() {
		// Expose current TransactionStatus, preserving any existing TransactionStatus
		// for restoration after this transaction is complete.
		this.oldTransactionInfo = transactionInfoHolder.get();
		transactionInfoHolder.set(this);
	}

我们发现它这里有一个 ThreadLocal,它会将之前已经保存在这里的 TransactionInfo 拿出来,放到刚才上面提到的 old 里面持有,而当前这个 TransactionInfo 对象则会扔进这个 ThreadLocal 里面,然后在执行完切面后把 old 放回去,形成了一个事务栈:

	protected void cleanupTransactionInfo(@Nullable TransactionInfo txInfo) {
		if (txInfo != null) {
			txInfo.restoreThreadLocalStatus();
		}
	}

	private void restoreThreadLocalStatus() {
		// Use stack to restore old transaction TransactionInfo.
		// Will be null if none was set.
		transactionInfoHolder.set(this.oldTransactionInfo);
	}

AbstractPlatformTransactionManager

getTransaction

获取事务状态的流程,方法为AbstractPlatformTransactionManager#getTransaction

public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
        throws TransactionException {

    TransactionDefinition def = (definition != null ? 
            definition : TransactionDefinition.withDefaults());

    // 获取事务对象
    Object transaction = doGetTransaction();
    boolean debugEnabled = logger.isDebugEnabled();

    // 是否存在事务,存在则返回
    if (isExistingTransaction(transaction)) {
        return handleExistingTransaction(def, transaction, debugEnabled);
    }
    // 运行到了这里,表明当前没有事务

    // 检查超时时间的设置是否合理
    if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
        throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
    }

    // PROPAGATION_MANDATORY:必须在事务中运行,这里没有事务,直接抛异常
    // No existing transaction found -> check propagation behavior to find out how to proceed.
    if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
        throw new IllegalTransactionStateException(...);
    }
    // 挂起当前事务,创建新事务
    else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
            def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
            def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
        // suspend(...) 传入null:如果有同步事务,则挂起同步事务,否则什么也不做
        SuspendedResourcesHolder suspendedResources = suspend(null);
        try {
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            // 创建事务对象
            DefaultTransactionStatus status = newTransactionStatus(
                    def, transaction, true, newSynchronization, debugEnabled, suspendedResources);
            // 启动事务
            doBegin(transaction, def);
            // 设置 TransactionSynchronizationManager 的属性
            prepareSynchronization(status, def);
            return status;
        }
        catch (RuntimeException | Error ex) {
            resume(null, suspendedResources);
            throw ex;
        }
    }
    else {
        boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
        return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
    }
}

handleExistingTransaction

这里我们来看看如果当前存在事务,spring是怎么处理的,处理已存在事务的方法为AbstractPlatformTransactionManager#handleExistingTransaction,代码如下:

private TransactionStatus handleExistingTransaction(TransactionDefinition definition, 
        Object transaction, boolean debugEnabled) throws TransactionException {
    // 当传播方式为【不使用事务 PROPAGATION_NEVER】时,抛出异常
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
        throw new IllegalTransactionStateException(
                "Existing transaction found for transaction marked with propagation 'never'");
    }
    // 当传播方式为【不支持事务 PROPAGATION_NOT_SUPPORTED】时,挂起当前事务,然后在无事务的状态中运行
    // // 第一步,`suspend(trx)` 挂了了一个事务,并且在创建 `TransactionStatus` 时,没有放入 `transaction` 对象,也就是连接对象。
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
        // 1. suspend():挂起事务操作
        Object suspendedResources = suspend(transaction);
        boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
        return prepareTransactionStatus(
                definition, null, false, newSynchronization, debugEnabled, suspendedResources);
    }

    // 当传播方式为【在新的事务中运行 PROPAGATION_REQUIRES_NEW】时,挂起当前事务,然后启动新的事务
    // 这里直接挂起事务,并且 `newTransactionStatus` + `doBegin` +`prepareSynchronization` 三部曲,进行 `TransactionStatus` 的初始化。
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
        // 挂起事务操作
        SuspendedResourcesHolder suspendedResources = suspend(transaction);
        try {
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true,
                    newSynchronization, debugEnabled, suspendedResources);
            // 2. doBegin():启动新的事务
            doBegin(transaction, definition);
            prepareSynchronization(status, definition);
            return status;
        }
        catch (RuntimeException | Error beginEx) {
            resumeAfterBeginException(transaction, suspendedResources, beginEx);
            throw beginEx;
        }
    }

    // 当传播方式为【嵌套执行 PROPAGATION_NESTED】时, 设置事务的保存点
    // 存在事务,将该事务标注保存点,形成嵌套事务。
    // 嵌套事务中的子事务出现异常不会影响到父事务保存点之前的操作。
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
        if (!isNestedTransactionAllowed()) {
            throw new NestedTransactionNotSupportedException(...);
        }
        // 3. createAndHoldSavepoint(...):创建保存点,回滚时只回滚到该保存点
        if (useSavepointForNestedTransaction()) {
            DefaultTransactionStatus status = prepareTransactionStatus(definition, transaction,
                    false, false, debugEnabled, null);
            status.createAndHoldSavepoint();
            return status;
        }
        else {
        	// 没有使用 `savePoint` 则还是三部曲,但是这个三部曲,由于我们已经有一个现有的连接,
        	// 所以会创建一个 `TransactionStatus`,但是他们的连接也就是 `connectionHolder` 使用的是同一个对象。
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            DefaultTransactionStatus status = newTransactionStatus(
                    definition, transaction, true, newSynchronization, debugEnabled, null);
            // 如果不支持保存点,就启动新的事务
            doBegin(transaction, definition);
            prepareSynchronization(status, definition);
            return status;
        }
    }
    if (isValidateExistingTransaction()) {
        // 处理验证操作,不作分析
        ...
    }
    boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    // 当传播方式为【嵌套执行 PROPAGATION_REQUIRED、PROPAGATION_SUPPORT、PROPAGATION_MAMDY】
    return prepareTransactionStatus(definition, transaction, false, 
            newSynchronization, debugEnabled, null);
}

可以看到,这个方法里就处理了事务的隔离级别的逻辑,相关的代码已经作了注释,这里就不多说了,不过这里有几个方法需要特别提出:

  1. suspend():挂起事务操作
  2. doBegin():启动新的事务
  3. createAndHoldSavepoint(...):创建保存点,回滚时只回滚到该保存点

suspend 和 resume

上面那些东西实际上都比较简单,就是判断是否获取新的连接,然后创建一个 TransactionInfo 对象,但是里面有一个稍微复杂点的东西,就是事务的挂起和恢复,是怎么做的?

我们先看挂起:

suspend 挂起事务

	protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
		if (TransactionSynchronizationManager.isSynchronizationActive()) {
			List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
			try {
				Object suspendedResources = null;
				if (transaction != null) {
					suspendedResources = doSuspend(transaction);
				}
				String name = TransactionSynchronizationManager.getCurrentTransactionName();
				TransactionSynchronizationManager.setCurrentTransactionName(null);
				boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
				TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
				Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
				TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
				boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
				TransactionSynchronizationManager.setActualTransactionActive(false);
				return new SuspendedResourcesHolder(
						suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
			}
			catch (RuntimeException | Error ex) {
				// doSuspend failed - original transaction is still active...
				doResumeSynchronization(suspendedSynchronizations);
				throw ex;
			}
		}
		else if (transaction != null) {
			// Transaction active but no synchronization active.
			Object suspendedResources = doSuspend(transaction);
			return new SuspendedResourcesHolder(suspendedResources);
		}
		else {
			// Neither transaction nor synchronization active.
			return null;
		}
	}

当一个事务被挂起,且处于同步状态(由于对于 TranstaionStatus 初始化三步曲的最后一步 prepare 会 initSynchronization,搞了一个 linkedHashSet<TransactionSynchronization>

  • 此时会先 doSuspendSynchronization(); ,里面很简单,就是把我们注册的那些 TransactionSynchronizationsuspend() 方法都跑一遍,并且把它们全部清除避免被跑两次以上,并且拿到这部分注册的 linkedHashSet<TransactionSynchronization> : suspendedSynchronizations如下:
	private List<TransactionSynchronization> doSuspendSynchronization() {
		List<TransactionSynchronization> suspendedSynchronizations =
				TransactionSynchronizationManager.getSynchronizations();
		for (TransactionSynchronization synchronization : suspendedSynchronizations) {
			synchronization.suspend();
		}
		TransactionSynchronizationManager.clearSynchronization();
		return suspendedSynchronizations;
	}

紧接着调用 PlatfromTransactionManagerdoSuspend(),代码也是很简单,就是把刚才来来回回讲的那个 transactionHolder 移除掉,返回的是我们的事务对象,也就是包含了 connection 的那个

	protected Object doSuspend(Object transaction) {
		DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
		txObject.setConnectionHolder(null);
		return TransactionSynchronizationManager.unbindResource(obtainDataSource());
	}

后续的操作也很容易看懂,我们把它保存在 TransactionSynchronizationManager 的那些 ThreadLocal 全部拿到原值,并把现有的值清除掉,并且我们把上面说的那些乱七八糟的包括 suspendedSynchronizations ,以及 transaction 一并放到一个叫做 SuspendedResourcesHolder 的对象里面。

这样,这个事务就和当前线程没有半毛钱关系了,这些变量全部被保存在 SuspendedResourcesHolder 里面,这个保存着事务信息的对象会保存在我们新的 TransactionStatus 里面,那么问题来了,怎么恢复?实际上你都知道怎么挂起了,还不知道怎么恢复吗?

resume 恢复事务

当然是反过来操作!

resume() 的调用入口如下:

AbstractPlatformTransactionManager
getTransaction(TransactionDefinition)
resumeAfterBeginException(Object, SuspendedResourcesHolder, Throwable)
cleanupAfterCompletion(DefaultTransactionStatus)

很简单,分两种,第一种是当前事务报错了会恢复到上一个事务。第二种是当前事务执行完毕了,会调用恢复。代码很简单,如下:我就不啰嗦了,是上面的反操作。

-- 代码位于 org.springframework.transaction.support.AbstractPlatformTransactionManager#resume --

	protected final void resume(@Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder)
			throws TransactionException {

		if (resourcesHolder != null) {
			Object suspendedResources = resourcesHolder.suspendedResources;
			if (suspendedResources != null) {
				doResume(transaction, suspendedResources);
			}
			List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
			if (suspendedSynchronizations != null) {
				TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
				TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
				TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
				TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
				doResumeSynchronization(suspendedSynchronizations);
			}
		}
	}

prepareTransactionStatus

准备返回结果:prepareTransactionStatus(...)

  • handleExistingTransaction(...)方法与getTransaction(...)方法在处理返回结果时,都使用了prepareTransactionStatus(...)方法:

    // `handleExistingTransaction(...)`方法
    return prepareTransactionStatus(definition, transaction, false, 
                newSynchronization, debugEnabled, null);
    
    // `getTransaction(...)`方法
    return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
    

我们来分析下这个方法是做了啥,进入AbstractPlatformTransactionManager#prepareTransactionStatus

protected final DefaultTransactionStatus prepareTransactionStatus(
        TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction,
        boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {

    // 创建了一个 DefaultTransactionStatus 对象
    DefaultTransactionStatus status = newTransactionStatus(
            definition, transaction, newTransaction, newSynchronization, debug, suspendedResources);
    // 准备 Synchronization
    prepareSynchronization(status, definition);
    return status;
}

/**
 *创建一个 TransactionStatus 实例
 */
protected DefaultTransactionStatus newTransactionStatus(
        TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction,
        boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {

    boolean actualNewSynchronization = newSynchronization &&
            !TransactionSynchronizationManager.isSynchronizationActive();
    // 调用 DefaultTransactionStatus 的构造方法
    return new DefaultTransactionStatus(
            transaction, newTransaction, actualNewSynchronization,
            definition.isReadOnly(), debug, suspendedResources);
}
上一篇:自相矛盾:一个进程可以自成死锁么


下一篇:安卓类微信项目