最简单的 gRPC 教程—2 通信模式

简介: gRPC 包含四种基础的通信模式:• 一元模式(Unary RPC)• 服务器端流 RPC(Server Sreaming RPC)• 客户端流 RPC(Client Streaming RPC)• 双向流 RPC(Bidirectional Streaming RPC)

gRPC 包含四种基础的通信模式:

  • 一元模式(Unary RPC)
  • 服务器端流 RPC(Server Sreaming RPC)
  • 客户端流 RPC(Client Streaming RPC)
  • 双向流 RPC(Bidirectional Streaming RPC)


一元模式


一元模式是最简单、也最容易理解的通信方式,客户端发送单一的请求,并从服务端获取到响应。上一篇文章中的获取和添加商品的逻辑,其实就是很典型的一元 RPC 模式。

在这里,我们构建一个简单的订单服务,可以添加、获取、搜索、处理订单,借此来演示 gRPC 的几种通信模式。




首先定义一个 proto 文件(我命名为 OrderInfo.proto):

syntax = "proto3";
package order;
import "google/protobuf/wrappers.proto";
service OrderManagement {
  //获取订单
  rpc getOrder(google.protobuf.StringValue) returns(Order);
  //添加订单
  rpc addOrder(Order) returns(google.protobuf.StringValue);
}
message Order {
  //订单id
  string id = 1;
  //订单详情
  repeated string items = 2;
  //订单描述
  string description = 3;
  //订单价格
  float price = 4;
  //订单目的地
  string destination = 5;
}


然后使用命令 protoc --go_out=plugins=grpc:../order OrderInfo.proto 生成所需要的客户端存根文件(参考前一篇文章)。

然后新建一个 server/main.go 文件,写一下两个业务方法的逻辑:

type server struct {
   orderMap map[string]*order.Order
}
//获取订单
func (s *server) GetOrder(ctx context.Context, req *wrappers.StringValue) (resp *order.Order, err error) {
   resp = &order.Order{}
   id := req.Value
   var exist bool
   if resp, exist = s.orderMap[id]; !exist {
      err = status.Error(codes.NotFound, "order not found id = " + id)
      return
   }
   return
}
//添加订单
func (s *server) AddOrder(ctx context.Context, req *order.Order) (resp *wrappers.StringValue, err error) {
   resp = &wrappers.StringValue{}
   if s.orderMap == nil {
      s.orderMap = make(map[string]*order.Order)
   }
   v4, err := uuid.NewV4()
   if err != nil {
      return resp, status.Errorf(codes.Internal, "gen uuid err", err)
   }
   id := v4.String()
   req.Id = id
   s.orderMap[id] = req
   resp.Value = id
   return
}


然后继续在 main.go 中添加一个服务器:

func main() {
   listener, err := net.Listen("tcp", port)
   if err != nil {
      log.Println("net listen err ", err)
      return
   }
   s := grpc.NewServer()
   order.RegisterOrderManagementServer(s, &server{})
   log.Println("start gRPC listen on port " + port)
   if err := s.Serve(listener); err != nil {
      log.Println("failed to serve...", err)
      return
   }
}


接下来继续写客户端的逻辑,调用服务端的两个方法,获取到响应。和上一篇文章类似,新建一个 client/main.go 文件:

const address = "localhost:50051"
func main() {
   conn, err := grpc.Dial(address, grpc.WithInsecure())
   if err != nil {
      log.Println("did not connect.", err)
      return
   }
   defer conn.Close()
   ctx := context.Background()
   client := order.NewOrderManagementClient(conn)
   fmt.Println("-----------unary rpc-------------")
   id := AddOrder(ctx, client)
   GetOrder(ctx, client, id)
}
//添加一个订单
func AddOrder(ctx context.Context, client order.OrderManagementClient) string {
   odr := &order.Order{
      Description: "a new order for test-1",
      Price: 12322.232,
      Destination: "Shanghai",
      Items: []string{"doll", "22", "33"},
   }
   val, err := client.AddOrder(ctx, odr)
   if err != nil {
      log.Println("add order fail.", err)
      return ""
   }
   log.Println("add order success.id = ", val.String())
   return val.Value
}
//获取一个订单
func GetOrder(ctx context.Context, client order.OrderManagementClient, id string) {
   val, err := client.GetOrder(ctx, &wrappers.StringValue{Value: id})
   if err != nil {
      log.Println("get order err.", err)
      return
   }
   log.Printf("get order succes. order = %+v", val)
}


服务端流 RPC 模式


一元模式只有单一的请求和响应,但是服务器端流 RPC 模式下,服务器在收到客户端的请求后,可能会发送多个响应的序列,这被称为“流”。

在订单服务中,如果客户端根据关键字搜索订单,那么服务器会把所有满足条件的记录返回给客户端,通过这个例子来演示一下这种模式:




首先在 proto 文件中新增一个 rpc 方法:

//搜索订单
rpc searchOrder(google.protobuf.StringValue) returns(stream Order);

注意这里的返回值加上 stream 表示返回的是订单流,然后再使用 protoc 命令重新生成 OrderInfo.pb.go 文件。


然后在 server/main.go 文件中添加搜索订单的逻辑:

//搜索订单
func (s *server) SearchOrder(searchKey *wrappers.StringValue, stream order.OrderManagement_SearchOrderServer) (err error) {
   for _, val := range s.orderMap {
      for _, item := range val.Items {
         if strings.Contains(item, searchKey.Value) {
            err = stream.Send(val)
            if err != nil {
               log.Println("stream send order err.", err)
               return
            }
            break
         }
      }
   }
   return
}


客户端的写法也和之前的有一些区别了:

func SearchOrder(ctx context.Context, client order.OrderManagementClient) {
   searchKey := "Apple"
   searchStream, _ := client.SearchOrder(ctx, &wrappers.StringValue{Value: searchKey})
   for {
      val, err := searchStream.Recv()
      if err == io.EOF { //服务端没有数据了
         break
      }
      log.Printf("search order from server : %+v", val)
   }
   return
}


客户端流 RPC 模式


在这种模式下,客户端会发送多个请求给服务端,服务端会发送一条响应到客户端。在订单服务中,我们会更新一批订单,客户端则通过流的的方式传输过来。




还是在 proto 文件中定义一个新的方法:

//更新订单
rpc updateOrder(stream Order) returns(google.protobuf.StringValue);

注意这里在 rpc 的方法入参中多了 stream 关键字,表示是客户端流,然后再使用 protoc 生成文件。


还是老样子,在 server/main.go 中添加更新订单的逻辑:

//更新订单
func (s *server) UpdateOrder(stream order.OrderManagement_UpdateOrderServer) (err error) {
   updatedIds := "updated order ids : "
   for {
      val, err := stream.Recv()
      if err == io.EOF { //完成读取订单流
         //向客户端发送消息
         return stream.SendAndClose(&wrappers.StringValue{Value: updatedIds})
      }
      s.orderMap[val.Id] = val
      log.Println("[server]update the order : ", val.Id)
      updatedIds += val.Id + ", "
   }


然后继续在客户端 client/main.go 中添加:

//更新订单
func UpdateOrder(ctx context.Context, client order.OrderManagementClient) {
   updateStream, _ := client.UpdateOrder(ctx)
   order1 := &order.Order{Id: "103", Items: []string{"Apple Watch S6"}, Destination: "San Jose, CA", Price: 4400.00}
   order2 := &order.Order{Id: "105", Items: []string{"Amazon Kindle"}, Destination: "San Jose, CA", Price: 330.00}
   //更新订单1
   if err := updateStream.Send(order1); err != nil {
      log.Println("send order err.", err)
   }
   //更新订单2
   if err := updateStream.Send(order2); err != nil {
      log.Println("send order err.", err)
   }
   //关闭流并接收响应
   recv, err := updateStream.CloseAndRecv()
   if err != nil {
      log.Println("close and recv err.", err)
      return
   }
   log.Printf("the update result : %+v", recv)
}


双向流 RPC 模式


双向流 RPC 模式其实就是客户端和服务端流 RPC 的组合,客户端以消息流的方式发送数据,服务端也已消息流的方式响应数据。



在订单服务中,我们可能需要处理订单,客户端发送连续的订单信息,服务端对订单进行发货处理。首先我们在 proto 文件中定义一个新的方法:

//处理订单
rpc processOrder(stream google.protobuf.StringValue) returns(stream CombinedShipment);
...
message CombinedShipment {
  string id = 1;
  string status = 2;
  repeated Order orderList = 3;
}


然后在 server/main.go 中实现该方法的逻辑:

//处理订单
func (s * server) ProcessOrder(stream order.OrderManagement_ProcessOrderServer) (err error) {
   var combinedShipmentMap = make(map[string]*order.CombinedShipment)
   for {
      val, err := stream.Recv() //接收从客户端发送来的订单
      if err == io.EOF { //接收完毕,返回结果
         for _, shipment := range combinedShipmentMap {
            if err := stream.Send(shipment); err != nil {
               log.Println("[server] process finished!")
               return err
            }
         }
         break
      }
      if err != nil {
         log.Println(err)
         break
      }
      if val != nil {
         orderId := val.Value
         log.Printf("[server]reading order : %+v\n", orderId)
         dest := s.orderMap[orderId].Destination
         shipment, exist := combinedShipmentMap[dest]
         if exist {
            ord := s.orderMap[orderId]
            shipment.OrderList = append(shipment.OrderList, ord)
            combinedShipmentMap[dest] = shipment
         } else {
            comShip := &order.CombinedShipment{Id: "cmb - " + (s.orderMap[orderId].Destination), Status: "Processed!", }
            ord := s.orderMap[orderId]
            comShip.OrderList = append(comShip.OrderList, ord)
            combinedShipmentMap[dest] = comShip
            log.Println(len(comShip.OrderList), comShip.GetId())
         }
      }
   }
   return
}


这里的处理逻辑比起前面的几个稍微复杂了一点,需要多看下,自己动手敲一下。

然后就是客户端的逻辑:

//处理订单
func ProcessOrder(ctx context.Context, client order.OrderManagementClient) {
   processStream, _ := client.ProcessOrder(ctx)
   //发送两个订单处理
   if err := processStream.Send(&wrappers.StringValue{Value: "103"}); err != nil {
      log.Println("send order err.", err)
   }
   if err := processStream.Send(&wrappers.StringValue{Value: "105"}); err != nil {
      log.Println("send order err.", err)
   }
   chn := make(chan struct{})
   //异步接收服务端的结果
   go processResultFromServer(processStream, chn)
   //再发送一个订单
   if err := processStream.Send(&wrappers.StringValue{Value: "106"}); err != nil {
      log.Println("send order err.", err)
   }
   //发送完毕后记得关闭
   if err := processStream.CloseSend(); err != nil {
      log.Println("close send err.", err)
   }
   <-chn
}
//从服务端获取处理的结果
func processResultFromServer(stream order.OrderManagement_ProcessOrderClient, chn chan struct{}) {
   defer close(chn)
   for {
      shipment, err := stream.Recv()
      if err == io.EOF {
         log.Println("[client]结束从服务端接收数据")
         break
      }
      log.Printf("[client]server process result : %+v\n", shipment)
   }
}
相关文章
|
3月前
|
JSON Dubbo Java
【Dubbo协议指南】揭秘高性能服务通信,选择最佳协议的终极攻略!
【8月更文挑战第24天】在分布式服务架构中,Apache Dubbo作为一款高性能的Java RPC框架,支持多种通信协议,包括Dubbo协议、HTTP协议及Hessian协议等。Dubbo协议是默认选择,采用NIO异步通讯,适用于高要求的内部服务通信。HTTP协议通用性强,利于跨语言调用;Hessian协议则在数据传输效率上有优势。选择合适协议需综合考虑性能需求、序列化方式、网络环境及安全性等因素。通过合理配置,可实现服务性能最优化及系统可靠性提升。
57 3
|
3月前
|
物联网 C# Windows
看看如何使用 C# 代码让 MQTT 进行完美通信
看看如何使用 C# 代码让 MQTT 进行完美通信
569 0
|
5月前
|
C++
gRPC 四模式之 服务器端流RPC模式
gRPC 四模式之 服务器端流RPC模式
133 0
|
5月前
|
安全 C++
gRPC 四模式之 客户端流RPC模式
gRPC 四模式之 客户端流RPC模式
54 0
|
5月前
|
存储 C++
gRPC 四模式之 双向流RPC模式
gRPC 四模式之 双向流RPC模式
194 0
|
网络协议 物联网 开发者
详细介绍 MQTT 的工作原理,包括 MQTT 协议的特点、核心概念以及消息传递的流程
详细介绍 MQTT 的工作原理,包括 MQTT 协议的特点、核心概念以及消息传递的流程
3303 1
|
传感器 Cloud Native 物联网
gRpc的四种通信方式详细介绍
gRpc的四种通信方式详细介绍
225 0
|
人工智能 安全 网络协议
高质量通信gRPC入门,有了它,谁还用Socket
高质量通信gRPC入门,有了它,谁还用Socket
1284 0
高质量通信gRPC入门,有了它,谁还用Socket
|
运维 监控 Dubbo
Dubbo协议异步单一长连接原理与优势
Dubbo协议异步单一长连接原理与优势
566 0
聊一聊 gRPC 的四种通信模式
聊一聊 gRPC 的四种通信模式