atomikos多数据源配置-在工作流(activiti)分库时的事务管理实战-阿里云开发者社区

开发者社区> 哮南> 正文

atomikos多数据源配置-在工作流(activiti)分库时的事务管理实战

简介: 近期开发一个工作流(activiti)项目,为了方便扩展,想把activiti的数据库和业务数据库分离。那么在项目里面需要针对两个数据库分别定义数据源。 注意: 下文的atomikos只能解决工作流项目在单应用单服务节点时候的事务问题。 微服务架构下需要采用别的分布式管理框架,比如使用seata,那么此时需要还原成 下文中最基本的`配置多个数据源`的方式
+关注继续查看

配置多个数据源

yml配置两个数据源, act和business:

datasource:
    act:
      jdbcUrl: jdbc:mysql://localhost:3306/lmwy_product_act?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai
      username: root
      password: 123456
      driverClassName: com.mysql.jdbc.Driver
    business:
      jdbc-url: jdbc:mysql://localhost:3306/lmwy_product?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai
      username: root
      password: 123456
      driver-class-name: com.mysql.jdbc.Driver

分别对应到两个数据源的配置:

package com.zhirui.lmwy.flow.config;

import org.activiti.spring.boot.AbstractProcessEngineAutoConfiguration;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

import javax.sql.DataSource;


/**
 *  多数据源配置类
 *      1.activiti数据库连接池
 *          默认, 我们也无法去修改源码故默认
 *      2.工作流业务数据库连接池
 *          明确指定 businessDataSource
 */
@Configuration
public class FlowDatasourceConfig extends AbstractProcessEngineAutoConfiguration {
    @Bean
    @Primary
    @ConfigurationProperties(prefix = "spring.datasource.act")
    @Qualifier("activitiDataSource")
    public DataSource activitiDataSource() {
        DataSource build = DataSourceBuilder.create().build();
        return build;
    }

    @Bean
    @ConfigurationProperties(prefix = "spring.datasource.business")
    @Qualifier("businessDataSource")
    public DataSource businessDataSource() {
        DataSource build = DataSourceBuilder.create().build();
        return build;
    }
}

业务的springdata配置和事物管理器配置

package com.zhirui.lmwy.flow.config;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.domain.EntityScan;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.env.Environment;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import org.springframework.orm.jpa.vendor.HibernateJpaVendorAdapter;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;
import java.util.HashMap;

/**
 * 需要在配置类的上面加上@EnableJpaRepositories(basePackages={"dao层对应的包路径"}),这样jpa的dao层就注入进来了。结果启动spring boot 时发现,又有 Not a managed type: class ******的错误,经查询发现少了jpa entity路径的配置,在配置类的头部加上标记:@EntityScan("entity对应的包路径")
 */
@Configuration
@EnableJpaRepositories(
        basePackages = {"com.zhirui.lmwy.flow.dao"},//代理的dao接口所在的包
        entityManagerFactoryRef = "flowEntityManager",
        transactionManagerRef = "flowTransactionManager"
)
public class JpaRepositoriesConfig {

    @Autowired
    private Environment env;

    @Autowired
    @Qualifier("businessDataSource")
    private DataSource businessDataSource;

    /**
     * 创建entityManagerFactory工厂
     */
    @Bean
    @Primary
    public LocalContainerEntityManagerFactoryBean flowEntityManager() {
        LocalContainerEntityManagerFactoryBean em = new LocalContainerEntityManagerFactoryBean();
        em.setDataSource(businessDataSource);
        //配置扫描的实体类包 ,否则报错:No persistence units parsed from {classpath*:META-INF/persistence.xml}
        em.setPackagesToScan(new String[]{"com.zhirui.lmwy.flow.entity"});
        HibernateJpaVendorAdapter vendorAdapter = new HibernateJpaVendorAdapter();
        em.setJpaVendorAdapter(vendorAdapter);
        HashMap<String, Object> properties = new HashMap<>();
        // application.yaml配置文件的ddl-auto的值
//      properties.put("hibernate.hbm2ddl.auto", env.getProperty("hibernate.hbm2ddl.auto"));
        properties.put("hibernate.show_sql", "true");
        properties.put("hibernate.format_sql", "true");
        // application.yaml配置文件的database-platform的值
//      properties.put("hibernate.dialect", "org.hibernate.dialect.MySQLDialect");
        properties.put("hibernate.implicit_naming_strategy", "org.springframework.boot.orm.jpa.hibernate.SpringImplicitNamingStrategy");
        properties.put("hibernate.physical_naming_strategy", "org.springframework.boot.orm.jpa.hibernate.SpringPhysicalNamingStrategy");
        em.setJpaPropertyMap(properties);
        return em;
    }

    /**
     * 创建事务管理器
     */
    @Primary
    @Bean
    public PlatformTransactionManager flowTransactionManager() {
        JpaTransactionManager transactionManager = new JpaTransactionManager();
        transactionManager.setEntityManagerFactory(flowEntityManager().getObject());
        return transactionManager;
    }
}

activiti中使用act数据源

package com.zhirui.lmwy.flow.config;

import lombok.AllArgsConstructor;
import org.activiti.spring.SpringProcessEngineConfiguration;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;

/**
 * Activiti 配置
 */
@Configuration
@AllArgsConstructor
public class ActivitiConfig {

    private final DataSource dataSource;

    @Autowired
    @Qualifier("activitiDataSource")
    private DataSource activitiDataSource;

    @Bean
    public SpringProcessEngineConfiguration getProcessEngineConfiguration() {
        SpringProcessEngineConfiguration config =
                new SpringProcessEngineConfiguration();
        config.setDataSource(activitiDataSource);
        config.setTransactionManager(activitiTransactionManager());
        return config;
    }

    @Bean
    public DataSourceTransactionManager activitiTransactionManager() {
        DataSourceTransactionManager transactionManager = new DataSourceTransactionManager();
        transactionManager.setDataSource(activitiDataSource);
        return transactionManager;
    }
}

事物管理问题

service方法中只能一个生效,business的事务管理器是"flowTransactionManager",且配置了@Primary组件,所以下面的方法中使用的数据源就是"flowTransactionManager",那么针对数据源act的操作将无法回滚。

@Override
@Transactional(rollbackFor = Exception.class)
public void commit(FlowTaskInstance flowTaskInstance, String tenantId) throws Exception {
    针对数据源act的操作
        针对数据源business的操作
        ...

可以将@Transactional(rollbackFor = Exception.class) 指定事务管理器名称:

@Transactional(rollbackFor = Exception.class, transactionManager = "activitiTransactionManager")

那么方法上使用事物管理器为“activitiTransactionManager”, 但是针对数据源business的操作也将无法回滚。

我们需要一种分布式事物管理器。

* 如果是单应用单节点服务的多数据源事务,可以采用下午的 atomikos实现分布式市委 的方案。

* 如果是微服务架构,那么直接在改基础上整合seata即可!

atomikos实现分布式事务

先了解下spring中事务的管理器

PlatformTransactionManager顶级接口定义了最核心的事务管理方法,下面一层是AbstractPlatformTransactionManager抽象类,实现了PlatformTransactionManager接口的方法并定义了一些抽象方法,供子类拓展。最下面一层是2个经典事务管理器:

1.DataSourceTransactionmanager: 即本地单资源事务管理器,也是spring默认的事务管理器。

2.JtaTransactionManager: 即多资源事务管理器(又叫做分布式事务管理器),其实现了JTA规范,使用XA协议进行两阶段提交。

3. atomikos是JTA规范的具体技术,比较火和流行。

pom配置

<!-- 分布式事务管理 参考:https://blog.csdn.net/qq_35387940/article/details/103474353 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-aop</artifactId>
</dependency>

yaml配置

#datasource
spring:
  jta:
    enabled: true
    atomikos:
      datasource:
        act:
          xa-properties.url: jdbc:mysql://localhost:3306/lmwy_product_act?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai&pinGlobalTxToPhysicalConnection=true
          xa-properties.user: root
          xa-properties.password: 123456
          xa-data-source-class-name: com.mysql.jdbc.jdbc2.optional.MysqlXADataSource
          unique-resource-name: act
          max-pool-size: 10
          min-pool-size: 1
          max-lifetime: 10000
          borrow-connection-timeout: 10000
        business:
          xa-properties.url: jdbc:mysql://localhost:3306/lmwy_product?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai&pinGlobalTxToPhysicalConnection=true
          xa-properties.user: root
          xa-properties.password: 123456
          xa-data-source-class-name: com.mysql.jdbc.jdbc2.optional.MysqlXADataSource
          unique-resource-name: business
          max-pool-size: 10
          min-pool-size: 1
          max-lifetime: 10000
          borrow-connection-timeout: 10000

配置多数据源和事务管理器

package com.zhirui.lmwy.flow.config;

import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import org.activiti.spring.boot.AbstractProcessEngineAutoConfiguration;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.transaction.jta.JtaTransactionManager;

import javax.sql.DataSource;
import javax.transaction.SystemException;
import javax.transaction.UserTransaction;


/**
 *  多数据源配置类
 *      1.activiti数据库连接池
 *          默认
 *      2.工作流业务数据库连接池
 *          明确指定 businessDataSource
 */
@Configuration
public class AtomikosConfig extends AbstractProcessEngineAutoConfiguration {

    @Primary
    @Bean(name = "actDatasource")
    @Qualifier("actDatasource")
    @ConfigurationProperties(prefix="spring.jta.atomikos.datasource.act")
    public DataSource actDatasource() {
        return new AtomikosDataSourceBean();
    }

    @Bean(name = "businessDatasource")
    @Qualifier("businessDatasource")
    @ConfigurationProperties(prefix="spring.jta.atomikos.datasource.business")
    public DataSource businessDatasource() {
        return new AtomikosDataSourceBean();
    }

    @Bean("jtaTransactionManager")
    @Primary
    public JtaTransactionManager activitiTransactionManager() throws SystemException {
        UserTransactionManager userTransactionManager = new UserTransactionManager();
        UserTransaction userTransaction = new UserTransactionImp();
        return new JtaTransactionManager(userTransaction, userTransactionManager);
    }
}

业务jpa绑定数据源

package com.zhirui.lmwy.flow.config;

import org.hibernate.engine.transaction.jta.platform.internal.AtomikosJtaPlatform;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.domain.EntityScan;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.env.Environment;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import org.springframework.orm.jpa.vendor.HibernateJpaVendorAdapter;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;
import javax.transaction.TransactionManager;
import javax.transaction.UserTransaction;
import java.util.HashMap;
import java.util.Map;

/**
 * 需要在配置类的上面加上@EnableJpaRepositories(basePackages={"dao层对应的包路径"}),这样jpa的dao层就注入进来了。结果启动spring boot 时发现,又有 Not a managed type: class ******的错误,经查询发现少了jpa entity路径的配置,在配置类的头部加上标记:@EntityScan("entity对应的包路径")
 */
@Configuration
@EnableJpaRepositories(
        basePackages = {"com.zhirui.lmwy.flow.dao"},//代理的dao接口所在的包
        entityManagerFactoryRef = "flowEntityManager",
        transactionManagerRef = "jtaTransactionManager" //指定jta的事务管理器
)
public class JpaRepositoriesConfig {

    @Autowired
    private Environment env;

    @Autowired
    @Qualifier("businessDatasource")
    private DataSource businessDataSource;

    /**
     * 创建entityManagerFactory工厂
     */
    @Bean
//    @Primary
    public LocalContainerEntityManagerFactoryBean flowEntityManager() {
        // *** jta 事务管理 ***
//        AtomikosJtaPlatform.setTransactionManager(transactionManager);
//        AtomikosJtaPlatform.setUserTransaction(userTransaction);

        LocalContainerEntityManagerFactoryBean em = new LocalContainerEntityManagerFactoryBean();

        // *** jta datasource ***
        em.setJtaDataSource(businessDataSource);
//        em.setDataSource(businessDataSource);
        // 配置扫描的实体类包 ,否则报错:No persistence units parsed from {classpath*:META-INF/persistence.xml}
        em.setPackagesToScan(new String[]{"com.zhirui.lmwy.flow.entity"});
        HibernateJpaVendorAdapter vendorAdapter = new HibernateJpaVendorAdapter();
        em.setJpaVendorAdapter(vendorAdapter);
        HashMap<String, Object> properties = new HashMap<>();

        // *** jta datasource ***
        properties.put("hibernate.transaction.jta.platform", AtomikosJtaPlatform.class.getName());
        properties.put("javax.persistence.transactionType", "JTA");

        // application.yaml配置文件的ddl-auto的值
//      properties.put("hibernate.hbm2ddl.auto", env.getProperty("hibernate.hbm2ddl.auto"));
        properties.put("hibernate.show_sql", "true");
        properties.put("hibernate.format_sql", "true");
        // application.yaml配置文件的database-platform的值
//      properties.put("hibernate.dialect", "org.hibernate.dialect.MySQLDialect");
        properties.put("hibernate.implicit_naming_strategy", "org.springframework.boot.orm.jpa.hibernate.SpringImplicitNamingStrategy");
        properties.put("hibernate.physical_naming_strategy", "org.springframework.boot.orm.jpa.hibernate.SpringPhysicalNamingStrategy");
        em.setJpaPropertyMap(properties);
        return em;
    }

}

activiti绑定数据源

package com.zhirui.lmwy.flow.config;

import lombok.AllArgsConstructor;
import org.activiti.spring.SpringProcessEngineConfiguration;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.transaction.jta.JtaTransactionManager;

import javax.sql.DataSource;

/**
 * Activiti 配置
 */
@Configuration
@AllArgsConstructor
public class ActivitiConfig {

    @Autowired
    @Qualifier("actDatasource")
    private DataSource activitiDataSource;

    @Bean
    @Primary
    public SpringProcessEngineConfiguration getProcessEngineConfiguration(JtaTransactionManager jtaTransactionManager) {
        SpringProcessEngineConfiguration config =
                new SpringProcessEngineConfiguration();
        config.setDataSource(activitiDataSource);
        config.setTransactionManager(jtaTransactionManager);
        return config;
    }

}

测试:

执行任务提交方法,报错后都会进行回滚

public void commit(){
    // 操作1:activiti执行任务,用的act数据源
    taskService.complete(task.getId());
    ...
    // 操作2:更新业务流程对象,保存业务对象,用的business数据源
    flowInstanceDao.save(flowInstance);
    ...
    int a = 100 / 0;
}

参考:

http://www.manongjc.com/detail/6-anzuqtksygeselj.html

https://www.cnblogs.com/xkzhangsanx/p/11218453.html

https://www.jianshu.com/p/099c0850ba16

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
实时计算账号管理的功能有哪些?
本文为您介绍实时计算账号模型、主子账号管理和账号安全验证功能。
438 0
SQL Server 2012实施与管理实战指南(笔记)——Ch5启动SQL Server服务和数据库
5.启动SQL Server服务和数据库 在数据库和服务启动过程中,经常会出现的问题: 1.SQL Server实例无法正常启动 2.系统数据库无法正常启动 3.网络配置失败 4.用户数据库无法启动 5.
980 0
SQL Server 2012实施与管理实战指南(笔记)——Ch4数据库连接组件
4.数据库连接组件 访问数据库有多种不同的技术,包括ADO,ODBC,OLEDB,ADO.NET等这些都有一些共性。首先要建立连接(Connection),然后通过命令(Command)对数据库进行访问,最后把结果集放到dataset或者recordset上。
899 0
(三):C++分布式实时应用框架——系统管理模块
C++分布式实时应用框架——系统管理模块     上篇:(二): 基于ZeroMQ的实时通讯平台   版权声明:本文版权及所用技术归属smartguys团队所有,对于抄袭,非经同意转载等行为保留法律追究的权利!     一个分布式实时系统集群动辄上百台机器,集群的规模已经限定这将是一个”封闭“的系统。
1182 0
配置并运行Activiti Explorer
1、安装Java Jdk并配置环境变量 2、安装tomcat 3、下载Activiti包 解压后,将activiti-explorer.war文件放置到tomcat安装目录的webapps文件夹 重启tomcat 会自动解压activiti-explorer.war文件 浏览以下网址:http://localhost:8080/activiti-explorer 成功后,界面如下: 作者:
1532 0
Spring-Boot + Mybatis 多数据源配置
Spring-Boot + Mybatis 多数据源
8620 0
谈谈Spring Boot 数据源加载及其多数据源简单实现
业务需求 提供所有微服务数据源的图形化维护功能 代码生成可以根据选择的数据源加载表等源信息 数据源管理要支持动态配置,实时生效附录效果图 实现思路 本文提供方法仅供类似简单业务场景,在生产环境和复杂的业务场景 请使用分库分表的中间件(例如mycat)或者框架 sharding-sphere (一直在用)等 先来看Spring 默认的数据源注入策略,如下代码默认的事务管理器在初始化时回去加载数据源实现。
1061 0
iOS夯实:ARC时代的内存管理
iOS夯实:ARC时代的内存管理 什么是ARC Automatic Reference Counting (ARC) is a compiler feature that provides automatic memory management of Objective-C objects. Rather than having to think about retain and release operations [^1] [^1]: Transitioning to ARC Release Notes ARC提供是一个编译器的特性,帮助我们在编译的时候自动插入管理引用计数的代码。
926 0
spring多数据源的配置(转)
C3P0和DBCP的区别   C3P0是一个开源的JDBC连接池,它实现了数据源和JNDI绑定,支持JDBC3规范和JDBC2的标准扩展。目前使用它的开源项目有Hibernate,Spring等。
842 0
+关注
哮南
汪界最强程序汪
9
文章
0
问答
文章排行榜
最热
最新
相关电子书
更多
文娱运维技术
立即下载
《SaaS模式云原生数据仓库应用场景实践》
立即下载
《看见新力量:二》电子书
立即下载