6. 多库事务处理
6.1 关于事务的理解
首先有必要理解事务的本质。
1.提到Spring事务,就离不开事务的四大特性和隔离级别、七大传播特性。
事务特性和离级别是属于数据库范畴。Spring事务的七大传播特性是什么呢?它是Spring在当前线程内,处理多个事务操作时的事务应用策略,数据库事务本身并不存在传播特性。
2.Spring事务的定义包括:begin、commit、rollback、close、suspend、resume等动作。
- begin(事务开始): 可以认为存在于数据库的命令中,比如Mysql的
start transaction
命令,但是在JDBC编程方式中不存在。 - close(事务关闭): Spring事务的close()方法,是把
Connection
对象归还给数据库连接池,与事务无关。 - suspend(事务挂起): Spring中事务挂起的语义是:需要新事务时,将现有的
Connection
保存起来(还有尚未提交的事务),然后创建新的Connection2
,Connection2
提交、回滚、关闭完毕后,再把Connection1
取出来继续执行。 - resume(事务恢复): 嵌套事务执行完毕,返回上层事务重新绑定连接对象到事务管理器的过程。
实际上,只有commit、rollback、close是在JDBC真实存在的,而其他动作都是应用的语意,而非JDBC事务的真实命令。因此,事务真实存在的方法是:setAutoCommit()
、commit()
、rollback()
。
close()语义为:
- 关闭一个数据库连接,这已经不再是事务的方法了。
使用DataSource并不会执行物理关闭,只是归还给连接池。
6.2 自定义管理事务
为了保证在多个数据源中事务的一致性,我们可以手动管理Connetion
的事务提交和回滚。考虑到不同ORM框架的事务管理实现差异,要求实现自定义事务管理不影响框架层的事务。
这可以通过使用装饰器设计模式,对Connection
进行包装重写commit和rolllback屏蔽其默认行为,这样就不会影响到原生Connection
和ORM框架的默认事务行为。其整体思路如下图所示:
这里并没有使用前面提到的@SwitchDataSource
,这是因为我们在TransactionAop
中已经执行了lookupKey的切换。
6.2.1 定义多事务注解
@Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface MultiTransaction { String transactionManager() default "multiTransactionManager"; // 默认数据隔离级别,随数据库本身默认值 IsolationLevel isolationLevel() default IsolationLevel.DEFAULT; // 默认为主库数据源 String datasourceId() default "default"; // 只读事务,若有更新操作会抛出异常 boolean readOnly() default false;
业务方法只需使用该注解即可开启事务,datasourceId
指定事务用到的数据源,不指定默认为主库。
6.2.3 包装Connection
自定义事务我们使用包装过的Connection
,屏蔽其中的commit&rollback
方法。这样我们就可以在主事务里进行统一的事务提交和回滚操作。
public class ConnectionProxy implements Connection { private final Connection connection; public ConnectionProxy(Connection connection) { this.connection = connection; } @Override public void commit() throws SQLException { // connection.commit(); } public void realCommit() throws SQLException { connection.commit(); } @Override public void close() throws SQLException { //connection.close(); } public void realClose() throws SQLException { if (!connection.getAutoCommit()) { connection.setAutoCommit(true); } connection.close(); } @Override public void rollback() throws SQLException { if(!connection.isClosed()) connection.rollback(); } ... }
这里commit&close
方法不执行操作,rollback执行的前提是连接执行close才生效。这样不管是使用哪个ORM框架,其自身事务管理都将失效。事务的控制就交由MultiTransaction
控制了。
6.2.4 事务上下文管理
public class TransactionHolder { // 是否开启了一个MultiTransaction private boolean isOpen; // 是否只读事务 private boolean readOnly; // 事务隔离级别 private IsolationLevel isolationLevel; // 维护当前线程事务ID和连接关系 private ConcurrentHashMap<String, ConnectionProxy> connectionMap; // 事务执行栈 private Stack<String> executeStack; // 数据源切换栈 private Stack<String> datasourceKeyStack; // 主事务ID private String mainTransactionId; // 执行次数 private AtomicInteger transCount; // 事务和数据源key关系 private ConcurrentHashMap<String, String> executeIdDatasourceKeyMap; }
每开启一个事物,生成一个事务ID并绑定一个ConnectionProxy
。事务嵌套调用,保存事务ID和lookupKey至栈中,当内层事务执行完毕执行pop。这样的话,外层事务只需在栈中执行peek即可获取事务ID和lookupKey。
6.2.5 数据源兼容处理
为了不影响原生事务的使用,需要重写getConnection
方法。当前线程没有启动自定义事务,则直接从数据源中返回连接。
@Override public Connection getConnection() throws SQLException { TransactionHolder transactionHolder = MultiTransactionManager.TRANSACTION_HOLDER_THREAD_LOCAL.get(); if (Objects.isNull(transactionHolder)) { return determineTargetDataSource().getConnection(); } ConnectionProxy ConnectionProxy = transactionHolder.getConnectionMap() .get(transactionHolder.getExecuteStack().peek()); if (ConnectionProxy == null) { // 没开跨库事务,直接返回 return determineTargetDataSource().getConnection(); } else { transactionHolder.addCount(); // 开了跨库事务,从当前线程中拿包装过的Connection return ConnectionProxy; } }
6.2.6 切面处理
切面处理的核心逻辑是:维护一个嵌套事务栈,当业务方法执行结束,或者发生异常时,判断当前栈顶事务ID是否为主事务ID。如果是的话这时候已经到了最外层事务,这时才执行提交和回滚。详细流程如下图所示:
package com.github.mtxn.transaction.aop; import com.github.mtxn.application.Application; import com.github.mtxn.transaction.MultiTransactionManager; import com.github.mtxn.transaction.annotation.MultiTransaction; import com.github.mtxn.transaction.context.DataSourceContextHolder; import com.github.mtxn.transaction.support.IsolationLevel; import com.github.mtxn.transaction.support.TransactionHolder; import com.github.mtxn.utils.ExceptionUtils; import lombok.extern.slf4j.Slf4j; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Pointcut; import org.aspectj.lang.reflect.MethodSignature; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; import java.lang.reflect.Method; @Aspect @Component @Slf4j @Order(99999) public class MultiTransactionAop { @Pointcut("@annotation(com.github.mtxn.transaction.annotation.MultiTransaction)") public void pointcut() { if (log.isDebugEnabled()) { log.debug("start in transaction pointcut..."); } } @Around("pointcut()") public Object aroundTransaction(ProceedingJoinPoint point) throws Throwable { MethodSignature signature = (MethodSignature) point.getSignature(); // 从切面中获取当前方法 Method method = signature.getMethod(); MultiTransaction multiTransaction = method.getAnnotation(MultiTransaction.class); if (multiTransaction == null) { return point.proceed(); } IsolationLevel isolationLevel = multiTransaction.isolationLevel(); boolean readOnly = multiTransaction.readOnly(); String prevKey = DataSourceContextHolder.getKey(); MultiTransactionManager multiTransactionManager = Application.resolve(multiTransaction.transactionManager()); // 切数据源,如果失败使用默认库 if (multiTransactionManager.switchDataSource(point, signature, multiTransaction)) return point.proceed(); // 开启事务栈 TransactionHolder transactionHolder = multiTransactionManager.startTransaction(prevKey, isolationLevel, readOnly, multiTransactionManager); Object proceed; try { proceed = point.proceed(); multiTransactionManager.commit(); } catch (Throwable ex) { log.error("execute method:{}#{},err:", method.getDeclaringClass(), method.getName(), ex); multiTransactionManager.rollback(); throw ExceptionUtils.api(ex, "系统异常:%s", ex.getMessage()); } finally { // 当前事务结束出栈 String transId = multiTransactionManager.getTrans().getExecuteStack().pop(); transactionHolder.getDatasourceKeyStack().pop(); // 恢复上一层事务 DataSourceContextHolder.setKey(transactionHolder.getDatasourceKeyStack().peek()); // 最后回到主事务,关闭此次事务 multiTransactionManager.close(transId); } return proceed; } }
7.总结
本文主要介绍了多数据源管理的解决方案(应用层事务,而非XA二段提交保证),以及对多个库同时操作的事务管理。
需要注意的是,这种方式只适用于单体架构的应用。因为多个库的事务参与者都是运行在同一个JVM进行。如果是在微服务架构的应用中,则需要使用分布式事务管理(譬如:Seata)。