带你十天轻松搞定 Go 微服务之大结局(分布式事务)

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS PostgreSQL,集群系列 2核4GB
简介: 带你十天轻松搞定 Go 微服务之大结局(分布式事务)

序言

我们通过一个系列文章跟大家详细展示一个 go-zero 微服务示例,整个系列分十篇文章,目录结构如下:

  1. 环境搭建:带你十天轻松搞定 Go 微服务系列(一)
  2. 服务拆分:带你十天轻松搞定 Go 微服务系列(二)
  3. 用户服务:带你十天轻松搞定 Go 微服务系列(三)
  4. 产品服务:带你十天轻松搞定 Go 微服务系列(四)
  5. 订单服务:带你十天轻松搞定 Go 微服务系列(五)
  6. 支付服务:带你十天轻松搞定 Go 微服务系列(六)
  7. RPC 服务 Auth 验证:带你十天轻松搞定 Go 微服务系列(七)
  8. 服务监控:带你十天轻松搞定 Go 微服务系列(八、服务监控)
  9. 链路追踪:带你十天轻松搞定 Go 微服务系列(九、链路追踪)
  10. 分布式事务(本文)

期望通过本系列带你在本机利用 Docker 环境利用 go-zero 快速开发一个商城系统,让你快速上手微服务。

完整示例代码:https://github.com/nivin-studio/go-zero-mall

首先,我们来看一下整体的服务拆分图:

10.1 DTM 介绍

DTM 是一款 golang开发的分布式事务管理器,解决了跨数据库、跨服务、跨语言栈更新数据的一致性问题。

绝大多数的订单系统的事务都会跨服务,因此都有更新数据一致性的需求,都可以通过 DTM 大幅简化架构,形成一个优雅的解决方案。

而且 DTM 已经深度合作,原生的支持go-zero中的分布式事务,下面就来详细的讲解如何用 DTM 来帮助我们的订单系统解决一致性问题

10.2 go-zero 使用 DTM

首先我们回顾下 第五章 订单服务 中 order rpc 服务中 Create 接口处理逻辑。方法里判断了用户和产品的合法性,以及产品库存是否充足,最后通过 OrderModel 创建了一个新的订单,以及调用 product rpc 服务 Update 的接口更新了产品的库存。

func (l *CreateLogic) Create(in *order.CreateRequest) (*order.CreateResponse, error) {
 // 查询用户是否存在
 _, err := l.svcCtx.UserRpc.UserInfo(l.ctx, &user.UserInfoRequest{
  Id: in.Uid,
 })
 if err != nil {
  return nil, err
 }
 // 查询产品是否存在
 productRes, err := l.svcCtx.ProductRpc.Detail(l.ctx, &product.DetailRequest{
  Id: in.Pid,
 })
 if err != nil {
  return nil, err
 }
 // 判断产品库存是否充足
 if productRes.Stock <= 0 {
  return nil, status.Error(500, "产品库存不足")
 }
 newOrder := model.Order{
  Uid:    in.Uid,
  Pid:    in.Pid,
  Amount: in.Amount,
  Status: 0,
 }
 res, err := l.svcCtx.OrderModel.Insert(&newOrder)
 if err != nil {
  return nil, status.Error(500, err.Error())
 }
 newOrder.Id, err = res.LastInsertId()
 if err != nil {
  return nil, status.Error(500, err.Error())
 }
 _, err = l.svcCtx.ProductRpc.Update(l.ctx, &product.UpdateRequest{
  Id:     productRes.Id,
  Name:   productRes.Name,
  Desc:   productRes.Desc,
  Stock:  productRes.Stock - 1,
  Amount: productRes.Amount,
  Status: productRes.Status,
 })
 if err != nil {
  return nil, err
 }
 return &order.CreateResponse{
  Id: newOrder.Id,
 }, nil
}

之前我们说过,这里处理逻辑存在数据一致性问题,有可能订单创建成功了,但是在更新产品库存的时候可能会发生失败,这时候就会存在订单创建成功,产品库存没有减少的情况。

因为这里的产品库存更新是跨服务操作的,也没有办法使用本地事务来处理,所以我们需要使用分布式事务来处理它。这里我们需要借助 DTMSAGA 协议来实现订单创建和产品库存更新的跨服务分布式事务操作。

大家可以先移步到 DTM 的文档先了接下 SAGA事务模式。

10.2.1 添加 DTM 服务配置

参见 第一章 环境搭建,修改 dtm->config.yml 配置文件。我们只要修改 MicroService 中的 TargetEndPoint 配置即可,将 dtm 注册到 etcd 中。

# ......
# 微服务
MicroService:
  Driver: 'dtm-driver-gozero'           # 要处理注册/发现的驱动程序的名称
  Target: 'etcd://etcd:2379/dtmservice' # 注册 dtm 服务的 etcd 地址
  EndPoint: 'dtm:36790'
# ......

10.2.2 添加 dtm_barrier 数据表

微服务是一个分布式系统,因此可能发生各种异常,例如网络抖动导致重复请求,这类的异常会让业务处理异常复杂。而 DTM 中,首创了 子事务屏障 技术,使用该技术,能够非常便捷的解决异常问题,极大的降低了分布式事务的使用门槛。

使用 DTM 提供的子事务屏障技术则需要在业务数据库中创建子事务屏障相关的表,建表语句如下:

create database if not exists dtm_barrier
/*!40100 DEFAULT CHARACTER SET utf8mb4 */
;
drop table if exists dtm_barrier.barrier;
create table if not exists dtm_barrier.barrier(
  id bigint(22) PRIMARY KEY AUTO_INCREMENT,
  trans_type varchar(45) default '',
  gid varchar(128) default '',
  branch_id varchar(128) default '',
  op varchar(45) default '',
  barrier_id varchar(45) default '',
  reason varchar(45) default '' comment 'the branch type who insert this record',
  create_time datetime DEFAULT now(),
  update_time datetime DEFAULT now(),
  key(create_time),
  key(update_time),
  UNIQUE key(gid, branch_id, op, barrier_id)
);

注意:库名和表名请勿修改,如果您自定义了表名,请在使用前调用 dtmcli.SetBarrierTableName

10.2.3 修改 OrderModelProductModel

在每一个子事务中,很多操作逻辑,需要使用到本地事务,所以我们添加一些 model 方法兼容 DTM 的子事务屏障

$ vim mall/service/order/model/ordermodel.go
package model
......
type (
 OrderModel interface {
  TxInsert(tx *sql.Tx, data *Order) (sql.Result, error)
  TxUpdate(tx *sql.Tx, data *Order) error
 }
)
......
func (m *defaultOrderModel) TxInsert(tx *sql.Tx, data *Order) (sql.Result, error) {
 query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?)", m.table, orderRowsExpectAutoSet)
 ret, err := tx.Exec(query, data.Uid, data.Pid, data.Amount, data.Status)
 return ret, err
}
func (m *defaultOrderModel) TxUpdate(tx *sql.Tx, data *Order) error {
 productIdKey := fmt.Sprintf("%s%v", cacheOrderIdPrefix, data.Id)
 _, err := m.Exec(func(conn sqlx.SqlConn) (result sql.Result, err error) {
  query := fmt.Sprintf("update %s set %s where `id` = ?", m.table, orderRowsWithPlaceHolder)
  return tx.Exec(query, data.Uid, data.Pid, data.Amount, data.Status, data.Id)
 }, productIdKey)
 return err
}
func (m *defaultOrderModel) FindOneByUid(uid int64) (*Order, error) {
 var resp Order
 query := fmt.Sprintf("select %s from %s where `uid` = ? order by create_time desc limit 1", orderRows, m.table)
 err := m.QueryRowNoCache(&resp, query, uid)
 switch err {
 case nil:
  return &resp, nil
 case sqlc.ErrNotFound:
  return nil, ErrNotFound
 default:
  return nil, err
 }
}
$ vim mall/service/product/model/productmodel.go
package model
......
type (
 ProductModel interface {
  TxAdjustStock(tx *sql.Tx, id int64, delta int) (sql.Result, error)
 }
)
......
func (m *defaultProductModel) TxAdjustStock(tx *sql.Tx, id int64, delta int) (sql.Result, error) {
 productIdKey := fmt.Sprintf("%s%v", cacheProductIdPrefix, id)
 return m.Exec(func(conn sqlx.SqlConn) (result sql.Result, err error) {
  query := fmt.Sprintf("update %s set stock=stock+? where stock >= -? and id=?", m.table)
  return tx.Exec(query, delta, delta, id)
 }, productIdKey)
}

10.2.4 修改 product rpc 服务

  • 添加 DecrStock, DecrStockRevert 接口方法
    我们需要为 product rpc 服务添加 DecrStockDecrStockRevert 两个接口方法,分别用于产品库存更新 和 产品库存更新的补偿。
$ vim mall/service/product/rpc/product.proto
syntax = "proto3";
package productclient;
option go_package = "product";
......
// 减产品库存
message DecrStockRequest {
    int64 id = 1;
    int64 num = 2;
}
message DecrStockResponse {
}
// 减产品库存
service Product {
    ......
    rpc DecrStock(DecrStockRequest) returns(DecrStockResponse);
    rpc DecrStockRevert(DecrStockRequest) returns(DecrStockResponse);
}

提示:修改后使用 goctl 工具重新生成下代码。

  • 实现 DecrStock 接口方法
    在这里只有库存不足时,我们不需要再重试,直接回滚。
$ vim mall/service/product/rpc/internal/logic/decrstocklogic.go
package logic
import (
 "context"
 "database/sql"
 "mall/service/product/rpc/internal/svc"
 "mall/service/product/rpc/product"
 "github.com/dtm-labs/dtmcli"
 "github.com/dtm-labs/dtmgrpc"
 "github.com/tal-tech/go-zero/core/logx"
 "github.com/tal-tech/go-zero/core/stores/sqlx"
 "google.golang.org/grpc/codes"
 "google.golang.org/grpc/status"
)
type DecrStockLogic struct {
 ctx    context.Context
 svcCtx *svc.ServiceContext
 logx.Logger
}
func NewDecrStockLogic(ctx context.Context, svcCtx *svc.ServiceContext) *DecrStockLogic {
 return &DecrStockLogic{
  ctx:    ctx,
  svcCtx: svcCtx,
  Logger: logx.WithContext(ctx),
 }
}
func (l *DecrStockLogic) DecrStock(in *product.DecrStockRequest) (*product.DecrStockResponse, error) {
 // 获取 RawDB
 db, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB()
 if err != nil {
  return nil, status.Error(500, err.Error())
 }
 // 获取子事务屏障对象
 barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)
 if err != nil {
  return nil, status.Error(500, err.Error())
 }
 // 开启子事务屏障
 err = barrier.CallWithDB(db, func(tx *sql.Tx) error {
  // 更新产品库存
  result, err := l.svcCtx.ProductModel.TxAdjustStock(tx, in.Id, -1)
  if err != nil {
   return err
  }
  affected, err := result.RowsAffected()
  // 库存不足,返回子事务失败
  if err == nil && affected == 0 {
   return dtmcli.ErrFailure
  }
  return err
 })
 // 这种情况是库存不足,不再重试,走回滚
 if err == dtmcli.ErrFailure {
  return nil, status.Error(codes.Aborted, dtmcli.ResultFailure)
 }
 if err != nil {
  return nil, err
 }
 return &product.DecrStockResponse{}, nil
}
  • 实现 DecrStockRevert 接口方法
    DecrStock 接口方法中,产品库存是减去指定的数量,在这里我们把它给加回来。这样产品库存就回到在 DecrStock 接口方法减去之前的数量。
$ vim mall/service/product/rpc/internal/logic/decrstockrevertlogic.go
package logic
import (
 "context"
 "database/sql"
 "mall/service/product/rpc/internal/svc"
 "mall/service/product/rpc/product"
 "github.com/dtm-labs/dtmcli"
 "github.com/dtm-labs/dtmgrpc"
 "github.com/tal-tech/go-zero/core/logx"
 "github.com/tal-tech/go-zero/core/stores/sqlx"
 "google.golang.org/grpc/status"
)
type DecrStockRevertLogic struct {
 ctx    context.Context
 svcCtx *svc.ServiceContext
 logx.Logger
}
func NewDecrStockRevertLogic(ctx context.Context, svcCtx *svc.ServiceContext) *DecrStockRevertLogic {
 return &DecrStockRevertLogic{
  ctx:    ctx,
  svcCtx: svcCtx,
  Logger: logx.WithContext(ctx),
 }
}
func (l *DecrStockRevertLogic) DecrStockRevert(in *product.DecrStockRequest) (*product.DecrStockResponse, error) {
 // 获取 RawDB
 db, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB()
 if err != nil {
  return nil, status.Error(500, err.Error())
 }
 // 获取子事务屏障对象
 barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)
 if err != nil {
  return nil, status.Error(500, err.Error())
 }
 // 开启子事务屏障
 err = barrier.CallWithDB(db, func(tx *sql.Tx) error {
  // 更新产品库存
  _, err := l.svcCtx.ProductModel.TxAdjustStock(tx, in.Id, 1)
  return err
 })
 if err != nil {
  return nil, err
 }
 return &product.DecrStockResponse{}, nil
}

10.2.5 修改 order rpc 服务

  • 添加 CreateRevert 接口方法
    order rpc 服务中已经有 Create 接口方法、我们需要创建它的补偿接口方法 DecrStockRevert
$ vim mall/service/order/rpc/order.proto
syntax = "proto3";
package orderclient;
option go_package = "order";
......
service Order {
    rpc Create(CreateRequest) returns(CreateResponse);
    rpc CreateRevert(CreateRequest) returns(CreateResponse);
    ......
}

提示:修改后使用 goctl 工具重新生成下代码。

  • 修改 Create 接口方法
    原来 Create 接口方法中产品库存判断和更新操作,我们已经在 product rpc DecrStock 接口方法中实现了,所以我们这里只要创建订单一个操作即可。
$ vim mall/service/order/rpc/internal/logic/createlogic.go
package logic
import (
 "context"
 "database/sql"
 "fmt"
 "mall/service/order/model"
 "mall/service/order/rpc/internal/svc"
 "mall/service/order/rpc/order"
 "mall/service/user/rpc/user"
 "github.com/dtm-labs/dtmgrpc"
 "github.com/tal-tech/go-zero/core/logx"
 "github.com/tal-tech/go-zero/core/stores/sqlx"
 "google.golang.org/grpc/status"
)
type CreateLogic struct {
 ctx    context.Context
 svcCtx *svc.ServiceContext
 logx.Logger
}
func NewCreateLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CreateLogic {
 return &CreateLogic{
  ctx:    ctx,
  svcCtx: svcCtx,
  Logger: logx.WithContext(ctx),
 }
}
func (l *CreateLogic) Create(in *order.CreateRequest) (*order.CreateResponse, error) {
 // 获取 RawDB
 db, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB()
 if err != nil {
  return nil, status.Error(500, err.Error())
 }
 // 获取子事务屏障对象
 barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)
 if err != nil {
  return nil, status.Error(500, err.Error())
 }
 // 开启子事务屏障
 if err := barrier.CallWithDB(db, func(tx *sql.Tx) error {
  // 查询用户是否存在
  _, err := l.svcCtx.UserRpc.UserInfo(l.ctx, &user.UserInfoRequest{
   Id: in.Uid,
  })
  if err != nil {
   return fmt.Errorf("用户不存在")
  }
  newOrder := model.Order{
   Uid:    in.Uid,
   Pid:    in.Pid,
   Amount: in.Amount,
   Status: 0,
  }
  // 创建订单
  _, err = l.svcCtx.OrderModel.TxInsert(tx, &newOrder)
  if err != nil {
   return fmt.Errorf("订单创建失败")
  }
  return nil
 }); err != nil {
  return nil, status.Error(500, err.Error())
 }
 return &order.CreateResponse{}, nil
}
  • 实现 CreateRevert 接口方法
    在这个接口中我们查询用户刚刚创建的订单,把订单的状态改为 9(无效状态)
$ vim mall/service/order/rpc/internal/logic/createrevertlogic.go
package logic
import (
 "context"
 "database/sql"
 "fmt"
 "mall/service/order/rpc/internal/svc"
 "mall/service/order/rpc/order"
 "mall/service/user/rpc/user"
 "github.com/dtm-labs/dtmgrpc"
 "github.com/tal-tech/go-zero/core/logx"
 "github.com/tal-tech/go-zero/core/stores/sqlx"
 "google.golang.org/grpc/status"
)
type CreateRevertLogic struct {
 ctx    context.Context
 svcCtx *svc.ServiceContext
 logx.Logger
}
func NewCreateRevertLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CreateRevertLogic {
 return &CreateRevertLogic{
  ctx:    ctx,
  svcCtx: svcCtx,
  Logger: logx.WithContext(ctx),
 }
}
func (l *CreateRevertLogic) CreateRevert(in *order.CreateRequest) (*order.CreateResponse, error) {
 // 获取 RawDB
 db, err := sqlx.NewMysql(l.svcCtx.Config.Mysql.DataSource).RawDB()
 if err != nil {
  return nil, status.Error(500, err.Error())
 }
 // 获取子事务屏障对象
 barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)
 if err != nil {
  return nil, status.Error(500, err.Error())
 }
 // 开启子事务屏障
 if err := barrier.CallWithDB(db, func(tx *sql.Tx) error {
  // 查询用户是否存在
  _, err := l.svcCtx.UserRpc.UserInfo(l.ctx, &user.UserInfoRequest{
   Id: in.Uid,
  })
  if err != nil {
   return fmt.Errorf("用户不存在")
  }
  // 查询用户最新创建的订单
  resOrder, err := l.svcCtx.OrderModel.FindOneByUid(in.Uid)
  if err != nil {
   return fmt.Errorf("订单不存在")
  }
  // 修改订单状态9,标识订单已失效,并更新订单
  resOrder.Status = 9
  err = l.svcCtx.OrderModel.TxUpdate(tx, resOrder)
  if err != nil {
   return fmt.Errorf("订单更新失败")
  }
  return nil
 }); err != nil {
  return nil, status.Error(500, err.Error())
 }
 return &order.CreateResponse{}, nil
}

10.2.6 修改 order api 服务

我们把 order rpc 服务 CreateCreateRevert 接口方法,product rpc 服务 DecrStockDecrStockRevert 接口方法,提到 order api 服务中做成一个以 SAGA事务模式 的分布式事务操作。

  • 添加 pproduct rpc 依赖配置
$ vim mall/service/order/api/etc/order.yaml
Name: Order
Host: 0.0.0.0
Port: 8002
......
OrderRpc:
  Etcd:
    Hosts:
    - etcd:2379
    Key: order.rpc
ProductRpc:
  Etcd:
    Hosts:
    - etcd:2379
    Key: product.rpc
  • 添加 pproduct rpc 服务配置的实例化
$ vim mall/service/order/api/internal/config/config.go
package config
import (
 "github.com/tal-tech/go-zero/rest"
 "github.com/tal-tech/go-zero/zrpc"
)
type Config struct {
 rest.RestConf
 Auth struct {
  AccessSecret string
  AccessExpire int64
 }
 OrderRpc   zrpc.RpcClientConf
 ProductRpc zrpc.RpcClientConf
}
  • 注册服务上下文 pproduct rpc 的依赖
$ vim mall/service/order/api/internal/svc/servicecontext.go
package svc
import (
 "mall/service/order/api/internal/config"
 "mall/service/order/rpc/orderclient"
 "mall/service/product/rpc/productclient"
 "github.com/tal-tech/go-zero/zrpc"
)
type ServiceContext struct {
 Config config.Config
 OrderRpc   orderclient.Order
 ProductRpc productclient.Product
}
func NewServiceContext(c config.Config) *ServiceContext {
 return &ServiceContext{
  Config:     c,
  OrderRpc:   orderclient.NewOrder(zrpc.MustNewClient(c.OrderRpc)),
  ProductRpc: productclient.NewProduct(zrpc.MustNewClient(c.ProductRpc)),
 }
}
  • 添加导入 gozerodtm 驱动
$ vim mall/service/order/api/order.go
package main
import (
 ......
 _ "github.com/dtm-labs/driver-gozero" // 添加导入 `gozero` 的 `dtm` 驱动
)
var configFile = flag.String("f", "etc/order.yaml", "the config file")
func main() {
 ......
}
  • 修改 order api Create 接口方法
$ vim mall/service/order/api/internal/logic/createlogic.go
package logic
import (
 "context"
 "mall/service/order/api/internal/svc"
 "mall/service/order/api/internal/types"
 "mall/service/order/rpc/order"
 "mall/service/product/rpc/product"
 "github.com/dtm-labs/dtmgrpc"
 "github.com/tal-tech/go-zero/core/logx"
 "google.golang.org/grpc/status"
)
type CreateLogic struct {
 logx.Logger
 ctx    context.Context
 svcCtx *svc.ServiceContext
}
func NewCreateLogic(ctx context.Context, svcCtx *svc.ServiceContext) CreateLogic {
 return CreateLogic{
  Logger: logx.WithContext(ctx),
  ctx:    ctx,
  svcCtx: svcCtx,
 }
}
func (l *CreateLogic) Create(req types.CreateRequest) (resp *types.CreateResponse, err error) {
 // 获取 OrderRpc BuildTarget
 orderRpcBusiServer, err := l.svcCtx.Config.OrderRpc.BuildTarget()
 if err != nil {
  return nil, status.Error(100, "订单创建异常")
 }
 // 获取 ProductRpc BuildTarget
 productRpcBusiServer, err := l.svcCtx.Config.ProductRpc.BuildTarget()
 if err != nil {
  return nil, status.Error(100, "订单创建异常")
 }
 // dtm 服务的 etcd 注册地址
 var dtmServer = "etcd://etcd:2379/dtmservice"
 // 创建一个gid
 gid := dtmgrpc.MustGenGid(dtmServer)
 // 创建一个saga协议的事务
 saga := dtmgrpc.NewSagaGrpc(dtmServer, gid).
  Add(orderRpcBusiServer+"/orderclient.Order/Create", orderRpcBusiServer+"/orderclient.Order/CreateRevert", &order.CreateRequest{
   Uid:    req.Uid,
   Pid:    req.Pid,
   Amount: req.Amount,
   Status: 0,
  }).
  Add(productRpcBusiServer+"/productclient.Product/DecrStock", productRpcBusiServer+"/productclient.Product/DecrStockRevert", &product.DecrStockRequest{
   Id:  req.Pid,
   Num: 1,
  })
 // 事务提交
 err = saga.Submit()
 if err != nil {
  return nil, status.Error(500, err.Error())
 }
 return &types.CreateResponse{}, nil
}

提示:SagaGrpc.Add 方法第一个参数 action 是微服务 grpc 访问的方法路径,这个方法路径需要分别去以下文件中寻找。

mall/service/order/rpc/order/order.pb.go

mall/service/product/rpc/product/product.pb.go

按关键字 Invoke 搜索即可找到。

10.3 测试 go-zero + DTM

10.3.1 测试分布式事务正常流程

  • 使用 postman 调用 /api/product/create 接口,创建一个产品,库存 stock1

  • 使用 postman 调用 /api/order/create 接口,创建一个订单,产品ID pid1

  • 我们可以看出,产品的库存从原来的 1 已经变成了 0

  • 我们再看下子事务屏障表 barrier 里的数据,我们可以看出两个服务的操作均已经完成。

10.3.2 测试分布式事务失败流程1

  • 接着上面测试结果,此时的产品ID为 1 的库存已经是 0了, 使用 postman 调用 /api/order/create 接口,再创建一个订单。

  • 我们看下订单数据表里有一条ID为 2 产品ID为 1 的数据,它的订单数据状态为 9

  • 我们再看下子事务屏障表 barrier 里的数据,我们可以看出(gid = fqYS8CbYbK8GkL8SCuTRUF)第一个服务(branch_id = 01)子事务屏障操作是正常,第二个服务(branch_id = 02)子事务屏障操作失败,要求补偿。于是两个服务都发生了补偿的操作记录。

  • 这个分布式事务的操作流程
  1. 首先 DTM 服务会调 order rpc Create 接口进行创建订单处理。
  2. 创建订单完成后 DTM 服务再调 product rpc DecrStock 接口,这个接口的里通过 pid 更新产品库存,因产品库存不足,抛出事务失败。
  3. DTM 服务发起补偿机制,调 order rpc CreateRevert 接口进行订单的补偿处理。
  4. DTM 服务发起补偿机制,调 product rpc DecrStockRevert 接口进行产品库存更新的补偿处理。但是因为在 product rpc DecrStock 接口的子事务屏障内,业务处理并未成功。所以在 DecrStockRevert 接口里不会执行子事务屏障内的业务逻辑。

10.3.3 测试分布式事务失败流程2

  • 我们在数据库中手动将产品ID为 1 库存修改为100,然后在 product rpc DecrStock 接口方法中子事务屏障外,人为的制造异常失败。

  • 使用 postman 调用 /api/order/create 接口,再创建一个订单,产品ID pid1

  • 我们分别来看下订单数据表和产品数据表,订单数据表ID为 3 的订单,它的订单数据状态为 9。产品数据表ID为 1 的产品,它的库存还是 100 且数据更新时间也发生了变化。

  • 我们再看下子事务屏障表 barrier 里的数据,我们可以看出(gid = ZbjYHv2jNra7RMwyWjB5Lc)第一个服务(branch_id = 01)子事务屏障操作是正常,第二个服务(branch_id = 02)子事务屏障操作也是正常。因为在 product rpc DecrStock 接口方法中子事务屏障外,我们人为的制造异常失败,所以两个服务发生了补偿的操作记录。

大家可以对比下 测试分布式事务失败流程1 与 测试分布式事务失败流程2 不同之处,是不是能发现和体会到 DTM 的这个子事务屏障技术的强大之处。

子事务屏障会自动识别正向操作是否已执行,失败流程1未执行业务操作,所以补偿时,也不会执行补偿的业务操作;失败流程2执行了业务操作,所以补偿时,也会执行补偿的业务操作。

项目地址

https://github.com/zeromicro/go-zero

欢迎使用 go-zerostar 支持我们!

关于勘误

由于公众号只能修改20个字,所以无法勘误,文章如有疑惑,请查看:

https://www.zhihu.com/people/kevwan/posts

所有勘误都会在这里修改。

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
19天前
|
Go API Docker
热门go与微服务15
热门go与微服务15
28 2
|
19天前
|
安全 应用服务中间件 API
微服务分布式系统架构之zookeeper与dubbo-2
微服务分布式系统架构之zookeeper与dubbo-2
|
19天前
|
负载均衡 Java 应用服务中间件
微服务分布式系统架构之zookeeper与dubbor-1
微服务分布式系统架构之zookeeper与dubbor-1
|
2月前
|
监控 Go 微服务
带你十天轻松搞定 Go 微服务系列全集+勘误
带你十天轻松搞定 Go 微服务系列全集+勘误
|
2月前
|
消息中间件 人工智能 供应链
go-zero 微服务实战系列(二、服务拆分)
go-zero 微服务实战系列(二、服务拆分)
|
1月前
|
缓存 安全 Java
如何利用Go语言提升微服务架构的性能
在当今的软件开发中,微服务架构逐渐成为主流选择,它通过将应用程序拆分为多个小服务来提升灵活性和可维护性。然而,如何确保这些微服务高效且稳定地运行是一个关键问题。Go语言,以其高效的并发处理能力和简洁的语法,成为解决这一问题的理想工具。本文将探讨如何通过Go语言优化微服务架构的性能,包括高效的并发编程、内存管理技巧以及如何利用Go生态系统中的工具来提升服务的响应速度和资源利用率。
|
2月前
|
消息中间件 缓存 Kafka
go-zero微服务实战系列(八、如何处理每秒上万次的下单请求)
go-zero微服务实战系列(八、如何处理每秒上万次的下单请求)
|
2月前
|
缓存 NoSQL Redis
go-zero微服务实战系列(七、请求量这么高该如何优化)
go-zero微服务实战系列(七、请求量这么高该如何优化)
|
2月前
|
缓存 NoSQL 数据库
go-zero微服务实战系列(五、缓存代码怎么写)
go-zero微服务实战系列(五、缓存代码怎么写)
|
2月前
|
Java 数据库连接 微服务
揭秘微服务架构下的数据魔方:Hibernate如何玩转分布式持久化,实现秒级响应的秘密武器?
【8月更文挑战第31天】微服务架构通过将系统拆分成独立服务,提升了可维护性和扩展性,但也带来了数据一致性和事务管理等挑战。Hibernate 作为强大的 ORM 工具,在微服务中发挥关键作用,通过二级缓存和分布式事务支持,简化了对象关系映射,并提供了有效的持久化策略。其二级缓存机制减少数据库访问,提升性能;支持 JTA 保证跨服务事务一致性;乐观锁机制解决并发数据冲突。合理配置 Hibernate 可助力构建高效稳定的分布式系统。
54 0