最近在工作中接触到了一些数据源切换的任务,需要处理到一些和分布式事务相关的模块,于是今天特意来总结一些关于分布式事务内容的技术要点,方便以后做技术分析的时候能够多考虑一些应用场景。
本地事务
某个业务场景中,需要往同一个数据库的多张表执行写操作,一旦有任意环节出现了异常,所有的写操作都需要执行回滚操作,保证数据的一致性和准确性,这一致性的保证我们一般称之为“事务”。所以也可以说”事务“是保证数据准确和一致的一种手段。
业界常用的数据库如:MySQL,Oracle,SqlServer,Postgre Sql都是支持事务类型的操作,当然也有部分数据库并不支持事务操作,例如说:es。
如何衡量一款数据库是否具备有本地事务的功能,关键要看它是否具备有本地事务的四项基本要素,也就是ACID。
A 原子性
Atomicity
要么全部成功要么全部失败
C 一致性
Consistency
原始数据不会收到事务提交的影响,例如说事务提交失败导致这条数据”不翼而飞“。和事务操作无关的数据不会在事务提交前后有任何变化,均保持一致的特效。
I 隔离性
Isolation
各个事务都是处于不同的线程,各自的事务执行不会相互影响。
D 持久性
Durability
一旦数据发生了提交,数据就一定要持久化到数据库中
分布式事务
分布式事务和本地事务的最大区别在于是否涉及到多个数据源。在现在比较流行的微服务架构场景中,分布式事务一般都是由多个子系统所触发。例如说:请求订单系统接口提交订单,库存系统扣除库存的这么一个操作。
简单的一次操作涉及到了两个子系统之间的操作,而这两个子系统也对因为应着两个不同的数据源, 此时所面对的事务问题就光靠本地事务就没办法解决了。
\
2PC
早期分布式解决手段的一种方案,在业界被称作为2PC(2 Phase Commitment Protocol)
也就是两阶段提交协议。
基本思路为:
(这里我暂时将不同的数据源称之为“参与者”,事务管理器称之为“协调者”)
1.协调者询问所有的参与者是否发送prepare操作成功。
2.如果所有参与者都prepare成功,那么事务协调器就会触发提交操作,否则就是所有参与者均执行回滚。
这里我解释一下prepare操作是怎么回事:
prepare操作实际上是需要程序将需要执行的sql发送给到数据库,此时数据库内部会执行本地事务操作,将其sql记录到日志中,但是由于没有收到提交信号,所以并不会执行。只有当2pc执行到第二阶段,事务协调器给各个数据源发送提交操作的时候,所有的参与者均执行回滚操作的时候才会继续生效执行。
采用2pc技术方案能够提高数据信息的一致性的概率,是一种强一致性的手段。
但是这种手段并不能保证一定的一致性,但是如果在提交commit的时候出现了异常,那么就会出现异常情况,不过这种现象的概率特别低。
应用框架案例:
atomikos框架
XA规范
上边讲解到了2pc方案,那么在jdk内部也会有对应的一套技术规范来统一这种实现手段。这种手段就是XA。
XA其实是X/Open 组织基于2PC的思想提出的一项标准。(注意,这是一套规范)
JTA接口标准
jta接口是一套jdk内置的接口,专门用于处理分布式事务。jta的接口在jdk包目录下边可以看到相关代码:
jdk指定了相关的jta接口标准,不同的数据库厂商可以基于jta接口规范来做2pc的实现:
在jdk内部可以基于xa和jta来实现一个简单的两阶段提交案例代码,
通过实际的代码案例要比上边的理论讲解更加实际:
package org.idea.spring.jdbc.xa; import com.mysql.jdbc.jdbc2.optional.MysqlXADataSource; import org.idea.spring.jdbc.xa.bean.MyXid; import javax.sql.XAConnection; import javax.sql.XADataSource; import javax.transaction.xa.XAException; import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; import java.sql.Connection; import java.sql.SQLException; import java.sql.Statement; /** * @author idea * @date created in 11:18 上午 2020/10/30 */ public class JTADatasourceDemo { private static final String INSERT_SQL_1 = "INSERT INTO `test-db01`.`t_order_1` ( `order_no`, `product_id`, `user_id`, `create_time`, `update_time` ) VALUES ( 39241, 1556586, 2662, now(), now() )"; private static final String INSERT_SQL_2 = "INSERT INTO `test-db01`.`t_order_2`( `order_no`, `product_id`, `user_id`, `create_time`, `update_time`) VALUES ( 3562, 1137131, 3711, now(), now())"; public static void main(String[] args) throws SQLException, XAException { //往表order_1表写入数据 XADataSource xaDataSource_1 = getDatasource(DataSourceConfig.DATA_SOURCE_1); XAConnection xaConnection_1 = getXAConnection(xaDataSource_1); Connection connection_1 = getConnection(xaConnection_1); connection_1.setAutoCommit(false); Statement statement_1 = connection_1.createStatement(); XAResource xaResource_1 = xaConnection_1.getXAResource(); Xid xid_1 = new MyXid(100, new byte[]{0x07}, new byte[]{0x05}); xaResource_1.start(xid_1, XAResource.TMNOFLAGS); statement_1.executeUpdate(INSERT_SQL_1); xaResource_1.end(xid_1, XAResource.TMSUCCESS); int ret_1 = xaResource_1.prepare(xid_1); //往order_2表写入数据 XADataSource xaDataSource_2 = getDatasource(DataSourceConfig.DATA_SOURCE_2); XAConnection xaConnection_2 = getXAConnection(xaDataSource_2); Connection connection_2 = getConnection(xaConnection_2); connection_2.setAutoCommit(false); Statement statement_2 = connection_2.createStatement(); XAResource xaResource_2 = xaConnection_2.getXAResource(); Xid xid_2 = new MyXid(100, new byte[]{0x07}, new byte[]{0x06}); xaResource_2.start(xid_2, XAResource.TMNOFLAGS); statement_2.executeUpdate(INSERT_SQL_2); xaResource_2.end(xid_2, XAResource.TMSUCCESS); int ret_2 = xaResource_2.prepare(xid_2); if (XAResource.XA_OK == ret_2 && XAResource.XA_OK == ret_1) { System.out.println("both success"); xaResource_1.commit(xid_1, false); xaResource_2.commit(xid_2, false); } else { if(XAResource.XA_OK != ret_1){ xaResource_1.rollback(xid_1); } if(XAResource.XA_OK != ret_2){ xaResource_2.rollback(xid_2); } System.out.println("has error,need row back"); } closeConnection(connection_1); closeConnection(connection_2); } /** * 获取请求链接 * * @param dataSourceConfig * @return */ public static XADataSource getDatasource(DataSourceConfig dataSourceConfig) { MysqlXADataSource dataSource = new MysqlXADataSource(); dataSource.setPassword(dataSourceConfig.getPassword()); dataSource.setUrl(dataSourceConfig.getUrl()); dataSource.setUser(dataSourceConfig.getUsername()); return dataSource; } public static Connection getConnection(XAConnection xaConnection) { Connection connection = null; try { connection = xaConnection.getConnection(); return connection; } catch (Exception e) { e.printStackTrace(); } return connection; } public static XAConnection getXAConnection(XADataSource xaDataSource) { XAConnection xaConnection = null; try { xaConnection = xaDataSource.getXAConnection(); } catch (Exception e) { e.printStackTrace(); } return xaConnection; } public static void closeConnection(Connection conn) { try { conn.close(); } catch (SQLException e) { System.out.println("连接关闭失败"); } } } //数据源的配置用枚举替代 package org.idea.spring.jdbc.xa; import lombok.AllArgsConstructor; import lombok.Getter; /** * @author idea * @date created in 11:19 上午 2020/10/30 */ @AllArgsConstructor @Getter public enum DataSourceConfig { DATA_SOURCE_1("jdbc:mysql://127.0.0.1:3306/test-db01","root","yourpassword"), DATA_SOURCE_2("jdbc:mysql://127.0.0.1:3306/test-db02","root","yourpassword"); private String url; private String username; private String password; } 复制代码
MySQL是如何实现XA规范
这里面会涉及到关于mysql的日志相关技术知识点。
mysql内部的binlog中会记录binlog_event,如果涉及到了和事务相关的操作就会记录一项xid_event信息,该事件会记录事务的xid。
当有事务执行prepare的时候,会先将数据写入到redolog(此时xid也会被写入到redolog),直到事务提交后再写入到binlog里面。
ps:redo log会将一些修改过的数据记录下来,防止数据还未实际刷入到磁盘中发生丢失。redo log的 目的在于数据备份+减少写磁盘压力。关于写入的数据如何从 buffer 中再去写入到 redo log 中可以 通过 innodb_flush_log_at_trx_Commit 参数来调节,本文就不做过多讲述。 复制代码
2PC的一些异常场景
1.在一些预料不到的情况下,假设发生了这种情形,第一到第二阶段中途,资源全部提交了。
此时会导致第二阶段的所有资源都提交失败,并且提示“已经提交”
解决思路:对于所有事务统一标记为已经提交。
2.第一到第二阶段中途,资源全部回滚了。
此时会导致第二阶段的所有资源都回滚了,并且提示“已经回滚”。
解决思路:对于所有事务统一标记为已经回滚。
3.第一到第二阶段中途,资源全部回滚了。
此时会导致第二阶段的部分资源回滚了,并且提示“已经回滚”。
解决思路:需要自行在业务系统中进行实现和设计。
关于提交异常这块可以去了解下Heuristic Decision(启发式决策)关键字。
如果想要自己实现一个可靠的DTP就需要考虑以下几个要点;
如何获取TM? 如何启动和结束一个事务 如何标识一个事务 如何保存和传递事务上下文 应用如何通过资源管理器操作共享资源 资源管理器如何实现准备阶段以及与提交阶段的逻辑 如何实现两阶段提交协议 如何实现在异常情况下进行事务恢复 复制代码
实现起来,确实还是挺复杂的。
2PC的缺点
比较占用资源,多个事务共同提交的时候需要进行等待,容易发生堵塞情况。
采用JTA这种方式的大事务手段,在一些访问量较高的场景中对于系统的资源消耗比较厉害,容易出现资源竞争系统激烈情况。
\
关于分布式事务的 tcc 专题涉及知识点比较多,后续专门出一篇文章进行整理。