TransactionAspectSupport
invokeWithinTransaction
@Nullable
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
// If the transaction attribute is null, the method is non-transactional.
TransactionAttributeSource tas = getTransactionAttributeSource();
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
final TransactionManager tm = determineTransactionManager(txAttr);
PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
// Standard transaction demarcation with getTransaction and commit/rollback calls.
// 创建或者拿到当前事务信息
TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
Object retVal;
try {
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
retVal = invocation.proceedWithInvocation();// 执行切面
}
catch (Throwable ex) {
// target invocation exception
completeTransactionAfterThrowing(txInfo, ex);// 回滚逻辑
throw ex;
}
finally {
cleanupTransactionInfo(txInfo);// 拿回上一个事务
}
commitTransactionAfterReturning(txInfo);// 提交当前事务
return retVal;
}
}
上述代码逻辑还是很清晰的
- 先从
TransactionDefinition(TransactionAttribute)
开始,看看当前这个切面需不需要执行事务, - 如果能获取到,则通过其获取到合适的
PlatformTransactionManager
,创建或者获取、封装到我们的TransactionInfo
。 - 执行切面,再根据切面情况选择提交或者回滚。
createTransactionIfNecessary
获取事务信息
- 事务的信息会在
TransactionAspectSupport#createTransactionIfNecessary
方法中获取,这个方法非常重要,隔离级别、传播方式都会在这个方法里处理。该方法代码如下:
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
// 如果未指定名称,则将方法名当做事务名称
if (txAttr != null && txAttr.getName() == null) {
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {
return joinpointIdentification;
}
};
}
TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
// 获取事务状态,如果当前没有事务,可能会创建事务
status = tm.getTransaction(txAttr);
}
}
// 准备事务信息,就是将前面得到的信息封装成 TransactionInfo
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
这个方法主要是两个操作:
- 获取事务状态
- 准备事务信息
prepareTransactionInfo
protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, String joinpointIdentification,
@Nullable TransactionStatus status) {
TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
// We always bind the TransactionInfo to the thread, even if we didn't create
// a new transaction here. This guarantees that the TransactionInfo stack
// will be managed correctly even if no transaction was created by this aspect.
txInfo.bindToThread();
return txInfo;
}
private static final ThreadLocal<TransactionInfo> transactionInfoHolder =
new NamedThreadLocal<>("Current aspect-driven transaction");
private void bindToThread() {
// Expose current TransactionStatus, preserving any existing TransactionStatus
// for restoration after this transaction is complete.
this.oldTransactionInfo = transactionInfoHolder.get();
transactionInfoHolder.set(this);
}
我们发现它这里有一个 ThreadLocal
,它会将之前已经保存在这里的 TransactionInfo
拿出来,放到刚才上面提到的 old
里面持有,而当前这个 TransactionInfo
对象则会扔进这个 ThreadLocal
里面,然后在执行完切面后把 old
放回去,形成了一个事务栈:
protected void cleanupTransactionInfo(@Nullable TransactionInfo txInfo) {
if (txInfo != null) {
txInfo.restoreThreadLocalStatus();
}
}
private void restoreThreadLocalStatus() {
// Use stack to restore old transaction TransactionInfo.
// Will be null if none was set.
transactionInfoHolder.set(this.oldTransactionInfo);
}
AbstractPlatformTransactionManager
getTransaction
获取事务状态的流程,方法为AbstractPlatformTransactionManager#getTransaction
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException {
TransactionDefinition def = (definition != null ?
definition : TransactionDefinition.withDefaults());
// 获取事务对象
Object transaction = doGetTransaction();
boolean debugEnabled = logger.isDebugEnabled();
// 是否存在事务,存在则返回
if (isExistingTransaction(transaction)) {
return handleExistingTransaction(def, transaction, debugEnabled);
}
// 运行到了这里,表明当前没有事务
// 检查超时时间的设置是否合理
if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
}
// PROPAGATION_MANDATORY:必须在事务中运行,这里没有事务,直接抛异常
// No existing transaction found -> check propagation behavior to find out how to proceed.
if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(...);
}
// 挂起当前事务,创建新事务
else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
// suspend(...) 传入null:如果有同步事务,则挂起同步事务,否则什么也不做
SuspendedResourcesHolder suspendedResources = suspend(null);
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 创建事务对象
DefaultTransactionStatus status = newTransactionStatus(
def, transaction, true, newSynchronization, debugEnabled, suspendedResources);
// 启动事务
doBegin(transaction, def);
// 设置 TransactionSynchronizationManager 的属性
prepareSynchronization(status, def);
return status;
}
catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
throw ex;
}
}
else {
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
}
}
handleExistingTransaction
这里我们来看看如果当前存在事务,spring是怎么处理的,处理已存在事务的方法为AbstractPlatformTransactionManager#handleExistingTransaction
,代码如下:
private TransactionStatus handleExistingTransaction(TransactionDefinition definition,
Object transaction, boolean debugEnabled) throws TransactionException {
// 当传播方式为【不使用事务 PROPAGATION_NEVER】时,抛出异常
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}
// 当传播方式为【不支持事务 PROPAGATION_NOT_SUPPORTED】时,挂起当前事务,然后在无事务的状态中运行
// // 第一步,`suspend(trx)` 挂了了一个事务,并且在创建 `TransactionStatus` 时,没有放入 `transaction` 对象,也就是连接对象。
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
// 1. suspend():挂起事务操作
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
// 当传播方式为【在新的事务中运行 PROPAGATION_REQUIRES_NEW】时,挂起当前事务,然后启动新的事务
// 这里直接挂起事务,并且 `newTransactionStatus` + `doBegin` +`prepareSynchronization` 三部曲,进行 `TransactionStatus` 的初始化。
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
// 挂起事务操作
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true,
newSynchronization, debugEnabled, suspendedResources);
// 2. doBegin():启动新的事务
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException | Error beginEx) {
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
}
// 当传播方式为【嵌套执行 PROPAGATION_NESTED】时, 设置事务的保存点
// 存在事务,将该事务标注保存点,形成嵌套事务。
// 嵌套事务中的子事务出现异常不会影响到父事务保存点之前的操作。
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(...);
}
// 3. createAndHoldSavepoint(...):创建保存点,回滚时只回滚到该保存点
if (useSavepointForNestedTransaction()) {
DefaultTransactionStatus status = prepareTransactionStatus(definition, transaction,
false, false, debugEnabled, null);
status.createAndHoldSavepoint();
return status;
}
else {
// 没有使用 `savePoint` 则还是三部曲,但是这个三部曲,由于我们已经有一个现有的连接,
// 所以会创建一个 `TransactionStatus`,但是他们的连接也就是 `connectionHolder` 使用的是同一个对象。
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, null);
// 如果不支持保存点,就启动新的事务
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
}
if (isValidateExistingTransaction()) {
// 处理验证操作,不作分析
...
}
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 当传播方式为【嵌套执行 PROPAGATION_REQUIRED、PROPAGATION_SUPPORT、PROPAGATION_MAMDY】
return prepareTransactionStatus(definition, transaction, false,
newSynchronization, debugEnabled, null);
}
可以看到,这个方法里就处理了事务的隔离级别的逻辑,相关的代码已经作了注释,这里就不多说了,不过这里有几个方法需要特别提出:
-
suspend()
:挂起事务操作 -
doBegin()
:启动新的事务 -
createAndHoldSavepoint(...)
:创建保存点,回滚时只回滚到该保存点
suspend 和 resume
上面那些东西实际上都比较简单,就是判断是否获取新的连接,然后创建一个 TransactionInfo
对象,但是里面有一个稍微复杂点的东西,就是事务的挂起和恢复,是怎么做的?
我们先看挂起:
suspend 挂起事务
protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
if (TransactionSynchronizationManager.isSynchronizationActive()) {
List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
try {
Object suspendedResources = null;
if (transaction != null) {
suspendedResources = doSuspend(transaction);
}
String name = TransactionSynchronizationManager.getCurrentTransactionName();
TransactionSynchronizationManager.setCurrentTransactionName(null);
boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
TransactionSynchronizationManager.setActualTransactionActive(false);
return new SuspendedResourcesHolder(
suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
}
catch (RuntimeException | Error ex) {
// doSuspend failed - original transaction is still active...
doResumeSynchronization(suspendedSynchronizations);
throw ex;
}
}
else if (transaction != null) {
// Transaction active but no synchronization active.
Object suspendedResources = doSuspend(transaction);
return new SuspendedResourcesHolder(suspendedResources);
}
else {
// Neither transaction nor synchronization active.
return null;
}
}
当一个事务被挂起,且处于同步状态(由于对于 TranstaionStatus 初始化三步曲的最后一步 prepare 会 initSynchronization
,搞了一个 linkedHashSet<TransactionSynchronization>
)
- 此时会先
doSuspendSynchronization();
,里面很简单,就是把我们注册的那些TransactionSynchronization
的suspend()
方法都跑一遍,并且把它们全部清除避免被跑两次以上,并且拿到这部分注册的linkedHashSet<TransactionSynchronization>
:suspendedSynchronizations
如下:
private List<TransactionSynchronization> doSuspendSynchronization() {
List<TransactionSynchronization> suspendedSynchronizations =
TransactionSynchronizationManager.getSynchronizations();
for (TransactionSynchronization synchronization : suspendedSynchronizations) {
synchronization.suspend();
}
TransactionSynchronizationManager.clearSynchronization();
return suspendedSynchronizations;
}
紧接着调用 PlatfromTransactionManager
的 doSuspend()
,代码也是很简单,就是把刚才来来回回讲的那个 transactionHolder
移除掉,返回的是我们的事务对象,也就是包含了 connection
的那个。
protected Object doSuspend(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
txObject.setConnectionHolder(null);
return TransactionSynchronizationManager.unbindResource(obtainDataSource());
}
后续的操作也很容易看懂,我们把它保存在 TransactionSynchronizationManager
的那些 ThreadLocal
全部拿到原值,并把现有的值清除掉,并且我们把上面说的那些乱七八糟的包括 suspendedSynchronizations
,以及 transaction
一并放到一个叫做 SuspendedResourcesHolder
的对象里面。
这样,这个事务就和当前线程没有半毛钱关系了,这些变量全部被保存在 SuspendedResourcesHolder
里面,这个保存着事务信息的对象会保存在我们新的 TransactionStatus
里面,那么问题来了,怎么恢复?实际上你都知道怎么挂起了,还不知道怎么恢复吗?
resume 恢复事务
当然是反过来操作!
resume()
的调用入口如下:
AbstractPlatformTransactionManager
getTransaction(TransactionDefinition)
resumeAfterBeginException(Object, SuspendedResourcesHolder, Throwable)
cleanupAfterCompletion(DefaultTransactionStatus)
很简单,分两种,第一种是当前事务报错了会恢复到上一个事务。第二种是当前事务执行完毕了,会调用恢复。代码很简单,如下:我就不啰嗦了,是上面的反操作。
-- 代码位于 org.springframework.transaction.support.AbstractPlatformTransactionManager#resume --
protected final void resume(@Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder)
throws TransactionException {
if (resourcesHolder != null) {
Object suspendedResources = resourcesHolder.suspendedResources;
if (suspendedResources != null) {
doResume(transaction, suspendedResources);
}
List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
if (suspendedSynchronizations != null) {
TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
doResumeSynchronization(suspendedSynchronizations);
}
}
}
prepareTransactionStatus
准备返回结果:prepareTransactionStatus(...)
-
handleExistingTransaction(...)
方法与getTransaction(...)
方法在处理返回结果时,都使用了prepareTransactionStatus(...)
方法:// `handleExistingTransaction(...)`方法 return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null); // `getTransaction(...)`方法 return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
我们来分析下这个方法是做了啥,进入AbstractPlatformTransactionManager#prepareTransactionStatus
:
protected final DefaultTransactionStatus prepareTransactionStatus(
TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction,
boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {
// 创建了一个 DefaultTransactionStatus 对象
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, newTransaction, newSynchronization, debug, suspendedResources);
// 准备 Synchronization
prepareSynchronization(status, definition);
return status;
}
/**
*创建一个 TransactionStatus 实例
*/
protected DefaultTransactionStatus newTransactionStatus(
TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction,
boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {
boolean actualNewSynchronization = newSynchronization &&
!TransactionSynchronizationManager.isSynchronizationActive();
// 调用 DefaultTransactionStatus 的构造方法
return new DefaultTransactionStatus(
transaction, newTransaction, actualNewSynchronization,
definition.isReadOnly(), debug, suspendedResources);
}