Spring事务部分源码解析(四) - TransactionSynchronization注册

通过之前的文章我们已经知道对于一个事务执行多个业务,是通过实现TransactionSynchronization,当提交和回滚是回调完成.那么什么来注册TransactionSynchronization呢

TransactionSynchronizationManager

我们需要看一下registerSynchronization()方法的调用方,我们通过调用就可以知道第三方厂商实现调用的地方

image-20190926192218024

按照调用的引用关系. 目前我们的项目中有MyBatis, RabbitMQ, MongoDB, Redis, 和Spring jdbc有实现

接下来我们一步步看都是在何时注册

Mybatis

SqlSessionUtils#registerSessionHolder()

image-20190926192544701

SqlSessionSynchronization

我们看一下具体实现TransactionSynchronization相关的commit和completion代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public void beforeCommit(boolean readOnly) { 
if (TransactionSynchronizationManager.isActualTransactionActive()) {
try {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Transaction synchronization committing SqlSession [" + this.holder.getSqlSession() + "]");
}
this.holder.getSqlSession().commit();
} catch (PersistenceException p) {
if (this.holder.getPersistenceExceptionTranslator() != null) {
DataAccessException translated = this.holder
.getPersistenceExceptionTranslator()
.translateExceptionIfPossible(p);
if (translated != null) {
throw translated;
}
}
throw p;
}
}
}

public void beforeCompletion() {
// Issue #18 Close SqlSession and deregister it now
// because afterCompletion may be called from a different thread
if (!this.holder.isOpen()) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Transaction synchronization deregistering SqlSession [" + this.holder.getSqlSession() + "]");
}
TransactionSynchronizationManager.unbindResource(sessionFactory);
this.holderActive = false;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Transaction synchronization closing SqlSession [" + this.holder.getSqlSession() + "]");
}
this.holder.getSqlSession().close();
}
}

public void afterCompletion(int status) {
if (this.holderActive) {
// afterCompletion may have been called from a different thread
// so avoid failing if there is nothing in this one
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Transaction synchronization deregistering SqlSession [" + this.holder.getSqlSession() + "]");
}
TransactionSynchronizationManager.unbindResourceIfPossible(sessionFactory);
this.holderActive = false;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Transaction synchronization closing SqlSession [" + this.holder.getSqlSession() + "]");
}
this.holder.getSqlSession().close();
}
this.holder.reset();
}

这里有一个点: 我们发现MyBatis里只是实现了commit, holder的失效和resource的解绑. 并没有实现rollback.

个人觉得mybatis可能是觉得依赖于Connection的commit.因为本身在TransactionSynchronization的概念里,没有rollback只有commit和completion. 而Mybatis一定是依赖于ConnectionHolder,因此一定可以使用Connection的rollback.

但是我觉得我的理解是有问题的,因为MyBatis本身自己就实现了一套commit和rollback机制,当然底层还是使用Connection的commit和rollback.只不过是增加了一些缓存相关的处理.

image-20190926194523630

通过方法名我们便可以看出,每当获取Session是,就会注册对应的TransactionSynchronization

RabbitMQ

ConnectionFactoryUtils#bindResourceToTransaction()

image-20190926194726213

RabbitResourceSynchronization

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void afterCompletion(int status) {
if (status == TransactionSynchronization.STATUS_COMMITTED) {
this.resourceHolder.commitAll();
}
else {
this.resourceHolder.rollbackAll();
}

if (this.resourceHolder.isReleaseAfterCompletion()) {
this.resourceHolder.setSynchronizedWithTransaction(false);
}
super.afterCompletion(status);
}

protected void releaseResource(RabbitResourceHolder resourceHolder, Object resourceKey) {
ConnectionFactoryUtils.releaseResources(resourceHolder);
}

Rabbit是已经实现了根据TransactionStatus来决定是commit还是rollback

ConnectionFactoryUtils#doGetTransactionalResourceHolder()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
private static RabbitResourceHolder doGetTransactionalResourceHolder(ConnectionFactory connectionFactory,
ResourceFactory resourceFactory) {

Assert.notNull(connectionFactory, "ConnectionFactory must not be null");
Assert.notNull(resourceFactory, "ResourceFactory must not be null");

RabbitResourceHolder resourceHolder = (RabbitResourceHolder) TransactionSynchronizationManager.getResource(connectionFactory);
if (resourceHolder != null) {
Channel channel = resourceFactory.getChannel(resourceHolder);
if (channel != null) {
return resourceHolder;
}
}
RabbitResourceHolder resourceHolderToUse = resourceHolder;
if (resourceHolderToUse == null) {
resourceHolderToUse = new RabbitResourceHolder();
}
Connection connection = resourceFactory.getConnection(resourceHolderToUse); //NOSONAR
Channel channel = null;
try {
/*
* If we are in a listener container, first see if there's a channel registered
* for this consumer and the consumer is using the same connection factory.
*/
channel = ConsumerChannelRegistry.getConsumerChannel(connectionFactory);
if (channel == null && connection == null) {
connection = resourceFactory.createConnection();
if (resourceHolder == null) {
/*
* While creating a connection, a connection listener might have created a
* transactional channel and bound it to the transaction.
*/
resourceHolder = (RabbitResourceHolder) TransactionSynchronizationManager
.getResource(connectionFactory);
if (resourceHolder != null) {
channel = resourceHolder.getChannel();
resourceHolderToUse = resourceHolder;
}
}
resourceHolderToUse.addConnection(connection);
}
if (channel == null) {
channel = resourceFactory.createChannel(connection);
}
resourceHolderToUse.addChannel(channel, connection);
if (resourceHolderToUse != resourceHolder) {
bindResourceToTransaction(resourceHolderToUse, connectionFactory,
resourceFactory.isSynchedLocalTransactionAllowed());
}
return resourceHolderToUse;

}
catch (IOException ex) {
RabbitUtils.closeConnection(connection);
throw new AmqpIOException(ex);
}
}

从方法名上看是获取TransactionResouceHolder.有点懵,我们继续往上看调用方

image-20190926195649277

image-20190926195743569

好了. 看到doExecute就知道是我们执行RabbitTemplate#convertAndSend()的具体实现

MongoDB

MongoDbUtils#doGetDB()

image-20190926195934239

可以理解为建立连接的时候就就直接注册

MongoSynchronization

image-20190926200013448

没什么好说的了,继承父类的绑定解绑Resource.没有自己的实现

Redis

RedisConnectionUtils#potentiallyRegisterTransactionSynchronisation()

image-20190926200206610

可以看到是获取连接的时候进行注册

RedisTransactionSynchronizer

image-20190926200343949

虽然Redis的事务是基于乐观锁实现,很鸡肋,但是至少也是人家的一种态度.
我们看一下他的实现和Rabbit类似,都是继续status来commit或者rollback(discard表示取消事务)

RedisConnectionUtils#doGetConnection()

image-20190926200253579

Spring JDBC

DataSourceUtils#doGetConnection

image-20190926200808146

ConnectionSynchronization

image-20190926200835955

这一部分发现Spring Jdbc主要是先除了解绑dataSource,还有一个作用是关闭连接

总结

通过注册TransactionSynchronization实现类的代码,我们可以发现.这一部分其实是Spring提供开放性扩展的一个地方.首先Spring提供@Transactional注解,如果让自己来控制事务相关,假如一个事务中又有jdbc, 又有rabbit,又有redis,想一下自己控制的话需要写多少的冗余代码.但是spring提供的这个特性,和TransactionManager无关,在任何的TransactionManager中只要注册就可以执行.

例如: RabbitTemplate, RedisTemplate,并不依赖TransactionManager,只需要Connection就可自动和@Transactional集成.

现在我们串一下:

执行@Transactional方法,执行TransactionAspectSupport#invokeWithinTransaction(), 开启事务,执行方法(rabbitTemplate, redisTempalte等在方法中执行是会自动注册TransactionSynchronization至线程安全TransactionSynchronizationManager),异常设置线程共享变量rollbackOnly为true. 提交事务或回滚(TransactionSynchronizationManager#getSynchronizations(),然后遍历执行对应的方法)

好了, Spring事务相关你的代码已经写完.
接下来开始准备Dubbo的源码解析.


Spring事务部分源码解析(四) - TransactionSynchronization注册
https://gallrax.github.io/2019/09/18/Spring事务部分源码解析(四)-TransactionSynchronization注册/
作者
Gallrax
发布于
2019年9月18日
许可协议