微服务从代码到k8s部署应有尽有系列(九、事务精讲)

本文涉及的产品
RDS MySQL DuckDB 分析主实例,集群系列 4核8GB
RDS AI 助手,专业版
简介: 微服务从代码到k8s部署应有尽有系列(九、事务精讲)

我们用一个系列来讲解从需求到上线、从代码到k8s部署、从日志到监控等各个方面的微服务完整实践。

整个项目使用了go-zero开发的微服务,基本包含了go-zero以及相关go-zero作者开发的一些中间件,所用到的技术栈基本是go-zero项目组的自研组件,基本是go-zero全家桶了。

实战项目地址:https://github.com/Mikaelemmmm/go-zero-looklook

关于分布式事务

因为本项目服务划分相对独立一些,所以目前没有使用到分布式事务,不过go-zero结合dtm使用分布式事务的最佳实践,我有整理demo,这里就介绍一下go-zero结合dtm的使用,项目地址go-zero结合dtm最佳实践仓库地址 :https://github.com/Mikaelemmmm/gozerodtm

【注】下面说的不是go-zero-looklook项目,是这个项目 https://github.com/Mikaelemmmm/gozerodtm

一、注意事项

  • go-zero 1.2.4版本以上,这个一定要注意
  • dtm 你用最新的就行了

二、clone dtm

git clone https://github.com/yedf/dtm.git

三、配置文件

1、找到项目跟文件夹下的conf.sample.yml

2、cp conf.sample.yml   conf.yml

3、使用etcd , 把配置中下面这段注释打开  (如果没用etcd就更简单了 ,这个都省了,直接链接到dtm server地址就可以了)

MicroService:
  Driver: 'dtm-driver-gozero' # name of the driver to handle register/discover
  Target: 'etcd://localhost:2379/dtmservice' # register dtm server to this url
  EndPoint: 'localhost:36790'

解释一下:

MicroService 这个不要动,这个代表要对把dtm注册到那个微服务服务集群里面去,使微服务集群内部服务可以通过grpc直接跟dtm交互

Driver :'dtm-driver-gozero' ,  使用go-zero的注册服务发现驱动,支持go-zero

Target: 'etcd://localhost:2379/dtmservice'  将当前dtm的server直接注册到微服务所在的etcd集群中,如果go-zero作为微服务使用的话,就可以直接通过etcd拿到dtm的server grpc链接,直接就可以跟dtm server交互了

EndPoint: 'localhost:36790' , 代表的是dtm的server的连接地址+端口 , 集群中的微服务可以直接通过etcd获得此地址跟dtm交互了,

如果你自己去改了dtm源码grpc端口,记得这里要改下端口

四、启动dtm server

在dtm项目根目录下

go run app/main.go dev

五、使用go-zero的grpc对接dtm

这是一个快速下单扣商品库存的例子

1、order-api

order-api是http服务入口创建订单

service order {
   @doc "创建订单"
   @handler create
   post /order/quickCreate (QuickCreateReq) returns (QuickCreateResp)
}

接下来看logic

func (l *CreateLogic) Create(req types.QuickCreateReq,r *http.Request) (*types.QuickCreateResp, error) {
 orderRpcBusiServer, err := l.svcCtx.Config.OrderRpcConf.BuildTarget()
 if err != nil{
  return nil,fmt.Errorf("下单异常超时")
 }
 stockRpcBusiServer, err := l.svcCtx.Config.StockRpcConf.BuildTarget()
 if err != nil{
  return nil,fmt.Errorf("下单异常超时")
 }
 createOrderReq:= &order.CreateReq{UserId: req.UserId,GoodsId: req.GoodsId,Num: req.Num}
 deductReq:= &stock.DecuctReq{GoodsId: req.GoodsId,Num: req.Num}
 // 这里只举了saga例子,tcc等其他例子基本没啥区别具体可以看dtm官网
 gid := dtmgrpc.MustGenGid(dtmServer)
 saga := dtmgrpc.NewSagaGrpc(dtmServer, gid).
  Add(orderRpcBusiServer+"/pb.order/create", orderRpcBusiServer+"/pb.order/createRollback", createOrderReq).
  Add(stockRpcBusiServer+"/pb.stock/deduct", stockRpcBusiServer+"/pb.stock/deductRollback", deductReq)
 err = saga.Submit()
 dtmimp.FatalIfError(err)
 if err != nil{
  return nil,fmt.Errorf("submit data to  dtm-server err  : %+v \n",err)
 }
 return &types.QuickCreateResp{}, nil
}

进入到下单逻辑时,分别获取order订单、stock库存服务的rpc在etcd中的地址,使用BuildTarget()这个方法

然后创建order、stock对应的请求参数

请求dtm获取全局事务id , 基于此全局事务id开启grpc的saga分布式事务 ,将创建订单、扣减库存请求放入事务中,这里使用grpc形式请求,每个业务要有一个正向请求、一个回滚请求、以及请求参数,当执行其中任何一个业务正向请求出错了会自动调用事务中所有业务回滚请求达到回滚效果。

2、order-srv

order-srv是订单的rpc服务,与dtm-gozero-order数据库中order表交互

// service
service order {
   rpc create(CreateReq)returns(CreateResp);
   rpc createRollback(CreateReq)returns(CreateResp);
}

2.1 Create

当order-api提交事务默认请求的是create方法,我们看看logic

func (l *CreateLogic) Create(in *pb.CreateReq) (*pb.CreateResp, error) {
   fmt.Printf("创建订单 in : %+v \n", in)
   // barrier防止空补偿、空悬挂等具体看dtm官网即可,别忘记加barrier表在当前库中,因为判断补偿与要执行的sql一起本地事务
   barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)
   db, err := sqlx.NewMysql(l.svcCtx.Config.DB.DataSource).RawDB()
   if err != nil {
      // !!!一般数据库不会错误不需要dtm回滚,就让他一直重试,这时候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具体自己把控!!!
      return nil, status.Error(codes.Internal, err.Error())
   }
   if err := barrier.CallWithDB(db, func(tx *sql.Tx) error {
      order := new(model.Order)
      order.GoodsId = in.GoodsId
      order.Num = in.Num
      order.UserId = in.UserId
      _, err = l.svcCtx.OrderModel.Insert(tx, order)
      if err != nil {
         return fmt.Errorf("创建订单失败 err : %v , order:%+v \n", err, order)
      }
      return nil
   }); err != nil {
      // !!!一般数据库不会错误不需要dtm回滚,就让他一直重试,这时候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具体自己把控!!!
      return nil, status.Error(codes.Internal, err.Error())
   }
   return &pb.CreateResp{}, nil
}

可以看到,一进入方法内部我们就使用了dtm的子事务屏障技术,至于为什么使用子事务屏障是因为可能会出现重复请求或者空请求造成的脏数据等,在这里dtm自动给我们做了幂等处理不需要我们自己在额外做了,同时保证他内部的幂等处理跟我们自己执行的事务要在一个事务中,所以要使用一个会话的db链接,这时候我们就要先获取

db, err := sqlx.NewMysql(l.svcCtx.Config.DB.DataSource).RawDB()

然后基于此db连接dtm在内部通过sql执行做幂等处理,同时我们基于此db连接开启事务,这样就能保证dtm内部的子事务屏障在执行sql操作与我们自己业务执行的sql操作在一个事务中。

dtm在使用grpc调用我们业务的时候,我们的grpc服务在返回给dtm server错误时候,dtm会根据我们返回给它的grpc错误码判断是要执行回滚操作还是一直重试:

  • codes.Internal :dtm server不会调用回滚,会一直重试,每次重试dtm的数据库中都会加一次重试次数,自己可以监控这个重试次数报警,人工处理
  • codes.Aborted :    dtm server会调用所有回滚请求,执行回滚操作

如果dtm在调用grpc返回错误是nil时候,就认为调用成功了

2.2 CreateRollback

当我们调用订单的创建订单或者库存扣减时候返回给dtm server 的codes.Aborted时候,dtm server会调用所有回滚操作,CreateRollback就是对应订单下单的回滚操作,代码如下

func (l *CreateRollbackLogic) CreateRollback(in *pb.CreateReq) (*pb.CreateResp, error) {
 fmt.Printf("订单回滚  , in: %+v \n", in)
 order, err := l.svcCtx.OrderModel.FindLastOneByUserIdGoodsId(in.UserId, in.GoodsId)
 if err != nil && err != model.ErrNotFound {
  // !!!一般数据库不会错误不需要dtm回滚,就让他一直重试,这时候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具体自己把控!!!
  return nil, status.Error(codes.Internal, err.Error())
 }
 if order != nil {
  barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)
  db, err := l.svcCtx.OrderModel.SqlDB()
  if err != nil {
   // !!!一般数据库不会错误不需要dtm回滚,就让他一直重试,这时候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具体自己把控!!!
   return nil, status.Error(codes.Internal, err.Error())
  }
  if err := barrier.CallWithDB(db, func(tx *sql.Tx) error {
   order.RowState = -1
   if err := l.svcCtx.OrderModel.Update(tx, order); err != nil {
    return fmt.Errorf("回滚订单失败  err : %v , userId:%d , goodsId:%d", err, in.UserId, in.GoodsId)
   }
   return nil
  }); err != nil {
   logx.Errorf("err : %v \n", err)
   // !!!一般数据库不会错误不需要dtm回滚,就让他一直重试,这时候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具体自己把控!!!
   return nil, status.Error(codes.Internal, err.Error())
  }
 }
 return &pb.CreateResp{}, nil
}

其实就是如果之前下单成功了,将之前下成功的单给取消掉就是对应下单的回滚操作

3、stock-srv

3.1 Deduct

扣减库存,这里跟order的Create一样了,是下单事务内的正向操作,扣减库存,代码如下

func (l *DeductLogic) Deduct(in *pb.DecuctReq) (*pb.DeductResp, error) {
 fmt.Printf("扣库存start....")
 stock, err := l.svcCtx.StockModel.FindOneByGoodsId(in.GoodsId)
 if err != nil && err != model.ErrNotFound {
  // !!!一般数据库不会错误不需要dtm回滚,就让他一直重试,这时候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具体自己把控!!!
  return nil, status.Error(codes.Internal, err.Error())
 }
 if stock == nil || stock.Num < in.Num {
  // 【回滚】库存不足确定需要dtm直接回滚,直接返回 codes.Aborted, dtmcli.ResultFailure 才可以回滚
  return nil, status.Error(codes.Aborted, dtmcli.ResultFailure)
 }
 // barrier防止空补偿、空悬挂等具体看dtm官网即可,别忘记加barrier表在当前库中,因为判断补偿与要执行的sql一起本地事务
 barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)
 db, err := l.svcCtx.StockModel.SqlDB()
 if err != nil {
  // !!!一般数据库不会错误不需要dtm回滚,就让他一直重试,这时候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具体自己把控!!!
  return nil, status.Error(codes.Internal, err.Error())
 }
 if err := barrier.CallWithDB(db, func(tx *sql.Tx) error {
  sqlResult,err := l.svcCtx.StockModel.DecuctStock(tx, in.GoodsId, in.Num)
  if err != nil{
   // !!!一般数据库不会错误不需要dtm回滚,就让他一直重试,这时候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具体自己把控!!!
   return status.Error(codes.Internal, err.Error())
  }
  affected, err := sqlResult.RowsAffected()
  if err != nil{
   // !!!一般数据库不会错误不需要dtm回滚,就让他一直重试,这时候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具体自己把控!!!
   return status.Error(codes.Internal, err.Error())
  }
  // 如果是影响行数为0,直接就告诉dtm失败不需要重试了
  if affected <= 0 {
   return  status.Error(codes.Aborted,  dtmcli.ResultFailure)
  }
  // !!开启测试!! 测试订单回滚更改状态为失效,并且当前库扣失败不需要回滚
  //return fmt.Errorf("扣库存失败 err : %v , in:%+v \n",err,in)
  return nil
 }); err != nil {
  // !!!一般数据库不会错误不需要dtm回滚,就让他一直重试,这时候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具体自己把控!!!
  return nil,err
 }
 return &pb.DeductResp{}, nil
}

这里值得注意的是当只有库存不足、或者在扣库存影响行数为0(未成功)才需要告诉dtm server要回滚,其他情况下其实都是网络抖动、硬件异常导致,应该让dtm server一直重试,当然自己要加个最大重试次数的监控报警,如果达到最大次数还未成功能实现自动发短信、打电话人工介入了。

3.2 DeductRollback

这里是对应扣库存的回滚操作

func (l *DeductRollbackLogic) DeductRollback(in *pb.DecuctReq) (*pb.DeductResp, error) {
 fmt.Printf("库存回滚 in : %+v \n", in)
 barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)
 db, err := l.svcCtx.StockModel.SqlDB()
 if err != nil {
  // !!!一般数据库不会错误不需要dtm回滚,就让他一直重试,这时候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具体自己把控!!!
  return nil, status.Error(codes.Internal, err.Error())
 }
 if err := barrier.CallWithDB(db, func(tx *sql.Tx) error {
  if err := l.svcCtx.StockModel.AddStock(tx, in.GoodsId, in.Num); err != nil {
   return fmt.Errorf("回滚库存失败 err : %v ,goodsId:%d , num :%d", err, in.GoodsId, in.Num)
  }
  return nil
 }); err != nil {
  logx.Errorf("err : %v \n", err)
  // !!!一般数据库不会错误不需要dtm回滚,就让他一直重试,这时候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具体自己把控!!!
  return nil, status.Error(codes.Internal, err.Error())
 }
 return &pb.DeductResp{}, nil
}

六、子事务屏障

这个词是dtm作者定义的,其实子事务屏障代码不多,看barrier.CallWithDB这个方法即可。

// CallWithDB the same as Call, but with *sql.DB
func (bb *BranchBarrier) CallWithDB(db *sql.DB, busiCall BarrierBusiFunc) error {   
  tx, err := db.Begin()   
  if err != nil {      
    return err   
  }   
  return bb.Call(tx, busiCall)
}

由于这个方法他在内部开启本地事务,它内部是在此事务执行了sql操作,所以在我们执行自己的业务时候必须跟它用同一个事务,那就要基于同一个db连接开事务了,so~ 你知道为什么我们要提前获取db连接了吧,目的就是要让它内部执行的sql操作跟我们的sql操作在一个事务下。至于它内部为什么执行自己的sql操作,接下来我们分析。

我们看bb.Call这个方法

// Call 子事务屏障,详细介绍见 https://zhuanlan.zhihu.com/p/388444465
// tx: 本地数据库的事务对象,允许子事务屏障进行事务操作
// busiCall: 业务函数,仅在必要时被调用
func (bb *BranchBarrier) Call(tx *sql.Tx, busiCall BarrierBusiFunc) (rerr error) {
 bb.BarrierID = bb.BarrierID + 1
 bid := fmt.Sprintf("%02d", bb.BarrierID)
 defer func() {
  // Logf("barrier call error is %v", rerr)
  if x := recover(); x != nil {
   tx.Rollback()
   panic(x)
  } else if rerr != nil {
   tx.Rollback()
  } else {
   tx.Commit()
  }
 }()
 ti := bb
 originType := map[string]string{
  BranchCancel:     BranchTry,
  BranchCompensate: BranchAction,
 }[ti.Op]
 originAffected, _ := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, originType, bid, ti.Op)
 currentAffected, rerr := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, ti.Op, bid, ti.Op)
 dtmimp.Logf("originAffected: %d currentAffected: %d", originAffected, currentAffected)
 if (ti.Op == BranchCancel || ti.Op == BranchCompensate) && originAffected > 0 || // 这个是空补偿
  currentAffected == 0 { // 这个是重复请求或者悬挂
  return
 }
 rerr = busiCall(tx)
 return
}

核心其实就是如下几行代码

originAffected, _ := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, originType, bid, ti.Op) 
currentAffected, rerr := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, ti.Op, bid, ti.Op) 
dtmimp.Logf("originAffected: %d currentAffected: %d", originAffected, currentAffected) 
if (ti.Op == BranchCancel || ti.Op == BranchCompensate) && originAffected > 0 || // 这个是空补偿  
  currentAffected == 0 { // 这个是重复请求或者悬挂  
  return 
}
rerr = busiCall(tx)
func insertBarrier(tx DB, transType string, gid string, branchID string, op string, barrierID string, reason string) (int64, error) {
  if op == "" {  
    return 0, nil 
 } 
  sql := dtmimp.GetDBSpecial().GetInsertIgnoreTemplate("dtm_barrier.barrier(trans_type, gid, branch_id, op, barrier_id, reason) values(?,?,?,?,?,?)", "uniq_barrier") 
  return dtmimp.DBExec(tx, sql, transType, gid, branchID, op, barrierID, reason)
}

每一个业务逻辑,dtm server在正常成功请求时候,  ti.Op 默认正常执行的操作是action,所以正常第一次请求都是ti.Op值都是action,那originType就是“”

originAffected, _ := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, originType, bid, ti.Op)

那么上面这个sql就不会执行因为ti.Op == "" 在insertBarrier中直接return了

currentAffected, rerr := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, ti.Op, bid, ti.Op)

那第二个sql的ti.Op是 action, 所以子事务屏障表barrier就会插入一条数据

同理在执行库存也会插入一条

1、整个事务都成功的子事务屏障

那在一次下订单正常成功请求下,由于 ti.Op都是action,所以originType都是"" , 所以不管是下单的barrier 还是扣库存的barrier,在执行他们2次barrier insert时候,originAffected都会忽略,因为originType==“” 会直接被return不插入数据,这样看来 不管是下单还是扣库存,barrier的第二条插入数据生效,所以barrier数据表中就会有2条下单数据,一条是订单的一条是扣库存的

cde3e9ee02489b598becafb880aa37ec.png

gid  : dtm全局事务id

branch_id : 每个全局事务id下的每个业务id

op : 操作,如果是正常成功请求就是action

barrier_id : 同一个业务下开多个会递增

这4个字段在表中是联合唯一索引,在insertBarrier时候,dtm判断如果存在就忽略不插入

2、如果订单成功库存不足回滚子事务屏障

我们库存只有10个 ,我们下单20个

1)当订单下成功,因为订单下单时候并不知道后续库存情况(即使在下单时候先去查库存那也会有查询时候足够,扣除时候不足情况),

所以下单成功barrier表中按照之前梳理的逻辑就会在barrier表中产生一条正确数据执行数据

6106ce6b75da7f1461f0192b92bb0b37.png

2)接着执行扣库存操作

func (l *DeductLogic) Deduct(in *pb.DecuctReq) (*pb.DeductResp, error) {
 fmt.Printf("扣库存start....")
 stock, err := l.svcCtx.StockModel.FindOneByGoodsId(in.GoodsId)
 if err != nil && err != model.ErrNotFound {
  // !!!一般数据库不会错误不需要dtm回滚,就让他一直重试,这时候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具体自己把控!!!
  return nil, status.Error(codes.Internal, err.Error())
 }
 if stock == nil || stock.Num < in.Num {
  //【回滚】库存不足确定需要dtm直接回滚,直接返回 codes.Aborted, dtmcli.ResultFailure 才可以回滚
  return nil, status.Error(codes.Aborted, dtmcli.ResultFailure)
 }
  
  .......
}

在执行扣库存业务逻辑之前,由于我们查询库存发现库存不足,所以直接return codes.Aborted 了,不会走到子事务屏障barrier这里,所以barrier表中不会插入数据,而是告诉dtm要回滚

3)调用order回滚操作

订单回滚的时候会开启barrier,这时候又会执行barrier代码(如下),由于回滚代码的ti.Op是compensate ,orginType就是action

originAffected, _ := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, originType, bid, ti.Op)
currentAffected, err := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, ti.Op, bid, ti.Op)
dtmimp.Logf("originAffected: %d currentAffected: %d", originAffected, currentAffected)
if (ti.Op == BranchCancel || ti.Op == BranchCompensate) && originAffected > 0 || // 这个是空补偿  
  currentAffected == 0 { // 这个是重复请求或者悬挂  
  return 
}
rerr = busiCall(tx)

由于我们之前下订单成功了,barrier表里有一条下单成功时候的记录action,所以originAffected==0 ,所以只会插入一条当前回滚记录继续调用 busiCall(tx) 执行后续我们自己写的回滚操作

e15352f7abcc04fb42e7d7cebdca6128.png

到此,我们应该只有两条数据,一条订单成功创建记录,一条订单回滚记录

4)库存回滚DeductRollback

订单回滚成功后,会再继续调用库存回滚DeductRollback,库存回滚代码如下

这就是子事务屏障自动帮我们判断的,也就是那两条核心插入语句帮我们判断的,以至于我们业务不会出现脏数据

库存这里回滚分两种情况

  • 没扣成功回滚
  • 扣成功回滚
没扣成功回滚(我们当前举例场景是这个 )

首先调用库存回滚时候ti.Op是compensate ,orginType就是action ,会执行下面2条insert

originAffected, _ := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, originType, bid, ti.Op)
currentAffected, err := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, ti.Op, bid, ti.Op)
dtmimp.Logf("originAffected: %d currentAffected: %d", originAffected, currentAffected)
if (ti.Op == BranchCancel || ti.Op == BranchCompensate) && originAffected > 0 || // 这个是空补偿
  currentAffected == 0 { // 这个是重复请求或者悬挂
  return}rerr = busiCall(tx)
}

这里结合判断如果是回滚、取消操作,originAffected > 0 当前插入成功了,之前对应的正向扣库存操作没有插入成功,说明之前库存没扣成功,直接return就不需要执行后续补偿了。所以此时会在barrier表中插入2条数据直接return,就不会执行我们后续补偿操作了

到此我们barrier表中有4条数据了

扣成功回滚(这个情况自己可以尝试模拟此场景)

如果我们上一步扣库存成功,在执行此补偿的时候ti.Op是compensate ,orginType就是action ,继续执行2个insert语句

originAffected, _ := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, originType, bid, ti.Op)
currentAffected, err := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, ti.Op, bid, ti.Op)
dtmimp.Logf("originAffected: %d currentAffected: %d", originAffected, currentAffected)
if (ti.Op == BranchCancel || ti.Op == BranchCompensate) && originAffected > 0 || // 这个是空补偿
  currentAffected == 0 { // 这个是重复请求或者悬挂
  return}rerr = busiCall(tx)
}

这里结合判断如果是回滚、取消操作,originAffected == 0 当前插入忽略了没插入进去,说明之前正向扣库存插入成功了,这里只插入第二个sql语句记录即可,然后在执行后续我们补偿的业务操作。

所以,整体分析下来核心语句就是2条insert,它帮我们解决了重复回滚数据、数据幂等情况,只能说dtm作者想法真的很好,用了最少的代码帮我们解决了一个很麻烦的问题

七、go-zero对接中注意事项

1、dtm的回滚补偿

在使用dtm的grpc时候,当我们使用saga、tcc等如果第一步尝试或者执行失败了,是希望它能执行后面的rollback的,在grpc中的服务如果发生错误了,必须返回 :status.Error(codes.Aborted, dtmcli.ResultFailure) , 返回其他错误,不会执行你的rollback操作,dtm会一直重试,如下:

stock, err := l.svcCtx.StockModel.FindOneByGoodsId(in.GoodsId)
if err != nil && err != model.ErrNotFound {
  // !!!一般数据库不会错误不需要dtm回滚,就让他一直重试,这时候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具体自己把控!!!
  return nil, status.Error(codes.Internal, err.Error())
}
if stock == nil || stock.Num < in.Num {
  //【回滚】库存不足确定需要dtm直接回滚,直接返回 codes.Aborted, dtmcli.ResultFailure 才可以回滚
  return nil, status.Error(codes.Aborted, dtmcli.ResultFailure)
}

2、barrier的空补偿、悬挂等

之前准备工作中,我们创建了dtm_barrier库以及执行了barrier.mysql.sql,这个其实就是为我们的业务服务做了一个检查,防止空补偿,具体可以看barrier.Call中源码,没几行代码可以看懂的。

如果我们线上使用的话,你的每个与db交互的服务只要用到了barrier,这个服务使用到的mysql账号,要给他分配barrier库的权限,这个不要忘记了

3、barrier在rpc中本地事务

在rpc的业务中,如果使用了barrier的话,那么在model中与db交互时候必须要用事务,并且一定要跟barrier用同一个事务

logic

barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)
 db, err := sqlx.NewMysql(l.svcCtx.Config.DB.DataSource).RawDB()
 if err != nil {
  // !!!一般数据库不会错误不需要dtm回滚,就让他一直重试,这时候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具体自己把控!!!
  return nil, status.Error(codes.Internal, err.Error())
 }
 if err := barrier.CallWithDB(db, func(tx *sql.Tx) error {
  sqlResult,err := l.svcCtx.StockModel.DecuctStock(tx, in.GoodsId, in.Num)
  if err != nil{
   // !!!一般数据库不会错误不需要dtm回滚,就让他一直重试,这时候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具体自己把控!!!
   return status.Error(codes.Internal, err.Error())
  }
  affected, err := sqlResult.RowsAffected()
  if err != nil{
   // !!!一般数据库不会错误不需要dtm回滚,就让他一直重试,这时候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具体自己把控!!!
   return status.Error(codes.Internal, err.Error())
  }
  // 如果是影响行数为0,直接就告诉dtm失败不需要重试了
  if affected <= 0 {
   return  status.Error(codes.Aborted,  dtmcli.ResultFailure)
  }
  // !!开启测试!! : 测试订单回滚更改状态为失效,并且当前库扣失败不需要回滚
  // return fmt.Errorf("扣库存失败 err : %v , in:%+v \n",err,in)
  return nil
 }); err != nil {
  // !!!一般数据库不会错误不需要dtm回滚,就让他一直重试,这时候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具体自己把控!!!
  return nil, err
 }

model

func (m *defaultStockModel) DecuctStock(tx *sql.Tx,goodsId , num int64) (sql.Result,error) {
 query := fmt.Sprintf("update %s set `num` = `num` - ? where `goods_id` = ? and num >= ?", m.table)
 return tx.Exec(query,num, goodsId,num)
}
func (m *defaultStockModel) AddStock(tx *sql.Tx,goodsId , num int64) error {
 query := fmt.Sprintf("update %s set `num` = `num` + ? where `goods_id` = ?", m.table)
 _, err :=tx.Exec(query, num, goodsId)
 return err
}

七、使用go-zero的http对接

这个基本没啥难度,grpc会了这个很简单,鉴于go在微服务中去使用http场景不多,这里就不详细去做了,我之前一个版本写过一个简单的,但是没这个完善,有兴趣可以去看下,不过那个barrier是自己基于go-zero的sqlx,将dtm的官方的做了修改,现在不需要了。

项目地址:https://github.com/Mikaelemmmm/dtmbarrier-go-zero

项目地址

https://github.com/zeromicro/go-zero

相关实践学习
深入解析Docker容器化技术
Docker是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的Linux机器上,也可以实现虚拟化,容器是完全使用沙箱机制,相互之间不会有任何接口。Docker是世界领先的软件容器平台。开发人员利用Docker可以消除协作编码时“在我的机器上可正常工作”的问题。运维人员利用Docker可以在隔离容器中并行运行和管理应用,获得更好的计算密度。企业利用Docker可以构建敏捷的软件交付管道,以更快的速度、更高的安全性和可靠的信誉为Linux和Windows Server应用发布新功能。 在本套课程中,我们将全面的讲解Docker技术栈,从环境安装到容器、镜像操作以及生产环境如何部署开发的微服务应用。本课程由黑马程序员提供。 &nbsp; &nbsp; 相关的阿里云产品:容器服务 ACK 容器服务 Kubernetes 版(简称 ACK)提供高性能可伸缩的容器应用管理能力,支持企业级容器化应用的全生命周期管理。整合阿里云虚拟化、存储、网络和安全能力,打造云端最佳容器化应用运行环境。 了解产品详情: https://www.aliyun.com/product/kubernetes
相关文章
|
3月前
|
Kubernetes Java 调度
无需接入执行器,0 代码改造实现微服务任务调度
本文提出了一种基于云原生的任务调度新方案,不需要依赖SDK,不依赖语言,实现定时调度和分布式跑批。
275 33
|
3月前
|
Kubernetes 调度 微服务
无需接入执行器,0代码改造实现微服务任务调度
本文提出了一种基于云原生的任务调度新方案,不需要依赖SDK,不依赖语言,实现定时调度和分布式跑批
220 1
|
4月前
|
jenkins Java 持续交付
使用 Jenkins 和 Spring Cloud 自动化微服务部署
随着单体应用逐渐被微服务架构取代,企业对快速发布、可扩展性和高可用性的需求日益增长。Jenkins 作为领先的持续集成与部署工具,结合 Spring Cloud 提供的云原生解决方案,能够有效简化微服务的开发、测试与部署流程。本文介绍了如何通过 Jenkins 实现微服务的自动化构建与部署,并结合 Spring Cloud 的配置管理、服务发现等功能,打造高效、稳定的微服务交付流程。
624 0
使用 Jenkins 和 Spring Cloud 自动化微服务部署
|
10月前
|
Java 关系型数据库 数据库
微服务——SpringBoot使用归纳——Spring Boot事务配置管理——常见问题总结
本文总结了Spring Boot中使用事务的常见问题,虽然通过`@Transactional`注解可以轻松实现事务管理,但在实际项目中仍有许多潜在坑点。文章详细分析了三个典型问题:1) 异常未被捕获导致事务未回滚,需明确指定`rollbackFor`属性;2) 异常被try-catch“吃掉”,应避免在事务方法中直接处理异常;3) 事务范围与锁范围不一致引发并发问题,建议调整锁策略以覆盖事务范围。这些问题看似简单,但一旦发生,排查难度较大,因此开发时需格外留意。最后,文章提供了课程源代码下载地址,供读者实践参考。
265 0
|
10月前
|
Java 关系型数据库 数据库
微服务——SpringBoot使用归纳——Spring Boot事务配置管理——Spring Boot 事务配置
本文介绍了 Spring Boot 中的事务配置与使用方法。首先需要导入 MySQL 依赖,Spring Boot 会自动注入 `DataSourceTransactionManager`,无需额外配置即可通过 `@Transactional` 注解实现事务管理。接着通过创建一个用户插入功能的示例,展示了如何在 Service 层手动抛出异常以测试事务回滚机制。测试结果表明,数据库中未新增记录,证明事务已成功回滚。此过程简单高效,适合日常开发需求。
1396 0
|
10月前
|
Java 数据库 微服务
微服务——SpringBoot使用归纳——Spring Boot事务配置管理——事务相关
本文介绍Spring Boot事务配置管理,阐述事务在企业应用开发中的重要性。事务确保数据操作可靠,任一异常均可回滚至初始状态,如转账、购票等场景需全流程执行成功才算完成。同时,事务管理在Spring Boot的service层广泛应用,但根据实际需求也可能存在无需事务的情况,例如独立数据插入操作。
270 0
|
10月前
|
存储 Kubernetes 开发工具
使用ArgoCD管理Kubernetes部署指南
ArgoCD 是一款基于 Kubernetes 的声明式 GitOps 持续交付工具,通过自动同步 Git 存储库中的配置与 Kubernetes 集群状态,确保一致性与可靠性。它支持实时同步、声明式设置、自动修复和丰富的用户界面,极大简化了复杂应用的部署管理。结合 Helm Charts,ArgoCD 提供模块化、可重用的部署流程,显著减少人工开销和配置错误。对于云原生企业,ArgoCD 能优化部署策略,提升效率与安全性,是实现自动化与一致性的理想选择。
691 0
|
6月前
|
存储 监控 Shell
SkyWalking微服务监控部署与优化全攻略
综上所述,虽然SkyWalking的初始部署流程相对复杂,但通过一步步的准备和配置,可以充分发挥其作为可观测平台的强大功能,实现对微服务架构的高效监控和治理。尽管未亲临,心已向往。将一件事做到极致,便是天分的展现。
|
9月前
|
存储 Kubernetes 异构计算
Qwen3 大模型在阿里云容器服务上的极简部署教程
通义千问 Qwen3 是 Qwen 系列最新推出的首个混合推理模型,其在代码、数学、通用能力等基准测试中,与 DeepSeek-R1、o1、o3-mini、Grok-3 和 Gemini-2.5-Pro 等顶级模型相比,表现出极具竞争力的结果。
|
8月前
|
人工智能 搜索推荐 前端开发
从代码到心灵对话:我的CodeBuddy升级体验之旅(个性化推荐微服务系统)
本文分享了使用CodeBuddy最新版本的深度体验,重点探讨了Craft智能体、MCP协议和DeepSeek V3三大功能。Craft实现从对话到代码的无缝转化,大幅提升开发效率;MCP协议打通全流程开发,促进团队协作;DeepSeek V3则将代码补全提升至新境界,显著减少Bug并优化跨语言开发。这些功能共同塑造了AI与程序员共生的未来模式,让编程更高效、自然。
838 15

热门文章

最新文章

推荐镜像

更多