gRPC三种流和消息格式(二)

简介: gRPC三种流和消息格式

gRPC三种流和消息格式(一)https://developer.aliyun.com/article/1392117


客户端代码

package main
import (
  "context"
  "flag"
  "io"
  "log"
  "time"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  pb "mygrpc/proto/hello" // 引入编译生成的包
)
const (
  defaultName = "world"
)
var (
  addr = flag.String("addr", "localhost:50051", "the address to connect to")
  name = flag.String("name", defaultName, "Name to greet")
)
func main() {
  flag.Parse()
  // 与服务建立连接.
  conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
  if err != nil {
    log.Fatalf("did not connect: %v", err)
  }
  defer func(conn *grpc.ClientConn) {
    err := conn.Close()
    if err != nil {
    }
  }(conn)
  // 创建指定服务的客户端
  c := pb.NewGreeterClient(conn)
  // 连接服务器并打印出其响应。
  ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  defer cancel()
  // 调用指定方法
  searchStream, _ := c.SearchOrders(ctx, &pb.HelloRequest{Name: "开始服务端rpc流测试"})
  for {
    // 客户端 Recv 方法接收服务端发送的流
    searchOrder, err := searchStream.Recv()
    if err == io.EOF {
      log.Print("EOF")
      break
    }
    if err == nil {
      log.Print("Search Result : ", searchOrder)
    }
  }
  r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name})
  if err != nil {
    log.Fatalf("could not greet: %v", err)
  }
  log.Printf("Greeting: %s", r.GetMessage())
}

客户流RPC

客户端多个请求发给服务端,服务端发送一个响应给客户端,比如更新业务,客户端的读个请求发过来,服务端更新完返回一个成功的结果

在protobuf中的 service添加以下代码

rpc updateOrders(stream HelloRequest) returns (stream HelloReply);

服务端代码

func (s *server) UpdateOrders(stream pb.OrderManagement_UpdateOrdersServer) error {
  ordersStr := "Updated Order IDs : "
  for {
    // Recv 对客户端发来的请求接收
    order, err := stream.Recv()
    if err == io.EOF {
      // 流结束,关闭并发送响应给客户端
      return stream.SendAndClose(&wrapper.StringValue{Value: "Orders processed " + ordersStr})
    }
    if err != nil {
      return err
    }
    // 更新数据
    orderMap[order.Id] = *order
    log.Printf("Order ID : %s - %s", order.Id, "Updated")
    ordersStr += order.Id + ", "
  }
}

客户端代码

package main
import (
  "context"
  "flag"
  "log"
  "time"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  pb "mygrpc/proto/hello" // 引入编译生成的包
)
const (
  defaultName = "world"
)
var (
  addr = flag.String("addr", "localhost:50051", "the address to connect to")
  name = flag.String("name", defaultName, "Name to greet")
)
func main() {
  flag.Parse()
  // 与服务建立连接.
  conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
  if err != nil {
    log.Fatalf("did not connect: %v", err)
  }
  defer func(conn *grpc.ClientConn) {
    err := conn.Close()
    if err != nil {
    }
  }(conn)
  // 创建指定服务的客户端
  c := pb.NewGreeterClient(conn)
  // 连接服务器并打印出其响应。
  ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  defer cancel()
  // 调用指定方法
  updateStream, err := c.UpdateOrders(ctx)
  if err != nil {
    log.Fatalf("%v.UpdateOrders(_) = _, %v", c, err)
  }
  // Updating order 1
  if err := updateStream.Send(&pb.HelloRequest{Name: "1"}); err != nil {
    log.Fatalf("%v.Send(%v) = %v", updateStream, &pb.HelloRequest{Name: "1"}, err)
  }
  // Updating order 2
  if err := updateStream.Send(&pb.HelloRequest{Name: "2"}); err != nil {
    log.Fatalf("%v.Send(%v) = %v", updateStream, &pb.HelloRequest{Name: "2"}, err)
  }
  // 发送关闭信号并接收服务端响应
  err = updateStream.CloseSend()
  if err != nil {
    log.Fatalf("%v.CloseAndRecv() got error %v, want %v", updateStream, err, nil)
  }
  log.Printf("客户端流传输结束")
}

双工流RPC

对应的业务就比如实时的消息流

protobuf

// 设置双工rpc
  rpc processOrders(stream HelloRequest) returns (stream HelloReply);

服务端

package main
import (
  "flag"
  "fmt"
  "google.golang.org/grpc"
  "io"
  "log"
  pb "mygrpc/proto/hello"
  "net"
  "sync"
)
var (
  port = flag.Int("port", 50051, "The service port")
)
type server struct {
  pb.UnimplementedGreeterServer
}
func (s *server) ProcessOrders(stream pb.Greeter_ProcessOrdersServer) error {
  var (
    waitGroup sync.WaitGroup // 一组 goroutine 的结束
    // 设置通道
    msgCh = make(chan *pb.HelloReply)
  )
  // 计数器加1
  waitGroup.Add(1)
  // 消费队列中的内容
  go func() {
    // 计数器减一
    defer waitGroup.Done()
    for {
      v := <-msgCh
      fmt.Println(v)
      err := stream.Send(v)
      if err != nil {
        fmt.Println("Send error:", err)
        break
      }
    }
  }()
  waitGroup.Add(1)
  // 向队列中添加内容
  go func() {
    defer waitGroup.Done()
    for {
      req, err := stream.Recv()
      if err == io.EOF {
        break
      }
      if err != nil {
        log.Fatalf("recv error:%v", err)
      }
      fmt.Printf("Recved :%v \n", req.GetName())
      msgCh <- &pb.HelloReply{Message: "服务端传输数据"}
    }
    close(msgCh)
  }()
  // 等待 计数器问0 推出
  waitGroup.Wait()
  // 返回nil表示已经完成响应
  return nil
}
func main() {
  flag.Parse()
  lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
  if err != nil {
    log.Fatalf("failed to listen: %v", err)
  }
  // 开启rpc
  s := grpc.NewServer()
  // 注册服务
  pb.RegisterGreeterServer(s, &server{})
  log.Printf("service listening at %v", lis.Addr())
  if err := s.Serve(lis); err != nil {
    log.Fatalf("failed to serve: %v", err)
  }
}

客户端

package main
import (
  "context"
  "flag"
  "fmt"
  "io"
  "log"
  "sync"
  "time"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  pb "mygrpc/proto/hello" // 引入编译生成的包
)
const (
  defaultName = "world"
)
var (
  addr = flag.String("addr", "localhost:50051", "the address to connect to")
  name = flag.String("name", defaultName, "Name to greet")
)
func main() {
  flag.Parse()
  // 与服务建立连接.
  conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
  if err != nil {
    log.Fatalf("did not connect: %v", err)
  }
  defer func(conn *grpc.ClientConn) {
    err := conn.Close()
    if err != nil {
    }
  }(conn)
  // 创建指定服务的客户端
  c := pb.NewGreeterClient(conn)
  // 连接服务器并打印出其响应。
  ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  defer cancel()
  // 设置
  var wg sync.WaitGroup
  // 调用指定方法
  stream, _ := c.ProcessOrders(ctx)
  if err != nil {
    panic(err)
  }
  // 3.开两个goroutine 分别用于Recv()和Send()
  wg.Add(1)
  go func() {
    defer wg.Done()
    for {
      req, err := stream.Recv()
      if err == io.EOF {
        fmt.Println("Server Closed")
        break
      }
      if err != nil {
        continue
      }
      fmt.Printf("Recv Data:%v \n", req.GetMessage())
    }
  }()
  wg.Add(1)
  go func() {
    defer wg.Done()
    for i := 0; i < 2; i++ {
      err := stream.Send(&pb.HelloRequest{Name: "hello world"})
      if err != nil {
        log.Printf("send error:%v\n", err)
      }
    }
    // 4. 发送完毕关闭stream
    err := stream.CloseSend()
    if err != nil {
      log.Printf("Send error:%v\n", err)
      return
    }
  }()
  wg.Wait()
  log.Printf("服务端流传输结束")
}

代码仓库

https://github.com/onenewcode/mygrpc.git

也可以直接下载绑定的资源。

相关文章
|
5月前
gRPC三种流和消息格式(一)
gRPC三种流和消息格式
109 0
|
1月前
|
人工智能 Java
通过okhttp调用SSE流式接口,并将消息返回给客户端
通过okhttp调用SSE流式接口,并将消息返回给客户端
|
5月前
|
网络协议 数据格式
HTTP鸡础(传输协议,特点,历史版本,请求消息数据格式)
HTTP鸡础(传输协议,特点,历史版本,请求消息数据格式)
|
5月前
|
消息中间件 JSON 监控
Kafka 的消息格式:了解消息结构与序列化
Kafka 作为一款高性能的消息中间件系统,其消息格式对于消息的生产、传输和消费起着至关重要的作用。本篇博客将深入讨论 Kafka 的消息格式,包括消息的结构、序列化与反序列化,以及一些常用的消息格式选项。通过更丰富的示例代码和深入的解析,希望能够帮助大家更好地理解 Kafka 消息的内部机制。
|
7月前
|
网络协议 索引
HTTP/2 协议(帧、消息、流简单的抓包分析)
HTTP/2 协议(帧、消息、流简单的抓包分析)
123 0
|
8月前
|
定位技术 开发工具 Windows
如何在RTSP/RTMP直播过程中加入SEI扩展数据发送和接收解析
在直播系统中,除了直播音视频之外,有时候还想从主播端发布文本信息等,这些信息可以不通过视频传输通道发送给用户播放端,但如果传输的数据想和视频保持精准同步,那最好的办法就是这些信息和视频数据打包在一起传输,并通过h264 sei方式就可以把数据放入h264 Access Unit中传输。
189 0
|
9月前
|
编解码 Linux 定位技术
如何在轻量级RTSP服务支持H.264扩展SEI发送接收自定义数据?
如何在轻量级RTSP服务支持H.264扩展SEI发送接收自定义数据?
114 0
|
存储 JSON 编解码
06-gRPC收发请求过程解析
Google 开发并且开源的一款高性能、跨语言的 RPC 框架,当前支持 C、Java 和 Go。跨语言,通信协议基于HTTP/2,序列化支持 PB(Protocol Buffer)和 JSON。
184 0
|
网络协议 大数据 Go
gRPC(四)基础:gRPC流
Client发送完成后需要手动调用Close()或者CloseSend()方法关闭stream,Server端则return nil就会自动 Close。
625 0
gRPC(四)基础:gRPC流
|
存储 传感器 移动开发
Alink 格式报文发送 | 学习笔记
快速学习 Alink 格式报文发送
117 0
Alink 格式报文发送 | 学习笔记