spring源码系列文章,示例代码的中文注释,均是 copy 自 https://gitee.com/wlizhi/spring-framework 。
链接中源码是作者从 github 下载,并以自身理解对核心流程及主要节点做了详细的中文注释。
1 事务的挂起
事务的挂起,调用的是 suspend()。
以下是 suspend() 源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
if (TransactionSynchronizationManager.isSynchronizationActive()) {
List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
try {
Object suspendedResources = null;
if (transaction != null) {
// TODO 重点:如果存在事务,则挂起事务,其实就是从ThreadLocal变量中解绑连接。
suspendedResources = doSuspend(transaction);
}
// 清楚TransactionSynchronizationManager中的当前事务的一些信息。将旧的事务对象信息封装到挂起对象中,最终会绑定到当前事务对象。
String name = TransactionSynchronizationManager.getCurrentTransactionName();
TransactionSynchronizationManager.setCurrentTransactionName(null);
boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
TransactionSynchronizationManager.setActualTransactionActive(false);
// 返回一个挂起资源持有对象,这个对象中封装了当前已存在事务的具体信息。这个对象会存储到新开启事务的TransactionStatus中。
return new SuspendedResourcesHolder(
suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
}
catch (RuntimeException | Error ex) {
// doSuspend failed - original transaction is still active...
doResumeSynchronization(suspendedSynchronizations);
throw ex;
}
}
else if (transaction != null) {
// Transaction active but no synchronization active.
Object suspendedResources = doSuspend(transaction);
return new SuspendedResourcesHolder(suspendedResources);
}
else {
// Neither transaction nor synchronization active.
return null;
}
}
}
|
这个方法中,首先会挂起前一个事务,并把前一个事务信息返回,最终封装到SuspendedResourcesHolder中,之后会将 SuspendedResourcesHolder 封装到当前事务对象中。
在封装之前,会清除 TransactionSynchronizationManager 中记载的当前事务的一些信息。
具体的挂起操作,就在 doSuspend() 中:
1
2
3
4
5
6
7
8
9
10
|
public class DataSourceTransactionManager extends AbstractPlatformTransactionManager
implements ResourceTransactionManager, InitializingBean {
@Override
protected Object doSuspend(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
txObject.setConnectionHolder(null);
// 解绑链接资源
return TransactionSynchronizationManager.unbindResource(obtainDataSource());
}
}
|
可以看到,所谓的挂起,就是解绑resource,它存储在一个线程绑定的静态常量中。
来到 unbindResource():
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
public class DataSourceTransactionManager extends AbstractPlatformTransactionManager
implements ResourceTransactionManager, InitializingBean {
private static Object doUnbindResource(Object actualKey) {
// 从ThreadLocal的map缓存中获取,如果为空,直接返回,如果不为空,则移除绑定关系。
Map<Object, Object> map = resources.get();
if (map == null) {
return null;
}
// 这里会解绑当前线程的链接资源。
Object value = map.remove(actualKey);
// Remove entire ThreadLocal if empty...
if (map.isEmpty()) {
resources.remove();
}
// Transparently suppress a ResourceHolder that was marked as void...
if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
value = null;
}
if (value != null && logger.isTraceEnabled()) {
logger.trace("Removed value [" + value + "] for key [" + actualKey + "] from thread [" +
Thread.currentThread().getName() + "]");
}
return value;
}
}
|
事务的挂起,实际上就是将 DataSourceTransactionManager 中的静态常量 resources 中存储的值给移除掉。这是一个 ThreadLocal 类型的 map。
其定义是这样的:private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal<>("Transactional resources");
2 事务的提交
在执行完被代理方法后,最终会在 finally 语句块中调用 commitTransactionAfterReturning(),这个方法就完成了事务的提交操作。
源码如下:
1
2
3
4
5
6
7
8
9
|
public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {
protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) { logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]"); }
// 使用事务管理器,提交事务。
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}
}
|
其实这个方法中,不一定就是提交事务,如果设置了仅回滚,则会进行回滚操作。
具体逻辑在 commit()中:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
@Override
public final void commit(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
// TODO 如果事务信息中保存的是仅回滚,则进行回滚操作。比如假如到这个事务中的方法抛出异常,
// 则不会再那个方法就回滚,而是返回到事务的起点方法后,再进行回滚操作。
if (defStatus.isLocalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Transactional code has requested rollback");
}
// 回滚事务
processRollback(defStatus, false);
return;
}
// 判断是否仅回滚,主要是看connectionHolder是否设置了仅回滚。
if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
}
// 回滚事务
processRollback(defStatus, true);
return;
}
// TODO 事务提交
processCommit(defStatus);
}
}
|
比如 RERUIRED 传播行为,前一个方法已经开启了事务,然后当前方法就会加入到这个已存在的事务中。如果这个内层的方法抛出异常,即使在外层方法中捕获异常,同样会回滚事务。
这个回滚操作并不是在内层方法中做的,在内层方法结束的时候,只会做一个 rollbackOnly 的标记。真正的回滚是在创建事务的起点方法做的。
所以上面代码中,会先判断是否仅回滚,如果是,则直接执行回滚操作。isGlobalRollbackOnly() 的判断于此类似。如果最终判断应该提交事务,则执行 processCommit() 进行事务的提交。
来到 processCommit() 源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
boolean unexpectedRollback = false;
// 预提交,这是一个钩子方法,点进去可以看到是一个空方法。
prepareForCommit(status);
// TransactionSynchronization的扩展点可在事务方法中向TransactionSynchronizationManager注入回调。不同的方法回调时机不一样。
// 也可以使用 @TransactionalEventListener,其原理也是注入 TransactionSynchronization 回调。
// 事务同步回调接口TransactionSynchronization.beforeCommit()的调用。
triggerBeforeCommit(status);
// 事务同步回调接口TransactionSynchronization.beforeCompletion()的调用。
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
// TODO 是否有回滚点(保存点),如果是设置了回滚点,则仅仅吧回滚点抹除即可。真正的提交是在最外层事务提交做的。
// 这也是NESTED传播行为,在外围方法事务异常时,所有嵌套事务全部回滚的原因所在。
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Releasing transaction savepoint");
}
unexpectedRollback = status.isGlobalRollbackOnly();
status.releaseHeldSavepoint();
} else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction commit");
}
// TODO 事务提交
unexpectedRollback = status.isGlobalRollbackOnly();
doCommit(status);
}else if (isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = status.isGlobalRollbackOnly();
}
// 省略...
}// 省略... 这里会有一些catch,对 TransactionSynchronization 扩展点的调用。
finally {
// TransactionSynchronization 扩展
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}
}
finally {
cleanupAfterCompletion(status);
}
}
}
|
- 在 processCommit() 中,会先进行一些扩展钩子方法的回调。这里使用了策略模式,会对 synchronizations 遍历,里面封装了所有注册的 TransactionSynchronization,遍历去调用对应的方法。
- 如果设置了保存点,则将保存点抹除。在 NESTED 传播行为中,它的实现就是使用的保存点(savePoint),NESTED 传播行为并不会新启一个事务,而是标记一个 savePoint,如果这个方法异常,就会回滚至 savePoint处。
- 如果是一个事务创建的起点,就会执行提交操作。
在 doCommit() 中进行了事务的提交:
1
2
3
4
5
6
7
8
9
10
|
public class DataSourceTransactionManager extends AbstractPlatformTransactionManager
implements ResourceTransactionManager, InitializingBean {
@Override
protected void doCommit(DefaultTransactionStatus status) {
// 获取到事务的链接对象,提交事务
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
con.commit();
}
}
|
从上面源码可以看到,拿到事务对象,从事务对象中拿到 ConnectionHolder,在拿到 Connection,使用 Connection 调用 commit() 进行事务的提交。
3 事务的回滚
在调用业务方法抛出异常时,事务的通知方法中,会捕获这个异常。然后对事务进行回滚操作,之后将异常再次抛出。
事务的回滚入口方法在 completeTransactionAfterThrowing():
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {
protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
// 如果存在事务状态对象
if (txInfo != null && txInfo.getTransactionStatus() != null) {
// 判断事务属性不为空并且满足回滚规则,就进行回滚,否则进行事务提交
if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
// TODO 重点:具体的回滚代码
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
// 省略 try catch 块...
}else {
// 如果抛出的异常,与回滚的异常定义不匹配,则提交事务。也就是说,抛出异常了,也不一定回滚。它存在一个异常类型匹配。
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
// 这里省略了 try catch 块...
}
}
}
}
|
首先判断是否存在事务,如果存在事务,则判断,抛出的异常是否符合是需要回滚的异常。
具体哪些异常回滚是可以设置的,如果不设置,默认是 RuntimeException 和 Error。
简单看下 rollBackOn():
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
public class RuleBasedTransactionAttribute extends DefaultTransactionAttribute implements Serializable {
@Override
public boolean rollbackOn(Throwable ex) {
RollbackRuleAttribute winner = null;
int deepest = Integer.MAX_VALUE;
// 判断回滚规则
if (this.rollbackRules != null) {
for (RollbackRuleAttribute rule : this.rollbackRules) {
// 递归调用异常的父类,如果异常名称中包含规则中封装的异常名称,则返回。
int depth = rule.getDepth(ex);
// 如果不满足规则,会返回-1,如果满足了规则,会把这个规则赋值到winner临时变量中。
if (depth >= 0 && depth < deepest) {
deepest = depth;
winner = rule;
}
}
}
if (logger.isTraceEnabled()) {
logger.trace("Winning rollback rule is: " + winner);
}
// 如果没有配置回滚异常或不回滚异常,则会走默认的。默认回滚运行时异常及Error:
// (ex instanceof RuntimeException || ex instanceof Error);
// User superclass behavior (rollback on unchecked) if no rule matches.
if (winner == null) {
logger.trace("No relevant rollback rule found: applying default rules");
return super.rollbackOn(ex);
}
// 判断规则如果是不回滚规则,则返回false,表示不需要回滚。
// 判断规则如果不是不回滚规则,返回true,表示需要回滚
return !(winner instanceof NoRollbackRuleAttribute);
}
}
|
如果事务声明中配置了需要回滚的异常,就递归的匹配异常,获取其 RollbackRuleAttribute,如果获取的到,返回这个规则不属于不回滚的异常规则 true/fasle。
如果匹配不到,就使用默认的回滚规则,默认回滚规则在 super.rollbackOn(ex);
中定义了。
1
2
3
4
5
6
7
|
public class DefaultTransactionAttribute extends DefaultTransactionDefinition implements TransactionAttribute {
@Override
public boolean rollbackOn(Throwable ex) {
return (ex instanceof RuntimeException || ex instanceof Error);
}
}
|
回到 completeTransactionAfterThrowing() 中,接着看具体的回滚逻辑 rollback(),在这里会调用到 processRollback()。
源码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
|
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
try {
boolean unexpectedRollback = unexpected;
try {
// 如果是一个新建的事务,即到了事务的起点。
// 在回滚之前会循环调用TransactionSynchronization的beforeCompletion方法,这里是一个回调函数
triggerBeforeCompletion(status);
// 有保存点(或者叫回滚点),回滚到保存点位置
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Rolling back transaction to savepoint");
}
status.rollbackToHeldSavepoint();
}// 如果是一个新建的事务,即回到了事务的起点。
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction rollback");
}
// 从当前事务状态对象中获取到链接对象,进行回滚操作
doRollback(status);
}
else {
// 如果是加入的已有事务,则将事务状态设置为仅回滚 rollbackOnly=true,最终返回到事务的起点时,是否向外抛出异常都会回滚。
// 所以在 REQUIRED 传播行为中,任何一个加入事务的方法异常,都会触发回滚。
// Participating in larger transaction
if (status.hasTransaction()) {
if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
if (status.isDebug()) {
logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
}
doSetRollbackOnly(status);
}
else {
if (status.isDebug()) {
logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
}
}
}
else {
logger.debug("Should roll back transaction but cannot - no transaction available");
}
// Unexpected rollback only matters here if we're asked to fail early
if (!isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = false;
}
}
}
catch (RuntimeException | Error ex) {
// TransactionSynchronization 扩展点
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw ex;
}
// 事务提交后,获取事务同步器进行循环调用,TransactionSynchronization.afterCompletion
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
// Raise UnexpectedRollbackException if we had a global rollback-only marker
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction rolled back because it has been marked as rollback-only");
}
}
finally {
cleanupAfterCompletion(status);
}
}
}
|
流程如下:
- 首先执行已注册的 TransactionSynchronization 的一些回调方法。
- 如果当前事务中有保存点(savePoint),就回滚至保存点。最终会通过 JDBC 来进行回滚。
- 如果当前事务是一个事务创建的起点,就回滚事务。
- 执行一些已注册的 TransactionSynchronization 进行后置处理。
doRollback() 源码如下:
1
2
3
4
5
6
7
8
9
10
|
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
@Override
protected void doRollback(DefaultTransactionStatus status) {
// 从事务状态对象中获取当前持有的链接,进行回滚操作
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
con.rollback();
// 省略 try catch 块...
}
}
|
最终会调用 con.rollback()
进行事务的回滚。
而通过回滚点回滚,最终会调用到这里:
1
2
3
4
5
6
7
8
9
10
|
public abstract class JdbcTransactionObjectSupport implements SavepointManager, SmartTransactionObject {
@Override
public void rollbackToSavepoint(Object savepoint) throws TransactionException {
ConnectionHolder conHolder = getConnectionHolderForSavepoint();
conHolder.getConnection().rollback((Savepoint) savepoint);
conHolder.resetRollbackOnly();
// 省略 try catch 块...
}
}
|
4 数据库连接的释放
无论是事务提交或是回滚,之后,都会调用一个方法 triggerAfterCompletion(),在这个方法中,就完成了数据库连接的释放。
在真正释放之前,同样也会调用已注册的 TransactionSynchronization 的一个扩展方法 resume()。
连接的释放,实际上就是通过 TransactionSynchronization 的扩展方法来实现的,这个方法是 afterCompletion()。
在 triggerAfterCompletion() 中,调用了 invokeAfterCompletion()。在这里就遍历了 synchronizations,进行 afterCompletion() 的调用。
源码如下(省略了try catch 代码块):
1
2
3
4
5
6
7
8
9
10
|
public abstract class TransactionSynchronizationUtils {
public static void invokeAfterCompletion(@Nullable List<TransactionSynchronization> synchronizations,
int completionStatus) {
if (synchronizations != null) {
for (TransactionSynchronization synchronization : synchronizations) {
synchronization.afterCompletion(completionStatus);
}
}
}
}
|
在 afterCompletion() 中经历了以下调用链:
releaseConnection() -> doReleaseConnection() -> doCloseConnection() -> con.close()
以 druid 连接池为例,最终会调用到 recycle(),在recycle中,有这么一段代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
public class DruidDataSource extends DruidAbstractDataSource implements DruidDataSourceMBean, ManagedDataSource, Referenceable, Closeable, Cloneable, ConnectionPoolDataSource, MBeanRegistration {
private volatile DruidConnectionHolder[] connections;
// 步骤一
protected void recycle(DruidPooledConnection pooledConnection) throws SQLException {
// 省略无关代码...
lock.lock();
try {
if (holder.active) {
activeCount--;
holder.active = false;
}
closeCount++;
// 这个方法,将连接归还。
result = putLast(holder, currentTimeMillis);
recycleCount++;
} finally {
lock.unlock();
}
//省略无关代码...
}
// 步骤二
boolean putLast(DruidConnectionHolder e, long lastActiveTimeMillis) {
if (poolingCount >= maxActive || e.discard) {
return false;
}
e.lastActiveTimeMillis = lastActiveTimeMillis;
// 这里,将连接归还到数组的指定索引处。
connections[poolingCount] = e;
incrementPoolingCount();
if (poolingCount > poolingPeak) {
poolingPeak = poolingCount;
poolingPeakTime = lastActiveTimeMillis;
}
notEmpty.signal();
notEmptySignalCount++;
return true;
}
}
|
从上面代码中可以看到,在 result = putLast(holder, currentTimeMillis);
中,就将连接进行回收了。