分布式事务之2PC

本文涉及的产品
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介: 分布式事务之2PC

最近在工作中接触到了一些数据源切换的任务,需要处理到一些和分布式事务相关的模块,于是今天特意来总结一些关于分布式事务内容的技术要点,方便以后做技术分析的时候能够多考虑一些应用场景。


本地事务



某个业务场景中,需要往同一个数据库的多张表执行写操作,一旦有任意环节出现了异常,所有的写操作都需要执行回滚操作,保证数据的一致性和准确性,这一致性的保证我们一般称之为“事务”。所以也可以说”事务“是保证数据准确和一致的一种手段。

业界常用的数据库如:MySQL,Oracle,SqlServer,Postgre Sql都是支持事务类型的操作,当然也有部分数据库并不支持事务操作,例如说:es。


如何衡量一款数据库是否具备有本地事务的功能,关键要看它是否具备有本地事务的四项基本要素,也就是ACID。


A 原子性

Atomicity

要么全部成功要么全部失败

C 一致性

Consistency


原始数据不会收到事务提交的影响,例如说事务提交失败导致这条数据”不翼而飞“。和事务操作无关的数据不会在事务提交前后有任何变化,均保持一致的特效。


I 隔离性

Isolation


各个事务都是处于不同的线程,各自的事务执行不会相互影响。


D 持久性

Durability

一旦数据发生了提交,数据就一定要持久化到数据库中


分布式事务



分布式事务和本地事务的最大区别在于是否涉及到多个数据源。在现在比较流行的微服务架构场景中,分布式事务一般都是由多个子系统所触发。例如说:请求订单系统接口提交订单,库存系统扣除库存的这么一个操作。


简单的一次操作涉及到了两个子系统之间的操作,而这两个子系统也对因为应着两个不同的数据源, 此时所面对的事务问题就光靠本地事务就没办法解决了。


\

2PC


早期分布式解决手段的一种方案,在业界被称作为2PC(2 Phase Commitment Protocol)


也就是两阶段提交协议。


基本思路为:


(这里我暂时将不同的数据源称之为“参与者”,事务管理器称之为“协调者”)


1.协调者询问所有的参与者是否发送prepare操作成功。


2.如果所有参与者都prepare成功,那么事务协调器就会触发提交操作,否则就是所有参与者均执行回滚。


网络异常,图片无法展示
|


这里我解释一下prepare操作是怎么回事:


prepare操作实际上是需要程序将需要执行的sql发送给到数据库,此时数据库内部会执行本地事务操作,将其sql记录到日志中,但是由于没有收到提交信号,所以并不会执行。只有当2pc执行到第二阶段,事务协调器给各个数据源发送提交操作的时候,所有的参与者均执行回滚操作的时候才会继续生效执行。


采用2pc技术方案能够提高数据信息的一致性的概率,是一种强一致性的手段。


但是这种手段并不能保证一定的一致性,但是如果在提交commit的时候出现了异常,那么就会出现异常情况,不过这种现象的概率特别低。


应用框架案例:


atomikos框架


XA规范


上边讲解到了2pc方案,那么在jdk内部也会有对应的一套技术规范来统一这种实现手段。这种手段就是XA。


XA其实是X/Open 组织基于2PC的思想提出的一项标准。(注意,这是一套规范)


JTA接口标准


jta接口是一套jdk内置的接口,专门用于处理分布式事务。jta的接口在jdk包目录下边可以看到相关代码:


网络异常,图片无法展示
|


jdk指定了相关的jta接口标准,不同的数据库厂商可以基于jta接口规范来做2pc的实现:


网络异常,图片无法展示
|


在jdk内部可以基于xa和jta来实现一个简单的两阶段提交案例代码,


通过实际的代码案例要比上边的理论讲解更加实际:


package org.idea.spring.jdbc.xa;
import com.mysql.jdbc.jdbc2.optional.MysqlXADataSource;
import org.idea.spring.jdbc.xa.bean.MyXid;
import javax.sql.XAConnection;
import javax.sql.XADataSource;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
/**
 * @author idea
 * @date created in 11:18 上午 2020/10/30
 */
public class JTADatasourceDemo {
    private static final String INSERT_SQL_1 = "INSERT INTO `test-db01`.`t_order_1` (  `order_no`, `product_id`, `user_id`, `create_time`, `update_time` ) VALUES ( 39241, 1556586, 2662, now(), now() )";
    private static final String INSERT_SQL_2 = "INSERT INTO `test-db01`.`t_order_2`( `order_no`, `product_id`, `user_id`, `create_time`, `update_time`) VALUES ( 3562, 1137131, 3711, now(), now())";
    public static void main(String[] args) throws SQLException, XAException {
        //往表order_1表写入数据
        XADataSource xaDataSource_1 = getDatasource(DataSourceConfig.DATA_SOURCE_1);
        XAConnection xaConnection_1 = getXAConnection(xaDataSource_1);
        Connection connection_1 = getConnection(xaConnection_1);
        connection_1.setAutoCommit(false);
        Statement statement_1 = connection_1.createStatement();
        XAResource xaResource_1 = xaConnection_1.getXAResource();
        Xid xid_1 = new MyXid(100, new byte[]{0x07}, new byte[]{0x05});
        xaResource_1.start(xid_1, XAResource.TMNOFLAGS);
        statement_1.executeUpdate(INSERT_SQL_1);
        xaResource_1.end(xid_1, XAResource.TMSUCCESS);
        int ret_1 = xaResource_1.prepare(xid_1);
        //往order_2表写入数据
        XADataSource xaDataSource_2 = getDatasource(DataSourceConfig.DATA_SOURCE_2);
        XAConnection xaConnection_2 = getXAConnection(xaDataSource_2);
        Connection connection_2 = getConnection(xaConnection_2);
        connection_2.setAutoCommit(false);
        Statement statement_2 = connection_2.createStatement();
        XAResource xaResource_2 = xaConnection_2.getXAResource();
        Xid xid_2 = new MyXid(100, new byte[]{0x07}, new byte[]{0x06});
        xaResource_2.start(xid_2, XAResource.TMNOFLAGS);
        statement_2.executeUpdate(INSERT_SQL_2);
        xaResource_2.end(xid_2, XAResource.TMSUCCESS);
        int ret_2 = xaResource_2.prepare(xid_2);
        if (XAResource.XA_OK == ret_2 && XAResource.XA_OK == ret_1) {
            System.out.println("both success");
            xaResource_1.commit(xid_1, false);
            xaResource_2.commit(xid_2, false);
        } else {
            if(XAResource.XA_OK != ret_1){
                xaResource_1.rollback(xid_1);
            }
            if(XAResource.XA_OK != ret_2){
                xaResource_2.rollback(xid_2);
            }
            System.out.println("has error,need row back");
        }
        closeConnection(connection_1);
        closeConnection(connection_2);
    }
    /**
     * 获取请求链接
     *
     * @param dataSourceConfig
     * @return
     */
    public static XADataSource getDatasource(DataSourceConfig dataSourceConfig) {
        MysqlXADataSource dataSource = new MysqlXADataSource();
        dataSource.setPassword(dataSourceConfig.getPassword());
        dataSource.setUrl(dataSourceConfig.getUrl());
        dataSource.setUser(dataSourceConfig.getUsername());
        return dataSource;
    }
    public static Connection getConnection(XAConnection xaConnection) {
        Connection connection = null;
        try {
            connection = xaConnection.getConnection();
            return connection;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return connection;
    }
    public static XAConnection getXAConnection(XADataSource xaDataSource) {
        XAConnection xaConnection = null;
        try {
            xaConnection = xaDataSource.getXAConnection();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return xaConnection;
    }
    public static void closeConnection(Connection conn) {
        try {
            conn.close();
        } catch (SQLException e) {
            System.out.println("连接关闭失败");
        }
    }
}
//数据源的配置用枚举替代
package org.idea.spring.jdbc.xa;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
 * @author idea
 * @date created in 11:19 上午 2020/10/30
 */
@AllArgsConstructor
@Getter
public enum DataSourceConfig {
    DATA_SOURCE_1("jdbc:mysql://127.0.0.1:3306/test-db01","root","yourpassword"),
    DATA_SOURCE_2("jdbc:mysql://127.0.0.1:3306/test-db02","root","yourpassword");
    private String url;
    private String username;
    private String password;
}
复制代码


MySQL是如何实现XA规范


这里面会涉及到关于mysql的日志相关技术知识点。


mysql内部的binlog中会记录binlog_event,如果涉及到了和事务相关的操作就会记录一项xid_event信息,该事件会记录事务的xid。


当有事务执行prepare的时候,会先将数据写入到redolog(此时xid也会被写入到redolog),直到事务提交后再写入到binlog里面。


网络异常,图片无法展示
|


ps:redo log会将一些修改过的数据记录下来,防止数据还未实际刷入到磁盘中发生丢失。redo log的
目的在于数据备份+减少写磁盘压力。关于写入的数据如何从 buffer 中再去写入到 redo log 中可以
通过 innodb_flush_log_at_trx_Commit 参数来调节,本文就不做过多讲述。
复制代码


2PC的一些异常场景


1.在一些预料不到的情况下,假设发生了这种情形,第一到第二阶段中途,资源全部提交了。


此时会导致第二阶段的所有资源都提交失败,并且提示“已经提交”


解决思路:对于所有事务统一标记为已经提交。


网络异常,图片无法展示
|


2.第一到第二阶段中途,资源全部回滚了。


此时会导致第二阶段的所有资源都回滚了,并且提示“已经回滚”。


解决思路:对于所有事务统一标记为已经回滚。


网络异常,图片无法展示
|


3.第一到第二阶段中途,资源全部回滚了。


此时会导致第二阶段的部分资源回滚了,并且提示“已经回滚”。


解决思路:需要自行在业务系统中进行实现和设计。


网络异常,图片无法展示
|


关于提交异常这块可以去了解下Heuristic Decision(启发式决策)关键字。


如果想要自己实现一个可靠的DTP就需要考虑以下几个要点;


如何获取TM?
如何启动和结束一个事务
如何标识一个事务
如何保存和传递事务上下文
应用如何通过资源管理器操作共享资源
资源管理器如何实现准备阶段以及与提交阶段的逻辑
如何实现两阶段提交协议
如何实现在异常情况下进行事务恢复
复制代码


实现起来,确实还是挺复杂的。


2PC的缺点


比较占用资源,多个事务共同提交的时候需要进行等待,容易发生堵塞情况。

采用JTA这种方式的大事务手段,在一些访问量较高的场景中对于系统的资源消耗比较厉害,容易出现资源竞争系统激烈情况。


\

关于分布式事务的 tcc 专题涉及知识点比较多,后续专门出一篇文章进行整理。

相关实践学习
基于CentOS快速搭建LAMP环境
本教程介绍如何搭建LAMP环境,其中LAMP分别代表Linux、Apache、MySQL和PHP。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
Java 关系型数据库 MySQL
面试被问分布式事务(2PC、3PC、TCC),这样解释没毛病!
面试被问分布式事务(2PC、3PC、TCC),这样解释没毛病!
506 0
面试被问分布式事务(2PC、3PC、TCC),这样解释没毛病!
|
2月前
|
分布式计算 算法
分布式系统设计之共识算法—2PC、3PC、 Paxos
分布式系统设计之共识算法—2PC、3PC、 Paxos
42 1
|
8月前
|
算法
分布式系统中的那些一致性(CAP、BASE、2PC、3PC、Paxos、ZAB、Raft)
本文介绍 CAP、BASE理论的正确理解、Paxos 算法如何保证一致性及死循环问题、ZAB 协议中原子广播及崩溃恢复以及 Raft 算法的动态演示。
61 0
|
3月前
|
存储 消息中间件 关系型数据库
解密分布式事务:CAP理论、BASE理论、两阶段提交(2PC)、三阶段提交(3PC)、补偿事务(TCC)、MQ事务消息、最大努力通知
解密分布式事务:CAP理论、BASE理论、两阶段提交(2PC)、三阶段提交(3PC)、补偿事务(TCC)、MQ事务消息、最大努力通知
|
8月前
|
算法 Oracle 关系型数据库
【分布式】分布式事务基础概念(2PC,3PC,TCC)
【分布式】分布式事务基础概念(2PC,3PC,TCC)
302 0
|
10月前
|
消息中间件 缓存 NoSQL
分布式事务 3PC
3PC(Three-Phase Commit)是一种增强型的2PC(Two-Phase Commit)协议,用于解决2PC协议存在的可靠性问题和性能问题。3PC协议将2PC协议的两个阶段分为了三个阶段,同时引入了超时机制,从而提高了协议的可靠性和容错性。
74 0
|
10月前
|
消息中间件 算法 程序员
分布式事务 2PC
分布式事务是指跨越多个计算机节点的事务,涉及到多个数据库或其他资源的访问和更新。在分布式事务中,由于数据分布在不同的节点上,因此需要采用一些特殊的技术来保证事务的一致性和可靠性,其中最常用的技术之一就是两阶段提交(Two-Phase Commit,2PC)。
97 0
分布式事务 2PC
|
11月前
【JavaP6大纲】分布式事务篇:两阶段提交(2PC)
【JavaP6大纲】分布式事务篇:两阶段提交(2PC)
|
11月前
【JavaP6大纲】分布式事务篇:三阶段提交(3PC)
【JavaP6大纲】分布式事务篇:三阶段提交(3PC)
|
消息中间件 存储 监控
七种常见分布式事务详解(2PC、3PC、TCC、Saga、本地事务表、MQ事务消息、最大努力通知)
七种常见分布式事务详解(2PC、3PC、TCC、Saga、本地事务表、MQ事务消息、最大努力通知)
784 0

热门文章

最新文章