Spring事务执行流程剖析2

警告
本文最后更新于 2020-12-07,文中内容可能已过时。

spring源码系列文章,示例代码的中文注释,均是 copy 自 https://gitee.com/wlizhi/spring-framework

链接中源码是作者从 github 下载,并以自身理解对核心流程及主要节点做了详细的中文注释。


1 事务的挂起

事务的挂起,调用的是 suspend()。

以下是 suspend() 源码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {

    protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
		if (TransactionSynchronizationManager.isSynchronizationActive()) {
			List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
			try {
				Object suspendedResources = null;
				if (transaction != null) {
					// TODO 重点:如果存在事务,则挂起事务,其实就是从ThreadLocal变量中解绑连接。
					suspendedResources = doSuspend(transaction);
				}
				// 清楚TransactionSynchronizationManager中的当前事务的一些信息。将旧的事务对象信息封装到挂起对象中,最终会绑定到当前事务对象。
				String name = TransactionSynchronizationManager.getCurrentTransactionName();
				TransactionSynchronizationManager.setCurrentTransactionName(null);
				boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
				TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
				Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
				TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
				boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
				TransactionSynchronizationManager.setActualTransactionActive(false);
				// 返回一个挂起资源持有对象,这个对象中封装了当前已存在事务的具体信息。这个对象会存储到新开启事务的TransactionStatus中。
				return new SuspendedResourcesHolder(
						suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
			}
			catch (RuntimeException | Error ex) {
				// doSuspend failed - original transaction is still active...
				doResumeSynchronization(suspendedSynchronizations);
				throw ex;
			}
		}
		else if (transaction != null) {
			// Transaction active but no synchronization active.
			Object suspendedResources = doSuspend(transaction);
			return new SuspendedResourcesHolder(suspendedResources);
		}
		else {
			// Neither transaction nor synchronization active.
			return null;
		}
	}
}

这个方法中,首先会挂起前一个事务,并把前一个事务信息返回,最终封装到SuspendedResourcesHolder中,之后会将 SuspendedResourcesHolder 封装到当前事务对象中。

在封装之前,会清除 TransactionSynchronizationManager 中记载的当前事务的一些信息。

具体的挂起操作,就在 doSuspend() 中:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
public class DataSourceTransactionManager extends AbstractPlatformTransactionManager
		implements ResourceTransactionManager, InitializingBean {
	@Override
	protected Object doSuspend(Object transaction) {
		DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
		txObject.setConnectionHolder(null);
		// 解绑链接资源
		return TransactionSynchronizationManager.unbindResource(obtainDataSource());
	}
}

可以看到,所谓的挂起,就是解绑resource,它存储在一个线程绑定的静态常量中。

来到 unbindResource():

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class DataSourceTransactionManager extends AbstractPlatformTransactionManager
		implements ResourceTransactionManager, InitializingBean {
	private static Object doUnbindResource(Object actualKey) {
        // 从ThreadLocal的map缓存中获取,如果为空,直接返回,如果不为空,则移除绑定关系。
        Map<Object, Object> map = resources.get();
        if (map == null) {
            return null;
        }
        // 这里会解绑当前线程的链接资源。
        Object value = map.remove(actualKey);
        // Remove entire ThreadLocal if empty...
        if (map.isEmpty()) {
            resources.remove();
        }
        // Transparently suppress a ResourceHolder that was marked as void...
        if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
            value = null;
        }
        if (value != null && logger.isTraceEnabled()) {
            logger.trace("Removed value [" + value + "] for key [" + actualKey + "] from thread [" +
                    Thread.currentThread().getName() + "]");
        }
        return value;
    }
}

事务的挂起,实际上就是将 DataSourceTransactionManager 中的静态常量 resources 中存储的值给移除掉。这是一个 ThreadLocal 类型的 map。

其定义是这样的:private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal<>("Transactional resources");

2 事务的提交

在执行完被代理方法后,最终会在 finally 语句块中调用 commitTransactionAfterReturning(),这个方法就完成了事务的提交操作。

源码如下:

1
2
3
4
5
6
7
8
9
public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {
    protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
		if (txInfo != null && txInfo.getTransactionStatus() != null) {
            if (logger.isTraceEnabled()) { logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]"); }
			// 使用事务管理器,提交事务。
			txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
		}
	}
}

其实这个方法中,不一定就是提交事务,如果设置了仅回滚,则会进行回滚操作。

具体逻辑在 commit()中:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {

   @Override
   	public final void commit(TransactionStatus status) throws TransactionException {
   		if (status.isCompleted()) {
   			throw new IllegalTransactionStateException(
   					"Transaction is already completed - do not call commit or rollback more than once per transaction");
   		}
   
   		DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
   		// TODO 如果事务信息中保存的是仅回滚,则进行回滚操作。比如假如到这个事务中的方法抛出异常,
   		//  则不会再那个方法就回滚,而是返回到事务的起点方法后,再进行回滚操作。
   		if (defStatus.isLocalRollbackOnly()) {
   			if (defStatus.isDebug()) {
   				logger.debug("Transactional code has requested rollback");
   			}
   			// 回滚事务
   			processRollback(defStatus, false);
   			return;
   		}
   		// 判断是否仅回滚,主要是看connectionHolder是否设置了仅回滚。
   		if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
   			if (defStatus.isDebug()) {
   				logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
   			}
   			// 回滚事务
   			processRollback(defStatus, true);
   			return;
   		}
   		// TODO 事务提交
   		processCommit(defStatus);
   	}
}

比如 RERUIRED 传播行为,前一个方法已经开启了事务,然后当前方法就会加入到这个已存在的事务中。如果这个内层的方法抛出异常,即使在外层方法中捕获异常,同样会回滚事务。

这个回滚操作并不是在内层方法中做的,在内层方法结束的时候,只会做一个 rollbackOnly 的标记。真正的回滚是在创建事务的起点方法做的。

所以上面代码中,会先判断是否仅回滚,如果是,则直接执行回滚操作。isGlobalRollbackOnly() 的判断于此类似。如果最终判断应该提交事务,则执行 processCommit() 进行事务的提交。

来到 processCommit() 源码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {

	private void processCommit(DefaultTransactionStatus status) throws TransactionException {
		try {
			boolean beforeCompletionInvoked = false;

			try {
				boolean unexpectedRollback = false;
				// 预提交,这是一个钩子方法,点进去可以看到是一个空方法。
				prepareForCommit(status);
				//  TransactionSynchronization的扩展点可在事务方法中向TransactionSynchronizationManager注入回调。不同的方法回调时机不一样。
				// 也可以使用 @TransactionalEventListener,其原理也是注入 TransactionSynchronization 回调。
				// 事务同步回调接口TransactionSynchronization.beforeCommit()的调用。
				triggerBeforeCommit(status);
				// 事务同步回调接口TransactionSynchronization.beforeCompletion()的调用。
				triggerBeforeCompletion(status);
				beforeCompletionInvoked = true;

				// TODO 是否有回滚点(保存点),如果是设置了回滚点,则仅仅吧回滚点抹除即可。真正的提交是在最外层事务提交做的。
				//  这也是NESTED传播行为,在外围方法事务异常时,所有嵌套事务全部回滚的原因所在。
				if (status.hasSavepoint()) {
					if (status.isDebug()) {
						logger.debug("Releasing transaction savepoint");
					}
					unexpectedRollback = status.isGlobalRollbackOnly();
					status.releaseHeldSavepoint();
				} else if (status.isNewTransaction()) {
					if (status.isDebug()) {
						logger.debug("Initiating transaction commit");
					}
					// TODO 事务提交
					unexpectedRollback = status.isGlobalRollbackOnly();
					doCommit(status);
				}else if (isFailEarlyOnGlobalRollbackOnly()) {
					unexpectedRollback = status.isGlobalRollbackOnly();
				}
			    // 省略...
			}// 省略... 这里会有一些catch,对 TransactionSynchronization 扩展点的调用。
			finally {
				// TransactionSynchronization 扩展
				triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
			}

		}
		finally {
			cleanupAfterCompletion(status);
		}
	}
}
  1. 在 processCommit() 中,会先进行一些扩展钩子方法的回调。这里使用了策略模式,会对 synchronizations 遍历,里面封装了所有注册的 TransactionSynchronization,遍历去调用对应的方法。
  2. 如果设置了保存点,则将保存点抹除。在 NESTED 传播行为中,它的实现就是使用的保存点(savePoint),NESTED 传播行为并不会新启一个事务,而是标记一个 savePoint,如果这个方法异常,就会回滚至 savePoint处。
  3. 如果是一个事务创建的起点,就会执行提交操作。

在 doCommit() 中进行了事务的提交:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
public class DataSourceTransactionManager extends AbstractPlatformTransactionManager
		implements ResourceTransactionManager, InitializingBean {
    @Override
	protected void doCommit(DefaultTransactionStatus status) {
		// 获取到事务的链接对象,提交事务
		DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
		Connection con = txObject.getConnectionHolder().getConnection();
		con.commit();
	}
}

从上面源码可以看到,拿到事务对象,从事务对象中拿到 ConnectionHolder,在拿到 Connection,使用 Connection 调用 commit() 进行事务的提交。

3 事务的回滚

在调用业务方法抛出异常时,事务的通知方法中,会捕获这个异常。然后对事务进行回滚操作,之后将异常再次抛出。

事务的回滚入口方法在 completeTransactionAfterThrowing():

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {

    protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
		// 如果存在事务状态对象
		if (txInfo != null && txInfo.getTransactionStatus() != null) {
			// 判断事务属性不为空并且满足回滚规则,就进行回滚,否则进行事务提交
			if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
					// TODO 重点:具体的回滚代码
                txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
				// 省略 try catch 块...
			}else {
                // 如果抛出的异常,与回滚的异常定义不匹配,则提交事务。也就是说,抛出异常了,也不一定回滚。它存在一个异常类型匹配。
                txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
			    // 这里省略了 try catch 块...
			}
		}
	}
}

首先判断是否存在事务,如果存在事务,则判断,抛出的异常是否符合是需要回滚的异常。

具体哪些异常回滚是可以设置的,如果不设置,默认是 RuntimeException 和 Error。

简单看下 rollBackOn():

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class RuleBasedTransactionAttribute extends DefaultTransactionAttribute implements Serializable {

	@Override
	public boolean rollbackOn(Throwable ex) {
		RollbackRuleAttribute winner = null;
		int deepest = Integer.MAX_VALUE;

		// 判断回滚规则
		if (this.rollbackRules != null) {
			for (RollbackRuleAttribute rule : this.rollbackRules) {
				// 递归调用异常的父类,如果异常名称中包含规则中封装的异常名称,则返回。
				int depth = rule.getDepth(ex);
				// 如果不满足规则,会返回-1,如果满足了规则,会把这个规则赋值到winner临时变量中。
				if (depth >= 0 && depth < deepest) {
					deepest = depth;
					winner = rule;
				}
			}
		}

		if (logger.isTraceEnabled()) {
			logger.trace("Winning rollback rule is: " + winner);
		}

		// 如果没有配置回滚异常或不回滚异常,则会走默认的。默认回滚运行时异常及Error:
		// (ex instanceof RuntimeException || ex instanceof Error);
		// User superclass behavior (rollback on unchecked) if no rule matches.
		if (winner == null) {
			logger.trace("No relevant rollback rule found: applying default rules");
			return super.rollbackOn(ex);
		}
		// 判断规则如果是不回滚规则,则返回false,表示不需要回滚。
		// 判断规则如果不是不回滚规则,返回true,表示需要回滚
		return !(winner instanceof NoRollbackRuleAttribute);
	}
}

如果事务声明中配置了需要回滚的异常,就递归的匹配异常,获取其 RollbackRuleAttribute,如果获取的到,返回这个规则不属于不回滚的异常规则 true/fasle。

如果匹配不到,就使用默认的回滚规则,默认回滚规则在 super.rollbackOn(ex); 中定义了。

1
2
3
4
5
6
7
public class DefaultTransactionAttribute extends DefaultTransactionDefinition implements TransactionAttribute {

    @Override
	public boolean rollbackOn(Throwable ex) {
		return (ex instanceof RuntimeException || ex instanceof Error);
	}
}

回到 completeTransactionAfterThrowing() 中,接着看具体的回滚逻辑 rollback(),在这里会调用到 processRollback()。

源码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {

    private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
        try {
            boolean unexpectedRollback = unexpected;

            try {
                // 如果是一个新建的事务,即到了事务的起点。
                // 在回滚之前会循环调用TransactionSynchronization的beforeCompletion方法,这里是一个回调函数
                triggerBeforeCompletion(status);

                // 有保存点(或者叫回滚点),回滚到保存点位置
                if (status.hasSavepoint()) {
                    if (status.isDebug()) {
                        logger.debug("Rolling back transaction to savepoint");
                    }
                    status.rollbackToHeldSavepoint();
                }// 如果是一个新建的事务,即回到了事务的起点。
                else if (status.isNewTransaction()) {
                    if (status.isDebug()) {
                        logger.debug("Initiating transaction rollback");
                    }
                    // 从当前事务状态对象中获取到链接对象,进行回滚操作
                    doRollback(status);
                }
                else {
                    // 如果是加入的已有事务,则将事务状态设置为仅回滚 rollbackOnly=true,最终返回到事务的起点时,是否向外抛出异常都会回滚。
                    // 所以在 REQUIRED 传播行为中,任何一个加入事务的方法异常,都会触发回滚。
                    // Participating in larger transaction
                    if (status.hasTransaction()) {
                        if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
                            if (status.isDebug()) {
                                logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
                            }
                            doSetRollbackOnly(status);
                        }
                        else {
                            if (status.isDebug()) {
                                logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
                            }
                        }
                    }
                    else {
                        logger.debug("Should roll back transaction but cannot - no transaction available");
                    }
                    // Unexpected rollback only matters here if we're asked to fail early
                    if (!isFailEarlyOnGlobalRollbackOnly()) {
                        unexpectedRollback = false;
                    }
                }
            }
            catch (RuntimeException | Error ex) {
                // TransactionSynchronization 扩展点
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
                throw ex;
            }
            // 事务提交后,获取事务同步器进行循环调用,TransactionSynchronization.afterCompletion
            triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);

            // Raise UnexpectedRollbackException if we had a global rollback-only marker
            if (unexpectedRollback) {
                throw new UnexpectedRollbackException(
                        "Transaction rolled back because it has been marked as rollback-only");
            }
        }
        finally {
            cleanupAfterCompletion(status);
        }
    }  
}

流程如下:

  1. 首先执行已注册的 TransactionSynchronization 的一些回调方法。
  2. 如果当前事务中有保存点(savePoint),就回滚至保存点。最终会通过 JDBC 来进行回滚。
  3. 如果当前事务是一个事务创建的起点,就回滚事务。
  4. 执行一些已注册的 TransactionSynchronization 进行后置处理。

doRollback() 源码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
    @Override
	protected void doRollback(DefaultTransactionStatus status) {
		// 从事务状态对象中获取当前持有的链接,进行回滚操作
		DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
		Connection con = txObject.getConnectionHolder().getConnection();
		con.rollback();
		// 省略 try catch 块...
	}
}

最终会调用 con.rollback() 进行事务的回滚。

而通过回滚点回滚,最终会调用到这里:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
public abstract class JdbcTransactionObjectSupport implements SavepointManager, SmartTransactionObject {

    @Override
	public void rollbackToSavepoint(Object savepoint) throws TransactionException {
		ConnectionHolder conHolder = getConnectionHolderForSavepoint();
        conHolder.getConnection().rollback((Savepoint) savepoint);
        conHolder.resetRollbackOnly();
		// 省略 try catch 块...
	}
}

4 数据库连接的释放

无论是事务提交或是回滚,之后,都会调用一个方法 triggerAfterCompletion(),在这个方法中,就完成了数据库连接的释放。

在真正释放之前,同样也会调用已注册的 TransactionSynchronization 的一个扩展方法 resume()。

连接的释放,实际上就是通过 TransactionSynchronization 的扩展方法来实现的,这个方法是 afterCompletion()。

在 triggerAfterCompletion() 中,调用了 invokeAfterCompletion()。在这里就遍历了 synchronizations,进行 afterCompletion() 的调用。

源码如下(省略了try catch 代码块):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
public abstract class TransactionSynchronizationUtils {
    public static void invokeAfterCompletion(@Nullable List<TransactionSynchronization> synchronizations,
			int completionStatus) {
		if (synchronizations != null) {
			for (TransactionSynchronization synchronization : synchronizations) {
				synchronization.afterCompletion(completionStatus);
			}
		}
	}
}

在 afterCompletion() 中经历了以下调用链:

releaseConnection() -> doReleaseConnection() -> doCloseConnection() -> con.close()

以 druid 连接池为例,最终会调用到 recycle(),在recycle中,有这么一段代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class DruidDataSource extends DruidAbstractDataSource implements DruidDataSourceMBean, ManagedDataSource, Referenceable, Closeable, Cloneable, ConnectionPoolDataSource, MBeanRegistration {
	private volatile DruidConnectionHolder[] connections;
    // 步骤一
    protected void recycle(DruidPooledConnection pooledConnection) throws SQLException {
        // 省略无关代码...
        lock.lock();
        try {
            if (holder.active) {
                activeCount--;
                holder.active = false;
            }
            closeCount++;
            // 这个方法,将连接归还。
            result = putLast(holder, currentTimeMillis);
            recycleCount++;
        } finally {
            lock.unlock();
        }
        //省略无关代码...
    }

    // 步骤二
    boolean putLast(DruidConnectionHolder e, long lastActiveTimeMillis) {
        if (poolingCount >= maxActive || e.discard) {
            return false;
        }

        e.lastActiveTimeMillis = lastActiveTimeMillis;
        // 这里,将连接归还到数组的指定索引处。
        connections[poolingCount] = e;
        incrementPoolingCount();

        if (poolingCount > poolingPeak) {
            poolingPeak = poolingCount;
            poolingPeakTime = lastActiveTimeMillis;
        }

        notEmpty.signal();
        notEmptySignalCount++;

        return true;
    }
}

从上面代码中可以看到,在 result = putLast(holder, currentTimeMillis); 中,就将连接进行回收了。