Java 分布式事务规范 JTA 从入门到精通(下)

本文涉及的产品
RDS MySQL DuckDB 分析主实例,集群系列 4核8GB
RDS DuckDB + QuickBI 企业套餐,8核32GB + QuickBI 专业版
RDS MySQL DuckDB 分析主实例,基础系列 4核8GB
简介: 前面已经介绍了事务提交、回滚、异常场景下各组件的交互,事务提交之前的活动事务也有自己的交互流程。根据前面 API 的介绍,可以大概总结流程如下。

JTA 活动事务交互


前面已经介绍了事务提交、回滚、异常场景下各组件的交互,事务提交之前的活动事务也有自己的交互流程。根据前面 API 的介绍,可以大概总结流程如下。


image.png


需要注意的是只有 Connection 被 close 才会调用 Transaction.delistResource 释放资源,这意味着应该在 try{}finaly{} 中的 finally 块关闭连接。


JTA Atomikos 实战


了解 JTA API 之后我们可以通过实战的方式加深理解,由于目前 EJB 容器慢慢淡出了大家的视野,我们使用事务管理器的实现 Atomikos 加以演示。


Atomikos UserTransaction 实战


使用 UserTransaction 需要了解 Atomikos 提供的两个类。


UserTransactionImp:这个类实现是 UserTransaction 的实现,内部封装了 TransactionManager。

AtomikosDataSourceBean:这个类是 DataSource 的实现,内部封装了对 XAResource 的相关操作。

由于事务管理器和数据源都由 Atomikos 提供,因此其内部知道如何进行事务管理器、事务与资源之间的交互,例如可以将事务管理器设置为单例 bean,将事务/资源存到线程上下文。我们直接使用即可。


假定我们有一个 MySQL 数据库,数据库名为 test,表 user 数据结构如下。


create table user
(
    id       bigint unsigned auto_increment
        primary key,
    username varchar(20) null,
    password varchar(20) null
)


我们可以测试使用 Atomikos 添加一条记录。首先引入依赖。


<dependency>
    <groupId>jakarta.transaction</groupId>
    <artifactId>jakarta.transaction-api</artifactId>
    <version>1.3.3</version>
</dependency>
<dependency>
    <groupId>com.atomikos</groupId>
    <artifactId>transactions-jdbc</artifactId>
    <version>4.0.6</version>
</dependency>
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.27</version>
</dependency>


先提供一个获取数据源的静态方法。


public class Application {
    private static DataSource getDataSource() {
        Properties properties = new Properties();
        properties.put("url", "jdbc:mysql://127.0.0.1:3306/test");
        properties.put("user", "root");
        properties.put("password", "12345678");
        AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
        ds.setXaDataSourceClassName("com.mysql.cj.jdbc.MysqlXADataSource");
        ds.setXaProperties(properties);
        ds.setUniqueResourceName("resourceName");
        ds.setPoolSize(10);
        ds.setBorrowConnectionTimeout(60);
        return ds;
    }
}


其中代码中的 properties 用于配置 MysqlXADataSource 中连接数据库的属性值。

然后写一个测试方法。


public class Application {
    public static void main(String[] args) throws Exception {
        UserTransaction utx = new UserTransactionImp();
        utx.setTransactionTimeout(60);
        // 开启事务
        utx.begin();
        Connection conn = null;
        PreparedStatement ps = null;
        boolean error = false;
        try {
            conn = getDataSource().getConnection();
            ps = conn.prepareStatement("insert into user(username,password) values('hkp','123')");
            int count = ps.executeUpdate();
            System.out.println("count" + count);
        } catch (Exception e) {
            error = true;
        } finally {
            // 先关闭 JDBC 中的 Statement 和 Connection
            if (ps != null) {
                ps.close();
            }
            if (conn != null) {
                conn.close();
            }
            if (utx.getStatus() != Status.STATUS_NO_TRANSACTION) {
                if (error) {
                    // 遇到异常回滚事务
                    utx.rollback();
                } else {
                    // 正常提交事务
                    utx.commit();
                }
            }
        }
    }
}


可以看到,使用 Atomikos 提供的 UserTransaction 进行事务操作方式与普通的 JDBC 基本一致,只是使用了 Atomikos 提供的数据源获取连接,然后在进行 JDBC 操作前后添加了使用 UserTransaction 开启/结束事务的逻辑。


这里只是使用了一个数据源,也可以使用多个数据源开启分布式事务。


Atomikos TransactionManager 实战


如果不想使用 AtomikosDataSourceBean,也可以手动调用 JTA 的 API,标准环境使用 TransactionManager 的实现类 UserTransactionManager 即可,Web 环境也可以切换为 J2eeTransactionManager。示例代码如下。


public class Application {
    public static void main(String[] args) throws Exception {
        TransactionManager tm = new UserTransactionManager();
        tm.begin();
        // 使用 MySQL XADataSource 的实现
        MysqlXADataSource ds = new MysqlXADataSource();
        ds.setURL("jdbc:mysql://127.0.0.1:3306/test");
        ds.setUser("root");
        ds.setPassword("12345678");
        Connection conn = null;
        PreparedStatement ps = null;
        boolean error = false;
        try {
            // 获取 XAResource
            XAConnection xaconn = ds.getXAConnection();
            XAResource xares = xaconn.getXAResource();
            // 从事务管理器中获取事务
            Transaction tx = tm.getTransaction();
            // 事务关联资源
            tx.enlistResource(xares);
            conn = xaconn.getConnection();
            ps = conn.prepareStatement("insert into user(username,password) values('hkp','123')");
            ps.executeUpdate();
            // 事务与资源解除关联
            tx.delistResource(xares, XAResource.TMSUCCESS);
        } catch (Exception e) {
            error = true;
        } finally {
            // 先使用事务管理器完成事务
            if (tm.getStatus() != Status.STATUS_NO_TRANSACTION) {
                if (error) {
                    // 遇到异常回滚事务
                    tm.rollback();
                } else {
                    // 正常提交事务
                    tm.commit();
                }
            }
            // 最后再关闭 JDBC 中的 Statement 和 Connection
            if (ps != null) {
                ps.close();
            }
            if (conn != null) {
                conn.close();
            }
        }
    }
}


代码完全遵循前面的示意图。


先创建 TransactionManager。

然后调用 TransactionManager.begin 方法开启事务。

然后创建 XADataSource 并获取 XAResource。

然后获取 Transaction 并将其与 XResource 关联。

然后就可以按照 JDBC 的正常流程执行 SQL 了。

执行 SQL 后先把断开事务与资源的关联关系。

最后提交事务后再 close JDBC 中的 Statement 和 Connection,避免先 close 导致事务管理器无法与数据库交互。

Spring Atomikos 实战

Spring 对 JDBC、JTA、JPA 的事务进行封装,提供了自己的事务管理器。

首先引入 Spring 相关依赖。


<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-tx</artifactId>
    <version>5.2.6.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-context</artifactId>
    <version>5.2.6.RELEASE</version>
</dependency>


对于 JTA 来说,配置如下。


@Configuration
@EnableTransactionManagement
public class JTAConfig {
    @Bean(initMethod = "init", destroyMethod = "close")
    public AtomikosDataSourceBean dataSource() {
        Properties properties = new Properties();
        properties.put("url", "jdbc:mysql://127.0.0.1:3306/test");
        properties.put("user", "root");
        properties.put("password", "12345678");
        AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
        ds.setXaDataSourceClassName("com.mysql.cj.jdbc.MysqlXADataSource");
        ds.setXaProperties(properties);
        ds.setUniqueResourceName("resourceName");
        ds.setPoolSize(10);
        ds.setBorrowConnectionTimeout(60);
        return ds;
    }
    @Bean(initMethod = "init", destroyMethod = "close")
    public UserTransactionManager userTransactionManager() throws SystemException {
        UserTransactionManager userTransactionManager = new UserTransactionManager();
        userTransactionManager.setTransactionTimeout(300);
        userTransactionManager.setForceShutdown(true);
        return userTransactionManager;
    }
    @Bean
    public JtaTransactionManager jtaTransactionManager() throws SystemException {
        JtaTransactionManager jtaTransactionManager = new JtaTransactionManager();
        jtaTransactionManager.setTransactionManager(userTransactionManager());
        jtaTransactionManager.setUserTransaction(userTransactionManager());
        return jtaTransactionManager;
    }
}


然后定义我们操作数据库的 UserService 如下。


@Service
public class UserService {
    @Autowired
    private DataSource dataSource;
    @Transactional(rollbackFor = Exception.class)
    public void testInsert() {
        try (Connection conn = dataSource.getConnection();
             PreparedStatement ps = conn.prepareStatement("insert into user(username,password) values('kkk','789')")) {
            int count = ps.executeUpdate();
            System.out.println("count" + count);
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }
}


最后运行测试类。


public class Application {
    public static void main(String[] args) throws Exception {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
        context.scan("com.zzuhkp.demo");
        context.refresh();
        UserService userService = context.getBean(UserService.class);
        userService.testInsert();
        context.close();
    }
}


成功将数据插入数据库,如果事务方法抛出异常则不会提交事务到数据库。


Spring Boot Atomikos 实战


Spring Boot 环境下的 Atomikos 使用较为简单,首先引入相关依赖。


<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.27</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>


然后在 application.properties 进行数据源相关配置。


spring.datasource.url=jdbc:mysql://127.0.0.1:3306/test
spring.datasource.username=root
spring.datasource.password=12345678


这样就完成了。


引入 spring-boot-starter-jta-atomikos 之后 Spring 会自动配置 JtaTransactionManager 和 AtomikosDataSourceBean。

引入 spring-boot-starter-jdbc 则是为了引入事务相关依赖与功能特性。

仍然使用上述示例中的 UserService,修改测试类如下。


@SpringBootApplication
public class Application implements CommandLineRunner {
    public static void main(String[] args) throws Exception {
        SpringApplication.run(Application.class, args);
    }
    @Autowired
    private UserService userService;
    @Override
    public void run(String... args) throws Exception {
        userService.testInsert();
    }
}


可以正常提交事务,如果 UserService 抛出异常则回滚事务。多数据源的情况下手动配置多个 AtomikosDataSourceBean 作为 DataSource bean 即可。

相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。 &nbsp; 相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情:&nbsp;https://www.aliyun.com/product/rds/mysql&nbsp;
目录
相关文章
|
8月前
|
存储 Oracle Java
java零基础学习者入门课程
本课程为Java零基础入门教程,涵盖环境搭建、变量、运算符、条件循环、数组及面向对象基础,每讲配示例代码与实践建议,助你循序渐进掌握核心知识,轻松迈入Java编程世界。
683 0
|
10月前
|
安全 Java 数据库连接
2025 年最新 Java 学习路线图含实操指南助你高效入门 Java 编程掌握核心技能
2025年最新Java学习路线图,涵盖基础环境搭建、核心特性(如密封类、虚拟线程)、模块化开发、响应式编程、主流框架(Spring Boot 3、Spring Security 6)、数据库操作(JPA + Hibernate 6)及微服务实战,助你掌握企业级开发技能。
1184 3
|
9月前
|
Java
java入门代码示例
本文介绍Java入门基础,包含Hello World、变量类型、条件判断、循环及方法定义等核心语法示例,帮助初学者快速掌握Java编程基本结构与逻辑。
675 0
|
Java API 微服务
2025 年 Java 从入门到精通学习笔记全新版
《Java学习笔记:从入门到精通(2025更新版)》是一本全面覆盖Java开发核心技能的指南,适合零基础到高级开发者。内容包括Java基础(如开发环境配置、核心语法增强)、面向对象编程(密封类、接口增强)、进阶技术(虚拟线程、结构化并发、向量API)、实用类库与框架(HTTP客户端、Spring Boot)、微服务与云原生(容器化、Kubernetes)、响应式编程(Reactor、WebFlux)、函数式编程(Stream API)、测试技术(JUnit 5、Mockito)、数据持久化(JPA、R2DBC)以及实战项目(Todo应用)。
614 5
|
9月前
|
存储 算法 安全
“卧槽,系统又崩了!”——别慌,这也许是你看过最通俗易懂的分布式入门
本文深入解析分布式系统核心机制:数据分片与冗余副本实现扩展与高可用,租约、多数派及Gossip协议保障一致性与容错。探讨节点故障、网络延迟等挑战,揭示CFT/BFT容错原理,剖析规模与性能关系,为构建可靠分布式系统提供理论支撑。
422 2
|
9月前
|
前端开发 Java 数据库连接
帮助新手快速上手的 JAVA 学习路线最详细版涵盖从入门到进阶的 JAVA 学习路线
本Java学习路线涵盖从基础语法、面向对象、异常处理到高级框架、微服务、JVM调优等内容,适合新手入门到进阶,助力掌握企业级开发技能,快速成为合格Java开发者。
1315 3
|
10月前
|
NoSQL Java 关系型数据库
Java 从入门到进阶完整学习路线图规划与实战开发最佳实践指南
本文为Java开发者提供从入门到进阶的完整学习路线图,涵盖基础语法、面向对象、数据结构与算法、并发编程、JVM调优、主流框架(如Spring Boot)、数据库操作(MySQL、Redis)、微服务架构及云原生开发等内容,并结合实战案例与最佳实践,助力高效掌握Java核心技术。
1051 2
|
10月前
|
Java 测试技术 API
Java IO流(二):文件操作与NIO入门
本文详解Java NIO与传统IO的区别与优势,涵盖Path、Files类、Channel、Buffer、Selector等核心概念,深入讲解文件操作、目录遍历、NIO实战及性能优化技巧,适合处理大文件与高并发场景,助力高效IO编程与面试准备。
|
10月前
|
Java 编译器 API
Java Lambda表达式与函数式编程入门
Lambda表达式是Java 8引入的重要特性,简化了函数式编程的实现方式。它通过简洁的语法替代传统的匿名内部类,使代码更清晰、易读。本文深入讲解Lambda表达式的基本语法、函数式接口、方法引用等核心概念,并结合集合操作、线程处理、事件回调等实战案例,帮助开发者掌握现代Java编程技巧。同时,还解析了面试中高频出现的相关问题,助你深入理解其原理与应用场景。
|
10月前
|
前端开发 Java 数据库
Java 项目实战从入门到精通 :Java Web 在线商城项目开发指南
本文介绍了一个基于Java Web的在线商城项目,涵盖技术方案与应用实例。项目采用Spring、Spring MVC和MyBatis框架,结合MySQL数据库,实现商品展示、购物车、用户注册登录等核心功能。通过Spring Boot快速搭建项目结构,使用JPA进行数据持久化,并通过Thymeleaf模板展示页面。项目结构清晰,适合Java Web初学者学习与拓展。
607 1

热门文章

最新文章