当前位置: 首页 > news >正文

@Transactional声明式事务回调编程

文章目录

  • 1. 理论阐述
  • 2. 代码实现
    • 2.1. 问题代码
    • 2.2. 改进方案

本文参考:

事务回调编程

大事务问题

1. 理论阐述

最近在学习数据库事务的过程中,了解到了大事务的危害:

  1. 并发情况下,数据库连接资源容易耗尽
  2. 锁定数据较多,容易造成大量阻塞和锁超时,进而接口超时
  3. 执行时间长,容易造成主从延迟
  4. 回滚所需要的时间变长

那么大事务又是如何产生的呢?

  1. 单个事务操作数据库操作较多
  2. 事务中存在 RPC/MQ 等非 DB 耗时操作
  3. 大量的锁竞争

项目编程中我们经常会用到 Spring 的声明式事务 @Transactional 注解,我去反思了下项目中对事务的使用,还真的存在事务中嵌套 MQ 的用法,比方说本地数据库操作过程中穿插着 ES 写入消息(牺牲直连写入 ES 的时效性,中间加一层 MQ 可以提升容灾性),这就容易产生大事务,整体架构如下:
在这里插入图片描述

在分布式异常场景下这种模式也是有问题的:

比方说数据库操作执行报错,或者 MQ 消息超时,本地事务需要回滚,但是 MQ 消息已经发出去了,没法执行回滚操作,这就没法保证本地事务+MQ的原子性了。

在这里插入图片描述

想一下怎么尽可能避免发送MQ但又需要回滚的场景,其实就是把发MQ消息的时机往后放放,本地事务执行成功了,才发送 MQ 消息,这样子也避免大事务中嵌套 MQ,这在业务上也是可以接受的。

在这里插入图片描述

这种做法底层避免了数据库操作失败,MQ 需要回滚但是没法回滚的困境,但仍然有它的缺点,就是仍然没法保证 “数据库操作 + MQ” 的原子性,比方说下面,数据库事务提交了之后,App 重启或者宕机了,就不会发出 MQ 消息。
这其实涉及到了分布式事务的处理策略,我们当然可以用本地消息表或者其他分布式处理策略如TCC来解决这个问题。

所以这里谈论到的策略其实并不是一种分布式事务的处理方案,重点在于优化代码结构避免长事务,同时尽量保证“数据库操作 + MQ” 的原子性。

在这里插入图片描述

2. 代码实现

2.1. 问题代码

在@Transactional 声明式事务编程中,两个 insert 操作中穿插着发送MQ消息,典型的大事务问题。

@Transactional
public void doTransaction() {log.info("start tx");User user1 = new User();user1.setId(9);user1.setAge(2);user1.setName("jxz");user1.setEmail("111@qq.com");userMapper.insert(user1);log.info("insert user1...");log.info("调用其他 RPC 或者发送 MQ 消息");User user2 = new User();user2.setId(10);user2.setAge(3);user2.setName("jxz");user2.setEmail("111@qq.com");userMapper.insert(user2);log.info("insert user2...");log.info("end tx");
}

那正如前面所说的,我们可以在数据库本地事务提交以后,再去调用 RPC 或者 MQ。这个时候代码结构是需要调整的,如果只是单纯把 RPC 或者 MQ 从 @Transactional 注解声明的方法中抽取出来,后置调用,伪代码如下:

public void doRpcAfterTransaction() {// 原先 @Transactional 声明的数据库操作,事务失效doTransaction();log.info("调用其他 RPC 或者发送 MQ 消息");
}

@Transactional 注解也会失效,因为这属于方法内部调用 @Transactional 声明的方法,Spring 不是拿到的代理对象去调用。此外这种方式还增加了代码的复杂性,改动量太大。

2.2. 改进方案

那么是否存在一种代码改动量较小,能让人一眼看懂,最好在静态上还是内嵌在原来 @Transactional 声明式事务编程中;同时还能在当前事务执行完以后,能够及时回调 RPC/MQ 等第三方调用的。

仍然声明一下,这种方案是为了尽可能保证“本地事务+RPC/MQ”的原子性,并且代码结构简单,并不是分布式事务的解决方案

Spring 提供这样的 SPI 扩展,TransactionSynchronization 就提供事务执行完成以后回调的接口。
在这里插入图片描述

其中包括多个事务回调的拓展点:

在这里插入图片描述

其中 TransactionSynchronization#afterCompletion(int status) 就会根据事务执行结果(成功 commit 或者回滚 rollback),status 入参数代表事务执行状态,其实现就会执行事务后置处理。

这一切都建立在当前方法上下文存在活跃的事务,Spring 也提供了静态方法来让我们调用判断 TransactionSynchronizationManager#isActualTransactionActive()

最终写了个工具类实现代码如下:

package com.jxz.util;import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;/*** @Author jiangxuzhao* @Description* @Date 2024/10/2*/
public class TransactionUtils {/*** 事务后置处理 api,可以优化大事务提升数据库性能,尽量保证“本地事务 + RPC/MQ”的原子性** @param runnable 事务后置处理任务*/public static void doAfterTransaction(Runnable runnable) {if (TransactionSynchronizationManager.isActualTransactionActive()) {TransactionSynchronizationManager.registerSynchronization(new DoTransactionCompletion(runnable));}}/*** 实现 TransactionSynchronization 接口,重写其中的 afterCompletion 方法*/public static class DoTransactionCompletion implements TransactionSynchronization {// 待执行的任务Runnable runnable;public DoTransactionCompletion(Runnable runnable) {this.runnable = runnable;}// 在事务 commit/rollback 以后回调@Overridepublic void afterCompletion(int status) {// 当事务状态是 COMMITTED 时if (status == TransactionSynchronization.STATUS_COMMITTED) {runnable.run();}}}
}

在原先调用的地方修改也很简单:

内嵌在 @Transactional 声明式事务中,甚至连 RPC/MQ 调用的代码位置都不需要变动,内部实现的就是事务执行完成之后的后置回调。

@Transactional
public void doTransaction2() {log.info("start tx");User user1 = new User();user1.setId(13);user1.setAge(2);user1.setName("jxz");user1.setEmail("111@qq.com");userMapper.insert(user1);log.info("insert user1...");TransactionUtils.doAfterTransaction(() ->log.info("afterCommit, 调用其他 RPC 或者发送 MQ 消息"));User user2 = new User();user2.setId(14);user2.setAge(3);user2.setName("jxz");user2.setEmail("111@qq.com");userMapper.insert(user2);log.info("insert user2...");log.info("end tx");
}

http://www.mrgr.cn/news/44873.html

相关文章:

  • SpringBootWeb快速入门!详解如何创建一个简单的SpringBoot项目?
  • Bloom Filter 布隆过滤器
  • 1.两数之和
  • Python中对象obj类型确定最pythonic的方式——isinstance()函数
  • Chrome浏览器调用ActiveX控件--allWebOffice控件功能介绍
  • 联想服务器配置阵列、安装操作系统
  • 【自动驾驶】最近计划看的论文
  • Ajax教程
  • vivado 使用 UltraFast 设计方法系统级设计流程图
  • 图像分割恢复方法
  • 【重学 MySQL】五十九、二进制字符串类型与 JSON 类型
  • 【OpenCV】 Python 图像处理 入门
  • 长空会:儿童身高成长秘籍:彼格高儿童成长奶粉成就孩子美好未来
  • 服务攻防
  • 10.7每日作业
  • 头歌 | 获取最多金币
  • msvcp100.dll丢失怎样修复,6招轻松解决msvcp100.dll丢失问题
  • 在单位里,这6点人情世故一定要谨记
  • 机器学习——大规模语言模型与生成模型
  • BLE MESH学习1-基于沁恒CH582学习