spring源码系列文章,示例代码的中文注释,均是 copy 自 https://gitee.com/wlizhi/spring-framework 。
链接中源码是作者从 github 下载,并以自身理解对核心流程及主要节点做了详细的中文注释。
1 编程式设置扩展操作
在spring声明式事务中,事务提交前、以及提交后都有相应的策略模式去调用一些扩展方法。在开发过程中,如果我们有这样的需求,在一个事务提交后做一些操作,就可以用到这个功能。
示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
@Slf4j
@Service
public class TransactionSourceExtentionServiceImpl implements TransactionSourceExtentionService {
@Transactional
@Override
public int addUserRequiredWithTransactionSynchronization(PropagationUser user) {
userDao.insertSelective(user);
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCommit() {
log.info("事务成功提交了");
}
});
return 1;
}
}
|
上面的示例代码,在新增用户后,做一些自定义的操作,比如用户注册通知等。
我们可以在方法结尾,使用 TransactionSynchronizationManager.registerSynchronization
,去注入一个事件,通过重写相应的方法,可以在事务提交流程的不同节点插入自定义的操作。
如以上代码,是在事务提交后,做的一个自定义操作。它会将这个事件对象,存储到一个线程绑定的变量中。在事务提交完成后,去获取这个对象,然后调用afterCommit()。
看一下这个方法的源码:
1
2
3
4
5
6
7
8
9
|
public abstract class TransactionSynchronizationManager {
private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations = new NamedThreadLocal<>("Transaction synchronizations");
public static void registerSynchronization(TransactionSynchronization synchronization)
throws IllegalStateException {
// 注册事务事件,后面再各个节点会调用到事件中的回调方法
// 省略无关代码...
synchronizations.get().add(synchronization);
}
}
|
在事务的通知方法中,执行完提交操作后,最终会调用到 invokeAfterCommit(),源码如下 :
1
2
3
4
5
6
7
8
9
10
|
public abstract class TransactionSynchronizationUtils {
public static void invokeAfterCommit(@Nullable List<TransactionSynchronization> synchronizations) {
if (synchronizations != null) {
for (TransactionSynchronization synchronization : synchronizations) {
synchronization.afterCommit();
}
}
}
}
|
在我们注册的时候,像synchronizations中添加了我们自定义的事件,这个线程绑定的静态常量,是一个 Set 集合,也就是说,这个事件是可以注册多个的。
然后在事务提交后,遍历 synchronizations,并调用每一个事件的 afterCommit();
从源码中可以看到,这个过程是一个同步操作,在事务提交后插入操作,只是不影响事务的提交以及连接的归还,对业务方法的返回,还是会有一些影响的,所以这里一般会改为异步操作,通过本地线程池或者发送到mq来进行异步处理。
2 注解方式设置扩展操作
示例代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
@Slf4j
@Service
public class TransactionSourceExtentionServiceImpl implements TransactionSourceExtentionService {
@Autowired
PropagationUserDao userDao;
@Autowired
ApplicationEventPublisher eventPublisher;
@Transactional
@Override
public int addUserRequiredWithApplicationEventPublisher2(PropagationUser user) {
userDao.insertSelective(user);
eventPublisher.publishEvent(new UserAddEvent(user));
return 1;
}
static class UserAddEvent extends ApplicationEvent {
public UserAddEvent(PropagationUser user) {
super(user);
}
}
@Component
static class UserEventListener {
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void processUserAddEvent(UserAddEvent user) {
log.info("事务提交后用户信息处理,用户信息:{}", user.getSource());
}
}
}
|
在实例中注入 ApplicationEventPublisher,通过调用这个实例的 publishEvent(),来发布事件。
在另一个方法中,使用 @TransactionalEventListener 注解到方法上来接收这个事件,将事件注入到方法参数中,然后在方法中定义业务处理。
实际上,它和TransactionSynchronizationManager.registerSynchronization()
本质上是一样的,因为这个注解方式,底层就是基于 registerSynchronization() 来实现的。
在 publishEvent() 源码中经历了以下调用链:
publishEvent() -> multicastEvent() -> invokeListener() -> doInvokeListener() -> onApplicationEvent()
onApplicationEvent() 是 ApplicationListenerMethodTransactionalAdapter 中的一个方法,源码如下:
1
2
3
4
5
6
7
8
9
10
|
class ApplicationListenerMethodTransactionalAdapter extends ApplicationListenerMethodAdapter {
@Override
public void onApplicationEvent(ApplicationEvent event) {
if (TransactionSynchronizationManager.isSynchronizationActive()) {
TransactionSynchronization transactionSynchronization = createTransactionSynchronization(event);
// 在这里注册了事件
TransactionSynchronizationManager.registerSynchronization(transactionSynchronization);
} // 省略无关代码...
}
}
|
从源码中可以看到,它最终也是通过调用 registerSynchronization()
来进行事件的注入的。
匹配监听器的过程源码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
public abstract class AbstractApplicationEventMulticaster
implements ApplicationEventMulticaster, BeanClassLoaderAware, BeanFactoryAware {
private Collection<ApplicationListener<?>> retrieveApplicationListeners(
ResolvableType eventType, @Nullable Class<?> sourceType, @Nullable ListenerRetriever retriever) {
List<ApplicationListener<?>> allListeners = new ArrayList<>();
Set<ApplicationListener<?>> listeners;
Set<String> listenerBeans;
// 这里会获取到容器中所有已注册的监听器,进行匹配,匹配到之后。
synchronized (this.retrievalMutex) {
listeners = new LinkedHashSet<>(this.defaultRetriever.applicationListeners);
listenerBeans = new LinkedHashSet<>(this.defaultRetriever.applicationListenerBeans);
}
for (ApplicationListener<?> listener : listeners) {
// 关键点在这里,supportsEvent() 中判断了是否支持这个事件
if (supportsEvent(listener, eventType, sourceType)) {
if (retriever != null) {
retriever.applicationListeners.add(listener);
}
allListeners.add(listener);
}
}
// 省略...
return allListeners;
}
}
|
在上面源码中,获取到容器中所有的监听器,然后遍历,判断是否支持当前事件,如果支持,则将其添加到列表,最终经过一些处理后返回。
其实在外层方法调用这里的时候,是做了缓存的。只有第一次调用的时候,会走到这里,进行遍历并判断。之后再获取,都是从缓存中获取的。
再深入看的话,可以看一下 TransactionSynchronizationEventAdapter类,实际上注册的实例就是这个类型的。
源码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
private static class TransactionSynchronizationEventAdapter extends TransactionSynchronizationAdapter {
private final ApplicationListenerMethodAdapter listener;
private final ApplicationEvent event;
private final TransactionPhase phase;
public TransactionSynchronizationEventAdapter(ApplicationListenerMethodAdapter listener,
ApplicationEvent event, TransactionPhase phase) {
this.listener = listener;
this.event = event;
this.phase = phase;
}
@Override
public int getOrder() {
return this.listener.getOrder();
}
@Override
public void beforeCommit(boolean readOnly) {
if (this.phase == TransactionPhase.BEFORE_COMMIT) {
processEvent();
}
}
@Override
public void afterCompletion(int status) {
if (this.phase == TransactionPhase.AFTER_COMMIT && status == STATUS_COMMITTED) {
processEvent();
}
else if (this.phase == TransactionPhase.AFTER_ROLLBACK && status == STATUS_ROLLED_BACK) {
processEvent();
}
else if (this.phase == TransactionPhase.AFTER_COMPLETION) {
processEvent();
}
}
protected void processEvent() {
// 总是会调用到这里,这里其实就是我们加了@TransactionalEventListener注解的方法调用
this.listener.processEvent(this.event);
}
}
|
可以看到,这个 TransactionSynchronization 实现类里面,定义了一个执行骨架,它最终会在合适的节点调用 processEvent(),这个方法就是我们使用 @TransactionalEventListener 注解的方法。
这个方法定义在它的父类里,源码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
public class ApplicationListenerMethodAdapter implements GenericApplicationListener {
public void processEvent(ApplicationEvent event) {
Object[] args = resolveArguments(event);
if (shouldHandle(event, args)) {
// 这里调用了我们定义的处理方法
Object result = doInvoke(args);
if (result != null) {
handleResult(result);
}
else {
logger.trace("No result object given - no result to handle");
}
}
}
}
|
继续跟进源码,到 doInvoke() 中:
1
2
3
4
5
6
7
8
9
10
|
public class ApplicationListenerMethodAdapter implements GenericApplicationListener {
@Nullable
protected Object doInvoke(Object... args) {
Object bean = getTargetBean();
ReflectionUtils.makeAccessible(this.method);
return this.method.invoke(bean, args);
// 省略 try catch 代码块
}
}
|
最终通过反射,调用我们自己定义的处理方法。