使用强大的DBPack处理分布式事务(PHP使用教程)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS MySQL,高可用系列 2核4GB
简介: 新兴的AT事务解决方案,例如Seata和Seata-golang,通过数据源代理层的资源管理器RM记录SQL回滚日志,跟随本地事务一起提交,大幅减少了数据的锁定时间,性能好且对业务几乎没有侵入。其缺点是支持的语言比较单一,例如Seata只支持Java语言类型的微服务,Seata-golang只支持Go语言类型的微服务。为了突破AT事务对业务编程语言的限制,现在业界正在往DB Mesh的方向发展,通过将事务中间件部署在SideCar的方式,达到任何编程语言都能使用分布式事务中间件的效果。

主流的分布式事务的处理方案

近些年,随着微服务的广泛使用,业务对系统的分布式事务处理能力的要求越来越高。

早期的基于XA协议的二阶段提交方案,将分布式事务的处理放在数据库驱动层,实现了对业务的无侵入,但是对数据的锁定时间很长,性能较低。

现在主流的TCC事务方案和SAGA事务方案,都是基于业务补偿机制,虽然没有全局锁,性能很高,但是一定程度上入侵了业务逻辑,增加了业务开发人员的开发时间和系统维护成本。

新兴的AT事务解决方案,例如SeataSeata-golang,通过数据源代理层的资源管理器RM记录SQL回滚日志,跟随本地事务一起提交,大幅减少了数据的锁定时间,性能好且对业务几乎没有侵入。其缺点是支持的语言比较单一,例如Seata只支持Java语言类型的微服务,Seata-golang只支持Go语言类型的微服务。

为了突破AT事务对业务编程语言的限制,现在业界正在往DB Mesh的方向发展,通过将事务中间件部署在SideCar的方式,达到任何编程语言都能使用分布式事务中间件的效果。

DBPack是一个处理分布式事务的数据库代理,其能够拦截MySQL流量,生成对应的事务回滚镜像,通过与ETCD协调完成分布式事务,性能很高,且对业务没有入侵,能够自动补偿SQL操作,支持接入任何编程语言。DBPack还支持TCC事务模式,能够自动补偿HTTP请求。目前其demo已经有Java、Go、Python和PHP,TCC的sample也已经在路上了,demo示例可以关注dbpack-samples

最新版DBPack不仅支持预处理的sql语句,还支持text类型的sql。DBPack最新版还兼容了php8的pdo_mysql扩展。Mysql 客户端在给用户发送 sql 执行结果时,如果执行没有异常,发送的第一个包为 OKPacket,该包中有一个标志位可以标识 sql 请求是否在一个事务中。如下图所示

image-20220629161325409.png

这个包的内容为:

07 00 00 // 前 3 个字节表示 payload 的长度为 7 个字节
01 // sequence 响应的序号,前 4 个字节一起构成了 OKPacket 的 header
00 // 标识 payload 为 OKPacket
00 // affected row
00 // last insert id
03 00 // 状态标志位
00 00 // warning 数量

dbpack 之前的版本将标志位设置为 0,java、golang、.net core、php 8.0 之前的 mysql driver 都能正确协调事务,php 8.0 的 pdo driver 会对标志位进行校验,所以 php 8.0 以上版本在使用 dbpack 协调分布式事务时,会抛出 transaction not active 异常。最新版本已经修复了这个问题。

下图是具体的DBPack事务流程图。

dbpack-workflow.png

其事务流程简要描述如下:

  1. 客户端向聚合层服务的DBPack代理发起HTTP请求。注意请求的地址和端口指向DBPack,并不直接指向实际API。
  2. DBPack生成全局唯一的XID,存储到ETCD中。
  3. 如果开启全局事务成功(如果失败则直接结束事务),聚合层服务就可以通过HTTP header(X-Dbpack-Xid)拿到XID了。此时,聚合服务调用服务1的接口,并传递XID。
  4. 服务1拿到XID,通过DBPack代理,注册分支事务(生成BranchID等信息,并存储到ETCD)。
  5. 服务1的分支事务注册成功后,DBPack自动生成本地事务的回滚镜像,随着本地事务一起commit。
  6. 服务2进行与服务1相同的步骤4和5。
  7. 聚合层服务根据服务1和服务2的结果,决定是全局事务提交还是回滚。如果是提交,则返回HTTP 200给DBPack(除200以外的状态码都会被DBPack认为是失败)。DBPack更新ETCD中的全局事务状态为全局提交中或回滚中。
  8. 服务1和服务2的DBPack,通过ETCD的watch机制,得知本地的分支事务是该提交还是回滚(如果是提交,则删除回滚日志;如果是回滚,则执行通过回滚日志回滚到事务前镜像)。
  9. 所有的分支事务提交或回滚完成后,ETCD里的分支事务状态将更新为已提交或已回滚,聚合层服务的DBPack的协程会检测到全局事务已经完成,将从ETCD删除XID和BranchID等事务信息。

本文将以PHP语言为例,详细介绍如何使用PHP对接DBPack完成分布式事务。实际使用其他语言时,对接过程也是类似的。

使用PHP对接DBPack实现分布式事务

前置条件

  • 业务数据库为mysql数据库
  • 业务数据表为innodb类型
  • 业务数据表必须有主键

Step0: 安装ETCD

ETCD_VER=v3.5.3
# choose either URL
GOOGLE_URL=https://storage.googleapis.com/etcd
GITHUB_URL=https://github.com/etcd-io/etcd/releases/download
DOWNLOAD_URL=${GOOGLE_URL}
rm -f /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz
rm -rf /tmp/etcd-download-test && mkdir -p /tmp/etcd-download-test
curl -L ${DOWNLOAD_URL}/${ETCD_VER}/etcd-${ETCD_VER}-linux-amd64.tar.gz -o /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz
tar xzvf /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz -C /tmp/etcd-download-test --strip-components=1
rm -f /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz
/tmp/etcd-download-test/etcd --version
/tmp/etcd-download-test/etcdctl version
/tmp/etcd-download-test/etcdutl version

Step1: 在业务数据库中创建undo_log表

undo_log表用于存储本地事务的回滚镜像。

-- ----------------------------
-- Table structure for undo_log
-- ----------------------------
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  `ext` varchar(100) DEFAULT NULL,
  PRIMARY KEY (`id`),
  KEY `idx_unionkey` (`xid`,`branch_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

Step2: 编写配置文件,对接DBPack

# 更新distributed_transaction.etcd_config.endpoints
# 更新listeners配置项,调整为实际聚合层服务的地址和端口
# 更新filters配置项,配置聚合层服务的API endpoint
vim /path/to/your/aggregation-service/config-aggregation.yaml
  • 更新distributed_transaction.etcd_config.endpoints
  • 更新listeners配置项,配置业务数据库信息,包括dbpack代理的端口
  • 更新data_source_cluster.dsn

vim /path/to/your/business-service/config-service.yaml

### Step3: 运行DBPack
```bash
git clone git@github.com:cectc/dbpack.git
cd dbpack
# build on local env
make build-local
# build on production env
make build
./dist/dbpack start --config /path/to/your/config-aggregation.yaml
./dist/dbpack start --config /path/to/your/config-service.yaml

Step4: 配置vhost,监听php项目端口

以Nginx为例,配置如下

server {
    listen 3001; # 暴露的服务端口
    index index.php index.html;
    root /var/www/code/; # 业务代码根目录
    location / {
        try_files $uri /index.php?$args;
    }
    location ~ \.php$ {
        fastcgi_split_path_info ^(.+\.php)(/.+)$;
        fastcgi_pass order-svc-app:9000; # php-fpm 端口
        fastcgi_index index.php;
        include fastcgi_params;
        fastcgi_param SCRIPT_FILENAME $document_root$fastcgi_script_name;
        fastcgi_param PATH_INFO $fastcgi_path_info;
    }
}

Step5: 编写应用程序

aggregation service example

class AggregationSvc
{
    public function CreateSo(string $xid, bool $rollback): bool
    {
        $createSoSuccess = $this->createSoRequest($xid);
        if (!$createSoSuccess) {
            return false;
        }
        $allocateInventorySuccess = $this->allocateInventoryRequest($xid);
        if (!$allocateInventorySuccess) {
            return false;
        }
        if ($rollback) {
            return false;
        }
        return true;
    }
    // private function createSoRequest(string $xid) ...
    // private function allocateInventoryRequest(string $xid) ...
}
$reqPath = strtok($_SERVER["REQUEST_URI"], '?');
$reaHeaders = getallheaders();
$xid = $reaHeaders['X-Dbpack-Xid'] ?? '';
if (empty($xid)) {
    die('xid is not provided!');
}
$aggregationSvc = new AggregationSvc();
if ($_SERVER['REQUEST_METHOD'] === 'POST') {
    switch ($reqPath) {
        case '/v1/order/create':
            if ($aggregationSvc->CreateOrder($xid, false)) {
                responseOK();
            } else {
                responseError();
            }
        case '/v1/order/create2':
            if ($aggregationSvc->CreateSo($xid, true)) {
                responseOK();
            } else {
                responseError();
            }
            break;
        default:
            die('api not found');
    }
}
function responseOK() {
    http_response_code(200);
    echo json_encode([
        'success' => true,
        'message' => 'success',
    ]);
}
function responseError() {
    http_response_code(400);
    echo json_encode([
        'success' => false,
        'message' => 'fail',
    ]);
}

order service example

class OrderDB
{
    private PDO $_connection;
    private static OrderDB $_instance;
    private string $_host = 'dbpack-order';
    private int $_port = 13308;
    private string $_username = 'dksl';
    private string $_password = '123456';
    private string $_database = 'order';
    const insertSoMaster = "INSERT /*+ XID('%s') */ INTO order.so_master (sysno, so_id, buyer_user_sysno, seller_company_code, 
        receive_division_sysno, receive_address, receive_zip, receive_contact, receive_contact_phone, stock_sysno, 
        payment_type, so_amt, status, order_date, appid, memo) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,now(),?,?)";
    const insertSoItem = "INSERT /*+ XID('%s') */ INTO order.so_item(sysno, so_sysno, product_sysno, product_name, cost_price, 
        original_price, deal_price, quantity) VALUES (?,?,?,?,?,?,?,?)";
    public static function getInstance(): OrderDB
    {
        if (empty(self::$_instance)) {
            self::$_instance = new self();
        }
        return self::$_instance;
    }
    private function __construct()
    {
        try {
            $this->_connection = new PDO(
                "mysql:host=$this->_host;port=$this->_port;dbname=$this->_database;charset=utf8",
                $this->_username,
                $this->_password,
                [
                    PDO::ATTR_PERSISTENT => true,
                    PDO::ATTR_EMULATE_PREPARES => false, // to let DBPack handle prepread sql
                ]
            );
        } catch (PDOException $e) {
            die($e->getMessage());
        }
    }
    private function __clone()
    {
    }
    public function getConnection(): PDO
    {
        return $this->_connection;
    }
    public function createSo(string $xid, array $soMasters): bool
    {
        $this->getConnection()->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
        try {
            $this->getConnection()->beginTransaction();
            foreach ($soMasters as $master) {
                if (!$this->insertSo($xid, $master)) {
                    throw new PDOException("failed to insert soMaster");
                }
            }
            $this->getConnection()->commit();
        } catch (PDOException $e) {
            $this->getConnection()->rollBack();
            return false;
        }
        return true;
    }
    private function insertSo(string $xid, array $soMaster): bool
    {
        // insert into so_master, so_item ...
    }
}
$reqPath = strtok($_SERVER["REQUEST_URI"], '?');
$reqHeaders = getallheaders();
$xid = $reqHeaders['Xid'] ?? '';
if (empty($xid)) {
    die('xid is not provided!');
}
if ($_SERVER['REQUEST_METHOD'] === 'POST') {
    if ($reqPath === '/createSo') {
        $reqBody = file_get_contents('php://input');
        $soMasters = json_decode($reqBody, true);
        $orderDB = OrderDB::getInstance();
        $result = $orderDB->createSo($xid, $soMasters);
        if ($result) {
            responseOK();
        } else {
            responseError();
        }
    }
}
function responseOK() {
    http_response_code(200);
    echo json_encode([
        'success' => true,
        'message' => 'success',
    ]);
}
function responseError() {
    http_response_code(400);
    echo json_encode([
        'success' => false,
        'message' => 'fail',
    ]);
}

Step6: 访问聚合层业务接口

curl -X{HTTP Method} http://localhost:{DBPack监听的聚合层服务端口}/{聚合层服务的API endpoint}

注意的点

  • 无论是使用mysqli驱动、pdo_mysql驱动,还是通过mysql_connect()连接数据库(<=php5.4),在start transaction;开始之后,后续的业务操作必须在同一个数据库连接上进行。
  • DBPack通过xid(全局事务唯一ID)在事务上下文中传播,业务数据库执行的业务SQL语句中,需要加入xid注释,这样DBPack才能根据xid处理对应的事务。例如insert /*+ XID('%s') */ into  xx ...;

参考链接


作者简介

卜贺贺。就职于日本楽天Rakuten CNTD,任Application Engineer,熟悉AT事务、Seata-golang和DBPack。GitHub:github.com/bohehe

相关实践学习
如何快速连接云数据库RDS MySQL
本场景介绍如何通过阿里云数据管理服务DMS快速连接云数据库RDS MySQL,然后进行数据表的CRUD操作。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
应用服务中间件 PHP nginx
|
安全 小程序 PHP
PHP代码审计(七)Rips源代码审计系统使用教程
上一篇中提到的Seay源代码审计系统是由C#编写的winform程序,现在已经停止更新了,但是,还是比较好用的。 PHP代码审计还有另一个工具,也是一个神器Rips
329 0
|
9天前
|
关系型数据库 MySQL PHP
新手教程:PHP入门教程
《PHP入门教程》涵盖PHP简介、环境搭建、第一个PHP脚本、基本语法、表单处理、函数定义及进一步学习方向。PHP是一种开源脚本语言,适合Web开发。环境搭建可使用XAMPP/WAMP/MAMP或手动安装Apache、PHP和MySQL。通过编写简单代码和处理表单,逐步掌握变量、字符串、数组、条件语句和循环等基础语法。最后介绍数据库操作、会话管理和面向对象编程等高级特性。
101 67
|
2月前
|
XML 安全 PHP
PHP与SOAP Web服务开发:基础与进阶教程
本文介绍了PHP与SOAP Web服务的基础和进阶知识,涵盖SOAP的基本概念、PHP中的SoapServer和SoapClient类的使用方法,以及服务端和客户端的开发示例。此外,还探讨了安全性、性能优化等高级主题,帮助开发者掌握更高效的Web服务开发技巧。
|
3月前
|
tengine 应用服务中间件 Linux
Tengine、Nginx安装PHP命令教程
要在阿里云Linux上安装PHP,请先更新YUM源并启用PHP 8.0仓库,然后安装PHP及相关扩展。通过`php -v`命令验证安装成功后,需修改Nginx配置文件以支持PHP,并重启服务。最后,创建`phpinfo.php`文件测试安装是否成功。对于CentOS系统,还需安装EPEL源和Remi仓库,其余步骤类似。完成上述操作后,可通过浏览器访问`http://IP地址/phpinfo.php`测试安装结果。
|
3月前
|
SQL NoSQL MongoDB
一款基于分布式文件存储的数据库MongoDB的介绍及基本使用教程
一款基于分布式文件存储的数据库MongoDB的介绍及基本使用教程
61 0
|
4月前
|
PHP Windows
thinkPhP6.0安装教程图解--PHP框架安装
本文是一篇关于ThinkPHP 6.0安装教程的图解,包括环境检查、安装Composer、修改Composer镜像地址、安装ThinkPHP框架以及启动运行ThinkPHP的步骤。文章详细描述了每个步骤的操作方法,并提供了相应的命令和截图,帮助用户理解并顺利完成ThinkPHP 6.0的安装和运行。
thinkPhP6.0安装教程图解--PHP框架安装
|
5月前
|
Linux PHP
Linux CentOS 宝塔 Suhosin禁用php5.6版本eval函数详细图文教程
【8月更文挑战第27天】本文介绍两种禁用PHP执行的方法:使用`PHP_diseval_extension`禁用和通过`suhosin`禁用。由于`suhosin`不支持PHP8,仅适用于PHP7及以下版本,若服务器安装了PHP5.6,则需对应安装`suhosin-0.9.38`版本。文章提供了详细的安装步骤,并强调了宝塔环境下与普通环境下的PHP路径差异。安装完成后,在`php.ini`中添加`suhosin.so`扩展并设置`executor.disable_eval = on`以禁用执行功能。最后通过测试代码验证是否成功禁用,并重启`php-fpm`服务生效。
75 2
|
5月前
|
SQL 关系型数据库 MySQL
PHP与MySQL交互之基础教程
【8月更文挑战第31天】 在数字世界中,数据是推动一切的核心力量。本文将引导你探索PHP与MySQL的协同工作,通过实际代码示例,展示如何建立连接、执行查询以及处理结果集。无论你是初学者还是希望巩固知识的开发者,这篇文章都将为你提供宝贵的实践知识。
|
5月前
|
网络协议 API PHP
PhalApi:在宝塔一键安装部署PHP开源接口框架的教程
要在宝塔面板上一键安装部署PhalApi开源接口框架,首先进入宝塔软件商店,切换到“一键部署”选项,搜索“phalapi”并点击“一键部署”。安装时需填写接口域名、数据库名及密码,提交后等待安装完成。安装成功后可在宝塔面板中查看新站点和源代码目录,并通过DNS解析设置访问接口域名,如`http://myapi.phalapi.net/`。默认开启的调试模式便于测试,可通过修改`config/sys.php`中的`debug`值为`false`关闭。最后,在源代码中开发自己的PHP接口,PhalApi会自动生成在线接口文档,方便后续调用与维护。更多详细教程可参考官方文档。