Java 分布式事务规范 JTA 从入门到精通(下)

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 前面已经介绍了事务提交、回滚、异常场景下各组件的交互,事务提交之前的活动事务也有自己的交互流程。根据前面 API 的介绍,可以大概总结流程如下。

JTA 活动事务交互


前面已经介绍了事务提交、回滚、异常场景下各组件的交互,事务提交之前的活动事务也有自己的交互流程。根据前面 API 的介绍,可以大概总结流程如下。


image.png


需要注意的是只有 Connection 被 close 才会调用 Transaction.delistResource 释放资源,这意味着应该在 try{}finaly{} 中的 finally 块关闭连接。


JTA Atomikos 实战


了解 JTA API 之后我们可以通过实战的方式加深理解,由于目前 EJB 容器慢慢淡出了大家的视野,我们使用事务管理器的实现 Atomikos 加以演示。


Atomikos UserTransaction 实战


使用 UserTransaction 需要了解 Atomikos 提供的两个类。


UserTransactionImp:这个类实现是 UserTransaction 的实现,内部封装了 TransactionManager。

AtomikosDataSourceBean:这个类是 DataSource 的实现,内部封装了对 XAResource 的相关操作。

由于事务管理器和数据源都由 Atomikos 提供,因此其内部知道如何进行事务管理器、事务与资源之间的交互,例如可以将事务管理器设置为单例 bean,将事务/资源存到线程上下文。我们直接使用即可。


假定我们有一个 MySQL 数据库,数据库名为 test,表 user 数据结构如下。


create table user
(
    id       bigint unsigned auto_increment
        primary key,
    username varchar(20) null,
    password varchar(20) null
)


我们可以测试使用 Atomikos 添加一条记录。首先引入依赖。


<dependency>
    <groupId>jakarta.transaction</groupId>
    <artifactId>jakarta.transaction-api</artifactId>
    <version>1.3.3</version>
</dependency>
<dependency>
    <groupId>com.atomikos</groupId>
    <artifactId>transactions-jdbc</artifactId>
    <version>4.0.6</version>
</dependency>
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.27</version>
</dependency>


先提供一个获取数据源的静态方法。


public class Application {
    private static DataSource getDataSource() {
        Properties properties = new Properties();
        properties.put("url", "jdbc:mysql://127.0.0.1:3306/test");
        properties.put("user", "root");
        properties.put("password", "12345678");
        AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
        ds.setXaDataSourceClassName("com.mysql.cj.jdbc.MysqlXADataSource");
        ds.setXaProperties(properties);
        ds.setUniqueResourceName("resourceName");
        ds.setPoolSize(10);
        ds.setBorrowConnectionTimeout(60);
        return ds;
    }
}


其中代码中的 properties 用于配置 MysqlXADataSource 中连接数据库的属性值。

然后写一个测试方法。


public class Application {
    public static void main(String[] args) throws Exception {
        UserTransaction utx = new UserTransactionImp();
        utx.setTransactionTimeout(60);
        // 开启事务
        utx.begin();
        Connection conn = null;
        PreparedStatement ps = null;
        boolean error = false;
        try {
            conn = getDataSource().getConnection();
            ps = conn.prepareStatement("insert into user(username,password) values('hkp','123')");
            int count = ps.executeUpdate();
            System.out.println("count" + count);
        } catch (Exception e) {
            error = true;
        } finally {
            // 先关闭 JDBC 中的 Statement 和 Connection
            if (ps != null) {
                ps.close();
            }
            if (conn != null) {
                conn.close();
            }
            if (utx.getStatus() != Status.STATUS_NO_TRANSACTION) {
                if (error) {
                    // 遇到异常回滚事务
                    utx.rollback();
                } else {
                    // 正常提交事务
                    utx.commit();
                }
            }
        }
    }
}


可以看到,使用 Atomikos 提供的 UserTransaction 进行事务操作方式与普通的 JDBC 基本一致,只是使用了 Atomikos 提供的数据源获取连接,然后在进行 JDBC 操作前后添加了使用 UserTransaction 开启/结束事务的逻辑。


这里只是使用了一个数据源,也可以使用多个数据源开启分布式事务。


Atomikos TransactionManager 实战


如果不想使用 AtomikosDataSourceBean,也可以手动调用 JTA 的 API,标准环境使用 TransactionManager 的实现类 UserTransactionManager 即可,Web 环境也可以切换为 J2eeTransactionManager。示例代码如下。


public class Application {
    public static void main(String[] args) throws Exception {
        TransactionManager tm = new UserTransactionManager();
        tm.begin();
        // 使用 MySQL XADataSource 的实现
        MysqlXADataSource ds = new MysqlXADataSource();
        ds.setURL("jdbc:mysql://127.0.0.1:3306/test");
        ds.setUser("root");
        ds.setPassword("12345678");
        Connection conn = null;
        PreparedStatement ps = null;
        boolean error = false;
        try {
            // 获取 XAResource
            XAConnection xaconn = ds.getXAConnection();
            XAResource xares = xaconn.getXAResource();
            // 从事务管理器中获取事务
            Transaction tx = tm.getTransaction();
            // 事务关联资源
            tx.enlistResource(xares);
            conn = xaconn.getConnection();
            ps = conn.prepareStatement("insert into user(username,password) values('hkp','123')");
            ps.executeUpdate();
            // 事务与资源解除关联
            tx.delistResource(xares, XAResource.TMSUCCESS);
        } catch (Exception e) {
            error = true;
        } finally {
            // 先使用事务管理器完成事务
            if (tm.getStatus() != Status.STATUS_NO_TRANSACTION) {
                if (error) {
                    // 遇到异常回滚事务
                    tm.rollback();
                } else {
                    // 正常提交事务
                    tm.commit();
                }
            }
            // 最后再关闭 JDBC 中的 Statement 和 Connection
            if (ps != null) {
                ps.close();
            }
            if (conn != null) {
                conn.close();
            }
        }
    }
}


代码完全遵循前面的示意图。


先创建 TransactionManager。

然后调用 TransactionManager.begin 方法开启事务。

然后创建 XADataSource 并获取 XAResource。

然后获取 Transaction 并将其与 XResource 关联。

然后就可以按照 JDBC 的正常流程执行 SQL 了。

执行 SQL 后先把断开事务与资源的关联关系。

最后提交事务后再 close JDBC 中的 Statement 和 Connection,避免先 close 导致事务管理器无法与数据库交互。

Spring Atomikos 实战

Spring 对 JDBC、JTA、JPA 的事务进行封装,提供了自己的事务管理器。

首先引入 Spring 相关依赖。


<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-tx</artifactId>
    <version>5.2.6.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-context</artifactId>
    <version>5.2.6.RELEASE</version>
</dependency>


对于 JTA 来说,配置如下。


@Configuration
@EnableTransactionManagement
public class JTAConfig {
    @Bean(initMethod = "init", destroyMethod = "close")
    public AtomikosDataSourceBean dataSource() {
        Properties properties = new Properties();
        properties.put("url", "jdbc:mysql://127.0.0.1:3306/test");
        properties.put("user", "root");
        properties.put("password", "12345678");
        AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
        ds.setXaDataSourceClassName("com.mysql.cj.jdbc.MysqlXADataSource");
        ds.setXaProperties(properties);
        ds.setUniqueResourceName("resourceName");
        ds.setPoolSize(10);
        ds.setBorrowConnectionTimeout(60);
        return ds;
    }
    @Bean(initMethod = "init", destroyMethod = "close")
    public UserTransactionManager userTransactionManager() throws SystemException {
        UserTransactionManager userTransactionManager = new UserTransactionManager();
        userTransactionManager.setTransactionTimeout(300);
        userTransactionManager.setForceShutdown(true);
        return userTransactionManager;
    }
    @Bean
    public JtaTransactionManager jtaTransactionManager() throws SystemException {
        JtaTransactionManager jtaTransactionManager = new JtaTransactionManager();
        jtaTransactionManager.setTransactionManager(userTransactionManager());
        jtaTransactionManager.setUserTransaction(userTransactionManager());
        return jtaTransactionManager;
    }
}


然后定义我们操作数据库的 UserService 如下。


@Service
public class UserService {
    @Autowired
    private DataSource dataSource;
    @Transactional(rollbackFor = Exception.class)
    public void testInsert() {
        try (Connection conn = dataSource.getConnection();
             PreparedStatement ps = conn.prepareStatement("insert into user(username,password) values('kkk','789')")) {
            int count = ps.executeUpdate();
            System.out.println("count" + count);
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }
}


最后运行测试类。


public class Application {
    public static void main(String[] args) throws Exception {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
        context.scan("com.zzuhkp.demo");
        context.refresh();
        UserService userService = context.getBean(UserService.class);
        userService.testInsert();
        context.close();
    }
}


成功将数据插入数据库,如果事务方法抛出异常则不会提交事务到数据库。


Spring Boot Atomikos 实战


Spring Boot 环境下的 Atomikos 使用较为简单,首先引入相关依赖。


<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.27</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>


然后在 application.properties 进行数据源相关配置。


spring.datasource.url=jdbc:mysql://127.0.0.1:3306/test
spring.datasource.username=root
spring.datasource.password=12345678


这样就完成了。


引入 spring-boot-starter-jta-atomikos 之后 Spring 会自动配置 JtaTransactionManager 和 AtomikosDataSourceBean。

引入 spring-boot-starter-jdbc 则是为了引入事务相关依赖与功能特性。

仍然使用上述示例中的 UserService,修改测试类如下。


@SpringBootApplication
public class Application implements CommandLineRunner {
    public static void main(String[] args) throws Exception {
        SpringApplication.run(Application.class, args);
    }
    @Autowired
    private UserService userService;
    @Override
    public void run(String... args) throws Exception {
        userService.testInsert();
    }
}


可以正常提交事务,如果 UserService 抛出异常则回滚事务。多数据源的情况下手动配置多个 AtomikosDataSourceBean 作为 DataSource bean 即可。

相关实践学习
如何快速连接云数据库RDS MySQL
本场景介绍如何通过阿里云数据管理服务DMS快速连接云数据库RDS MySQL,然后进行数据表的CRUD操作。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
2月前
|
Java 数据库
在Java中使用Seata框架实现分布式事务的详细步骤
通过以上步骤,利用 Seata 框架可以实现较为简单的分布式事务处理。在实际应用中,还需要根据具体业务需求进行更详细的配置和处理。同时,要注意处理各种异常情况,以确保分布式事务的正确执行。
|
2月前
|
消息中间件 Java Kafka
在Java中实现分布式事务的常用框架和方法
总之,选择合适的分布式事务框架和方法需要综合考虑业务需求、性能、复杂度等因素。不同的框架和方法都有其特点和适用场景,需要根据具体情况进行评估和选择。同时,随着技术的不断发展,分布式事务的解决方案也在不断更新和完善,以更好地满足业务的需求。你还可以进一步深入研究和了解这些框架和方法,以便在实际应用中更好地实现分布式事务管理。
|
12天前
|
自然语言处理 Java
Java中的字符集编码入门-增补字符(转载)
本文探讨Java对Unicode的支持及其发展历程。文章详细解析了Unicode字符集的结构,包括基本多语言面(BMP)和增补字符的表示方法,以及UTF-16编码中surrogate pair的使用。同时介绍了代码点和代码单元的概念,并解释了UTF-8的编码规则及其兼容性。
82 60
|
1月前
|
Java 开发者 微服务
Spring Boot 入门:简化 Java Web 开发的强大工具
Spring Boot 是一个开源的 Java 基础框架,用于创建独立、生产级别的基于Spring框架的应用程序。它旨在简化Spring应用的初始搭建以及开发过程。
67 6
Spring Boot 入门:简化 Java Web 开发的强大工具
|
1月前
|
监控 架构师 Java
Java虚拟机调优的艺术:从入门到精通####
本文作为一篇深入浅出的技术指南,旨在为Java开发者揭示JVM调优的神秘面纱,通过剖析其背后的原理、分享实战经验与最佳实践,引领读者踏上从调优新手到高手的进阶之路。不同于传统的摘要概述,本文将以一场虚拟的对话形式,模拟一位经验丰富的架构师向初学者传授JVM调优的心法,激发学习兴趣,同时概括性地介绍文章将探讨的核心议题——性能监控、垃圾回收优化、内存管理及常见问题解决策略。 ####
|
2月前
|
安全 IDE Java
Java常见规范及易忘点
遵循Java编程规范和注意易忘点是提高代码质量和可维护性的关键。通过规范的命名、格式、注释和合理的代码组织,可以让代码更加清晰和易于维护。同时,注意空指针检查、线程安全、集合框架和字符串操作等常见易忘点,可以减少程序错误,提高运行效率。结合单一职责原则、面向接口编程和合理的异常处理,能够编写出高质量的Java代码。希望本文能够帮助Java开发者提升编码水平,写出更高效、更可靠的代码。
33 2
|
2月前
|
监控 安全 Java
Java中的多线程编程:从入门到实践####
本文将深入浅出地探讨Java多线程编程的核心概念、应用场景及实践技巧。不同于传统的摘要形式,本文将以一个简短的代码示例作为开篇,直接展示多线程的魅力,随后再详细解析其背后的原理与实现方式,旨在帮助读者快速理解并掌握Java多线程编程的基本技能。 ```java // 简单的多线程示例:创建两个线程,分别打印不同的消息 public class SimpleMultithreading { public static void main(String[] args) { Thread thread1 = new Thread(() -> System.out.prin
|
2月前
|
Java 大数据 API
14天Java基础学习——第1天:Java入门和环境搭建
本文介绍了Java的基础知识,包括Java的简介、历史和应用领域。详细讲解了如何安装JDK并配置环境变量,以及如何使用IntelliJ IDEA创建和运行Java项目。通过示例代码“HelloWorld.java”,展示了从编写到运行的全过程。适合初学者快速入门Java编程。
|
2月前
|
存储 安全 Java
🌟Java零基础-反序列化:从入门到精通
【10月更文挑战第21天】本文收录于「滚雪球学Java」专栏,专业攻坚指数级提升,希望能够助你一臂之力,帮你早日登顶实现财富自由🚀;同时,欢迎大家关注&&收藏&&订阅!持续更新中,up!up!up!!
95 5
|
2月前
|
存储 NoSQL Java
Java调度任务如何使用分布式锁保证相同任务在一个周期里只执行一次?
【10月更文挑战第29天】Java调度任务如何使用分布式锁保证相同任务在一个周期里只执行一次?
111 1