atomikos多数据源配置-在工作流(activiti)分库时的事务管理实战

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
云数据库 RDS PostgreSQL,高可用系列 2核4GB
简介: 近期开发一个工作流(activiti)项目,为了方便扩展,想把activiti的数据库和业务数据库分离。那么在项目里面需要针对两个数据库分别定义数据源。注意:下文的atomikos只能解决工作流项目在单应用单服务节点时候的事务问题。微服务架构下需要采用别的分布式管理框架,比如使用seata,那么此时需要还原成 下文中最基本的`配置多个数据源`的方式

配置多个数据源

yml配置两个数据源, act和business:

datasource:
    act:
      jdbcUrl: jdbc:mysql://localhost:3306/lmwy_product_act?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai
      username: root
      password: 123456
      driverClassName: com.mysql.jdbc.Driver
    business:
      jdbc-url: jdbc:mysql://localhost:3306/lmwy_product?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai
      username: root
      password: 123456
      driver-class-name: com.mysql.jdbc.Driver

分别对应到两个数据源的配置:

package com.zhirui.lmwy.flow.config;

import org.activiti.spring.boot.AbstractProcessEngineAutoConfiguration;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

import javax.sql.DataSource;


/**
 *  多数据源配置类
 *      1.activiti数据库连接池
 *          默认, 我们也无法去修改源码故默认
 *      2.工作流业务数据库连接池
 *          明确指定 businessDataSource
 */
@Configuration
public class FlowDatasourceConfig extends AbstractProcessEngineAutoConfiguration {
    @Bean
    @Primary
    @ConfigurationProperties(prefix = "spring.datasource.act")
    @Qualifier("activitiDataSource")
    public DataSource activitiDataSource() {
        DataSource build = DataSourceBuilder.create().build();
        return build;
    }

    @Bean
    @ConfigurationProperties(prefix = "spring.datasource.business")
    @Qualifier("businessDataSource")
    public DataSource businessDataSource() {
        DataSource build = DataSourceBuilder.create().build();
        return build;
    }
}

业务的springdata配置和事物管理器配置

package com.zhirui.lmwy.flow.config;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.domain.EntityScan;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.env.Environment;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import org.springframework.orm.jpa.vendor.HibernateJpaVendorAdapter;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;
import java.util.HashMap;

/**
 * 需要在配置类的上面加上@EnableJpaRepositories(basePackages={"dao层对应的包路径"}),这样jpa的dao层就注入进来了。结果启动spring boot 时发现,又有 Not a managed type: class ******的错误,经查询发现少了jpa entity路径的配置,在配置类的头部加上标记:@EntityScan("entity对应的包路径")
 */
@Configuration
@EnableJpaRepositories(
        basePackages = {"com.zhirui.lmwy.flow.dao"},//代理的dao接口所在的包
        entityManagerFactoryRef = "flowEntityManager",
        transactionManagerRef = "flowTransactionManager"
)
public class JpaRepositoriesConfig {

    @Autowired
    private Environment env;

    @Autowired
    @Qualifier("businessDataSource")
    private DataSource businessDataSource;

    /**
     * 创建entityManagerFactory工厂
     */
    @Bean
    @Primary
    public LocalContainerEntityManagerFactoryBean flowEntityManager() {
        LocalContainerEntityManagerFactoryBean em = new LocalContainerEntityManagerFactoryBean();
        em.setDataSource(businessDataSource);
        //配置扫描的实体类包 ,否则报错:No persistence units parsed from {classpath*:META-INF/persistence.xml}
        em.setPackagesToScan(new String[]{"com.zhirui.lmwy.flow.entity"});
        HibernateJpaVendorAdapter vendorAdapter = new HibernateJpaVendorAdapter();
        em.setJpaVendorAdapter(vendorAdapter);
        HashMap<String, Object> properties = new HashMap<>();
        // application.yaml配置文件的ddl-auto的值
//      properties.put("hibernate.hbm2ddl.auto", env.getProperty("hibernate.hbm2ddl.auto"));
        properties.put("hibernate.show_sql", "true");
        properties.put("hibernate.format_sql", "true");
        // application.yaml配置文件的database-platform的值
//      properties.put("hibernate.dialect", "org.hibernate.dialect.MySQLDialect");
        properties.put("hibernate.implicit_naming_strategy", "org.springframework.boot.orm.jpa.hibernate.SpringImplicitNamingStrategy");
        properties.put("hibernate.physical_naming_strategy", "org.springframework.boot.orm.jpa.hibernate.SpringPhysicalNamingStrategy");
        em.setJpaPropertyMap(properties);
        return em;
    }

    /**
     * 创建事务管理器
     */
    @Primary
    @Bean
    public PlatformTransactionManager flowTransactionManager() {
        JpaTransactionManager transactionManager = new JpaTransactionManager();
        transactionManager.setEntityManagerFactory(flowEntityManager().getObject());
        return transactionManager;
    }
}

activiti中使用act数据源

package com.zhirui.lmwy.flow.config;

import lombok.AllArgsConstructor;
import org.activiti.spring.SpringProcessEngineConfiguration;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;

/**
 * Activiti 配置
 */
@Configuration
@AllArgsConstructor
public class ActivitiConfig {

    private final DataSource dataSource;

    @Autowired
    @Qualifier("activitiDataSource")
    private DataSource activitiDataSource;

    @Bean
    public SpringProcessEngineConfiguration getProcessEngineConfiguration() {
        SpringProcessEngineConfiguration config =
                new SpringProcessEngineConfiguration();
        config.setDataSource(activitiDataSource);
        config.setTransactionManager(activitiTransactionManager());
        return config;
    }

    @Bean
    public DataSourceTransactionManager activitiTransactionManager() {
        DataSourceTransactionManager transactionManager = new DataSourceTransactionManager();
        transactionManager.setDataSource(activitiDataSource);
        return transactionManager;
    }
}

事物管理问题

service方法中只能一个生效,business的事务管理器是"flowTransactionManager",且配置了@Primary组件,所以下面的方法中使用的数据源就是"flowTransactionManager",那么针对数据源act的操作将无法回滚。

@Override
@Transactional(rollbackFor = Exception.class)
public void commit(FlowTaskInstance flowTaskInstance, String tenantId) throws Exception {
    针对数据源act的操作
        针对数据源business的操作
        ...

可以将@Transactional(rollbackFor = Exception.class) 指定事务管理器名称:

@Transactional(rollbackFor = Exception.class, transactionManager = "activitiTransactionManager")

那么方法上使用事物管理器为“activitiTransactionManager”, 但是针对数据源business的操作也将无法回滚。

我们需要一种分布式事物管理器。

* 如果是单应用单节点服务的多数据源事务,可以采用下午的 atomikos实现分布式市委 的方案。

* 如果是微服务架构,那么直接在改基础上整合seata即可!

atomikos实现分布式事务

先了解下spring中事务的管理器

PlatformTransactionManager顶级接口定义了最核心的事务管理方法,下面一层是AbstractPlatformTransactionManager抽象类,实现了PlatformTransactionManager接口的方法并定义了一些抽象方法,供子类拓展。最下面一层是2个经典事务管理器:

1.DataSourceTransactionmanager: 即本地单资源事务管理器,也是spring默认的事务管理器。

2.JtaTransactionManager: 即多资源事务管理器(又叫做分布式事务管理器),其实现了JTA规范,使用XA协议进行两阶段提交。

3. atomikos是JTA规范的具体技术,比较火和流行。

pom配置

<!-- 分布式事务管理 参考:https://blog.csdn.net/qq_35387940/article/details/103474353 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-aop</artifactId>
</dependency>

yaml配置

#datasource
spring:
  jta:
    enabled: true
    atomikos:
      datasource:
        act:
          xa-properties.url: jdbc:mysql://localhost:3306/lmwy_product_act?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai&pinGlobalTxToPhysicalConnection=true
          xa-properties.user: root
          xa-properties.password: 123456
          xa-data-source-class-name: com.mysql.jdbc.jdbc2.optional.MysqlXADataSource
          unique-resource-name: act
          max-pool-size: 10
          min-pool-size: 1
          max-lifetime: 10000
          borrow-connection-timeout: 10000
        business:
          xa-properties.url: jdbc:mysql://localhost:3306/lmwy_product?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai&pinGlobalTxToPhysicalConnection=true
          xa-properties.user: root
          xa-properties.password: 123456
          xa-data-source-class-name: com.mysql.jdbc.jdbc2.optional.MysqlXADataSource
          unique-resource-name: business
          max-pool-size: 10
          min-pool-size: 1
          max-lifetime: 10000
          borrow-connection-timeout: 10000

配置多数据源和事务管理器

package com.zhirui.lmwy.flow.config;

import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import org.activiti.spring.boot.AbstractProcessEngineAutoConfiguration;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.transaction.jta.JtaTransactionManager;

import javax.sql.DataSource;
import javax.transaction.SystemException;
import javax.transaction.UserTransaction;


/**
 *  多数据源配置类
 *      1.activiti数据库连接池
 *          默认
 *      2.工作流业务数据库连接池
 *          明确指定 businessDataSource
 */
@Configuration
public class AtomikosConfig extends AbstractProcessEngineAutoConfiguration {

    @Primary
    @Bean(name = "actDatasource")
    @Qualifier("actDatasource")
    @ConfigurationProperties(prefix="spring.jta.atomikos.datasource.act")
    public DataSource actDatasource() {
        return new AtomikosDataSourceBean();
    }

    @Bean(name = "businessDatasource")
    @Qualifier("businessDatasource")
    @ConfigurationProperties(prefix="spring.jta.atomikos.datasource.business")
    public DataSource businessDatasource() {
        return new AtomikosDataSourceBean();
    }

    @Bean("jtaTransactionManager")
    @Primary
    public JtaTransactionManager activitiTransactionManager() throws SystemException {
        UserTransactionManager userTransactionManager = new UserTransactionManager();
        UserTransaction userTransaction = new UserTransactionImp();
        return new JtaTransactionManager(userTransaction, userTransactionManager);
    }
}

业务jpa绑定数据源

package com.zhirui.lmwy.flow.config;

import org.hibernate.engine.transaction.jta.platform.internal.AtomikosJtaPlatform;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.domain.EntityScan;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.env.Environment;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import org.springframework.orm.jpa.vendor.HibernateJpaVendorAdapter;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;
import javax.transaction.TransactionManager;
import javax.transaction.UserTransaction;
import java.util.HashMap;
import java.util.Map;

/**
 * 需要在配置类的上面加上@EnableJpaRepositories(basePackages={"dao层对应的包路径"}),这样jpa的dao层就注入进来了。结果启动spring boot 时发现,又有 Not a managed type: class ******的错误,经查询发现少了jpa entity路径的配置,在配置类的头部加上标记:@EntityScan("entity对应的包路径")
 */
@Configuration
@EnableJpaRepositories(
        basePackages = {"com.zhirui.lmwy.flow.dao"},//代理的dao接口所在的包
        entityManagerFactoryRef = "flowEntityManager",
        transactionManagerRef = "jtaTransactionManager" //指定jta的事务管理器
)
public class JpaRepositoriesConfig {

    @Autowired
    private Environment env;

    @Autowired
    @Qualifier("businessDatasource")
    private DataSource businessDataSource;

    /**
     * 创建entityManagerFactory工厂
     */
    @Bean
//    @Primary
    public LocalContainerEntityManagerFactoryBean flowEntityManager() {
        // *** jta 事务管理 ***
//        AtomikosJtaPlatform.setTransactionManager(transactionManager);
//        AtomikosJtaPlatform.setUserTransaction(userTransaction);

        LocalContainerEntityManagerFactoryBean em = new LocalContainerEntityManagerFactoryBean();

        // *** jta datasource ***
        em.setJtaDataSource(businessDataSource);
//        em.setDataSource(businessDataSource);
        // 配置扫描的实体类包 ,否则报错:No persistence units parsed from {classpath*:META-INF/persistence.xml}
        em.setPackagesToScan(new String[]{"com.zhirui.lmwy.flow.entity"});
        HibernateJpaVendorAdapter vendorAdapter = new HibernateJpaVendorAdapter();
        em.setJpaVendorAdapter(vendorAdapter);
        HashMap<String, Object> properties = new HashMap<>();

        // *** jta datasource ***
        properties.put("hibernate.transaction.jta.platform", AtomikosJtaPlatform.class.getName());
        properties.put("javax.persistence.transactionType", "JTA");

        // application.yaml配置文件的ddl-auto的值
//      properties.put("hibernate.hbm2ddl.auto", env.getProperty("hibernate.hbm2ddl.auto"));
        properties.put("hibernate.show_sql", "true");
        properties.put("hibernate.format_sql", "true");
        // application.yaml配置文件的database-platform的值
//      properties.put("hibernate.dialect", "org.hibernate.dialect.MySQLDialect");
        properties.put("hibernate.implicit_naming_strategy", "org.springframework.boot.orm.jpa.hibernate.SpringImplicitNamingStrategy");
        properties.put("hibernate.physical_naming_strategy", "org.springframework.boot.orm.jpa.hibernate.SpringPhysicalNamingStrategy");
        em.setJpaPropertyMap(properties);
        return em;
    }

}

activiti绑定数据源

package com.zhirui.lmwy.flow.config;

import lombok.AllArgsConstructor;
import org.activiti.spring.SpringProcessEngineConfiguration;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.transaction.jta.JtaTransactionManager;

import javax.sql.DataSource;

/**
 * Activiti 配置
 */
@Configuration
@AllArgsConstructor
public class ActivitiConfig {

    @Autowired
    @Qualifier("actDatasource")
    private DataSource activitiDataSource;

    @Bean
    @Primary
    public SpringProcessEngineConfiguration getProcessEngineConfiguration(JtaTransactionManager jtaTransactionManager) {
        SpringProcessEngineConfiguration config =
                new SpringProcessEngineConfiguration();
        config.setDataSource(activitiDataSource);
        config.setTransactionManager(jtaTransactionManager);
        return config;
    }

}

测试:

执行任务提交方法,报错后都会进行回滚

public void commit(){
    // 操作1:activiti执行任务,用的act数据源
    taskService.complete(task.getId());
    ...
    // 操作2:更新业务流程对象,保存业务对象,用的business数据源
    flowInstanceDao.save(flowInstance);
    ...
    int a = 100 / 0;
}

参考:

http://www.manongjc.com/detail/6-anzuqtksygeselj.html

https://www.cnblogs.com/xkzhangsanx/p/11218453.html

https://www.jianshu.com/p/099c0850ba16

相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。 &nbsp; 相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情:&nbsp;https://www.aliyun.com/product/rds/mysql&nbsp;
目录
相关文章
|
存储 SQL 数据库
在TiDB中创建表
【2月更文挑战第29天】在TiDB中创建表涉及定义字段名、数据类型和约束 。注意规划表结构、选择合适的数据类型和约束,以及谨慎使用索引,以平衡查询速度和写入性能。在实际操作前确保备份数据。
|
XML 缓存 前端开发
Thymeleaf一篇就够了
Thymeleaf是Springboot官方支持的模板引擎,有着动静分离等独有特点,通过本文简单学习下吧!
62100 24
Thymeleaf一篇就够了
|
关系型数据库 MySQL 数据库
docker 安装 mysql 并映射数据库存放路径及配置文件
本文是博主学习docker镜像的记录,希望对大家有所帮助。
4482 0
docker 安装 mysql 并映射数据库存放路径及配置文件
|
7月前
|
人工智能 Java 程序员
【AI程序员】通义灵码 AI 程序员全面上线JAVA使用体验
通过 AI 程序编写一个JAVA后台项目登陆页面
599 42
|
缓存 前端开发 JavaScript
解密前端性能优化:提升网页加载速度的关键策略
网页加载速度不仅影响用户体验,还对搜索引擎排名有直接影响。本文将深入探讨前端性能优化的多种策略,包括资源压缩与合并、延迟加载、CDN加速等。通过具体的优化方法和实际案例分析,读者将能够掌握提升网页性能的实用技巧,并能够在实际项目中有效地应用这些策略来实现更快的加载速度和更流畅的用户体验。
|
Java 数据库连接 mybatis
成功解决: Invalid bound statement (not found) 在已经使用mybatis的项目里引入mybatis-plus,结果不能共存的解决
这篇文章讨论了在已使用MyBatis的项目中引入MyBatis-Plus后出现的"Invalid bound statement (not found)"错误,并提供了解决方法,主要是通过修改yml配置文件来解决MyBatis和MyBatis-Plus共存时的冲突问题。
成功解决: Invalid bound statement (not found) 在已经使用mybatis的项目里引入mybatis-plus,结果不能共存的解决
|
Java
java: 警告: 源发行版 11 需要目标发行版 11”错误解决(解决方案整合)
java: 警告: 源发行版 11 需要目标发行版 11”错误解决(解决方案整合)
3253 0
|
消息中间件 存储 缓存
消息队列之推还是拉,RocketMQ 和 Kafka 是如何做的?(上)
消息队列之推还是拉,RocketMQ 和 Kafka 是如何做的?(上)
消息队列之推还是拉,RocketMQ 和 Kafka 是如何做的?(上)
|
小程序 JavaScript 容器
微信小程序入门学习02-TDesign中的自定义组件
微信小程序入门学习02-TDesign中的自定义组件
|
机器学习/深度学习 算法 数据处理
【MATLAB】tvfEMD信号分解+FFT+HHT组合算法
【MATLAB】tvfEMD信号分解+FFT+HHT组合算法
440 0

热门文章

最新文章