DBPack 赋能 python 微服务协调分布式事务

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: DBPack 的分布式事务致力于实现对用户的业务无入侵,使用时下流行的sidecar架构,主要使用 ETCD Watch 机制来驱动分布式事务提交回滚,它对 HTTP 流量和 MYSQL 流量做了拦截代理,支持 AT 模式(自动补偿 SQL)和 TCC 模式(自动补偿 HTTP 请求)。

什么是分布式事务

事务处理几乎在每一个信息系统中都会涉及,它存在的意义是为了保证系统数据符合期望的,且相互关联的数据之间不会产生矛盾,即数据状态的一致性。

按照数据库的经典理论,原子性、隔离性、持久性。原子性要求数据要么修改要么回滚,隔离性要求事务之间相互独立不影响,持久性要求事务的执行能正确的持久化,不丢失数据。mysql 类的行式数据库通过 mvcc 多版本视图和 wal 预写日志等技术的协作,实现了单个服务使用单个数据源或者单个服务使用多个数据源场景的多事务的原子性、隔离性和持久性。

凤凰架构这本书中有描述,单个服务使用单个数据源称之为本地事务,单个服务使用多个数据源称之为全局事务,而分布式事务特指多个服务同时访问多个数据源的事务处理机制。

DBpack 简介

分布式事务的实现有很多方式,如可靠性事务队列,TCC事务,SAGA事务等。

可靠性事务队列,也就是BASE,听起来和强一致性的ACID,"酸碱"格格不入,它作为最终一致性的概念起源,系统性地总结了一种针对分布式事务的技术手段。

TCC 较为烦琐,如同名字所示,它分为以下三个阶段。

  • Try:尝试执行阶段,完成所有业务可执行性的检查(保障一致性),并且预留好全部需用到的业务资源(保障隔离性)。
  • Confirm:确认执行阶段,不进行任何业务检查,直接使用 Try 阶段准备的资源来完成业务处理。Confirm 阶段可能会重复执行,因此本阶段所执行的操作需要具备幂等性。
  • Cancel:取消执行阶段,释放 Try 阶段预留的业务资源。Cancel 阶段可能会重复执行,也需要满足幂等性。

SAGA 事务将事务进行了拆分,大事务拆分若干个小事务,将整个分布式事务 T 分解为 n 个子事务,同时为为每一个子事务设计对应的补偿动作。尽管补偿操作通常比冻结或撤销容易实现,但保证正向、反向恢复过程的能严谨地进行也需要花费不少的工夫。

DBPack 的分布式事务致力于实现对用户的业务无入侵,使用时下流行的sidecar架构,主要使用 ETCD Watch 机制来驱动分布式事务提交回滚,它对 HTTP 流量和 MYSQL 流量做了拦截代理,支持 AT 模式(自动补偿 SQL)和 TCC 模式(自动补偿 HTTP 请求)。

DBPack 的 AT 模式性能取决于全局锁的释放速度,哪个事务竞争到了全局锁就能对业务数据做修改,在单位时间内,全局锁的释放速度越快,竞争到锁的事务越多,性能越高。DBPack 创建全局事务、注册分支事务只是在 ETCD 插入两条 KV 数据,事务提交回滚时修改对应数据的状态,通过 ETCD Watch 机制感知到数据的变化就能立即处理数据的提交回滚,从而在交互上减少了很多 RPC 请求。

distributed-transaction-sidecar.drawio.png

DBPack 的 TCC 模式中,请求会先到达 sidecar 后再注册 TCC 事务分支,确保 Prepare 先于 Cancel 执行。具体到操作的业务数据,建议使用 XID 和 BranchID 加锁。

tcc.drawio.png

DBpack 赋能 python 微服务

以上的前戏已铺垫,后文以讲解python 微服务代码为主,不涉及 dbpack 源码,感兴趣的童鞋可去自行调试了解。

https://github.com/CECTC/dbpack-samples/blob/main/python

这里会提到三个微服务,首先是是事务发起方,其次是订单系统,最后是产品库存系统。而每一个微服务,都使用dbpack代理。事务发起方请求成功后,当订单正常commit后,产品库存要发生正常扣除,一旦一个微服务未完成,另一个则要发生回滚,也就是说,两个微服务系统要保持一致。

首先,模拟分布式事务发起方的服务,该服务会注册两个 handler,一个会发起正常的请求,走 dbpack 代理发起分布式事务,另一个会则会非正常返回。事务发起方会根据 http 的请求情况,决定是否要发起分布式事务回滚。

以下借用了 flask web 框架实现了事务发起方的两个handler,通过两个http请求我们可以模拟分布式事务发起或者回滚。

from flask import Flask, request, jsonify
import requests
app = Flask(__name__)
create_so_url        = "http://order-svc:3001/createSo"
update_inventory_url = "http://product-svc:3002/allocateInventory"
@app.route('/v1/order/create', methods=['POST'])
def create_1():
   return create_so(rollback=False)
@app.route('/v1/order/create2', methods=['POST'])
def create_2():
   return create_so(rollback=True)
def create_so(rollback=True):
    xid = request.headers.get("x-dbpack-xid")
    so_items = [dict(
        product_sysno=1,
        product_name="apple iphone 13",
        original_price=6799,
        cost_price=6799,
        deal_price=6799,
        quantity=2,
    )]
    so_master = [dict(
        buyer_user_sysno = 10001,
        seller_company_code = "SC001",
        receive_division_sysno = 110105,
        receive_address = "beijing",
        receive_zip = "000001",
        receive_contact = "scott",
        receive_contact_phone =  "18728828296",
        stock_sysno = 1,
        payment_type = 1,
        so_amt = 6999 * 2,
        status = 10,
        appid = "dk-order",
        so_items = so_items,
    )]
    success = (jsonify(dict(success=True, message="success")), 200)
    failed = (jsonify(dict(success=False, message="failed")), 400)
    headers = {
        "Content-Type": "application/json",
        "xid": xid
    }
    so_req = dict(req=so_master)
    resp1 = requests.post(create_so_url, headers=headers, json=so_req)
    if resp1.status_code == 400:
        return failed
    ivt_req = dict(req=[dict(product_sysno= 1, qty=2)])
    resp2 = requests.post(update_inventory_url, headers=headers, json=ivt_req)
    if resp2.status_code == 400:
        return failed
    if rollback:
        print("rollback")
        return failed
    return success
if __name__ == "__main__":
    app.run(host="0.0.0.0", port=3000)

那么如何使用 dbpack 代理该服务呢?

$./dist/dbpack start --config ../dbpack-samples/configs/config-aggregation.yaml
$ cat ../dbpack-samples/configs/config-aggregation.yaml
listeners:
  - protocol_type: http
    socket_address:
      address: 0.0.0.0
      port: 13000
    config:
      backend_host: aggregation-svc:3000
    filters:
      - httpDTFilter
filters:
  - name: httpDTFilter
    kind: HttpDistributedTransaction
    conf:
      appid: aggregationSvc
      transaction_infos:
        - request_path: "/v1/order/create"
          timeout: 60000
        - request_path: "/v1/order/create2"
          timeout: 60000
distributed_transaction:
  appid: aggregationSvc
  retry_dead_threshold: 130000
  rollback_retry_timeout_unlock_enable: true
  etcd_config:
    endpoints:
      - etcd:2379

可想而知,以上的微服务两个 handler 是通过 filters这部分的定义来配置拦截的。

接着是订单系统。

from flask import Flask, jsonify, request
from datetime import datetime
import mysql.connector
import time
import random
app = Flask(__name__)
insert_so_master = "INSERT /*+ XID('{xid}') */ INTO order.so_master({keys}) VALUES ({placeholders})"
insert_so_item = "INSERT /*+ XID('{xid}') */ INTO order.so_item({keys}) VALUES ({placeholders})"
def conn():
    retry = 0
    while retry < 3:
        time.sleep(5)
        try:
            c = mysql.connector.connect(
              host="dbpack3",
              port=13308,
              user="dksl",
              password="123456",
              database="order",
              autocommit=True,
            )
            if c.is_connected():
                db_Info = c.get_server_info()
                print("Connected to MySQL Server version ", db_Info)
                return c
        except Exception as e:
            print(e.args)
        retry += 1 
connection = conn()
cursor = connection.cursor(prepared=True,)
@app.route('/createSo', methods=['POST'])
def create_so():
    xid = request.headers.get('xid')
    reqs = request.get_json()
    if xid and "req" in reqs:
        for res in reqs["req"]:
            res["sysno"] = next_id()
            res["so_id"] = res["sysno"]
            res["order_date"] = datetime.now()
            res_keys = [str(k) for k,v in res.items() if k != "so_items" and str(v) != ""]
            so_master = insert_so_master.format(
                xid=xid,
                keys=", ".join(res_keys),
                placeholders=", ".join(["%s"] * len(res_keys)),
            )
            try:
                cursor.execute(so_master, tuple(res.get(k, "") for k in res_keys))
            except Exception as e:
                print(e.args)
            so_items = res["so_items"]
            for item in so_items:
                item["sysno"] = next_id()
                item["so_sysno"] = res["sysno"]
                item_keys = [str(k) for k,v in item.items() if str(v) != "" ]
                so_item = insert_so_item.format(
                    xid=xid,
                    keys=", ".join(item_keys),
                    placeholders=", ".join(["%s"] * len(item_keys)),
                )
                try:
                    cursor.execute(so_item, tuple(item.get(k, "") for k in item_keys))
                except Exception as e:
                    print(e.args)
        return jsonify(dict(success=True, message="success")), 200
    return jsonify(dict(success=False, message="failed")), 400 
def next_id():
    return random.randrange(0, 9223372036854775807)
if __name__ == '__main__':
   app.run(host="0.0.0.0", port=3001)

注意到 sql 中以注解的形式添加使用了 xid ,主要是方便配合 dbpack 识别后做出相应的分布式事务处理,也就是回滚还是commit。

这里数据库连接使用 autocommit 这种方式。同时,使用 python 中的 mysql.connector 这个 lib 来支持 sql 传输中的二段式加密传输协议,见代码中声明的prepared=true

用以下命令,使用 dbpack 代理 order 微服务:

./dist/dbpack start --config ../dbpack-samples/configs/config-order.yaml

最后是产品库存系统,详细代码如下:

from flask import Flask, jsonify, request
import time
import mysql.connector
app = Flask(__name__)
allocate_inventory_sql = "update /*+ XID('{xid}') */ product.inventory set available_qty = available_qty - %s, allocated_qty = allocated_qty + %s where product_sysno = %s and available_qty >= %s;"
def conn():
    retry = 0
    while retry < 3:
        time.sleep(5)
        try:
            c = mysql.connector.connect(
              host="dbpack2",
              port=13307,
              user="dksl",
              password="123456",
              database="product",
              autocommit=True,
            )
            if c.is_connected():
                db_Info = c.get_server_info()
                print("Connected to MySQL Server version ", db_Info)
                return c
        except Exception as e:
            print(e.args)
        retry += 1 
connection = conn()
cursor = connection.cursor(prepared=True,)
@app.route('/allocateInventory', methods=['POST'])
def create_so():
    xid = request.headers.get('xid')
    reqs = request.get_json()
    if xid and "req" in reqs:
        for res in reqs["req"]:
            try:
                cursor.execute(allocate_inventory_sql.format(xid=xid), (res["qty"], res["qty"], res["product_sysno"], res["qty"],))
            except Exception as e:
                print(e.args)
        return jsonify(dict(success=True, message="success")), 200
    return jsonify(dict(success=False, message="failed")), 400
if __name__ == '__main__':
    app.run(host="0.0.0.0", port=3002)

同样,用以下命令使用 dbpack 代理 product 微服务:

./dist/dbpack start --config ../dbpack-samples/configs/config-product.yaml

我们可以使用docker-compose一键拉起以上三个微服务:

docker-compose up

正常情况下,以下请求会触发订单系统和产品库存系统的正常 commit:

curl -XPOST http://localhost:13000/v1/order/create

而以下命令虽然正常请求了订单系统和产品库存的 API,不管事务是否正常执行,由于事务发起方状态码不正常,要求"回滚",所以会导致已经 commit 的微服务发生回滚,以此保证分布式系统的一致性:

curl -XPOST http://localhost:13000/v1/order/create2

参考资料


作者简介

朱晗,开源爱好者,目前就职于中国电子云

相关实践学习
如何快速连接云数据库RDS MySQL
本场景介绍如何通过阿里云数据管理服务DMS快速连接云数据库RDS MySQL,然后进行数据表的CRUD操作。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
25天前
|
机器学习/深度学习 数据可视化 TensorFlow
使用Python实现深度学习模型的分布式训练
使用Python实现深度学习模型的分布式训练
167 73
|
8天前
|
数据采集 人工智能 分布式计算
🚀 MaxFrame 产品深度体验评测:Python 分布式计算的未来
在数据驱动的时代,大数据分析和AI模型训练对数据预处理的效率要求极高。传统的Pandas工具在小数据集下表现出色,但面对大规模数据时力不从心。阿里云推出的Python分布式计算框架MaxFrame,以“Pandas风格”为核心设计理念,旨在降低分布式计算门槛,同时支持超大规模数据处理。MaxFrame不仅保留了Pandas的操作习惯,还通过底层优化实现了高效的分布式调度、内存管理和容错机制,并深度集成阿里云大数据生态。本文将通过实践评测,全面解析MaxFrame的能力与价值,展示其在大数据和AI场景中的卓越表现。
24 4
🚀 MaxFrame 产品深度体验评测:Python 分布式计算的未来
|
3天前
|
SQL 分布式计算 DataWorks
MaxCompute MaxFrame评测 | 分布式Python计算服务MaxFrame(完整操作版)
在当今数字化迅猛发展的时代,数据信息的保存与分析对企业决策至关重要。MaxCompute MaxFrame是阿里云自研的分布式计算框架,支持Python编程接口、兼容Pandas接口并自动进行分布式计算。通过MaxCompute的海量计算资源,企业可以进行大规模数据处理、可视化数据分析及科学计算等任务。本文将详细介绍如何开通MaxCompute和DataWorks服务,并使用MaxFrame进行数据操作。包括创建项目、绑定数据源、编写PyODPS 3节点代码以及执行SQL查询等内容。最后,针对使用过程中遇到的问题提出反馈建议,帮助用户更好地理解和使用MaxFrame。
|
4天前
|
分布式计算 数据处理 MaxCompute
云产品评测|分布式Python计算服务MaxFrame
云产品评测|分布式Python计算服务MaxFrame
31 2
|
8天前
|
人工智能 分布式计算 数据处理
有奖评测,基于分布式 Python 计算服务 MaxFrame 进行数据处理
阿里云MaxCompute MaxFrame推出分布式Python计算服务MaxFrame评测活动,助力开发者高效完成大规模数据处理、可视化探索及ML/AI开发。活动时间为2024年12月17日至2025年1月31日,参与者需体验MaxFrame并发布评测文章,有机会赢取精美礼品。
|
8天前
|
Java 关系型数据库 数据库
微服务SpringCloud分布式事务之Seata
SpringCloud+SpringCloudAlibaba的Seata实现分布式事务,步骤超详细,附带视频教程
29 1
|
20天前
|
人工智能 分布式计算 数据处理
云产品评测:MaxFrame — 分布式Python计算服务的最佳实践与体验
阿里云推出的MaxFrame是一款高性能分布式计算平台,专为大规模数据处理和AI应用设计。它提供了强大的Python编程接口,支持分布式Pandas操作,显著提升数据处理速度(3-5倍)。MaxFrame在大语言模型数据处理中表现出色,具备高效内存管理和任务调度能力。然而,在开通流程、API文档及功能集成度方面仍有改进空间。总体而言,MaxFrame在易用性和计算效率上具有明显优势,但在开放性和社区支持方面有待加强。
45 9
|
22天前
|
分布式计算 大数据 数据处理
技术评测:MaxCompute MaxFrame——阿里云自研分布式计算框架的Python编程接口
随着大数据和人工智能技术的发展,数据处理的需求日益增长。阿里云推出的MaxCompute MaxFrame(简称“MaxFrame”)是一个专为Python开发者设计的分布式计算框架,它不仅支持Python编程接口,还能直接利用MaxCompute的云原生大数据计算资源和服务。本文将通过一系列最佳实践测评,探讨MaxFrame在分布式Pandas处理以及大语言模型数据处理场景中的表现,并分析其在实际工作中的应用潜力。
58 2
|
23天前
|
人工智能 分布式计算 数据处理
云产品评测:分布式Python计算服务MaxFrame
云产品评测:分布式Python计算服务MaxFrame
62 3
|
1月前
|
存储 运维 数据可视化
如何为微服务实现分布式日志记录
如何为微服务实现分布式日志记录
71 1