gRPC 包含四种基础的通信模式:
- 一元模式(Unary RPC)
- 服务器端流 RPC(Server Sreaming RPC)
- 客户端流 RPC(Client Streaming RPC)
- 双向流 RPC(Bidirectional Streaming RPC)
一元模式
一元模式是最简单、也最容易理解的通信方式,客户端发送单一的请求,并从服务端获取到响应。上一篇文章中的获取和添加商品的逻辑,其实就是很典型的一元 RPC 模式。
在这里,我们构建一个简单的订单服务,可以添加、获取、搜索、处理订单,借此来演示 gRPC 的几种通信模式。
首先定义一个 proto 文件(我命名为 OrderInfo.proto):
syntax = "proto3"; package order; import "google/protobuf/wrappers.proto"; service OrderManagement { //获取订单 rpc getOrder(google.protobuf.StringValue) returns(Order); //添加订单 rpc addOrder(Order) returns(google.protobuf.StringValue); } message Order { //订单id string id = 1; //订单详情 repeated string items = 2; //订单描述 string description = 3; //订单价格 float price = 4; //订单目的地 string destination = 5; }
然后使用命令 protoc --go_out=plugins=grpc:../order OrderInfo.proto
生成所需要的客户端存根文件(参考前一篇文章)。
然后新建一个 server/main.go 文件,写一下两个业务方法的逻辑:
type server struct { orderMap map[string]*order.Order } //获取订单 func (s *server) GetOrder(ctx context.Context, req *wrappers.StringValue) (resp *order.Order, err error) { resp = &order.Order{} id := req.Value var exist bool if resp, exist = s.orderMap[id]; !exist { err = status.Error(codes.NotFound, "order not found id = " + id) return } return } //添加订单 func (s *server) AddOrder(ctx context.Context, req *order.Order) (resp *wrappers.StringValue, err error) { resp = &wrappers.StringValue{} if s.orderMap == nil { s.orderMap = make(map[string]*order.Order) } v4, err := uuid.NewV4() if err != nil { return resp, status.Errorf(codes.Internal, "gen uuid err", err) } id := v4.String() req.Id = id s.orderMap[id] = req resp.Value = id return }
然后继续在 main.go 中添加一个服务器:
func main() { listener, err := net.Listen("tcp", port) if err != nil { log.Println("net listen err ", err) return } s := grpc.NewServer() order.RegisterOrderManagementServer(s, &server{}) log.Println("start gRPC listen on port " + port) if err := s.Serve(listener); err != nil { log.Println("failed to serve...", err) return } }
接下来继续写客户端的逻辑,调用服务端的两个方法,获取到响应。和上一篇文章类似,新建一个 client/main.go 文件:
const address = "localhost:50051" func main() { conn, err := grpc.Dial(address, grpc.WithInsecure()) if err != nil { log.Println("did not connect.", err) return } defer conn.Close() ctx := context.Background() client := order.NewOrderManagementClient(conn) fmt.Println("-----------unary rpc-------------") id := AddOrder(ctx, client) GetOrder(ctx, client, id) } //添加一个订单 func AddOrder(ctx context.Context, client order.OrderManagementClient) string { odr := &order.Order{ Description: "a new order for test-1", Price: 12322.232, Destination: "Shanghai", Items: []string{"doll", "22", "33"}, } val, err := client.AddOrder(ctx, odr) if err != nil { log.Println("add order fail.", err) return "" } log.Println("add order success.id = ", val.String()) return val.Value } //获取一个订单 func GetOrder(ctx context.Context, client order.OrderManagementClient, id string) { val, err := client.GetOrder(ctx, &wrappers.StringValue{Value: id}) if err != nil { log.Println("get order err.", err) return } log.Printf("get order succes. order = %+v", val) }
服务端流 RPC 模式
一元模式只有单一的请求和响应,但是服务器端流 RPC 模式下,服务器在收到客户端的请求后,可能会发送多个响应的序列,这被称为“流”。
在订单服务中,如果客户端根据关键字搜索订单,那么服务器会把所有满足条件的记录返回给客户端,通过这个例子来演示一下这种模式:
首先在 proto 文件中新增一个 rpc 方法:
//搜索订单 rpc searchOrder(google.protobuf.StringValue) returns(stream Order);
注意这里的返回值加上 stream
表示返回的是订单流,然后再使用 protoc
命令重新生成 OrderInfo.pb.go 文件。
然后在 server/main.go 文件中添加搜索订单的逻辑:
//搜索订单 func (s *server) SearchOrder(searchKey *wrappers.StringValue, stream order.OrderManagement_SearchOrderServer) (err error) { for _, val := range s.orderMap { for _, item := range val.Items { if strings.Contains(item, searchKey.Value) { err = stream.Send(val) if err != nil { log.Println("stream send order err.", err) return } break } } } return }
客户端的写法也和之前的有一些区别了:
func SearchOrder(ctx context.Context, client order.OrderManagementClient) { searchKey := "Apple" searchStream, _ := client.SearchOrder(ctx, &wrappers.StringValue{Value: searchKey}) for { val, err := searchStream.Recv() if err == io.EOF { //服务端没有数据了 break } log.Printf("search order from server : %+v", val) } return }
客户端流 RPC 模式
在这种模式下,客户端会发送多个请求给服务端,服务端会发送一条响应到客户端。在订单服务中,我们会更新一批订单,客户端则通过流的的方式传输过来。
还是在 proto 文件中定义一个新的方法:
//更新订单 rpc updateOrder(stream Order) returns(google.protobuf.StringValue);
注意这里在 rpc 的方法入参中多了 stream
关键字,表示是客户端流,然后再使用 protoc
生成文件。
还是老样子,在 server/main.go 中添加更新订单的逻辑:
//更新订单 func (s *server) UpdateOrder(stream order.OrderManagement_UpdateOrderServer) (err error) { updatedIds := "updated order ids : " for { val, err := stream.Recv() if err == io.EOF { //完成读取订单流 //向客户端发送消息 return stream.SendAndClose(&wrappers.StringValue{Value: updatedIds}) } s.orderMap[val.Id] = val log.Println("[server]update the order : ", val.Id) updatedIds += val.Id + ", " }
然后继续在客户端 client/main.go 中添加:
//更新订单 func UpdateOrder(ctx context.Context, client order.OrderManagementClient) { updateStream, _ := client.UpdateOrder(ctx) order1 := &order.Order{Id: "103", Items: []string{"Apple Watch S6"}, Destination: "San Jose, CA", Price: 4400.00} order2 := &order.Order{Id: "105", Items: []string{"Amazon Kindle"}, Destination: "San Jose, CA", Price: 330.00} //更新订单1 if err := updateStream.Send(order1); err != nil { log.Println("send order err.", err) } //更新订单2 if err := updateStream.Send(order2); err != nil { log.Println("send order err.", err) } //关闭流并接收响应 recv, err := updateStream.CloseAndRecv() if err != nil { log.Println("close and recv err.", err) return } log.Printf("the update result : %+v", recv) }
双向流 RPC 模式
双向流 RPC 模式其实就是客户端和服务端流 RPC 的组合,客户端以消息流的方式发送数据,服务端也已消息流的方式响应数据。
在订单服务中,我们可能需要处理订单,客户端发送连续的订单信息,服务端对订单进行发货处理。首先我们在 proto 文件中定义一个新的方法:
//处理订单 rpc processOrder(stream google.protobuf.StringValue) returns(stream CombinedShipment); ... message CombinedShipment { string id = 1; string status = 2; repeated Order orderList = 3; }
然后在 server/main.go 中实现该方法的逻辑:
//处理订单 func (s * server) ProcessOrder(stream order.OrderManagement_ProcessOrderServer) (err error) { var combinedShipmentMap = make(map[string]*order.CombinedShipment) for { val, err := stream.Recv() //接收从客户端发送来的订单 if err == io.EOF { //接收完毕,返回结果 for _, shipment := range combinedShipmentMap { if err := stream.Send(shipment); err != nil { log.Println("[server] process finished!") return err } } break } if err != nil { log.Println(err) break } if val != nil { orderId := val.Value log.Printf("[server]reading order : %+v\n", orderId) dest := s.orderMap[orderId].Destination shipment, exist := combinedShipmentMap[dest] if exist { ord := s.orderMap[orderId] shipment.OrderList = append(shipment.OrderList, ord) combinedShipmentMap[dest] = shipment } else { comShip := &order.CombinedShipment{Id: "cmb - " + (s.orderMap[orderId].Destination), Status: "Processed!", } ord := s.orderMap[orderId] comShip.OrderList = append(comShip.OrderList, ord) combinedShipmentMap[dest] = comShip log.Println(len(comShip.OrderList), comShip.GetId()) } } } return }
这里的处理逻辑比起前面的几个稍微复杂了一点,需要多看下,自己动手敲一下。
然后就是客户端的逻辑:
//处理订单 func ProcessOrder(ctx context.Context, client order.OrderManagementClient) { processStream, _ := client.ProcessOrder(ctx) //发送两个订单处理 if err := processStream.Send(&wrappers.StringValue{Value: "103"}); err != nil { log.Println("send order err.", err) } if err := processStream.Send(&wrappers.StringValue{Value: "105"}); err != nil { log.Println("send order err.", err) } chn := make(chan struct{}) //异步接收服务端的结果 go processResultFromServer(processStream, chn) //再发送一个订单 if err := processStream.Send(&wrappers.StringValue{Value: "106"}); err != nil { log.Println("send order err.", err) } //发送完毕后记得关闭 if err := processStream.CloseSend(); err != nil { log.Println("close send err.", err) } <-chn } //从服务端获取处理的结果 func processResultFromServer(stream order.OrderManagement_ProcessOrderClient, chn chan struct{}) { defer close(chn) for { shipment, err := stream.Recv() if err == io.EOF { log.Println("[client]结束从服务端接收数据") break } log.Printf("[client]server process result : %+v\n", shipment) } }