消息格式
RPC流
服务端实现protocol buffer定义的方法,客户端保留一个存根,提供服务端方法的抽象,客户端只需要调用存根中的方法,就可以远程调用服务端方法。
- 调用存根方法
- 存根创建HTTP POST请求(gRPC中所有请求都是 POST),设置
content-type
为application/grpc
- 到达服务端,会先检查请求头是不是gRPC请求,否则返回415
长度前缀的消息分帧
在写入消息前,先写入长度消息表明每条消息的大小。
每条消息都有额外的4字节来设置大小,也就是说消息的大小不能超过4GB
帧首中还有单字节无符号整数,用来表明数据是否进行了压缩
为1表示使用 message-encoding
中的编码机制进行了压缩
请求消息
客户端发送,包含3个部分:请求头信息、长度前缀的消息、流结束标记
1、对于gRPC 都是POST
2、协议:Http/Https
3、/服务名/方法名
4、目标URI的主机名
5、对不兼容代理的检测,gRPC下这个值必须为 trailers
6、超时时间
7、媒体类型
8、压缩类型
当因为没有要发送的数据而需要关闭请求流时,必须发送一个带标记的空数据帧
响应信息
服务端发送,包含3个部分:响应头信息、长度前缀的信息、trailers
END_STREAM 标记不会随数据帧一起发送,而是作为单独的头信息来发送,名为 trailer
三种流
一元RPC
通信时始终只有一个请求和一个响应
protocol buffer
syntax = "proto3"; package hello; // 第一个分割参数,输出路径;第二个设置生成类的包路径 option go_package = "./proto/hello"; // 设置服务名称 service Greeter { // 设置方法 rpc SayHello (HelloRequest) returns (HelloReply) {} } // 请求信息用户名. message HelloRequest { string name = 1; } // 响应信息 message HelloReply { string message = 1; }
服务端
package main import ( "context" "flag" "fmt" "log" "net" "google.golang.org/grpc" pb "mygrpc/proto/hello" ) var ( port = flag.Int("port", 50051, "The server port") ) type server struct { pb.UnimplementedGreeterServer } func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) { log.Printf("Received: %v", in.GetName()) return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil } func main() { flag.Parse() lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port)) if err != nil { log.Fatalf("failed to listen: %v", err) } // 开启rpc s := grpc.NewServer() // 注册服务 pb.RegisterGreeterServer(s, &server{}) log.Printf("server listening at %v", lis.Addr()) if err := s.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) } }
客户端
package main import ( "context" "flag" "log" "time" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" pb "mygrpc/proto/hello" // 引入编译生成的包 ) const ( defaultName = "world" ) var ( addr = flag.String("addr", "localhost:50051", "the address to connect to") name = flag.String("name", defaultName, "Name to greet") ) func main() { flag.Parse() // 与服务建立连接. conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { log.Fatalf("did not connect: %v", err) } defer conn.Close() // 创建指定服务的客户端 c := pb.NewGreeterClient(conn) // 连接服务器并打印出其响应。 ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() // 调用指定方法 r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name}) if err != nil { log.Fatalf("could not greet: %v", err) } log.Printf("Greeting: %s", r.GetMessage()) }
服务流RPC
通信时可以是一个请求,服务端多次响应,比如查询业务,服务端模糊匹配找到一次就返回客户端一次响应这样的多次响应。
在protobuf中的 service添加以下代码
rpc searchOrders(google.protobuf.StringValue) returns (stream Order);
服务端代码
package main import ( "context" "flag" "fmt" "google.golang.org/grpc" "io" "log" pb "mygrpc/proto/hello" "net" ) var ( port = flag.Int("port", 50051, "The service port") ) type server struct { pb.UnimplementedGreeterServer } func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) { log.Printf("Received: %v", in.GetName()) return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil } func (s *server) SearchOrders(req *pb.HelloRequest, stream pb.Greeter_SearchOrdersServer) error { log.Printf("Recved %v", req.GetName()) // 具体返回多少个response根据业务逻辑调整 for i := 0; i < 10; i++ { // 通过 send 方法不断推送数据 err := stream.Send(&pb.HelloReply{}) if err != nil { log.Fatalf("Send error:%v", err) return err } } return nil } func (s *server) UpdateOrders(stream pb.Greeter_UpdateOrdersServer) error { for { log.Println("开始接受客户端的流") // Recv 对客户端发来的请求接收 order, err := stream.Recv() if err == io.EOF { // 流结束,关闭并发送响应给客户端 return stream.Send(&pb.HelloReply{Message: "接受客户流结束"}) } if err != nil { return err } // 更新数据 log.Printf("Order ID : %s - %s", order.GetName(), "Updated") } } func main() { flag.Parse() lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port)) if err != nil { log.Fatalf("failed to listen: %v", err) } // 开启rpc s := grpc.NewServer() // 注册服务 pb.RegisterGreeterServer(s, &server{}) log.Printf("service listening at %v", lis.Addr()) if err := s.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) } }
gRPC三种流和消息格式(二)https://developer.aliyun.com/article/1392118