书接上文,我们继续实现剩余的两种方式--客户端流式 RPC、双向流式 RPC。
Client-side streaming RPC:客户端流式 RPC、
客户端流式 RPC,单向流,客户端通过流式发起多次 RPC 请求到服务端,服务端发起一次响应给客户端
Proto :
syntax = "proto3"; package proto; message String { string value = 1; } service HelloService { rpc Hello (stream String) returns (String){}; } 复制代码
server:
package main import ( "google.golang.org/grpc" "io" "log" "net" pb "rpc/proto" // 设置引用别名 ) // HelloServiceImpl 定义我们的服务 type HelloServiceImpl struct{} //实现Hello方法 func (p *HelloServiceImpl) Hello(stream pb.HelloService_HelloServer) error { for { resp, err := stream.Recv() if err == io.EOF { return stream.SendAndClose(&pb.String{Value:"say.hello"}) } if err != nil { return err } log.Printf("resp: %v", resp) } return nil } func main() { // 新建gRPC服务器实例 grpcServer := grpc.NewServer() // 在gRPC服务器注册我们的服务 pb.RegisterHelloServiceServer(grpcServer, new(HelloServiceImpl)) lis, err := net.Listen("tcp", ":1234") if err != nil { log.Fatal(err) } log.Println(" net.Listing...") //用服务器 Serve() 方法以及我们的端口信息区实现阻塞等待,直到进程被杀死或者 Stop() 被调用 err = grpcServer.Serve(lis) if err != nil { log.Fatalf("grpcServer.Serve err: %v", err) } } 复制代码
如上,我们对每一个 Recv 都进行了处理,当发现 io.EOF (流关闭) 后,需要通过 stream.SendAndClose 方法将最终的响应结果发送给客户端,同时关闭正在另外一侧等待的 Recv。
client:
package main import ( "context" "google.golang.org/grpc" "log" pb "rpc/proto" // 设置引用别名 ) // SayHello 调用服务端的 Hello 方法 func SayHello(client pb.HelloServiceClient, r *pb.String) error { stream, _ := client.Hello(context.Background()) for n := 0; n < 6; n++ { _ = stream.Send(r) } resp, _ := stream.CloseAndRecv() log.Printf("resp err: %v", resp) return nil } func main() { conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure()) if err != nil { log.Fatal("dialing err:", err) } defer conn.Close() // 建立gRPC连接 client := pb.NewHelloServiceClient(conn) // 创建发送结构体 req := pb.String{ Value: "stream server grpc ", } SayHello(client, &req) } 复制代码
在 Server 端的 stream.CloseAndRecv,与 Client 端 stream.SendAndClose 是配套使用的方法。
开启服务器端,开启客户端。执行结果如下:
$ go run server.go 2021/11/17 13:26:34 net.Listing... 2021/11/17 13:26:44 resp: value:"stream server grpc " 2021/11/17 13:26:44 resp: value:"stream server grpc " 2021/11/17 13:26:44 resp: value:"stream server grpc " 2021/11/17 13:26:44 resp: value:"stream server grpc " 2021/11/17 13:26:44 resp: value:"stream server grpc " 2021/11/17 13:26:44 resp: value:"stream server grpc " 复制代码
$ go run client.go 2021/11/17 13:26:44 resp err: value:"say.hello" 复制代码
Bidirectional streaming RPC:双向流式 RPC
双向流式 RPC,由客户端以流式的方式发起请求,服务端也以流式的方式响应请求。
首个请求一定是 Client 发起,但具体交互方式(谁先谁后、一次发多少、响应多少、什么时候关闭)根据程序编写的方式来确定(可以结合协程)。
Proto :
syntax = "proto3"; package proto; message String { string value = 1; } service HelloService { rpc Hello (stream String) returns (stream String){}; } 复制代码
server:
package main import ( "google.golang.org/grpc" "io" "log" "net" pb "rpc/proto" // 设置引用别名 ) // HelloServiceImpl 定义我们的服务 type HelloServiceImpl struct{} //实现Hello方法 func (p *HelloServiceImpl) Hello(stream pb.HelloService_HelloServer) error { for { _ = stream.Send(&pb.String{Value: "say.hello"}) resp, err := stream.Recv() //接收完了返回 if err == io.EOF { return nil } if err != nil { return err } log.Printf("resp: %v", resp) } } func main() { // 新建gRPC服务器实例 grpcServer := grpc.NewServer() // 在gRPC服务器注册我们的服务 pb.RegisterHelloServiceServer(grpcServer, new(HelloServiceImpl)) lis, err := net.Listen("tcp", ":1234") if err != nil { log.Fatal(err) } log.Println(" net.Listing...") err = grpcServer.Serve(lis) if err != nil { log.Fatalf("grpcServer.Serve err: %v", err) } } 复制代码
client:
package main import ( "context" "google.golang.org/grpc" "io" "log" pb "rpc/proto" // 设置引用别名 ) // SayHello 调用服务端的 Hello 方法 func SayHello(client pb.HelloServiceClient, r *pb.String) error { stream, _ := client.Hello(context.Background()) for n := 0; n <= 3; n++ { _ = stream.Send(r) resp, err := stream.Recv() if err == io.EOF { break } if err != nil { return err } log.Printf("resp err: %v", resp) } _ = stream.CloseSend() return nil } func main() { conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure()) if err != nil { log.Fatal("dialing err:", err) } defer conn.Close() // 建立gRPC连接 client := pb.NewHelloServiceClient(conn) // 创建发送结构体 req := pb.String{ Value: "stream server grpc ", } SayHello(client, &req) } 复制代码
服务端在循环中接收客户端发来的数据,如果遇到io.EOF表示客户端流被关闭,如果函数退出表示服 务端流关闭。生成返回的数据通过流发送给客户端,双向流数据的发送和接收都是完全独立的行为。需 要注意的是,发送和接收的操作并不需要一一对应,用户可以根据真实场景进行组织代码。
开启服务器端,开启客户端。执行结果如下:
$ go run server.go 2021/11/17 15:46:10 net.Listing... 2021/11/17 15:46:19 resp: value:"stream server grpc " 2021/11/17 15:46:19 resp: value:"stream server grpc " 2021/11/17 15:46:19 resp: value:"stream server grpc " 2021/11/17 15:46:19 resp: value:"stream server grpc " 复制代码
$ go run client.go 2021/11/17 15:46:19 resp err: value:"say.hello" 2021/11/17 15:46:19 resp err: value:"say.hello" 2021/11/17 15:46:19 resp err: value:"say.hello" 2021/11/17 15:46:19 resp err: value:"say.hello"