atomikos多数据源配置-在工作流(activiti)分库时的事务管理实战

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS PostgreSQL,集群系列 2核4GB
简介: 近期开发一个工作流(activiti)项目,为了方便扩展,想把activiti的数据库和业务数据库分离。那么在项目里面需要针对两个数据库分别定义数据源。注意:下文的atomikos只能解决工作流项目在单应用单服务节点时候的事务问题。微服务架构下需要采用别的分布式管理框架,比如使用seata,那么此时需要还原成 下文中最基本的`配置多个数据源`的方式

配置多个数据源

yml配置两个数据源, act和business:

datasource:
    act:
      jdbcUrl: jdbc:mysql://localhost:3306/lmwy_product_act?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai
      username: root
      password: 123456
      driverClassName: com.mysql.jdbc.Driver
    business:
      jdbc-url: jdbc:mysql://localhost:3306/lmwy_product?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai
      username: root
      password: 123456
      driver-class-name: com.mysql.jdbc.Driver

分别对应到两个数据源的配置:

package com.zhirui.lmwy.flow.config;

import org.activiti.spring.boot.AbstractProcessEngineAutoConfiguration;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

import javax.sql.DataSource;


/**
 *  多数据源配置类
 *      1.activiti数据库连接池
 *          默认, 我们也无法去修改源码故默认
 *      2.工作流业务数据库连接池
 *          明确指定 businessDataSource
 */
@Configuration
public class FlowDatasourceConfig extends AbstractProcessEngineAutoConfiguration {
    @Bean
    @Primary
    @ConfigurationProperties(prefix = "spring.datasource.act")
    @Qualifier("activitiDataSource")
    public DataSource activitiDataSource() {
        DataSource build = DataSourceBuilder.create().build();
        return build;
    }

    @Bean
    @ConfigurationProperties(prefix = "spring.datasource.business")
    @Qualifier("businessDataSource")
    public DataSource businessDataSource() {
        DataSource build = DataSourceBuilder.create().build();
        return build;
    }
}

业务的springdata配置和事物管理器配置

package com.zhirui.lmwy.flow.config;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.domain.EntityScan;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.env.Environment;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import org.springframework.orm.jpa.vendor.HibernateJpaVendorAdapter;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;
import java.util.HashMap;

/**
 * 需要在配置类的上面加上@EnableJpaRepositories(basePackages={"dao层对应的包路径"}),这样jpa的dao层就注入进来了。结果启动spring boot 时发现,又有 Not a managed type: class ******的错误,经查询发现少了jpa entity路径的配置,在配置类的头部加上标记:@EntityScan("entity对应的包路径")
 */
@Configuration
@EnableJpaRepositories(
        basePackages = {"com.zhirui.lmwy.flow.dao"},//代理的dao接口所在的包
        entityManagerFactoryRef = "flowEntityManager",
        transactionManagerRef = "flowTransactionManager"
)
public class JpaRepositoriesConfig {

    @Autowired
    private Environment env;

    @Autowired
    @Qualifier("businessDataSource")
    private DataSource businessDataSource;

    /**
     * 创建entityManagerFactory工厂
     */
    @Bean
    @Primary
    public LocalContainerEntityManagerFactoryBean flowEntityManager() {
        LocalContainerEntityManagerFactoryBean em = new LocalContainerEntityManagerFactoryBean();
        em.setDataSource(businessDataSource);
        //配置扫描的实体类包 ,否则报错:No persistence units parsed from {classpath*:META-INF/persistence.xml}
        em.setPackagesToScan(new String[]{"com.zhirui.lmwy.flow.entity"});
        HibernateJpaVendorAdapter vendorAdapter = new HibernateJpaVendorAdapter();
        em.setJpaVendorAdapter(vendorAdapter);
        HashMap<String, Object> properties = new HashMap<>();
        // application.yaml配置文件的ddl-auto的值
//      properties.put("hibernate.hbm2ddl.auto", env.getProperty("hibernate.hbm2ddl.auto"));
        properties.put("hibernate.show_sql", "true");
        properties.put("hibernate.format_sql", "true");
        // application.yaml配置文件的database-platform的值
//      properties.put("hibernate.dialect", "org.hibernate.dialect.MySQLDialect");
        properties.put("hibernate.implicit_naming_strategy", "org.springframework.boot.orm.jpa.hibernate.SpringImplicitNamingStrategy");
        properties.put("hibernate.physical_naming_strategy", "org.springframework.boot.orm.jpa.hibernate.SpringPhysicalNamingStrategy");
        em.setJpaPropertyMap(properties);
        return em;
    }

    /**
     * 创建事务管理器
     */
    @Primary
    @Bean
    public PlatformTransactionManager flowTransactionManager() {
        JpaTransactionManager transactionManager = new JpaTransactionManager();
        transactionManager.setEntityManagerFactory(flowEntityManager().getObject());
        return transactionManager;
    }
}

activiti中使用act数据源

package com.zhirui.lmwy.flow.config;

import lombok.AllArgsConstructor;
import org.activiti.spring.SpringProcessEngineConfiguration;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;

/**
 * Activiti 配置
 */
@Configuration
@AllArgsConstructor
public class ActivitiConfig {

    private final DataSource dataSource;

    @Autowired
    @Qualifier("activitiDataSource")
    private DataSource activitiDataSource;

    @Bean
    public SpringProcessEngineConfiguration getProcessEngineConfiguration() {
        SpringProcessEngineConfiguration config =
                new SpringProcessEngineConfiguration();
        config.setDataSource(activitiDataSource);
        config.setTransactionManager(activitiTransactionManager());
        return config;
    }

    @Bean
    public DataSourceTransactionManager activitiTransactionManager() {
        DataSourceTransactionManager transactionManager = new DataSourceTransactionManager();
        transactionManager.setDataSource(activitiDataSource);
        return transactionManager;
    }
}

事物管理问题

service方法中只能一个生效,business的事务管理器是"flowTransactionManager",且配置了@Primary组件,所以下面的方法中使用的数据源就是"flowTransactionManager",那么针对数据源act的操作将无法回滚。

@Override
@Transactional(rollbackFor = Exception.class)
public void commit(FlowTaskInstance flowTaskInstance, String tenantId) throws Exception {
    针对数据源act的操作
        针对数据源business的操作
        ...

可以将@Transactional(rollbackFor = Exception.class) 指定事务管理器名称:

@Transactional(rollbackFor = Exception.class, transactionManager = "activitiTransactionManager")

那么方法上使用事物管理器为“activitiTransactionManager”, 但是针对数据源business的操作也将无法回滚。

我们需要一种分布式事物管理器。

* 如果是单应用单节点服务的多数据源事务,可以采用下午的 atomikos实现分布式市委 的方案。

* 如果是微服务架构,那么直接在改基础上整合seata即可!

atomikos实现分布式事务

先了解下spring中事务的管理器

PlatformTransactionManager顶级接口定义了最核心的事务管理方法,下面一层是AbstractPlatformTransactionManager抽象类,实现了PlatformTransactionManager接口的方法并定义了一些抽象方法,供子类拓展。最下面一层是2个经典事务管理器:

1.DataSourceTransactionmanager: 即本地单资源事务管理器,也是spring默认的事务管理器。

2.JtaTransactionManager: 即多资源事务管理器(又叫做分布式事务管理器),其实现了JTA规范,使用XA协议进行两阶段提交。

3. atomikos是JTA规范的具体技术,比较火和流行。

pom配置

<!-- 分布式事务管理 参考:https://blog.csdn.net/qq_35387940/article/details/103474353 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-aop</artifactId>
</dependency>

yaml配置

#datasource
spring:
  jta:
    enabled: true
    atomikos:
      datasource:
        act:
          xa-properties.url: jdbc:mysql://localhost:3306/lmwy_product_act?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai&pinGlobalTxToPhysicalConnection=true
          xa-properties.user: root
          xa-properties.password: 123456
          xa-data-source-class-name: com.mysql.jdbc.jdbc2.optional.MysqlXADataSource
          unique-resource-name: act
          max-pool-size: 10
          min-pool-size: 1
          max-lifetime: 10000
          borrow-connection-timeout: 10000
        business:
          xa-properties.url: jdbc:mysql://localhost:3306/lmwy_product?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai&pinGlobalTxToPhysicalConnection=true
          xa-properties.user: root
          xa-properties.password: 123456
          xa-data-source-class-name: com.mysql.jdbc.jdbc2.optional.MysqlXADataSource
          unique-resource-name: business
          max-pool-size: 10
          min-pool-size: 1
          max-lifetime: 10000
          borrow-connection-timeout: 10000

配置多数据源和事务管理器

package com.zhirui.lmwy.flow.config;

import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import org.activiti.spring.boot.AbstractProcessEngineAutoConfiguration;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.transaction.jta.JtaTransactionManager;

import javax.sql.DataSource;
import javax.transaction.SystemException;
import javax.transaction.UserTransaction;


/**
 *  多数据源配置类
 *      1.activiti数据库连接池
 *          默认
 *      2.工作流业务数据库连接池
 *          明确指定 businessDataSource
 */
@Configuration
public class AtomikosConfig extends AbstractProcessEngineAutoConfiguration {

    @Primary
    @Bean(name = "actDatasource")
    @Qualifier("actDatasource")
    @ConfigurationProperties(prefix="spring.jta.atomikos.datasource.act")
    public DataSource actDatasource() {
        return new AtomikosDataSourceBean();
    }

    @Bean(name = "businessDatasource")
    @Qualifier("businessDatasource")
    @ConfigurationProperties(prefix="spring.jta.atomikos.datasource.business")
    public DataSource businessDatasource() {
        return new AtomikosDataSourceBean();
    }

    @Bean("jtaTransactionManager")
    @Primary
    public JtaTransactionManager activitiTransactionManager() throws SystemException {
        UserTransactionManager userTransactionManager = new UserTransactionManager();
        UserTransaction userTransaction = new UserTransactionImp();
        return new JtaTransactionManager(userTransaction, userTransactionManager);
    }
}

业务jpa绑定数据源

package com.zhirui.lmwy.flow.config;

import org.hibernate.engine.transaction.jta.platform.internal.AtomikosJtaPlatform;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.domain.EntityScan;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.env.Environment;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import org.springframework.orm.jpa.vendor.HibernateJpaVendorAdapter;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;
import javax.transaction.TransactionManager;
import javax.transaction.UserTransaction;
import java.util.HashMap;
import java.util.Map;

/**
 * 需要在配置类的上面加上@EnableJpaRepositories(basePackages={"dao层对应的包路径"}),这样jpa的dao层就注入进来了。结果启动spring boot 时发现,又有 Not a managed type: class ******的错误,经查询发现少了jpa entity路径的配置,在配置类的头部加上标记:@EntityScan("entity对应的包路径")
 */
@Configuration
@EnableJpaRepositories(
        basePackages = {"com.zhirui.lmwy.flow.dao"},//代理的dao接口所在的包
        entityManagerFactoryRef = "flowEntityManager",
        transactionManagerRef = "jtaTransactionManager" //指定jta的事务管理器
)
public class JpaRepositoriesConfig {

    @Autowired
    private Environment env;

    @Autowired
    @Qualifier("businessDatasource")
    private DataSource businessDataSource;

    /**
     * 创建entityManagerFactory工厂
     */
    @Bean
//    @Primary
    public LocalContainerEntityManagerFactoryBean flowEntityManager() {
        // *** jta 事务管理 ***
//        AtomikosJtaPlatform.setTransactionManager(transactionManager);
//        AtomikosJtaPlatform.setUserTransaction(userTransaction);

        LocalContainerEntityManagerFactoryBean em = new LocalContainerEntityManagerFactoryBean();

        // *** jta datasource ***
        em.setJtaDataSource(businessDataSource);
//        em.setDataSource(businessDataSource);
        // 配置扫描的实体类包 ,否则报错:No persistence units parsed from {classpath*:META-INF/persistence.xml}
        em.setPackagesToScan(new String[]{"com.zhirui.lmwy.flow.entity"});
        HibernateJpaVendorAdapter vendorAdapter = new HibernateJpaVendorAdapter();
        em.setJpaVendorAdapter(vendorAdapter);
        HashMap<String, Object> properties = new HashMap<>();

        // *** jta datasource ***
        properties.put("hibernate.transaction.jta.platform", AtomikosJtaPlatform.class.getName());
        properties.put("javax.persistence.transactionType", "JTA");

        // application.yaml配置文件的ddl-auto的值
//      properties.put("hibernate.hbm2ddl.auto", env.getProperty("hibernate.hbm2ddl.auto"));
        properties.put("hibernate.show_sql", "true");
        properties.put("hibernate.format_sql", "true");
        // application.yaml配置文件的database-platform的值
//      properties.put("hibernate.dialect", "org.hibernate.dialect.MySQLDialect");
        properties.put("hibernate.implicit_naming_strategy", "org.springframework.boot.orm.jpa.hibernate.SpringImplicitNamingStrategy");
        properties.put("hibernate.physical_naming_strategy", "org.springframework.boot.orm.jpa.hibernate.SpringPhysicalNamingStrategy");
        em.setJpaPropertyMap(properties);
        return em;
    }

}

activiti绑定数据源

package com.zhirui.lmwy.flow.config;

import lombok.AllArgsConstructor;
import org.activiti.spring.SpringProcessEngineConfiguration;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.transaction.jta.JtaTransactionManager;

import javax.sql.DataSource;

/**
 * Activiti 配置
 */
@Configuration
@AllArgsConstructor
public class ActivitiConfig {

    @Autowired
    @Qualifier("actDatasource")
    private DataSource activitiDataSource;

    @Bean
    @Primary
    public SpringProcessEngineConfiguration getProcessEngineConfiguration(JtaTransactionManager jtaTransactionManager) {
        SpringProcessEngineConfiguration config =
                new SpringProcessEngineConfiguration();
        config.setDataSource(activitiDataSource);
        config.setTransactionManager(jtaTransactionManager);
        return config;
    }

}

测试:

执行任务提交方法,报错后都会进行回滚

public void commit(){
    // 操作1:activiti执行任务,用的act数据源
    taskService.complete(task.getId());
    ...
    // 操作2:更新业务流程对象,保存业务对象,用的business数据源
    flowInstanceDao.save(flowInstance);
    ...
    int a = 100 / 0;
}

参考:

http://www.manongjc.com/detail/6-anzuqtksygeselj.html

https://www.cnblogs.com/xkzhangsanx/p/11218453.html

https://www.jianshu.com/p/099c0850ba16

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
6月前
|
存储 缓存 Java
SpringBoot 整合多数据源的事务问题
多数据源实现、切面的优先级问题(Order注解)、事务的传播特性(Propagation类)
|
6月前
|
存储 关系型数据库 MySQL
【mybatis-plus】Springboot+AOP+自定义注解实现多数据源操作(数据源信息存在数据库)
【mybatis-plus】Springboot+AOP+自定义注解实现多数据源操作(数据源信息存在数据库)
|
存储 SQL Java
springboot+jta+atomikos分布式多数据源事务管理 (SQLserver)
在我上一篇博客的项目基础上做更改 springboot多数据源分包式实现 首先我们来试一下没有更改的情况下
|
存储 druid Java
SpringBoot整合atomikos实现跨库事务
框架之前完成了多数据源的动态切换及事务的处理,想更近一步提供一个简单的跨库事务处理功能,经过网上的搜索调研,大致有XA事务/SEGA事务/TCC事务等方案,因为业务主要涉及政府及企业且并发量不大,所以采用XA事务,虽然性能有所损失,但是可以保证数据的强一致性
134 0
SpringBoot整合atomikos实现跨库事务
|
消息中间件 JavaScript 小程序
SpringBoot 多数据源及事务解决方案 上
SpringBoot 多数据源及事务解决方案 上
SpringBoot 多数据源及事务解决方案  上
|
SQL 存储 druid
SpringBoot多数据源事务解决方案
之前有文章提供了springboot多数据源动态注册切换的整合方案,在后续使用过程中,发现在事务控制中有多种bug发生,决定对此问题进行分析与解决
375 0
|
设计模式 Java 关系型数据库
SpringBoot 多数据源及事务解决方案 下
SpringBoot 多数据源及事务解决方案 下
|
druid Java 关系型数据库
springboot2原理实战(16)--jdbc数据源和事务
springboot2原理实战(16)--jdbc数据源和事务
253 0
springboot2原理实战(16)--jdbc数据源和事务
|
druid Java 数据库连接
Springboot 整合druid+mybatis+jta分布式事务+多数据源aop注解动态切换 (一篇到位)
Springboot 整合druid+mybatis+jta分布式事务+多数据源aop注解动态切换 (一篇到位)
395 1
Springboot 整合druid+mybatis+jta分布式事务+多数据源aop注解动态切换 (一篇到位)
|
XML Java 关系型数据库
Spring的事务操作一站式学习【事务的概念、注解声明式事务管理、声明式事务管理参数配置、XML声明式事务管理、完全注解声明式事务管理】(超详细)
Spring的事务操作一站式学习【事务的概念、注解声明式事务管理、声明式事务管理参数配置、XML声明式事务管理、完全注解声明式事务管理】(超详细)
Spring的事务操作一站式学习【事务的概念、注解声明式事务管理、声明式事务管理参数配置、XML声明式事务管理、完全注解声明式事务管理】(超详细)