警告
本文最后更新于 2020-12-07,文中内容可能已过时。
spring源码系列文章,示例代码的中文注释,均是 copy 自 https://gitee.com/wlizhi/spring-framework 。
链接中源码是作者从 github 下载,并以自身理解对核心流程及主要节点做了详细的中文注释。
1 编程式设置扩展操作
在spring声明式事务中,事务提交前、以及提交后都有相应的策略模式去调用一些扩展方法。在开发过程中,如果我们有这样的需求,在一个事务提交后做一些操作,就可以用到这个功能。
示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Slf4j
@Service
public class TransactionSourceExtentionServiceImpl implements TransactionSourceExtentionService {
@Transactional
@Override
public int addUserRequiredWithTransactionSynchronization ( PropagationUser user ) {
userDao . insertSelective ( user );
TransactionSynchronizationManager . registerSynchronization ( new TransactionSynchronization () {
@Override
public void afterCommit () {
log . info ( "事务成功提交了" );
}
});
return 1 ;
}
}
上面的示例代码,在新增用户后,做一些自定义的操作,比如用户注册通知等。
我们可以在方法结尾,使用 TransactionSynchronizationManager.registerSynchronization
,去注入一个事件,通过重写相应的方法,可以在事务提交流程的不同节点插入自定义的操作。
如以上代码,是在事务提交后,做的一个自定义操作。它会将这个事件对象,存储到一个线程绑定的变量中。在事务提交完成后,去获取这个对象,然后调用afterCommit()。
看一下这个方法的源码:
1
2
3
4
5
6
7
8
9
public abstract class TransactionSynchronizationManager {
private static final ThreadLocal < Set < TransactionSynchronization >> synchronizations = new NamedThreadLocal <> ( "Transaction synchronizations" );
public static void registerSynchronization ( TransactionSynchronization synchronization )
throws IllegalStateException {
// 注册事务事件,后面再各个节点会调用到事件中的回调方法
// 省略无关代码...
synchronizations . get (). add ( synchronization );
}
}
在事务的通知方法中,执行完提交操作后,最终会调用到 invokeAfterCommit(),源码如下 :
1
2
3
4
5
6
7
8
9
10
public abstract class TransactionSynchronizationUtils {
public static void invokeAfterCommit ( @Nullable List < TransactionSynchronization > synchronizations ) {
if ( synchronizations != null ) {
for ( TransactionSynchronization synchronization : synchronizations ) {
synchronization . afterCommit ();
}
}
}
}
在我们注册的时候,像synchronizations中添加了我们自定义的事件,这个线程绑定的静态常量,是一个 Set 集合,也就是说,这个事件是可以注册多个的。
然后在事务提交后,遍历 synchronizations,并调用每一个事件的 afterCommit();
从源码中可以看到,这个过程是一个同步操作,在事务提交后插入操作,只是不影响事务的提交以及连接的归还,对业务方法的返回,还是会有一些影响的,所以这里一般会改为异步操作,通过本地线程池或者发送到mq来进行异步处理。
2 注解方式设置扩展操作
示例代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Slf4j
@Service
public class TransactionSourceExtentionServiceImpl implements TransactionSourceExtentionService {
@Autowired
PropagationUserDao userDao ;
@Autowired
ApplicationEventPublisher eventPublisher ;
@Transactional
@Override
public int addUserRequiredWithApplicationEventPublisher2 ( PropagationUser user ) {
userDao . insertSelective ( user );
eventPublisher . publishEvent ( new UserAddEvent ( user ));
return 1 ;
}
static class UserAddEvent extends ApplicationEvent {
public UserAddEvent ( PropagationUser user ) {
super ( user );
}
}
@Component
static class UserEventListener {
@TransactionalEventListener ( phase = TransactionPhase . AFTER_COMMIT )
public void processUserAddEvent ( UserAddEvent user ) {
log . info ( "事务提交后用户信息处理,用户信息:{}" , user . getSource ());
}
}
}
在实例中注入 ApplicationEventPublisher,通过调用这个实例的 publishEvent(),来发布事件。
在另一个方法中,使用 @TransactionalEventListener 注解到方法上来接收这个事件,将事件注入到方法参数中,然后在方法中定义业务处理。
实际上,它和TransactionSynchronizationManager.registerSynchronization()
本质上是一样的,因为这个注解方式,底层就是基于 registerSynchronization() 来实现的。
在 publishEvent() 源码中经历了以下调用链:
publishEvent() -> multicastEvent() -> invokeListener() -> doInvokeListener() -> onApplicationEvent()
onApplicationEvent() 是 ApplicationListenerMethodTransactionalAdapter 中的一个方法,源码如下:
1
2
3
4
5
6
7
8
9
10
class ApplicationListenerMethodTransactionalAdapter extends ApplicationListenerMethodAdapter {
@Override
public void onApplicationEvent ( ApplicationEvent event ) {
if ( TransactionSynchronizationManager . isSynchronizationActive ()) {
TransactionSynchronization transactionSynchronization = createTransactionSynchronization ( event );
// 在这里注册了事件
TransactionSynchronizationManager . registerSynchronization ( transactionSynchronization );
} // 省略无关代码...
}
}
从源码中可以看到,它最终也是通过调用 registerSynchronization()
来进行事件的注入的。
匹配监听器的过程源码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public abstract class AbstractApplicationEventMulticaster
implements ApplicationEventMulticaster , BeanClassLoaderAware , BeanFactoryAware {
private Collection < ApplicationListener <?>> retrieveApplicationListeners (
ResolvableType eventType , @Nullable Class <?> sourceType , @Nullable ListenerRetriever retriever ) {
List < ApplicationListener <?>> allListeners = new ArrayList <> ();
Set < ApplicationListener <?>> listeners ;
Set < String > listenerBeans ;
// 这里会获取到容器中所有已注册的监听器,进行匹配,匹配到之后。
synchronized ( this . retrievalMutex ) {
listeners = new LinkedHashSet <> ( this . defaultRetriever . applicationListeners );
listenerBeans = new LinkedHashSet <> ( this . defaultRetriever . applicationListenerBeans );
}
for ( ApplicationListener <?> listener : listeners ) {
// 关键点在这里,supportsEvent() 中判断了是否支持这个事件
if ( supportsEvent ( listener , eventType , sourceType )) {
if ( retriever != null ) {
retriever . applicationListeners . add ( listener );
}
allListeners . add ( listener );
}
}
// 省略...
return allListeners ;
}
}
在上面源码中,获取到容器中所有的监听器,然后遍历,判断是否支持当前事件,如果支持,则将其添加到列表,最终经过一些处理后返回。
其实在外层方法调用这里的时候,是做了缓存的。只有第一次调用的时候,会走到这里,进行遍历并判断。之后再获取,都是从缓存中获取的。
再深入看的话,可以看一下 TransactionSynchronizationEventAdapter类,实际上注册的实例就是这个类型的。
源码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
private static class TransactionSynchronizationEventAdapter extends TransactionSynchronizationAdapter {
private final ApplicationListenerMethodAdapter listener ;
private final ApplicationEvent event ;
private final TransactionPhase phase ;
public TransactionSynchronizationEventAdapter ( ApplicationListenerMethodAdapter listener ,
ApplicationEvent event , TransactionPhase phase ) {
this . listener = listener ;
this . event = event ;
this . phase = phase ;
}
@Override
public int getOrder () {
return this . listener . getOrder ();
}
@Override
public void beforeCommit ( boolean readOnly ) {
if ( this . phase == TransactionPhase . BEFORE_COMMIT ) {
processEvent ();
}
}
@Override
public void afterCompletion ( int status ) {
if ( this . phase == TransactionPhase . AFTER_COMMIT && status == STATUS_COMMITTED ) {
processEvent ();
}
else if ( this . phase == TransactionPhase . AFTER_ROLLBACK && status == STATUS_ROLLED_BACK ) {
processEvent ();
}
else if ( this . phase == TransactionPhase . AFTER_COMPLETION ) {
processEvent ();
}
}
protected void processEvent () {
// 总是会调用到这里,这里其实就是我们加了@TransactionalEventListener注解的方法调用
this . listener . processEvent ( this . event );
}
}
可以看到,这个 TransactionSynchronization 实现类里面,定义了一个执行骨架,它最终会在合适的节点调用 processEvent(),这个方法就是我们使用 @TransactionalEventListener 注解的方法。
这个方法定义在它的父类里,源码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class ApplicationListenerMethodAdapter implements GenericApplicationListener {
public void processEvent ( ApplicationEvent event ) {
Object [] args = resolveArguments ( event );
if ( shouldHandle ( event , args )) {
// 这里调用了我们定义的处理方法
Object result = doInvoke ( args );
if ( result != null ) {
handleResult ( result );
}
else {
logger . trace ( "No result object given - no result to handle" );
}
}
}
}
继续跟进源码,到 doInvoke() 中:
1
2
3
4
5
6
7
8
9
10
public class ApplicationListenerMethodAdapter implements GenericApplicationListener {
@Nullable
protected Object doInvoke ( Object ... args ) {
Object bean = getTargetBean ();
ReflectionUtils . makeAccessible ( this . method );
return this . method . invoke ( bean , args );
// 省略 try catch 代码块
}
}
最终通过反射,调用我们自己定义的处理方法。