前言:
springboot整合多数据源,大家肯定不陌生,方式不一,但是相信大家整合多数据源,如果涉及到事务,都会非常烦恼,多数据源的事务不是冲突就是失效,而如今网上千篇一律的老年博客,想找到真正解决问题的,非常少。所以我决定出来分享下可用的整合方案,而且是从头到尾的那种。
这一篇我选择的是以AOP注解的方式去进行数据源的动态切换,顺带整合jta-atomikos把烦人的事务问题解决调,持久层框架用mybatis,数据库连接池使用druid,这些在我们周围目前使用比较多,方便大家根据项目实际需求,能在这个脚手架上进行进一步的扩展(能扩展什么?也可以看看我的springboot专栏,说不定会有额外的收获)
这篇篇幅可能较长,但是跟着我全部代码流程走完,你以后就可以把这个作为多数据源+分布式事务的脚手架,以后对于多数据源相关的事务问题,对你来说就不是问题。
接下来,我们开始整合。
先看下项目目录结构,大致能了解到我们这个实战整合做了些什么。
先准备两个数据源,
创个user表用于后面使用,
CREATE TABLE `user` ( `id` int(11) NOT NULL AUTO_INCREMENT, `username` varchar(255) DEFAULT NULL, `age` int(11) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8
pom.xml:
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!--分布式事务--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jta-atomikos</artifactId> </dependency> <!-- lombok --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.16.10</version> </dependency> <!-- springboot2.1.x版本默认的mysql-connector-java 版本比较高 8.0.x ,需要降低版本--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>6.0.6</version> </dependency> <!--Druid连接池--> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid-spring-boot-starter</artifactId> <version>1.1.9</version> </dependency> <!--aop starter--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency> <!--整合mybatis--> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>2.1.0</version> </dependency> <!--调试--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
然后是数据源的yml信息,application.yml:
server: port: 8077 spring: application: name: jta-dbsource datasource: druid: mydbone: url: jdbc:mysql://localhost:3306/mydbone?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8&pinGlobalTxToPhysicalConnection=true&autoReconnect=true username: root password: root driver-class-name: com.mysql.cj.jdbc.Driver # 初始化时建立物理连接的个数。初始化发生在显示调用 init 方法,或者第一次 getConnection 时 initialSize: 5 # 最小连接池数量 minIdle: 5 # 最大连接池数量 maxActive: 10 # 获取连接时最大等待时间,单位毫秒。配置了 maxWait 之后,缺省启用公平锁,并发效率会有所下降,如果需要可以通过配置 useUnfairLock 属性为 true 使用非公平锁。 maxWait: 60000 # Destroy 线程会检测连接的间隔时间,如果连接空闲时间大于等于 minEvictableIdleTimeMillis 则关闭物理连接。 timeBetweenEvictionRunsMillis: 60000 # 连接保持空闲而不被驱逐的最小时间 minEvictableIdleTimeMillis: 300000 # 用来检测连接是否有效的 sql 因数据库方言而异, 例如 oracle 应该写成 SELECT 1 FROM DUAL validationQuery: SELECT 1 # 建议配置为 true,不影响性能,并且保证安全性。申请连接的时候检测,如果空闲时间大于 timeBetweenEvictionRunsMillis,执行 validationQuery 检测连接是否有效。 testWhileIdle: true # 申请连接时执行 validationQuery 检测连接是否有效,做了这个配置会降低性能。 testOnBorrow: false # 归还连接时执行 validationQuery 检测连接是否有效,做了这个配置会降低性能。 testOnReturn: false # 是否自动回收超时连接 removeAbandoned: false # 超时时间 (以秒数为单位) remove-abandoned-timeout: 1800 logAbandoned: true pinGlobalTxToPhysicalConnection: true mydbtwo: url: jdbc:mysql://localhost:3306/mydbtwo?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8&pinGlobalTxToPhysicalConnection=true&autoReconnect=true username: root password: root driver-class-name: com.mysql.cj.jdbc.Driver initialSize: 6 minIdle: 6 maxActive: 10 maxWait: 60000 timeBetweenEvictionRunsMillis: 60000 minEvictableIdleTimeMillis: 300000 validationQuery: SELECT 1 testWhileIdle: true testOnBorrow: false testOnReturn: false removeAbandoned: false remove-abandoned-timeout: 1800 logAbandoned: true pinGlobalTxToPhysicalConnection: true # WebStatFilter 用于采集 web-jdbc 关联监控的数据。 web-stat-filter: # 是否开启 WebStatFilter 默认是 true enabled: true # 需要拦截的 url url-pattern: /* # 排除静态资源的请求 exclusions: "*.js,*.gif,*.jpg,*.png,*.css,*.ico,/druid/*" # Druid 内置提供了一个 StatViewServlet 用于展示 Druid 的统计信息。 stat-view-servlet: #是否启用 StatViewServlet 默认值 true enabled: true # 需要拦截的 url url-pattern: /druid/* # 允许清空统计数据 reset-enable: true login-username: myname login-password: mypwd
-----接下来就是代码环节-----
大家多注意看注释,很多关键信息都用注释方式进行了简明的介绍:
先创建一个自定义注解,DataSource.java:
import java.lang.annotation.*; /** * @Author : JCccc * @CreateTime : 2019/8/28 * @Description : **/ @Documented @Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) public @interface DataSource { String value() default DataSourceNames.ONE; }
然后是创建 DataSourceNames.java,用于简单数据源命名:
/** * @Author : JCccc * @CreateTime : 2019/8/28 * @Description : **/ public interface DataSourceNames { String ONE = "ONE"; String TWO = "TWO"; }
.
ps:其实这些都是我之前aop切换数据源的时候敲的,大概8月份的时候,这次我相当于在这个基础上着重解决事务问题
然后是将自定义注解作为切点,进行aop方式动态切换逻辑补全,创建DynamicDataSourceAspect.java:
import com.test.jtadbsource.dbConfig.DataSourceContextHolder; import org.aspectj.lang.JoinPoint; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.*; import org.aspectj.lang.reflect.MethodSignature; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import java.lang.reflect.Method; /** * @Author : JCccc * @CreateTime : 2019/12/10 * @Description : **/ @Aspect @Component public class DynamicDataSourceAspect { protected Logger logger = LoggerFactory.getLogger(getClass()); /** * 切点: 所有配置 DataSource 注解的方法 */ @Pointcut("@annotation(com.test.jtadbsource.dbAop.DataSource)") public void dataSourcePointCut() {} @Around("dataSourcePointCut()") public Object around(ProceedingJoinPoint point) throws Throwable { DataSource ds = null; MethodSignature signature = (MethodSignature) point.getSignature(); Method method = signature.getMethod(); //获取自定义注解 ds = method.getAnnotation(DataSource.class); if (ds == null) { //如果监测到自定义注解不存在,那么默认切换到数据源 mydbone DataSourceContextHolder.setDataSourceKey(DataSourceNames.ONE); logger.info("set default datasource is " + DataSourceNames.ONE); } else { //自定义存在,则按照注解的值去切换数据源 DataSourceContextHolder.setDataSourceKey(ds.value()); logger.info("set datasource is " + ds.value()); } return point.proceed(); } @After(value = "dataSourcePointCut()") public void afterSwitchDS(JoinPoint point) { DataSourceContextHolder.clearDataSourceKey(); logger.info("clean datasource"); } }
上面用到的DataSourceContextHolder.java:
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource; /** * @Author : JCccc * @CreateTime : 2019/12/10 * @Description : **/ public class DataSourceContextHolder extends AbstractRoutingDataSource { private static final ThreadLocal<String> contextHolder = new ThreadLocal<>(); // 设置数据源名 public static void setDataSourceKey(String dbName) { contextHolder.set(dbName); } // 获取数据源名 public static String getDataSourceKey() { return contextHolder.get(); } // 清除数据源名 public static void clearDataSourceKey() { contextHolder.remove(); } @Override protected Object determineCurrentLookupKey() { return getDataSourceKey(); } }
ok,到这里,基本的动态切换边框的东西都完毕了,接下来是比较核心的:
1. DataSourceFactory.java :
用于 不同的数据源DataSource的信息配置,使用DruidXADataSource创建,支持jta事务;
将不同数据源DataSource分别都关联上对应的AtomikosDataSourceBean,这样事务能提取到JTA事务管理器;
重写数据源会话工厂,为每个数据源单独配置一个。
配置重写的sqlSessionTemplate,将实际使用的不同数据源的sqlsession和spring的事务机制关联起来。
import com.alibaba.druid.pool.xa.DruidXADataSource; import com.test.jtadbsource.dbAop.DataSourceNames; import org.apache.ibatis.logging.stdout.StdOutImpl; import org.apache.ibatis.session.SqlSessionFactory; import org.mybatis.spring.SqlSessionFactoryBean; import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.support.PathMatchingResourcePatternResolver; import org.springframework.core.io.support.ResourcePatternResolver; import javax.sql.DataSource; import java.util.HashMap; import java.util.Map; /** * @Author : JCccc * @CreateTime : 2019/12/10 * @Description :多数据源配置 **/ @Configuration @MapperScan(basePackages = DataSourceFactory.BASE_PACKAGES, sqlSessionTemplateRef = "sqlSessionTemplate") public class DataSourceFactory { static final String BASE_PACKAGES = "com.test.jtadbsource.mapper"; private static final String MAPPER_LOCATION = "classpath:mybatis/mapper/*.xml"; /*** * 创建 DruidXADataSource mydbone 用@ConfigurationProperties 自动配置属性 */ @Bean @ConfigurationProperties("spring.datasource.druid.mydbone") public DataSource druidDataSourceOne() { return new DruidXADataSource(); } /*** * 创建 DruidXADataSource mydbtwo */ @Bean @ConfigurationProperties("spring.datasource.druid.mydbtwo") public DataSource druidDataSourceTwo() { return new DruidXADataSource(); } /** * 创建支持 XA 事务的 Atomikos 数据源 mydbone */ @Bean public DataSource dataSourceOne(DataSource druidDataSourceOne) { AtomikosDataSourceBean sourceBean = new AtomikosDataSourceBean(); sourceBean.setXaDataSource((DruidXADataSource) druidDataSourceOne); // 必须为数据源指定唯一标识 sourceBean.setPoolSize(5); sourceBean.setTestQuery("SELECT 1"); sourceBean.setUniqueResourceName("mydbone"); return sourceBean; } /** * 创建支持 XA 事务的 Atomikos 数据源 mydbtwo */ @Bean public DataSource dataSourceTwo(DataSource druidDataSourceTwo) { AtomikosDataSourceBean sourceBean = new AtomikosDataSourceBean(); sourceBean.setXaDataSource((DruidXADataSource) druidDataSourceTwo); sourceBean.setPoolSize(5); sourceBean.setTestQuery("SELECT 1"); sourceBean.setUniqueResourceName("mydbtwo"); return sourceBean; } /** * @param dataSourceOne 数据源 mydbone * @return 数据源 mydbone 的会话工厂 */ @Bean public SqlSessionFactory sqlSessionFactoryOne(DataSource dataSourceOne) throws Exception { return createSqlSessionFactory(dataSourceOne); } /** * @param dataSourceTwo 数据源 mydbtwo * @return 数据源 mydbtwo 的会话工厂 */ @Bean public SqlSessionFactory sqlSessionFactoryTwo(DataSource dataSourceTwo) throws Exception { return createSqlSessionFactory(dataSourceTwo); } /*** * sqlSessionTemplate 与 Spring 事务管理一起使用,以确保使用的实际 SqlSession 是与当前 Spring 事务关联的, * 此外它还管理会话生命周期,包括根据 Spring 事务配置根据需要关闭,提交或回滚会话 * @param sqlSessionFactoryOne 数据源 mydbone * @param sqlSessionFactoryTwo 数据源 mydbtwo */ @Bean public CustomSqlSessionTemplate sqlSessionTemplate(SqlSessionFactory sqlSessionFactoryOne, SqlSessionFactory sqlSessionFactoryTwo) { Map<Object, SqlSessionFactory> sqlSessionFactoryMap = new HashMap<>(); sqlSessionFactoryMap.put(DataSourceNames.ONE, sqlSessionFactoryOne); sqlSessionFactoryMap.put(DataSourceNames.TWO, sqlSessionFactoryTwo); CustomSqlSessionTemplate customSqlSessionTemplate = new CustomSqlSessionTemplate(sqlSessionFactoryOne); customSqlSessionTemplate.setTargetSqlSessionFactories(sqlSessionFactoryMap); return customSqlSessionTemplate; } /*** * 自定义会话工厂 * @param dataSource 数据源 * @return :自定义的会话工厂 */ private SqlSessionFactory createSqlSessionFactory(DataSource dataSource) throws Exception { SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean(); factoryBean.setDataSource(dataSource); org.apache.ibatis.session.Configuration configuration = new org.apache.ibatis.session.Configuration(); //配置驼峰命名 configuration.setMapUnderscoreToCamelCase(true); //配置sql日志 configuration.setLogImpl(StdOutImpl.class); factoryBean.setConfiguration(configuration); ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver(); //配置读取mapper.xml路径 factoryBean.setMapperLocations(resolver.getResources(MAPPER_LOCATION)); return factoryBean.getObject(); } }
上面用到的自定义CustomSqlSessionTemplate (重写SqlSessionTemplate):
import static java.lang.reflect.Proxy.newProxyInstance; import static org.apache.ibatis.reflection.ExceptionUtil.unwrapThrowable; import static org.mybatis.spring.SqlSessionUtils.closeSqlSession; import static org.mybatis.spring.SqlSessionUtils.getSqlSession; import static org.mybatis.spring.SqlSessionUtils.isSqlSessionTransactional; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.sql.Connection; import java.util.List; import java.util.Map; import org.apache.ibatis.exceptions.PersistenceException; import org.apache.ibatis.executor.BatchResult; import org.apache.ibatis.session.Configuration; import org.apache.ibatis.session.ExecutorType; import org.apache.ibatis.session.ResultHandler; import org.apache.ibatis.session.RowBounds; import org.apache.ibatis.session.SqlSession; import org.apache.ibatis.session.SqlSessionFactory; import org.mybatis.spring.MyBatisExceptionTranslator; import org.mybatis.spring.SqlSessionTemplate; import org.springframework.dao.support.PersistenceExceptionTranslator; import org.springframework.util.Assert; public class CustomSqlSessionTemplate extends SqlSessionTemplate { private final SqlSessionFactory sqlSessionFactory; private final ExecutorType executorType; private final SqlSession sqlSessionProxy; private final PersistenceExceptionTranslator exceptionTranslator; private Map<Object, SqlSessionFactory> targetSqlSessionFactories; private SqlSessionFactory defaultTargetSqlSessionFactory; /** * 通过Map传入 * @param targetSqlSessionFactories */ public void setTargetSqlSessionFactories(Map<Object, SqlSessionFactory> targetSqlSessionFactories) { this.targetSqlSessionFactories = targetSqlSessionFactories; } public void setDefaultTargetSqlSessionFactory(SqlSessionFactory defaultTargetSqlSessionFactory) { this.defaultTargetSqlSessionFactory = defaultTargetSqlSessionFactory; } public CustomSqlSessionTemplate(SqlSessionFactory sqlSessionFactory) { this(sqlSessionFactory, sqlSessionFactory.getConfiguration().getDefaultExecutorType()); } public CustomSqlSessionTemplate(SqlSessionFactory sqlSessionFactory, ExecutorType executorType) { this(sqlSessionFactory, executorType, new MyBatisExceptionTranslator(sqlSessionFactory.getConfiguration() .getEnvironment().getDataSource(), true)); } public CustomSqlSessionTemplate(SqlSessionFactory sqlSessionFactory, ExecutorType executorType, PersistenceExceptionTranslator exceptionTranslator) { super(sqlSessionFactory, executorType, exceptionTranslator); this.sqlSessionFactory = sqlSessionFactory; this.executorType = executorType; this.exceptionTranslator = exceptionTranslator; this.sqlSessionProxy = (SqlSession) newProxyInstance( SqlSessionFactory.class.getClassLoader(), new Class[] { SqlSession.class }, new SqlSessionInterceptor()); this.defaultTargetSqlSessionFactory = sqlSessionFactory; } //通过DataSourceContextHolder获取当前的会话工厂 @Override public SqlSessionFactory getSqlSessionFactory() { String dataSourceKey = DataSourceContextHolder.getDataSourceKey(); SqlSessionFactory targetSqlSessionFactory = targetSqlSessionFactories.get(dataSourceKey); if (targetSqlSessionFactory != null) { return targetSqlSessionFactory; } else if (defaultTargetSqlSessionFactory != null) { return defaultTargetSqlSessionFactory; } else { Assert.notNull(targetSqlSessionFactories, "Property 'targetSqlSessionFactories' or 'defaultTargetSqlSessionFactory' are required"); Assert.notNull(defaultTargetSqlSessionFactory, "Property 'defaultTargetSqlSessionFactory' or 'targetSqlSessionFactories' are required"); } return this.sqlSessionFactory; } @Override public Configuration getConfiguration() { return this.getSqlSessionFactory().getConfiguration(); } public ExecutorType getExecutorType() { return this.executorType; } public PersistenceExceptionTranslator getPersistenceExceptionTranslator() { return this.exceptionTranslator; } /** * {@inheritDoc} */ public <T> T selectOne(String statement) { return this.sqlSessionProxy.<T> selectOne(statement); } /** * {@inheritDoc} */ public <T> T selectOne(String statement, Object parameter) { return this.sqlSessionProxy.<T> selectOne(statement, parameter); } /** * {@inheritDoc} */ public <K, V> Map<K, V> selectMap(String statement, String mapKey) { return this.sqlSessionProxy.<K, V> selectMap(statement, mapKey); } /** * {@inheritDoc} */ public <K, V> Map<K, V> selectMap(String statement, Object parameter, String mapKey) { return this.sqlSessionProxy.<K, V> selectMap(statement, parameter, mapKey); } /** * {@inheritDoc} */ public <K, V> Map<K, V> selectMap(String statement, Object parameter, String mapKey, RowBounds rowBounds) { return this.sqlSessionProxy.<K, V> selectMap(statement, parameter, mapKey, rowBounds); } /** * {@inheritDoc} */ public <E> List<E> selectList(String statement) { return this.sqlSessionProxy.<E> selectList(statement); } /** * {@inheritDoc} */ public <E> List<E> selectList(String statement, Object parameter) { return this.sqlSessionProxy.<E> selectList(statement, parameter); } /** * {@inheritDoc} */ public <E> List<E> selectList(String statement, Object parameter, RowBounds rowBounds) { return this.sqlSessionProxy.<E> selectList(statement, parameter, rowBounds); } /** * {@inheritDoc} */ public void select(String statement, ResultHandler handler) { this.sqlSessionProxy.select(statement, handler); } /** * {@inheritDoc} */ public void select(String statement, Object parameter, ResultHandler handler) { this.sqlSessionProxy.select(statement, parameter, handler); } /** * {@inheritDoc} */ public void select(String statement, Object parameter, RowBounds rowBounds, ResultHandler handler) { this.sqlSessionProxy.select(statement, parameter, rowBounds, handler); } /** * {@inheritDoc} */ public int insert(String statement) { return this.sqlSessionProxy.insert(statement); } /** * {@inheritDoc} */ public int insert(String statement, Object parameter) { return this.sqlSessionProxy.insert(statement, parameter); } /** * {@inheritDoc} */ public int update(String statement) { return this.sqlSessionProxy.update(statement); } /** * {@inheritDoc} */ public int update(String statement, Object parameter) { return this.sqlSessionProxy.update(statement, parameter); } /** * {@inheritDoc} */ public int delete(String statement) { return this.sqlSessionProxy.delete(statement); } /** * {@inheritDoc} */ public int delete(String statement, Object parameter) { return this.sqlSessionProxy.delete(statement, parameter); } /** * {@inheritDoc} */ public <T> T getMapper(Class<T> type) { return getConfiguration().getMapper(type, this); } /** * {@inheritDoc} */ public void commit() { throw new UnsupportedOperationException("Manual commit is not allowed over a Spring managed SqlSession"); } /** * {@inheritDoc} */ public void commit(boolean force) { throw new UnsupportedOperationException("Manual commit is not allowed over a Spring managed SqlSession"); } /** * {@inheritDoc} */ public void rollback() { throw new UnsupportedOperationException("Manual rollback is not allowed over a Spring managed SqlSession"); } /** * {@inheritDoc} */ public void rollback(boolean force) { throw new UnsupportedOperationException("Manual rollback is not allowed over a Spring managed SqlSession"); } /** * {@inheritDoc} */ public void close() { throw new UnsupportedOperationException("Manual close is not allowed over a Spring managed SqlSession"); } /** * {@inheritDoc} */ public void clearCache() { this.sqlSessionProxy.clearCache(); } /** * {@inheritDoc} */ public Connection getConnection() { return this.sqlSessionProxy.getConnection(); } /** * {@inheritDoc} * @since 1.0.2 */ public List<BatchResult> flushStatements() { return this.sqlSessionProxy.flushStatements(); } /** * Proxy needed to route MyBatis method calls to the proper SqlSession got from Spring's Transaction Manager It also * unwraps exceptions thrown by {@code Method#invoke(Object, Object...)} to pass a {@code PersistenceException} to * the {@code PersistenceExceptionTranslator}. */ private class SqlSessionInterceptor implements InvocationHandler { public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { final SqlSession sqlSession = getSqlSession( CustomSqlSessionTemplate.this.getSqlSessionFactory(), CustomSqlSessionTemplate.this.executorType, CustomSqlSessionTemplate.this.exceptionTranslator); try { Object result = method.invoke(sqlSession, args); if (!isSqlSessionTransactional(sqlSession, CustomSqlSessionTemplate.this.getSqlSessionFactory())) { sqlSession.commit(true); } return result; } catch (Throwable t) { Throwable unwrapped = unwrapThrowable(t); if (CustomSqlSessionTemplate.this.exceptionTranslator != null && unwrapped instanceof PersistenceException) { Throwable translated = CustomSqlSessionTemplate.this.exceptionTranslator .translateExceptionIfPossible((PersistenceException) unwrapped); if (translated != null) { unwrapped = translated; } } throw unwrapped; } finally { closeSqlSession(sqlSession, CustomSqlSessionTemplate.this.getSqlSessionFactory()); } } } }
然后是xat分布式事务管理器,XATransactionManagerConfig.java:
import com.atomikos.icatch.jta.UserTransactionImp; import com.atomikos.icatch.jta.UserTransactionManager; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.annotation.EnableTransactionManagement; import org.springframework.transaction.jta.JtaTransactionManager; import javax.transaction.TransactionManager; import javax.transaction.UserTransaction; /** * @Author : JCccc * @CreateTime : 2019/12/10 * @Description :JTA 事务配置 **/ @Configuration @EnableTransactionManagement public class XATransactionManagerConfig { @Bean public UserTransaction userTransaction() throws Throwable { UserTransactionImp userTransactionImp = new UserTransactionImp(); userTransactionImp.setTransactionTimeout(10000); return userTransactionImp; } @Bean public TransactionManager atomikosTransactionManager() { UserTransactionManager userTransactionManager = new UserTransactionManager(); userTransactionManager.setForceShutdown(true); return userTransactionManager; } @Bean public PlatformTransactionManager transactionManager(UserTransaction userTransaction, TransactionManager transactionManager) { return new JtaTransactionManager(userTransaction, transactionManager); } }
然后,在启动类上,去除掉自动加载的数据源配置类,
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; @SpringBootApplication(exclude = {DataSourceAutoConfiguration.class}) public class JtadbsourceApplication { public static void main(String[] args) { SpringApplication.run(JtadbsourceApplication.class, args); } }
到这里,aop注解方式整合多数据源+分布式事务jta已经完毕了!
接下来就是使用测试环节,包括单数据源数据插入&事务回滚,多数据源切换插入&事务回滚:
首先创建实体类,User.java:
import lombok.Data; import lombok.ToString; /** * @Author : JCccc * @CreateTime : 2019/10/22 * @Description : **/ @Data @ToString public class User { private Integer id; private String username; private Integer age; }
然后是UserMapper.java:
import com.test.jtadbsource.pojo.User; import org.apache.ibatis.annotations.Mapper; import org.springframework.stereotype.Repository; /** * @Author : JCccc * @CreateTime : 2019/12/10 * @Description : **/ @Mapper public interface UserMapper { int insert(User user); }
然后是创建一个TestJtaservice.java:
import com.test.jtadbsource.pojo.User; /** * @Author : JCccc * @CreateTime : 2019/12/9 * @Description : **/ public interface TestJtaService { void testInsertUser(User user); void testInsertUser2(User user); }
然后是TestJtaServiceImpl.java , 这里将会通过我们开始创建的自定义注解来标识,哪些service使用哪些数据源:
import com.test.jtadbsource.dbAop.DataSource; import com.test.jtadbsource.dbAop.DataSourceNames; import com.test.jtadbsource.mapper.UserMapper; import com.test.jtadbsource.pojo.User; import com.test.jtadbsource.service.TestJtaService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** * @Author : JCccc * @CreateTime : 2019/12/9 * @Description : **/ @Service public class TestJtaServiceImpl implements TestJtaService { @Autowired UserMapper userMapper; public void testInsertUser(User user){ int insertNum = userMapper.insert(user); System.out.println("插入成功,条数:"+insertNum); } @DataSource(DataSourceNames.TWO) public void testInsertUser2(User user){ int insertNum = userMapper.insert(user); System.out.println("插入成功,条数:"+insertNum); } }
最后,我们写个接口,先来测试下数据源方面,操作不同数据是否正常:
调用下该接口,
数据正常插入:
那么我们直接测试下单数据源的事务回滚,
不使用手动回滚,这样测试下其实也行:
调用下接口,事务回滚正常:
接下来是两个数据源数据同时插入:
调用下接口,数据正常插入:
然后是不同数据源事务一起回滚:
调用下接口:
该篇就到此结束