Spring事务部分源码解析(三) - TransactionSynchronization

接下来阅读触发时间相关源码

AbstractPlatformTransactionManager

image-20190918180247521

可以看到触发事件有四个. commit前后,completion前后,
我们看下方代码比较直观

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
prepareForCommit(status);
triggerBeforeCommit(status);
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
boolean globalRollbackOnly = false;
if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
globalRollbackOnly = status.isGlobalRollbackOnly();
}
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Releasing transaction savepoint");
}
status.releaseHeldSavepoint();
}
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction commit");
}
doCommit(status);
}
// Throw UnexpectedRollbackException if we have a global rollback-only
// marker but still didn't get a corresponding exception from commit.
if (globalRollbackOnly) {
throw new UnexpectedRollbackException(
"Transaction silently rolled back because it has been marked as rollback-only");
}
}
catch (UnexpectedRollbackException ex) {
// can only be caused by doCommit
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
throw ex;
}
catch (TransactionException ex) {
// can only be caused by doCommit
if (isRollbackOnCommitFailure()) {
doRollbackOnCommitException(status, ex);
}
else {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
}
throw ex;
}
catch (RuntimeException ex) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
doRollbackOnCommitException(status, ex);
throw ex;
}
catch (Error err) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
doRollbackOnCommitException(status, err);
throw err;
}

// Trigger afterCommit callbacks, with an exception thrown there
// propagated to callers but the transaction still considered as committed.
try {
triggerAfterCommit(status);
}
finally {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}

}
finally {
cleanupAfterCompletion(status);
}
}

````

> commit流程:
>
> 1. 预提交(扩展)
> 2. 触发triggerBeforeCommit
> 3. 触发triggerBeforeCompletion
> 4. 提交
> 1. 发生TransactionException异常说明已经被标记rollback或者需要rollback,则执行triggerAfterCompletion处理,最终执行cleanupAfterCompletion
> 2. 发生RuntimeException或者Error则会判断是否执行triggerBeforeCompletion,如果执行则执行triggerAfterCompletion,如果未执行则先triggerBeforeCompletion,最终执行cleanupAfterCompletion
> 5. triggerAfterCommit
> 6. triggerAfterCompletion
> 7. cleanupAfterCompletion

private void processRollback(DefaultTransactionStatus status) {
try {
try {
triggerBeforeCompletion(status);
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug(“Rolling back transaction to savepoint”);
}
status.rollbackToHeldSavepoint();
}
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug(“Initiating transaction rollback”);
}
doRollback(status);
}
else if (status.hasTransaction()) {
if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
if (status.isDebug()) {
logger.debug(“Participating transaction failed - marking existing transaction as rollback-only”);
}
doSetRollbackOnly(status);
}
else {
if (status.isDebug()) {
logger.debug(“Participating transaction failed - letting transaction originator decide on rollback”);
}
}
}
else {
logger.debug(“Should roll back transaction but cannot - no transaction available”);
}
}
catch (RuntimeException ex) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw ex;
}
catch (Error err) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw err;
}
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
}
finally {
cleanupAfterCompletion(status);
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

> Rollback流程:
>
> 1. 触发triggerBeforeCompletion
> 2. rollback(并不一定触发回滚,可能是设置回滚标识)
> 1. 发生异常触发triggerAfterCompletion,并往上抛
> 3. 触发triggerAfterCompletion
> 4. cleanupAfterCompletion

### triggerBeforeCommit

![image-20190918181806849](https://raw.githubusercontent.com/Gallrax/blog-image/master/uPic/0081Kckwly1gki3pm79kkj30d705o74q-20230318170731895.jpg)

![image-20190918185623952](https://raw.githubusercontent.com/Gallrax/blog-image/master/uPic/0081Kckwly1gki3pon0fej30i904ft92-20230318170852690.jpg)

![image-20190918185926385](https://raw.githubusercontent.com/Gallrax/blog-image/master/uPic/0081Kckwly1gki3pqty7dj30p902uq37-20230318170732233.jpg)

### triggerAfterCommit

![image-20190918181905137](https://raw.githubusercontent.com/Gallrax/blog-image/master/uPic/0081Kckwly1gki3ptnsmoj30d5057q39-20230318170848503.jpg)

![image-20190918185648103](https://raw.githubusercontent.com/Gallrax/blog-image/master/uPic/0081Kckwly1gki3py2hjyj30fc04dglw-20230318170732501.jpg)

![image-20190918185953086](https://raw.githubusercontent.com/Gallrax/blog-image/master/uPic/0081Kckwly1gki3q083q6j30hs01tgln-20230318170732880.jpg)

![image-20190918190011335](https://raw.githubusercontent.com/Gallrax/blog-image/master/uPic/0081Kckwly1gki3q22tc4j30ka03ydg5-20230318170844018.jpg)

### triggerBeforeCompletion

![image-20190918182207864](https://raw.githubusercontent.com/Gallrax/blog-image/master/uPic/0081Kckwly1gki3q4wob4j30df07u0tg-20230318170840701.jpg)

![image-20190918185712074](https://raw.githubusercontent.com/Gallrax/blog-image/master/uPic/0081Kckwly1gki3q8bv4rj30hx04cq3a-20230318170733218.jpg)

![image-20190918190053220](https://raw.githubusercontent.com/Gallrax/blog-image/master/uPic/0081Kckwly1gki3qc1pqij30pa05fjrv-20230318170733455.jpg)

### triggerAfterCompletion

![image-20190918182444899](https://raw.githubusercontent.com/Gallrax/blog-image/master/uPic/0081Kckwly1gki3qez5icj30k80adgnf-20230318170733727.jpg)

![image-20190918185733079](https://raw.githubusercontent.com/Gallrax/blog-image/master/uPic/0081Kckwly1gki3qj1bxdj30pz0arabr-20230318170833943.jpg)

![image-20190918190737482](https://raw.githubusercontent.com/Gallrax/blog-image/master/uPic/0081Kckwly1gki3qlv9fyj30pw06fmxu-20230318170831315.jpg)

到这里,我们的关注点就应该是TransactionSynchronization.至于TransactionSynchronizationUtils,只是通过TransactionSynchronizationManager..getSynchronizations()获取TransactionSynchronization,然后遍历调用而已

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

## TransactionSynchronization

![image-20190918191254211](https://raw.githubusercontent.com/Gallrax/blog-image/master/uPic/0081Kckwly1gki3qomz6kj30hp0fv0ta-20230318170734199.jpg)

### 子类

![image-20190918191314396](https://raw.githubusercontent.com/Gallrax/blog-image/master/uPic/0081Kckwly1gki3qrh7x5j31cr0a978w-20230318170734333.jpg)

### SqlSessionSynchronization(Mybatis)

![image-20190918221701492](https://raw.githubusercontent.com/Gallrax/blog-image/master/uPic/0081Kckwly1gki3qturt4j30qi0jndig-20230318170734469.jpg)

#### RabbitResourceSynchronization

![image-20190918191514543](https://raw.githubusercontent.com/Gallrax/blog-image/master/uPic/0081Kckwly1gki3qxllvzj30mo0i90ux-20230318170734629.jpg)

### RedisTransactionSynchronizer

![image-20190918215145038](https://raw.githubusercontent.com/Gallrax/blog-image/master/uPic/0081Kckwly1gki3r0evtrj30ni0pjq5g-20230318170823107.jpg)

> 仅举例表示对接实现
>
> 目前项目中使用RabbitMQ, 当RabbitTemplate.channelTransacted = true时,会绑定TransactionSynchronization在使用@Transactional的方法之后会走到此处,执行RabbitMQ的发送和回滚

我们发现两个类是继承了ResourceHolderSynchronization

1
2
3
4
5
6
7
8
9
10
11

### ResourceHolderSynchronization

![image-20190918220026988](https://raw.githubusercontent.com/Gallrax/blog-image/master/uPic/0081Kckwly1gki3rc2ttmj30hp0fvabc-20230318170815738.jpg)

> 相比TransactionSynchronization 增加了红框的几个方法
>
> 核心的commit和completion已经做出抽象实现.具体又子类具体实现
>
> 看一下代码

@Override
public void suspend() {
if (this.holderActive) {
TransactionSynchronizationManager.unbindResource(this.resourceKey);
}
}

@Override
public void resume() {
    if (this.holderActive) {
        TransactionSynchronizationManager.bindResource(this.resourceKey, this.resourceHolder);
    }
}

@Override
public void flush() {
    flushResource(this.resourceHolder);
}

@Override
public void beforeCommit(boolean readOnly) {
}

@Override
public void beforeCompletion() {
    if (shouldUnbindAtCompletion()) {
        TransactionSynchronizationManager.unbindResource(this.resourceKey);
        this.holderActive = false;
        if (shouldReleaseBeforeCompletion()) {
            releaseResource(this.resourceHolder, this.resourceKey);
        }
    }
}

@Override
public void afterCommit() {
    if (!shouldReleaseBeforeCompletion()) {
        processResourceAfterCommit(this.resourceHolder);
    }
}

@Override
public void afterCompletion(int status) {
    if (shouldUnbindAtCompletion()) {
        boolean releaseNecessary = false;
        if (this.holderActive) {
            // The thread-bound resource holder might not be available anymore,
            // since afterCompletion might get called from a different thread.
            this.holderActive = false;
            TransactionSynchronizationManager.unbindResourceIfPossible(this.resourceKey);
            this.resourceHolder.unbound();
            releaseNecessary = true;
        }
        else {
            releaseNecessary = shouldReleaseAfterCompletion(this.resourceHolder);
        }
        if (releaseNecessary) {
            releaseResource(this.resourceHolder, this.resourceKey);
        }
    }
    else {
        // Probably a pre-bound resource...
        cleanupResource(this.resourceHolder, this.resourceKey, (status == STATUS_COMMITTED));
    }
    this.resourceHolder.reset();
}


/**
 * Return whether this holder should be unbound at completion
 * (or should rather be left bound to the thread after the transaction).
 * <p>The default implementation returns {@code true}.
 */
protected boolean shouldUnbindAtCompletion() {
    return true;
}

/**
 * Return whether this holder's resource should be released before
 * transaction completion ({@code true}) or rather after
 * transaction completion ({@code false}).
 * <p>Note that resources will only be released when they are
 * unbound from the thread ({@link #shouldUnbindAtCompletion()}).
 * <p>The default implementation returns {@code true}.
 * @see #releaseResource
 */
protected boolean shouldReleaseBeforeCompletion() {
    return true;
}

/**
 * Return whether this holder's resource should be released after
 * transaction completion ({@code true}).
 * <p>The default implementation returns {@code !shouldReleaseBeforeCompletion()},
 * releasing after completion if no attempt was made before completion.
 * @see #releaseResource
 */
protected boolean shouldReleaseAfterCompletion(H resourceHolder) {
    return !shouldReleaseBeforeCompletion();
}

/**
 * Flush callback for the given resource holder.
 * @param resourceHolder the resource holder to flush
 */
protected void flushResource(H resourceHolder) {
}

/**
 * After-commit callback for the given resource holder.
 * Only called when the resource hasn't been released yet
 * ({@link #shouldReleaseBeforeCompletion()}).
 * @param resourceHolder the resource holder to process
 */
protected void processResourceAfterCommit(H resourceHolder) {
}

/**
 * Release the given resource (after it has been unbound from the thread).
 * @param resourceHolder the resource holder to process
 * @param resourceKey the key that the ResourceHolder was bound for
 */
protected void releaseResource(H resourceHolder, K resourceKey) {
}

/**
 * Perform a cleanup on the given resource (which is left bound to the thread).
 * @param resourceHolder the resource holder to process
 * @param resourceKey the key that the ResourceHolder was bound for
 * @param committed whether the transaction has committed ({@code true})
 * or rolled back ({@code false})
 */
protected void cleanupResource(H resourceHolder, K resourceKey, boolean committed) {
}
1
2
3
4

> 可能这里讲的比较乱.大概说一下这些用途吧
> TransactionSynchronization这个类更多扩展的用途是在于afterCompletion,对于像jdbc,rabbit,redis这些可以在执行结束后释放Transactional resources

我们再回头想一下,我们是从TransactionSynchronizationManager才找到TransactionSynchronization.为什么TransactionSynchronization会放在TransactionSynchronizationManager.TransactionSynchronizationManager的意义在于什么?我们看一下


## TransactionSynchronizationManager

![image-20190918221920971](https://image.baidu.com/search/down?url=http://tva1.sinaimg.cn/large/0081Kckwly1gki3r7w4xnj30pw0hvmzl.jpg)

> 方法大致如下
> 注册,解绑,绑定,获取,清空
> 我们看一下属性

![image-20190918222051306](https://raw.githubusercontent.com/Gallrax/blog-image/master/uPic/0081Kckwly1gki3r4ysyyj30lo0ah0uf-20230318170735964.jpg)

> 看到ThreadLocal,似乎知道了些什么
> 多个线程,每个线程又需要对应多个厂商的事务连接,并且每个事务连接朝生夕死,虽然spring容器支持多例,但是多例如何传递?需要怎么处理?
> 通过TransactionSynchronizationManager的ThreadLocal来在线程上下文传递.也许才是最解耦最优雅的方式

## 总结

> AbstractPlatformTransactionManager提供抽象方法,子类具体实现(RabbitTransactionManager, DataSourceTransactionManager)
> TransactionSynchronization提供接口供第三方扩展注册,用于一个事务的多方处理(db, mq等, 实现如RabbitResourceSynchronization, RedisTransactionSynchronizer, SqlSessionSynchronization)
> 那么问题又来了AbstractPlatformTransactionManager的实现类在什么时候注册,是么时候获取?
> TransactionSynchronization的实现类在什么时候注册?什么时候绑定?什么时候获取?
> 写一篇讲解Spring定义抽象的实现类何时加载处理

Spring事务部分源码解析(三) - TransactionSynchronization
https://gallrax.github.io/2019/09/17/Spring事务部分源码解析(三)-TransactionSynchronization/
作者
Gallrax
发布于
2019年9月17日
许可协议