gRPC三种流和消息格式(一)

简介: gRPC三种流和消息格式

消息格式

RPC流

服务端实现protocol buffer定义的方法,客户端保留一个存根,提供服务端方法的抽象,客户端只需要调用存根中的方法,就可以远程调用服务端方法。

  • 调用存根方法
  • 存根创建HTTP POST请求(gRPC中所有请求都是 POST),设置content-typeapplication/grpc
  • 到达服务端,会先检查请求头是不是gRPC请求,否则返回415

长度前缀的消息分帧

在写入消息前,先写入长度消息表明每条消息的大小。

每条消息都有额外的4字节来设置大小,也就是说消息的大小不能超过4GB

帧首中还有单字节无符号整数,用来表明数据是否进行了压缩

为1表示使用 message-encoding中的编码机制进行了压缩

请求消息

客户端发送,包含3个部分:请求头信息、长度前缀的消息、流结束标记

1、对于gRPC 都是POST

2、协议:Http/Https

3、/服务名/方法名

4、目标URI的主机名

5、对不兼容代理的检测,gRPC下这个值必须为 trailers

6、超时时间

7、媒体类型

8、压缩类型

当因为没有要发送的数据而需要关闭请求流时,必须发送一个带标记的空数据帧

响应信息

服务端发送,包含3个部分:响应头信息、长度前缀的信息、trailers

END_STREAM 标记不会随数据帧一起发送,而是作为单独的头信息来发送,名为 trailer

三种流

一元RPC

通信时始终只有一个请求和一个响应

protocol buffer

syntax = "proto3";
package hello;
// 第一个分割参数,输出路径;第二个设置生成类的包路径
option go_package = "./proto/hello";
// 设置服务名称
service Greeter {
  // 设置方法
  rpc SayHello (HelloRequest) returns (HelloReply) {}
}
// 请求信息用户名.
message HelloRequest {
  string name = 1;
}
// 响应信息
message HelloReply {
  string message = 1;
}

服务端

package main
import (
  "context"
  "flag"
  "fmt"
  "log"
  "net"
  "google.golang.org/grpc"
  pb "mygrpc/proto/hello"
)
var (
  port = flag.Int("port", 50051, "The server port")
)
type server struct {
  pb.UnimplementedGreeterServer
}
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
  log.Printf("Received: %v", in.GetName())
  return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil
}
func main() {
  flag.Parse()
  lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
  if err != nil {
    log.Fatalf("failed to listen: %v", err)
  }
  // 开启rpc
  s := grpc.NewServer()
  // 注册服务
  pb.RegisterGreeterServer(s, &server{})
  log.Printf("server listening at %v", lis.Addr())
  if err := s.Serve(lis); err != nil {
    log.Fatalf("failed to serve: %v", err)
  }
}

客户端

package main
import (
  "context"
  "flag"
  "log"
  "time"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  pb "mygrpc/proto/hello" // 引入编译生成的包
)
const (
  defaultName = "world"
)
var (
  addr = flag.String("addr", "localhost:50051", "the address to connect to")
  name = flag.String("name", defaultName, "Name to greet")
)
func main() {
  flag.Parse()
  // 与服务建立连接.
  conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
  if err != nil {
    log.Fatalf("did not connect: %v", err)
  }
  defer conn.Close()
  // 创建指定服务的客户端
  c := pb.NewGreeterClient(conn)
  // 连接服务器并打印出其响应。
  ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  defer cancel()
  // 调用指定方法
  r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name})
  if err != nil {
    log.Fatalf("could not greet: %v", err)
  }
  log.Printf("Greeting: %s", r.GetMessage())
}

服务流RPC

通信时可以是一个请求,服务端多次响应,比如查询业务,服务端模糊匹配找到一次就返回客户端一次响应这样的多次响应。

在protobuf中的 service添加以下代码

rpc searchOrders(google.protobuf.StringValue) returns (stream Order);

服务端代码

package main
import (
  "context"
  "flag"
  "fmt"
  "google.golang.org/grpc"
  "io"
  "log"
  pb "mygrpc/proto/hello"
  "net"
)
var (
  port = flag.Int("port", 50051, "The service port")
)
type server struct {
  pb.UnimplementedGreeterServer
}
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
  log.Printf("Received: %v", in.GetName())
  return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil
}
func (s *server) SearchOrders(req *pb.HelloRequest, stream pb.Greeter_SearchOrdersServer) error {
  log.Printf("Recved %v", req.GetName())
  // 具体返回多少个response根据业务逻辑调整
  for i := 0; i < 10; i++ {
    // 通过 send 方法不断推送数据
    err := stream.Send(&pb.HelloReply{})
    if err != nil {
      log.Fatalf("Send error:%v", err)
      return err
    }
  }
  return nil
}
func (s *server) UpdateOrders(stream pb.Greeter_UpdateOrdersServer) error {
  for {
    log.Println("开始接受客户端的流")
    // Recv 对客户端发来的请求接收
    order, err := stream.Recv()
    if err == io.EOF {
      // 流结束,关闭并发送响应给客户端
      return stream.Send(&pb.HelloReply{Message: "接受客户流结束"})
    }
    if err != nil {
      return err
    }
    // 更新数据
    log.Printf("Order ID : %s - %s", order.GetName(), "Updated")
  }
}
func main() {
  flag.Parse()
  lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
  if err != nil {
    log.Fatalf("failed to listen: %v", err)
  }
  // 开启rpc
  s := grpc.NewServer()
  // 注册服务
  pb.RegisterGreeterServer(s, &server{})
  log.Printf("service listening at %v", lis.Addr())
  if err := s.Serve(lis); err != nil {
    log.Fatalf("failed to serve: %v", err)
  }
}


gRPC三种流和消息格式(二)https://developer.aliyun.com/article/1392118

相关文章
|
存储 负载均衡 Cloud Native
gRPC的原理和实践
gRPC的原理和实践
860 1
gRPC的原理和实践
|
消息中间件 存储 运维
从 Kafka 2.x 到 Kafka 3.x:升级之旅
从 Kafka 2.x 到 Kafka 3.x:升级之旅
2568 2
|
8月前
|
机器学习/深度学习 算法 计算机视觉
《LSTM:视频目标跟踪中时间序列信息的高效利用者》
在视频目标跟踪中,充分利用时间序列信息以提高精度至关重要。长短期记忆网络(LSTM)凭借其独特的门控机制(遗忘门、输入门和输出门)及细胞状态,在处理时间序列数据方面表现出色。遗忘门可丢弃无关信息,输入门整合新特征,输出门筛选关键信息,有效应对目标动态变化与复杂背景干扰。结合目标检测算法如YOLO,LSTM能准确预测目标位置,实现连续稳定的跟踪。
222 14
|
存储 C++
gRPC 四模式之 双向流RPC模式
gRPC 四模式之 双向流RPC模式
856 0
|
11月前
|
JSON 安全 Go
Go语言中使用JWT鉴权、Token刷新完整示例,拿去直接用!
本文介绍了如何在 Go 语言中使用 Gin 框架实现 JWT 用户认证和安全保护。JWT(JSON Web Token)是一种轻量、高效的认证与授权解决方案,特别适合微服务架构。文章详细讲解了 JWT 的基本概念、结构以及如何在 Gin 中生成、解析和刷新 JWT。通过示例代码,展示了如何在实际项目中应用 JWT,确保用户身份验证和数据安全。完整代码可在 GitHub 仓库中查看。
1821 1
|
11月前
|
存储 JSON Java
ELK 圣经:Elasticsearch、Logstash、Kibana 从入门到精通
ELK是一套强大的日志管理和分析工具,广泛应用于日志监控、故障排查、业务分析等场景。本文档将详细介绍ELK的各个组件及其配置方法,帮助读者从零开始掌握ELK的使用。
|
运维 数据安全/隐私保护 Docker
Docker自建仓库之Docker Registry部署实战
关于如何使用Docker Registry镜像搭建本地私有Docker仓库的实战教程,包括了下载镜像、创建授权目录和用户名密码、启动Registry容器、验证端口和容器、测试登录仓库、上传和下载镜像的详细步骤。
3085 5
|
数据处理 Apache 流计算
实时计算引擎 Flink:从入门到深入理解
本篇详细介绍了Apache Flink实时计算引擎的基本概念和核心功能。从入门到深入,逐步介绍了Flink的数据源与接收、数据转换与计算、窗口操作以及状态管理等方面的内容,并附带代码示例进行实际操作演示。通过阅读本文,读者可以建立起对Flink实时计算引擎的全面理解,为实际项目中的实时数据处理提供了有力的指导和实践基础。
4857 2
|
Ubuntu Unix Shell
Nacos 国内镜像
Nacos 国内镜像
2527 1