Spring Boot中如何实现分布式事务
今天,我们来探讨一下在Spring Boot中如何实现分布式事务。
一、什么是分布式事务
分布式事务是指在分布式系统中涉及多个数据库或服务的事务操作。它确保这些操作要么全部成功,要么全部失败,从而保持数据的一致性。实现分布式事务的常见方法包括两阶段提交(2PC)、补偿事务模式(TCC)、和消息队列事务。
二、使用Spring Cloud和Atomikos实现分布式事务
在Spring Boot中,我们可以使用Spring Cloud和Atomikos来实现分布式事务。Atomikos是一个Java事务处理器,支持XA和非XA事务。
1. 引入依赖
在pom.xml
中添加以下依赖:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.atomikos</groupId>
<artifactId>transactions-jta</artifactId>
<version>5.0.8</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>
<!-- 其他依赖 -->
</dependencies>
2. 配置数据源
在application.yml
中配置多个数据源:
spring:
jta:
enabled: true
datasource:
primary:
xa:
data-source-class-name: com.mysql.cj.jdbc.MysqlXADataSource
url: jdbc:mysql://localhost:3306/primary_db
username: root
password: password
secondary:
xa:
data-source-class-name: com.mysql.cj.jdbc.MysqlXADataSource
url: jdbc:mysql://localhost:3306/secondary_db
username: root
password: password
3. 配置Atomikos数据源和事务管理器
创建配置类来配置Atomikos数据源和JPA事务管理器:
package cn.juwatech.config;
import com.atomikos.jdbc.AtomikosDataSourceBean;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBeanWrapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import org.springframework.orm.jpa.vendor.HibernateJpaVendorAdapter;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class DataSourceConfig {
@Bean(name = "primaryDataSource")
public DataSource primaryDataSource() {
AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
xaDataSource.setUniqueResourceName("primary");
xaDataSource.setXaDataSourceClassName("com.mysql.cj.jdbc.MysqlXADataSource");
xaDataSource.setXaProperties(primaryXaProperties());
return new AtomikosDataSourceBeanWrapper(xaDataSource);
}
@Bean(name = "secondaryDataSource")
public DataSource secondaryDataSource() {
AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
xaDataSource.setUniqueResourceName("secondary");
xaDataSource.setXaDataSourceClassName("com.mysql.cj.jdbc.MysqlXADataSource");
xaDataSource.setXaProperties(secondaryXaProperties());
return new AtomikosDataSourceBeanWrapper(xaDataSource);
}
@Bean(name = "primaryEntityManager")
public LocalContainerEntityManagerFactoryBean primaryEntityManager() {
LocalContainerEntityManagerFactoryBean em = new LocalContainerEntityManagerFactoryBean();
em.setDataSource(primaryDataSource());
em.setPackagesToScan("cn.juwatech.primary");
em.setJpaVendorAdapter(new HibernateJpaVendorAdapter());
em.setJpaPropertyMap(jpaProperties());
return em;
}
@Bean(name = "secondaryEntityManager")
public LocalContainerEntityManagerFactoryBean secondaryEntityManager() {
LocalContainerEntityManagerFactoryBean em = new LocalContainerEntityManagerFactoryBean();
em.setDataSource(secondaryDataSource());
em.setPackagesToScan("cn.juwatech.secondary");
em.setJpaVendorAdapter(new HibernateJpaVendorAdapter());
em.setJpaPropertyMap(jpaProperties());
return em;
}
@Bean(name = "transactionManager")
public JpaTransactionManager transactionManager() {
JpaTransactionManager tm = new JpaTransactionManager();
tm.setEntityManagerFactory(primaryEntityManager().getObject());
tm.setDataSource(primaryDataSource());
return tm;
}
private Map<String, String> primaryXaProperties() {
Map<String, String> xaProperties = new HashMap<>();
xaProperties.put("databaseName", "primary_db");
xaProperties.put("user", "root");
xaProperties.put("password", "password");
return xaProperties;
}
private Map<String, String> secondaryXaProperties() {
Map<String, String> xaProperties = new HashMap<>();
xaProperties.put("databaseName", "secondary_db");
xaProperties.put("user", "root");
xaProperties.put("password", "password");
return xaProperties;
}
private Map<String, Object> jpaProperties() {
Map<String, Object> properties = new HashMap<>();
properties.put("hibernate.hbm2ddl.auto", "update");
properties.put("hibernate.dialect", "org.hibernate.dialect.MySQL5Dialect");
return properties;
}
}
4. 创建实体类和仓库接口
定义两个实体类和对应的仓库接口:
package cn.juwatech.primary;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
@Entity
public class PrimaryEntity {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String name;
// getters and setters
}
package cn.juwatech.secondary;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
@Entity
public class SecondaryEntity {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String description;
// getters and setters
}
package cn.juwatech.primary;
import org.springframework.data.jpa.repository.JpaRepository;
public interface PrimaryRepository extends JpaRepository<PrimaryEntity, Long> {
}
package cn.juwatech.secondary;
import org.springframework.data.jpa.repository.JpaRepository;
public interface SecondaryRepository extends JpaRepository<SecondaryEntity, Long> {
}
5. 创建服务层
创建一个服务类,包含跨多个数据库的事务操作:
package cn.juwatech.service;
import cn.juwatech.primary.PrimaryEntity;
import cn.juwatech.primary.PrimaryRepository;
import cn.juwatech.secondary.SecondaryEntity;
import cn.juwatech.secondary.SecondaryRepository;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
public class TransactionalService {
private final PrimaryRepository primaryRepository;
private final SecondaryRepository secondaryRepository;
public TransactionalService(PrimaryRepository primaryRepository, SecondaryRepository secondaryRepository) {
this.primaryRepository = primaryRepository;
this.secondaryRepository = secondaryRepository;
}
@Transactional
public void executeDistributedTransaction() {
PrimaryEntity primaryEntity = new PrimaryEntity();
primaryEntity.setName("Primary Name");
primaryRepository.save(primaryEntity);
SecondaryEntity secondaryEntity = new SecondaryEntity();
secondaryEntity.setDescription("Secondary Description");
secondaryRepository.save(secondaryEntity);
// 模拟异常以验证事务回滚
if (true) {
throw new RuntimeException("Simulated exception to trigger rollback");
}
}
}
6. 创建控制器
在控制器中调用服务层的方法,触发分布式事务:
package cn.juwatech.controller;
import cn.juwatech.service.TransactionalService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class TransactionalController {
private final TransactionalService transactionalService;
public TransactionalController(TransactionalService transactionalService) {
this.transactionalService = transactionalService;
}
@GetMapping("/transaction")
public String executeTransaction() {
try {
transactionalService.executeDistributedTransaction();
return "Transaction executed successfully";
} catch (Exception e) {
return "Transaction failed: " + e.getMessage();
}
}
}
通过本文,我们了解了如何在Spring Boot项目中使用Atomikos和Spring Cloud实现分布式事务。分布式事务保证了在多个数据库操作之间的数据一致性,是分布式系统中不可或缺的重要机制。