目录

20 声明式事务扩展 TransactionSynchronization

spring源码系列文章,示例代码的中文注释,均是 copy 自 https://gitee.com/wlizhi/spring-framework

链接中源码是作者从 github 下载,并以自身理解对核心流程及主要节点做了详细的中文注释。


1 编程式设置扩展操作

在spring声明式事务中,事务提交前、以及提交后都有相应的策略模式去调用一些扩展方法。在开发过程中,如果我们有这样的需求,在一个事务提交后做一些操作,就可以用到这个功能。

示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
@Slf4j
@Service
public class TransactionSourceExtentionServiceImpl implements TransactionSourceExtentionService {
	@Transactional
	@Override
	public int addUserRequiredWithTransactionSynchronization(PropagationUser user) {
		userDao.insertSelective(user);
		TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
			@Override
			public void afterCommit() {
				log.info("事务成功提交了");
			}
		});
		return 1;
	}
}

上面的示例代码,在新增用户后,做一些自定义的操作,比如用户注册通知等。

我们可以在方法结尾,使用 TransactionSynchronizationManager.registerSynchronization,去注入一个事件,通过重写相应的方法,可以在事务提交流程的不同节点插入自定义的操作。

如以上代码,是在事务提交后,做的一个自定义操作。它会将这个事件对象,存储到一个线程绑定的变量中。在事务提交完成后,去获取这个对象,然后调用afterCommit()。

看一下这个方法的源码:

1
2
3
4
5
6
7
8
9
public abstract class TransactionSynchronizationManager {
    private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations = new NamedThreadLocal<>("Transaction synchronizations");
    public static void registerSynchronization(TransactionSynchronization synchronization)
			throws IllegalStateException {
		// 注册事务事件,后面再各个节点会调用到事件中的回调方法
		// 省略无关代码...
		synchronizations.get().add(synchronization);
	}
}

在事务的通知方法中,执行完提交操作后,最终会调用到 invokeAfterCommit(),源码如下 :

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
public abstract class TransactionSynchronizationUtils {

    public static void invokeAfterCommit(@Nullable List<TransactionSynchronization> synchronizations) {
		if (synchronizations != null) {
			for (TransactionSynchronization synchronization : synchronizations) {
				synchronization.afterCommit();
			}
		}
	}
}

在我们注册的时候,像synchronizations中添加了我们自定义的事件,这个线程绑定的静态常量,是一个 Set 集合,也就是说,这个事件是可以注册多个的。

然后在事务提交后,遍历 synchronizations,并调用每一个事件的 afterCommit();

从源码中可以看到,这个过程是一个同步操作,在事务提交后插入操作,只是不影响事务的提交以及连接的归还,对业务方法的返回,还是会有一些影响的,所以这里一般会改为异步操作,通过本地线程池或者发送到mq来进行异步处理。

2 注解方式设置扩展操作

示例代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Slf4j
@Service
public class TransactionSourceExtentionServiceImpl implements TransactionSourceExtentionService {
	@Autowired
	PropagationUserDao userDao;

	@Autowired
	ApplicationEventPublisher eventPublisher;

	@Transactional
	@Override
	public int addUserRequiredWithApplicationEventPublisher2(PropagationUser user) {
		userDao.insertSelective(user);
		eventPublisher.publishEvent(new UserAddEvent(user));
		return 1;
	}

	static class UserAddEvent extends ApplicationEvent {
		public UserAddEvent(PropagationUser user) {
			super(user);
		}
	}

	@Component
	static class UserEventListener {
		@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
		public void processUserAddEvent(UserAddEvent user) {
			log.info("事务提交后用户信息处理,用户信息:{}", user.getSource());
		}
	}
}

在实例中注入 ApplicationEventPublisher,通过调用这个实例的 publishEvent(),来发布事件。

在另一个方法中,使用 @TransactionalEventListener 注解到方法上来接收这个事件,将事件注入到方法参数中,然后在方法中定义业务处理。

实际上,它和TransactionSynchronizationManager.registerSynchronization() 本质上是一样的,因为这个注解方式,底层就是基于 registerSynchronization() 来实现的。

在 publishEvent() 源码中经历了以下调用链:

publishEvent() -> multicastEvent() -> invokeListener() -> doInvokeListener() -> onApplicationEvent()

onApplicationEvent() 是 ApplicationListenerMethodTransactionalAdapter 中的一个方法,源码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
class ApplicationListenerMethodTransactionalAdapter extends ApplicationListenerMethodAdapter {
    @Override
	public void onApplicationEvent(ApplicationEvent event) {
		if (TransactionSynchronizationManager.isSynchronizationActive()) {
			TransactionSynchronization transactionSynchronization = createTransactionSynchronization(event);
			// 在这里注册了事件
			TransactionSynchronizationManager.registerSynchronization(transactionSynchronization);
		} // 省略无关代码...
	}
}

从源码中可以看到,它最终也是通过调用 registerSynchronization() 来进行事件的注入的。

匹配监听器的过程源码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public abstract class AbstractApplicationEventMulticaster
		implements ApplicationEventMulticaster, BeanClassLoaderAware, BeanFactoryAware {
    private Collection<ApplicationListener<?>> retrieveApplicationListeners(
			ResolvableType eventType, @Nullable Class<?> sourceType, @Nullable ListenerRetriever retriever) {
    
		List<ApplicationListener<?>> allListeners = new ArrayList<>();
		Set<ApplicationListener<?>> listeners;
		Set<String> listenerBeans;
		// 这里会获取到容器中所有已注册的监听器,进行匹配,匹配到之后。
		synchronized (this.retrievalMutex) {
			listeners = new LinkedHashSet<>(this.defaultRetriever.applicationListeners);
			listenerBeans = new LinkedHashSet<>(this.defaultRetriever.applicationListenerBeans);
		}
		for (ApplicationListener<?> listener : listeners) {
            // 关键点在这里,supportsEvent() 中判断了是否支持这个事件
			if (supportsEvent(listener, eventType, sourceType)) {
				if (retriever != null) {
					retriever.applicationListeners.add(listener);
				}
				allListeners.add(listener);
			}
		}
		// 省略...
		return allListeners;
	}
}

在上面源码中,获取到容器中所有的监听器,然后遍历,判断是否支持当前事件,如果支持,则将其添加到列表,最终经过一些处理后返回。

其实在外层方法调用这里的时候,是做了缓存的。只有第一次调用的时候,会走到这里,进行遍历并判断。之后再获取,都是从缓存中获取的。

再深入看的话,可以看一下 TransactionSynchronizationEventAdapter类,实际上注册的实例就是这个类型的。

源码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
private static class TransactionSynchronizationEventAdapter extends TransactionSynchronizationAdapter {

    private final ApplicationListenerMethodAdapter listener;

    private final ApplicationEvent event;

    private final TransactionPhase phase;

    public TransactionSynchronizationEventAdapter(ApplicationListenerMethodAdapter listener,
            ApplicationEvent event, TransactionPhase phase) {

        this.listener = listener;
        this.event = event;
        this.phase = phase;
    }

    @Override
    public int getOrder() {
        return this.listener.getOrder();
    }

    @Override
    public void beforeCommit(boolean readOnly) {
        if (this.phase == TransactionPhase.BEFORE_COMMIT) {
            processEvent();
        }
    }

    @Override
    public void afterCompletion(int status) {
        if (this.phase == TransactionPhase.AFTER_COMMIT && status == STATUS_COMMITTED) {
            processEvent();
        }
        else if (this.phase == TransactionPhase.AFTER_ROLLBACK && status == STATUS_ROLLED_BACK) {
            processEvent();
        }
        else if (this.phase == TransactionPhase.AFTER_COMPLETION) {
            processEvent();
        }
    }

    protected void processEvent() {
        // 总是会调用到这里,这里其实就是我们加了@TransactionalEventListener注解的方法调用
        this.listener.processEvent(this.event);
    }
}

可以看到,这个 TransactionSynchronization 实现类里面,定义了一个执行骨架,它最终会在合适的节点调用 processEvent(),这个方法就是我们使用 @TransactionalEventListener 注解的方法。

这个方法定义在它的父类里,源码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
public class ApplicationListenerMethodAdapter implements GenericApplicationListener {

    public void processEvent(ApplicationEvent event) {
		Object[] args = resolveArguments(event);
		if (shouldHandle(event, args)) {
            // 这里调用了我们定义的处理方法
			Object result = doInvoke(args);
			if (result != null) {  
				handleResult(result);
			}
			else {
				logger.trace("No result object given - no result to handle");
			}
		}
	}
}

继续跟进源码,到 doInvoke() 中:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
public class ApplicationListenerMethodAdapter implements GenericApplicationListener {

    @Nullable
	protected Object doInvoke(Object... args) {
		Object bean = getTargetBean();
		ReflectionUtils.makeAccessible(this.method);
        return this.method.invoke(bean, args);
		// 省略 try catch 代码块
	}
}

最终通过反射,调用我们自己定义的处理方法。