JTA 活动事务交互
前面已经介绍了事务提交、回滚、异常场景下各组件的交互,事务提交之前的活动事务也有自己的交互流程。根据前面 API 的介绍,可以大概总结流程如下。
需要注意的是只有 Connection 被 close 才会调用 Transaction.delistResource 释放资源,这意味着应该在 try{}finaly{} 中的 finally 块关闭连接。
JTA Atomikos 实战
了解 JTA API 之后我们可以通过实战的方式加深理解,由于目前 EJB 容器慢慢淡出了大家的视野,我们使用事务管理器的实现 Atomikos 加以演示。
Atomikos UserTransaction 实战
使用 UserTransaction 需要了解 Atomikos 提供的两个类。
UserTransactionImp:这个类实现是 UserTransaction 的实现,内部封装了 TransactionManager。
AtomikosDataSourceBean:这个类是 DataSource 的实现,内部封装了对 XAResource 的相关操作。
由于事务管理器和数据源都由 Atomikos 提供,因此其内部知道如何进行事务管理器、事务与资源之间的交互,例如可以将事务管理器设置为单例 bean,将事务/资源存到线程上下文。我们直接使用即可。
假定我们有一个 MySQL 数据库,数据库名为 test,表 user 数据结构如下。
create table user ( id bigint unsigned auto_increment primary key, username varchar(20) null, password varchar(20) null )
我们可以测试使用 Atomikos 添加一条记录。首先引入依赖。
<dependency> <groupId>jakarta.transaction</groupId> <artifactId>jakarta.transaction-api</artifactId> <version>1.3.3</version> </dependency> <dependency> <groupId>com.atomikos</groupId> <artifactId>transactions-jdbc</artifactId> <version>4.0.6</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.27</version> </dependency>
先提供一个获取数据源的静态方法。
public class Application { private static DataSource getDataSource() { Properties properties = new Properties(); properties.put("url", "jdbc:mysql://127.0.0.1:3306/test"); properties.put("user", "root"); properties.put("password", "12345678"); AtomikosDataSourceBean ds = new AtomikosDataSourceBean(); ds.setXaDataSourceClassName("com.mysql.cj.jdbc.MysqlXADataSource"); ds.setXaProperties(properties); ds.setUniqueResourceName("resourceName"); ds.setPoolSize(10); ds.setBorrowConnectionTimeout(60); return ds; } }
其中代码中的 properties 用于配置 MysqlXADataSource 中连接数据库的属性值。
然后写一个测试方法。
public class Application { public static void main(String[] args) throws Exception { UserTransaction utx = new UserTransactionImp(); utx.setTransactionTimeout(60); // 开启事务 utx.begin(); Connection conn = null; PreparedStatement ps = null; boolean error = false; try { conn = getDataSource().getConnection(); ps = conn.prepareStatement("insert into user(username,password) values('hkp','123')"); int count = ps.executeUpdate(); System.out.println("count" + count); } catch (Exception e) { error = true; } finally { // 先关闭 JDBC 中的 Statement 和 Connection if (ps != null) { ps.close(); } if (conn != null) { conn.close(); } if (utx.getStatus() != Status.STATUS_NO_TRANSACTION) { if (error) { // 遇到异常回滚事务 utx.rollback(); } else { // 正常提交事务 utx.commit(); } } } } }
可以看到,使用 Atomikos 提供的 UserTransaction 进行事务操作方式与普通的 JDBC 基本一致,只是使用了 Atomikos 提供的数据源获取连接,然后在进行 JDBC 操作前后添加了使用 UserTransaction 开启/结束事务的逻辑。
这里只是使用了一个数据源,也可以使用多个数据源开启分布式事务。
Atomikos TransactionManager 实战
如果不想使用 AtomikosDataSourceBean,也可以手动调用 JTA 的 API,标准环境使用 TransactionManager 的实现类 UserTransactionManager 即可,Web 环境也可以切换为 J2eeTransactionManager。示例代码如下。
public class Application { public static void main(String[] args) throws Exception { TransactionManager tm = new UserTransactionManager(); tm.begin(); // 使用 MySQL XADataSource 的实现 MysqlXADataSource ds = new MysqlXADataSource(); ds.setURL("jdbc:mysql://127.0.0.1:3306/test"); ds.setUser("root"); ds.setPassword("12345678"); Connection conn = null; PreparedStatement ps = null; boolean error = false; try { // 获取 XAResource XAConnection xaconn = ds.getXAConnection(); XAResource xares = xaconn.getXAResource(); // 从事务管理器中获取事务 Transaction tx = tm.getTransaction(); // 事务关联资源 tx.enlistResource(xares); conn = xaconn.getConnection(); ps = conn.prepareStatement("insert into user(username,password) values('hkp','123')"); ps.executeUpdate(); // 事务与资源解除关联 tx.delistResource(xares, XAResource.TMSUCCESS); } catch (Exception e) { error = true; } finally { // 先使用事务管理器完成事务 if (tm.getStatus() != Status.STATUS_NO_TRANSACTION) { if (error) { // 遇到异常回滚事务 tm.rollback(); } else { // 正常提交事务 tm.commit(); } } // 最后再关闭 JDBC 中的 Statement 和 Connection if (ps != null) { ps.close(); } if (conn != null) { conn.close(); } } } }
代码完全遵循前面的示意图。
先创建 TransactionManager。
然后调用 TransactionManager.begin 方法开启事务。
然后创建 XADataSource 并获取 XAResource。
然后获取 Transaction 并将其与 XResource 关联。
然后就可以按照 JDBC 的正常流程执行 SQL 了。
执行 SQL 后先把断开事务与资源的关联关系。
最后提交事务后再 close JDBC 中的 Statement 和 Connection,避免先 close 导致事务管理器无法与数据库交互。
Spring Atomikos 实战
Spring 对 JDBC、JTA、JPA 的事务进行封装,提供了自己的事务管理器。
首先引入 Spring 相关依赖。
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-tx</artifactId> <version>5.2.6.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>5.2.6.RELEASE</version> </dependency>
对于 JTA 来说,配置如下。
@Configuration @EnableTransactionManagement public class JTAConfig { @Bean(initMethod = "init", destroyMethod = "close") public AtomikosDataSourceBean dataSource() { Properties properties = new Properties(); properties.put("url", "jdbc:mysql://127.0.0.1:3306/test"); properties.put("user", "root"); properties.put("password", "12345678"); AtomikosDataSourceBean ds = new AtomikosDataSourceBean(); ds.setXaDataSourceClassName("com.mysql.cj.jdbc.MysqlXADataSource"); ds.setXaProperties(properties); ds.setUniqueResourceName("resourceName"); ds.setPoolSize(10); ds.setBorrowConnectionTimeout(60); return ds; } @Bean(initMethod = "init", destroyMethod = "close") public UserTransactionManager userTransactionManager() throws SystemException { UserTransactionManager userTransactionManager = new UserTransactionManager(); userTransactionManager.setTransactionTimeout(300); userTransactionManager.setForceShutdown(true); return userTransactionManager; } @Bean public JtaTransactionManager jtaTransactionManager() throws SystemException { JtaTransactionManager jtaTransactionManager = new JtaTransactionManager(); jtaTransactionManager.setTransactionManager(userTransactionManager()); jtaTransactionManager.setUserTransaction(userTransactionManager()); return jtaTransactionManager; } }
然后定义我们操作数据库的 UserService 如下。
@Service public class UserService { @Autowired private DataSource dataSource; @Transactional(rollbackFor = Exception.class) public void testInsert() { try (Connection conn = dataSource.getConnection(); PreparedStatement ps = conn.prepareStatement("insert into user(username,password) values('kkk','789')")) { int count = ps.executeUpdate(); System.out.println("count" + count); } catch (SQLException e) { throw new RuntimeException(e); } } }
最后运行测试类。
public class Application { public static void main(String[] args) throws Exception { AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(); context.scan("com.zzuhkp.demo"); context.refresh(); UserService userService = context.getBean(UserService.class); userService.testInsert(); context.close(); } }
成功将数据插入数据库,如果事务方法抛出异常则不会提交事务到数据库。
Spring Boot Atomikos 实战
Spring Boot 环境下的 Atomikos 使用较为简单,首先引入相关依赖。
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.27</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jta-atomikos</artifactId> </dependency>
然后在 application.properties
进行数据源相关配置。
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/test spring.datasource.username=root spring.datasource.password=12345678
这样就完成了。
引入 spring-boot-starter-jta-atomikos 之后 Spring 会自动配置 JtaTransactionManager 和 AtomikosDataSourceBean。
引入 spring-boot-starter-jdbc 则是为了引入事务相关依赖与功能特性。
仍然使用上述示例中的 UserService,修改测试类如下。
@SpringBootApplication public class Application implements CommandLineRunner { public static void main(String[] args) throws Exception { SpringApplication.run(Application.class, args); } @Autowired private UserService userService; @Override public void run(String... args) throws Exception { userService.testInsert(); } }
可以正常提交事务,如果 UserService
抛出异常则回滚事务。多数据源的情况下手动配置多个 AtomikosDataSourceBean
作为 DataSource
bean 即可。