Java 分布式事务规范 JTA 从入门到精通(下)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群版 2核4GB 100GB
推荐场景:
搭建个人博客
云数据库 RDS MySQL,高可用版 2核4GB 50GB
简介: 前面已经介绍了事务提交、回滚、异常场景下各组件的交互,事务提交之前的活动事务也有自己的交互流程。根据前面 API 的介绍,可以大概总结流程如下。

JTA 活动事务交互


前面已经介绍了事务提交、回滚、异常场景下各组件的交互,事务提交之前的活动事务也有自己的交互流程。根据前面 API 的介绍,可以大概总结流程如下。


image.png


需要注意的是只有 Connection 被 close 才会调用 Transaction.delistResource 释放资源,这意味着应该在 try{}finaly{} 中的 finally 块关闭连接。


JTA Atomikos 实战


了解 JTA API 之后我们可以通过实战的方式加深理解,由于目前 EJB 容器慢慢淡出了大家的视野,我们使用事务管理器的实现 Atomikos 加以演示。


Atomikos UserTransaction 实战


使用 UserTransaction 需要了解 Atomikos 提供的两个类。


UserTransactionImp:这个类实现是 UserTransaction 的实现,内部封装了 TransactionManager。

AtomikosDataSourceBean:这个类是 DataSource 的实现,内部封装了对 XAResource 的相关操作。

由于事务管理器和数据源都由 Atomikos 提供,因此其内部知道如何进行事务管理器、事务与资源之间的交互,例如可以将事务管理器设置为单例 bean,将事务/资源存到线程上下文。我们直接使用即可。


假定我们有一个 MySQL 数据库,数据库名为 test,表 user 数据结构如下。


create table user
(
    id       bigint unsigned auto_increment
        primary key,
    username varchar(20) null,
    password varchar(20) null
)


我们可以测试使用 Atomikos 添加一条记录。首先引入依赖。


<dependency>
    <groupId>jakarta.transaction</groupId>
    <artifactId>jakarta.transaction-api</artifactId>
    <version>1.3.3</version>
</dependency>
<dependency>
    <groupId>com.atomikos</groupId>
    <artifactId>transactions-jdbc</artifactId>
    <version>4.0.6</version>
</dependency>
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.27</version>
</dependency>


先提供一个获取数据源的静态方法。


public class Application {
    private static DataSource getDataSource() {
        Properties properties = new Properties();
        properties.put("url", "jdbc:mysql://127.0.0.1:3306/test");
        properties.put("user", "root");
        properties.put("password", "12345678");
        AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
        ds.setXaDataSourceClassName("com.mysql.cj.jdbc.MysqlXADataSource");
        ds.setXaProperties(properties);
        ds.setUniqueResourceName("resourceName");
        ds.setPoolSize(10);
        ds.setBorrowConnectionTimeout(60);
        return ds;
    }
}


其中代码中的 properties 用于配置 MysqlXADataSource 中连接数据库的属性值。

然后写一个测试方法。


public class Application {
    public static void main(String[] args) throws Exception {
        UserTransaction utx = new UserTransactionImp();
        utx.setTransactionTimeout(60);
        // 开启事务
        utx.begin();
        Connection conn = null;
        PreparedStatement ps = null;
        boolean error = false;
        try {
            conn = getDataSource().getConnection();
            ps = conn.prepareStatement("insert into user(username,password) values('hkp','123')");
            int count = ps.executeUpdate();
            System.out.println("count" + count);
        } catch (Exception e) {
            error = true;
        } finally {
            // 先关闭 JDBC 中的 Statement 和 Connection
            if (ps != null) {
                ps.close();
            }
            if (conn != null) {
                conn.close();
            }
            if (utx.getStatus() != Status.STATUS_NO_TRANSACTION) {
                if (error) {
                    // 遇到异常回滚事务
                    utx.rollback();
                } else {
                    // 正常提交事务
                    utx.commit();
                }
            }
        }
    }
}


可以看到,使用 Atomikos 提供的 UserTransaction 进行事务操作方式与普通的 JDBC 基本一致,只是使用了 Atomikos 提供的数据源获取连接,然后在进行 JDBC 操作前后添加了使用 UserTransaction 开启/结束事务的逻辑。


这里只是使用了一个数据源,也可以使用多个数据源开启分布式事务。


Atomikos TransactionManager 实战


如果不想使用 AtomikosDataSourceBean,也可以手动调用 JTA 的 API,标准环境使用 TransactionManager 的实现类 UserTransactionManager 即可,Web 环境也可以切换为 J2eeTransactionManager。示例代码如下。


public class Application {
    public static void main(String[] args) throws Exception {
        TransactionManager tm = new UserTransactionManager();
        tm.begin();
        // 使用 MySQL XADataSource 的实现
        MysqlXADataSource ds = new MysqlXADataSource();
        ds.setURL("jdbc:mysql://127.0.0.1:3306/test");
        ds.setUser("root");
        ds.setPassword("12345678");
        Connection conn = null;
        PreparedStatement ps = null;
        boolean error = false;
        try {
            // 获取 XAResource
            XAConnection xaconn = ds.getXAConnection();
            XAResource xares = xaconn.getXAResource();
            // 从事务管理器中获取事务
            Transaction tx = tm.getTransaction();
            // 事务关联资源
            tx.enlistResource(xares);
            conn = xaconn.getConnection();
            ps = conn.prepareStatement("insert into user(username,password) values('hkp','123')");
            ps.executeUpdate();
            // 事务与资源解除关联
            tx.delistResource(xares, XAResource.TMSUCCESS);
        } catch (Exception e) {
            error = true;
        } finally {
            // 先使用事务管理器完成事务
            if (tm.getStatus() != Status.STATUS_NO_TRANSACTION) {
                if (error) {
                    // 遇到异常回滚事务
                    tm.rollback();
                } else {
                    // 正常提交事务
                    tm.commit();
                }
            }
            // 最后再关闭 JDBC 中的 Statement 和 Connection
            if (ps != null) {
                ps.close();
            }
            if (conn != null) {
                conn.close();
            }
        }
    }
}


代码完全遵循前面的示意图。


先创建 TransactionManager。

然后调用 TransactionManager.begin 方法开启事务。

然后创建 XADataSource 并获取 XAResource。

然后获取 Transaction 并将其与 XResource 关联。

然后就可以按照 JDBC 的正常流程执行 SQL 了。

执行 SQL 后先把断开事务与资源的关联关系。

最后提交事务后再 close JDBC 中的 Statement 和 Connection,避免先 close 导致事务管理器无法与数据库交互。

Spring Atomikos 实战

Spring 对 JDBC、JTA、JPA 的事务进行封装,提供了自己的事务管理器。

首先引入 Spring 相关依赖。


<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-tx</artifactId>
    <version>5.2.6.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-context</artifactId>
    <version>5.2.6.RELEASE</version>
</dependency>


对于 JTA 来说,配置如下。


@Configuration
@EnableTransactionManagement
public class JTAConfig {
    @Bean(initMethod = "init", destroyMethod = "close")
    public AtomikosDataSourceBean dataSource() {
        Properties properties = new Properties();
        properties.put("url", "jdbc:mysql://127.0.0.1:3306/test");
        properties.put("user", "root");
        properties.put("password", "12345678");
        AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
        ds.setXaDataSourceClassName("com.mysql.cj.jdbc.MysqlXADataSource");
        ds.setXaProperties(properties);
        ds.setUniqueResourceName("resourceName");
        ds.setPoolSize(10);
        ds.setBorrowConnectionTimeout(60);
        return ds;
    }
    @Bean(initMethod = "init", destroyMethod = "close")
    public UserTransactionManager userTransactionManager() throws SystemException {
        UserTransactionManager userTransactionManager = new UserTransactionManager();
        userTransactionManager.setTransactionTimeout(300);
        userTransactionManager.setForceShutdown(true);
        return userTransactionManager;
    }
    @Bean
    public JtaTransactionManager jtaTransactionManager() throws SystemException {
        JtaTransactionManager jtaTransactionManager = new JtaTransactionManager();
        jtaTransactionManager.setTransactionManager(userTransactionManager());
        jtaTransactionManager.setUserTransaction(userTransactionManager());
        return jtaTransactionManager;
    }
}


然后定义我们操作数据库的 UserService 如下。


@Service
public class UserService {
    @Autowired
    private DataSource dataSource;
    @Transactional(rollbackFor = Exception.class)
    public void testInsert() {
        try (Connection conn = dataSource.getConnection();
             PreparedStatement ps = conn.prepareStatement("insert into user(username,password) values('kkk','789')")) {
            int count = ps.executeUpdate();
            System.out.println("count" + count);
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }
}


最后运行测试类。


public class Application {
    public static void main(String[] args) throws Exception {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
        context.scan("com.zzuhkp.demo");
        context.refresh();
        UserService userService = context.getBean(UserService.class);
        userService.testInsert();
        context.close();
    }
}


成功将数据插入数据库,如果事务方法抛出异常则不会提交事务到数据库。


Spring Boot Atomikos 实战


Spring Boot 环境下的 Atomikos 使用较为简单,首先引入相关依赖。


<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.27</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>


然后在 application.properties 进行数据源相关配置。


spring.datasource.url=jdbc:mysql://127.0.0.1:3306/test
spring.datasource.username=root
spring.datasource.password=12345678


这样就完成了。


引入 spring-boot-starter-jta-atomikos 之后 Spring 会自动配置 JtaTransactionManager 和 AtomikosDataSourceBean。

引入 spring-boot-starter-jdbc 则是为了引入事务相关依赖与功能特性。

仍然使用上述示例中的 UserService,修改测试类如下。


@SpringBootApplication
public class Application implements CommandLineRunner {
    public static void main(String[] args) throws Exception {
        SpringApplication.run(Application.class, args);
    }
    @Autowired
    private UserService userService;
    @Override
    public void run(String... args) throws Exception {
        userService.testInsert();
    }
}


可以正常提交事务,如果 UserService 抛出异常则回滚事务。多数据源的情况下手动配置多个 AtomikosDataSourceBean 作为 DataSource bean 即可。

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
6天前
|
Java 应用服务中间件 数据库连接
Java 新手入门:Spring Boot 启动揭秘,小白也能秒懂的超详细指南
Java 新手入门:Spring Boot 启动揭秘,小白也能秒懂的超详细指南
24 2
|
6天前
|
Java 测试技术 API
Java 新手入门:Java单元测试利器,Mock详解
Java 新手入门:Java单元测试利器,Mock详解
29 1
|
3天前
|
设计模式 前端开发 Java
【前端学java】SpringBootWeb极速入门-分层解耦(03)
【8月更文挑战第13天】SpringBootWeb极速入门-分层解耦(03)
8 2
【前端学java】SpringBootWeb极速入门-分层解耦(03)
|
4天前
|
开发框架 前端开发 Java
【前端学java】SpringBootWeb极速入门-实现一个简单的web页面01
【8月更文挑战第12天】SpringBootWeb极速入门-实现一个简单的web页面01
15 3
【前端学java】SpringBootWeb极速入门-实现一个简单的web页面01
|
4天前
|
JSON 前端开发 Java
【前端学java】SpringBootWeb极速入门-请求参数解析(02)
【8月更文挑战第12天】SpringBootWeb极速入门-请求参数解析(02)
10 1
【前端学java】SpringBootWeb极速入门-请求参数解析(02)
|
6天前
|
存储 NoSQL Java
一天五道Java面试题----第十一天(分布式架构下,Session共享有什么方案--------->分布式事务解决方案)
这篇文章是关于Java面试中的分布式架构问题的笔记,包括分布式架构下的Session共享方案、RPC和RMI的理解、分布式ID生成方案、分布式锁解决方案以及分布式事务解决方案。
一天五道Java面试题----第十一天(分布式架构下,Session共享有什么方案--------->分布式事务解决方案)
|
1天前
|
前端开发 IDE Java
"揭秘前端转Java的秘径:SpringBoot Web极速入门,掌握分层解耦艺术,让你的后端代码飞起来,你敢来挑战吗?"
【8月更文挑战第19天】面向前端开发者介绍Spring Boot后端开发,通过简化Spring应用搭建,快速实现Web应用。本文以创建“Hello World”应用为例,展示项目基本结构与运行方式。进而深入探讨三层架构(Controller、Service、DAO)下的分层解耦概念,通过员工信息管理示例,演示各层如何协作及依赖注入的使用,以此提升代码灵活性与可维护性。
|
9天前
|
消息中间件 Java Kafka
"Kafka快速上手:从环境搭建到Java Producer与Consumer实战,轻松掌握分布式流处理平台"
【8月更文挑战第10天】Apache Kafka作为分布式流处理平台的领头羊,凭借其高吞吐量、可扩展性和容错性,在大数据处理、实时日志收集及消息队列领域表现卓越。初学者需掌握Kafka基本概念与操作。Kafka的核心组件包括Producer(生产者)、Broker(服务器)和Consumer(消费者)。Producer发送消息到Topic,Broker负责存储与转发,Consumer则读取这些消息。首先确保已安装Java和Kafka,并启动服务。接着可通过命令行创建Topic,并使用提供的Java API实现Producer发送消息和Consumer读取消息的功能。
31 8
|
7天前
|
存储 Java 编译器
|
6天前
|
Java 测试技术 Spring
Java 新手入门:依赖注入的 N 种姿势,总有一款适合你!
Java 新手入门:依赖注入的 N 种姿势,总有一款适合你!
15 2