gRPC三种流和消息格式(一)https://developer.aliyun.com/article/1392117
客户端代码
package main import ( "context" "flag" "io" "log" "time" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" pb "mygrpc/proto/hello" // 引入编译生成的包 ) const ( defaultName = "world" ) var ( addr = flag.String("addr", "localhost:50051", "the address to connect to") name = flag.String("name", defaultName, "Name to greet") ) func main() { flag.Parse() // 与服务建立连接. conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { log.Fatalf("did not connect: %v", err) } defer func(conn *grpc.ClientConn) { err := conn.Close() if err != nil { } }(conn) // 创建指定服务的客户端 c := pb.NewGreeterClient(conn) // 连接服务器并打印出其响应。 ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() // 调用指定方法 searchStream, _ := c.SearchOrders(ctx, &pb.HelloRequest{Name: "开始服务端rpc流测试"}) for { // 客户端 Recv 方法接收服务端发送的流 searchOrder, err := searchStream.Recv() if err == io.EOF { log.Print("EOF") break } if err == nil { log.Print("Search Result : ", searchOrder) } } r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name}) if err != nil { log.Fatalf("could not greet: %v", err) } log.Printf("Greeting: %s", r.GetMessage()) }
客户流RPC
客户端多个请求发给服务端,服务端发送一个响应给客户端,比如更新业务,客户端的读个请求发过来,服务端更新完返回一个成功的结果
在protobuf中的 service添加以下代码
rpc updateOrders(stream HelloRequest) returns (stream HelloReply);
服务端代码
func (s *server) UpdateOrders(stream pb.OrderManagement_UpdateOrdersServer) error { ordersStr := "Updated Order IDs : " for { // Recv 对客户端发来的请求接收 order, err := stream.Recv() if err == io.EOF { // 流结束,关闭并发送响应给客户端 return stream.SendAndClose(&wrapper.StringValue{Value: "Orders processed " + ordersStr}) } if err != nil { return err } // 更新数据 orderMap[order.Id] = *order log.Printf("Order ID : %s - %s", order.Id, "Updated") ordersStr += order.Id + ", " } }
客户端代码
package main import ( "context" "flag" "log" "time" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" pb "mygrpc/proto/hello" // 引入编译生成的包 ) const ( defaultName = "world" ) var ( addr = flag.String("addr", "localhost:50051", "the address to connect to") name = flag.String("name", defaultName, "Name to greet") ) func main() { flag.Parse() // 与服务建立连接. conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { log.Fatalf("did not connect: %v", err) } defer func(conn *grpc.ClientConn) { err := conn.Close() if err != nil { } }(conn) // 创建指定服务的客户端 c := pb.NewGreeterClient(conn) // 连接服务器并打印出其响应。 ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() // 调用指定方法 updateStream, err := c.UpdateOrders(ctx) if err != nil { log.Fatalf("%v.UpdateOrders(_) = _, %v", c, err) } // Updating order 1 if err := updateStream.Send(&pb.HelloRequest{Name: "1"}); err != nil { log.Fatalf("%v.Send(%v) = %v", updateStream, &pb.HelloRequest{Name: "1"}, err) } // Updating order 2 if err := updateStream.Send(&pb.HelloRequest{Name: "2"}); err != nil { log.Fatalf("%v.Send(%v) = %v", updateStream, &pb.HelloRequest{Name: "2"}, err) } // 发送关闭信号并接收服务端响应 err = updateStream.CloseSend() if err != nil { log.Fatalf("%v.CloseAndRecv() got error %v, want %v", updateStream, err, nil) } log.Printf("客户端流传输结束") }
双工流RPC
对应的业务就比如实时的消息流
protobuf
// 设置双工rpc rpc processOrders(stream HelloRequest) returns (stream HelloReply);
服务端
package main import ( "flag" "fmt" "google.golang.org/grpc" "io" "log" pb "mygrpc/proto/hello" "net" "sync" ) var ( port = flag.Int("port", 50051, "The service port") ) type server struct { pb.UnimplementedGreeterServer } func (s *server) ProcessOrders(stream pb.Greeter_ProcessOrdersServer) error { var ( waitGroup sync.WaitGroup // 一组 goroutine 的结束 // 设置通道 msgCh = make(chan *pb.HelloReply) ) // 计数器加1 waitGroup.Add(1) // 消费队列中的内容 go func() { // 计数器减一 defer waitGroup.Done() for { v := <-msgCh fmt.Println(v) err := stream.Send(v) if err != nil { fmt.Println("Send error:", err) break } } }() waitGroup.Add(1) // 向队列中添加内容 go func() { defer waitGroup.Done() for { req, err := stream.Recv() if err == io.EOF { break } if err != nil { log.Fatalf("recv error:%v", err) } fmt.Printf("Recved :%v \n", req.GetName()) msgCh <- &pb.HelloReply{Message: "服务端传输数据"} } close(msgCh) }() // 等待 计数器问0 推出 waitGroup.Wait() // 返回nil表示已经完成响应 return nil } func main() { flag.Parse() lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port)) if err != nil { log.Fatalf("failed to listen: %v", err) } // 开启rpc s := grpc.NewServer() // 注册服务 pb.RegisterGreeterServer(s, &server{}) log.Printf("service listening at %v", lis.Addr()) if err := s.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) } }
客户端
package main import ( "context" "flag" "fmt" "io" "log" "sync" "time" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" pb "mygrpc/proto/hello" // 引入编译生成的包 ) const ( defaultName = "world" ) var ( addr = flag.String("addr", "localhost:50051", "the address to connect to") name = flag.String("name", defaultName, "Name to greet") ) func main() { flag.Parse() // 与服务建立连接. conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { log.Fatalf("did not connect: %v", err) } defer func(conn *grpc.ClientConn) { err := conn.Close() if err != nil { } }(conn) // 创建指定服务的客户端 c := pb.NewGreeterClient(conn) // 连接服务器并打印出其响应。 ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() // 设置 var wg sync.WaitGroup // 调用指定方法 stream, _ := c.ProcessOrders(ctx) if err != nil { panic(err) } // 3.开两个goroutine 分别用于Recv()和Send() wg.Add(1) go func() { defer wg.Done() for { req, err := stream.Recv() if err == io.EOF { fmt.Println("Server Closed") break } if err != nil { continue } fmt.Printf("Recv Data:%v \n", req.GetMessage()) } }() wg.Add(1) go func() { defer wg.Done() for i := 0; i < 2; i++ { err := stream.Send(&pb.HelloRequest{Name: "hello world"}) if err != nil { log.Printf("send error:%v\n", err) } } // 4. 发送完毕关闭stream err := stream.CloseSend() if err != nil { log.Printf("Send error:%v\n", err) return } }() wg.Wait() log.Printf("服务端流传输结束") }
代码仓库
https://github.com/onenewcode/mygrpc.git
也可以直接下载绑定的资源。