gRPC三种流和消息格式(二)

简介: gRPC三种流和消息格式

gRPC三种流和消息格式(一)https://developer.aliyun.com/article/1392117


客户端代码

package main
import (
  "context"
  "flag"
  "io"
  "log"
  "time"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  pb "mygrpc/proto/hello" // 引入编译生成的包
)
const (
  defaultName = "world"
)
var (
  addr = flag.String("addr", "localhost:50051", "the address to connect to")
  name = flag.String("name", defaultName, "Name to greet")
)
func main() {
  flag.Parse()
  // 与服务建立连接.
  conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
  if err != nil {
    log.Fatalf("did not connect: %v", err)
  }
  defer func(conn *grpc.ClientConn) {
    err := conn.Close()
    if err != nil {
    }
  }(conn)
  // 创建指定服务的客户端
  c := pb.NewGreeterClient(conn)
  // 连接服务器并打印出其响应。
  ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  defer cancel()
  // 调用指定方法
  searchStream, _ := c.SearchOrders(ctx, &pb.HelloRequest{Name: "开始服务端rpc流测试"})
  for {
    // 客户端 Recv 方法接收服务端发送的流
    searchOrder, err := searchStream.Recv()
    if err == io.EOF {
      log.Print("EOF")
      break
    }
    if err == nil {
      log.Print("Search Result : ", searchOrder)
    }
  }
  r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name})
  if err != nil {
    log.Fatalf("could not greet: %v", err)
  }
  log.Printf("Greeting: %s", r.GetMessage())
}

客户流RPC

客户端多个请求发给服务端,服务端发送一个响应给客户端,比如更新业务,客户端的读个请求发过来,服务端更新完返回一个成功的结果

在protobuf中的 service添加以下代码

rpc updateOrders(stream HelloRequest) returns (stream HelloReply);

服务端代码

func (s *server) UpdateOrders(stream pb.OrderManagement_UpdateOrdersServer) error {
  ordersStr := "Updated Order IDs : "
  for {
    // Recv 对客户端发来的请求接收
    order, err := stream.Recv()
    if err == io.EOF {
      // 流结束,关闭并发送响应给客户端
      return stream.SendAndClose(&wrapper.StringValue{Value: "Orders processed " + ordersStr})
    }
    if err != nil {
      return err
    }
    // 更新数据
    orderMap[order.Id] = *order
    log.Printf("Order ID : %s - %s", order.Id, "Updated")
    ordersStr += order.Id + ", "
  }
}

客户端代码

package main
import (
  "context"
  "flag"
  "log"
  "time"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  pb "mygrpc/proto/hello" // 引入编译生成的包
)
const (
  defaultName = "world"
)
var (
  addr = flag.String("addr", "localhost:50051", "the address to connect to")
  name = flag.String("name", defaultName, "Name to greet")
)
func main() {
  flag.Parse()
  // 与服务建立连接.
  conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
  if err != nil {
    log.Fatalf("did not connect: %v", err)
  }
  defer func(conn *grpc.ClientConn) {
    err := conn.Close()
    if err != nil {
    }
  }(conn)
  // 创建指定服务的客户端
  c := pb.NewGreeterClient(conn)
  // 连接服务器并打印出其响应。
  ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  defer cancel()
  // 调用指定方法
  updateStream, err := c.UpdateOrders(ctx)
  if err != nil {
    log.Fatalf("%v.UpdateOrders(_) = _, %v", c, err)
  }
  // Updating order 1
  if err := updateStream.Send(&pb.HelloRequest{Name: "1"}); err != nil {
    log.Fatalf("%v.Send(%v) = %v", updateStream, &pb.HelloRequest{Name: "1"}, err)
  }
  // Updating order 2
  if err := updateStream.Send(&pb.HelloRequest{Name: "2"}); err != nil {
    log.Fatalf("%v.Send(%v) = %v", updateStream, &pb.HelloRequest{Name: "2"}, err)
  }
  // 发送关闭信号并接收服务端响应
  err = updateStream.CloseSend()
  if err != nil {
    log.Fatalf("%v.CloseAndRecv() got error %v, want %v", updateStream, err, nil)
  }
  log.Printf("客户端流传输结束")
}

双工流RPC

对应的业务就比如实时的消息流

protobuf

// 设置双工rpc
  rpc processOrders(stream HelloRequest) returns (stream HelloReply);

服务端

package main
import (
  "flag"
  "fmt"
  "google.golang.org/grpc"
  "io"
  "log"
  pb "mygrpc/proto/hello"
  "net"
  "sync"
)
var (
  port = flag.Int("port", 50051, "The service port")
)
type server struct {
  pb.UnimplementedGreeterServer
}
func (s *server) ProcessOrders(stream pb.Greeter_ProcessOrdersServer) error {
  var (
    waitGroup sync.WaitGroup // 一组 goroutine 的结束
    // 设置通道
    msgCh = make(chan *pb.HelloReply)
  )
  // 计数器加1
  waitGroup.Add(1)
  // 消费队列中的内容
  go func() {
    // 计数器减一
    defer waitGroup.Done()
    for {
      v := <-msgCh
      fmt.Println(v)
      err := stream.Send(v)
      if err != nil {
        fmt.Println("Send error:", err)
        break
      }
    }
  }()
  waitGroup.Add(1)
  // 向队列中添加内容
  go func() {
    defer waitGroup.Done()
    for {
      req, err := stream.Recv()
      if err == io.EOF {
        break
      }
      if err != nil {
        log.Fatalf("recv error:%v", err)
      }
      fmt.Printf("Recved :%v \n", req.GetName())
      msgCh <- &pb.HelloReply{Message: "服务端传输数据"}
    }
    close(msgCh)
  }()
  // 等待 计数器问0 推出
  waitGroup.Wait()
  // 返回nil表示已经完成响应
  return nil
}
func main() {
  flag.Parse()
  lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
  if err != nil {
    log.Fatalf("failed to listen: %v", err)
  }
  // 开启rpc
  s := grpc.NewServer()
  // 注册服务
  pb.RegisterGreeterServer(s, &server{})
  log.Printf("service listening at %v", lis.Addr())
  if err := s.Serve(lis); err != nil {
    log.Fatalf("failed to serve: %v", err)
  }
}

客户端

package main
import (
  "context"
  "flag"
  "fmt"
  "io"
  "log"
  "sync"
  "time"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  pb "mygrpc/proto/hello" // 引入编译生成的包
)
const (
  defaultName = "world"
)
var (
  addr = flag.String("addr", "localhost:50051", "the address to connect to")
  name = flag.String("name", defaultName, "Name to greet")
)
func main() {
  flag.Parse()
  // 与服务建立连接.
  conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
  if err != nil {
    log.Fatalf("did not connect: %v", err)
  }
  defer func(conn *grpc.ClientConn) {
    err := conn.Close()
    if err != nil {
    }
  }(conn)
  // 创建指定服务的客户端
  c := pb.NewGreeterClient(conn)
  // 连接服务器并打印出其响应。
  ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  defer cancel()
  // 设置
  var wg sync.WaitGroup
  // 调用指定方法
  stream, _ := c.ProcessOrders(ctx)
  if err != nil {
    panic(err)
  }
  // 3.开两个goroutine 分别用于Recv()和Send()
  wg.Add(1)
  go func() {
    defer wg.Done()
    for {
      req, err := stream.Recv()
      if err == io.EOF {
        fmt.Println("Server Closed")
        break
      }
      if err != nil {
        continue
      }
      fmt.Printf("Recv Data:%v \n", req.GetMessage())
    }
  }()
  wg.Add(1)
  go func() {
    defer wg.Done()
    for i := 0; i < 2; i++ {
      err := stream.Send(&pb.HelloRequest{Name: "hello world"})
      if err != nil {
        log.Printf("send error:%v\n", err)
      }
    }
    // 4. 发送完毕关闭stream
    err := stream.CloseSend()
    if err != nil {
      log.Printf("Send error:%v\n", err)
      return
    }
  }()
  wg.Wait()
  log.Printf("服务端流传输结束")
}

代码仓库

https://github.com/onenewcode/mygrpc.git

也可以直接下载绑定的资源。

相关文章
|
存储 JSON Rust
【一起学Rust | 进阶篇 | reqwest库】纯 Rust 编写的 HTTP 客户端——reqwest
【一起学Rust | 进阶篇 | reqwest库】纯 Rust 编写的 HTTP 客户端——reqwest
2445 0
|
8月前
|
Web App开发 网络协议 应用服务中间件
HTTP2.0 从原理到实践,保证把你治得服服帖帖!
HTTP/2 是 HTTP/1.1 的重要升级,通过多路复用、头部压缩、服务器推送等特性显著提升性能与效率。本文详细解析了 HTTP/2 的优势、配置方法及实际应用,涵盖 Nginx/Apache/IIS 配置、curl 测试工具使用,并对比 HTTP/1.1 指出其优化点。同时提醒需注意 HTTPS 支持、客户端兼容性等问题,助你高效掌握并运用 HTTP/2 技术。
938 5
HTTP2.0 从原理到实践,保证把你治得服服帖帖!
|
Go
Golang语言之gRPC程序设计示例
这篇文章是关于Golang语言使用gRPC进行程序设计的详细教程,涵盖了RPC协议的介绍、gRPC环境的搭建、Protocol Buffers的使用、gRPC服务的编写和通信示例。
614 3
Golang语言之gRPC程序设计示例
|
算法 人机交互 调度
进程调度算法_轮转调度算法_优先级调度算法_多级反馈队列调度算法
轮转调度算法(RR)是一种常用且简单的调度方法,通过给每个进程分配一小段CPU运行时间来轮流执行。进程切换发生在当前进程完成或时间片用尽时。优先级调度算法则根据进程的紧迫性赋予不同优先级,高优先级进程优先执行,并分为抢占式和非抢占式。多队列调度算法通过设置多个具有不同优先级的就绪队列,采用多级反馈队列优先调度机制,以满足不同类型用户的需求,从而优化整体调度性能。
990 15
|
机器学习/深度学习 存储 人工智能
这7个矢量数据库你应该知道!
这7个矢量数据库你应该知道!
5594 10
|
JavaScript
深入理解汇编中的ZF、OF、SF标志位和条件跳转
深入理解汇编中的ZF、OF、SF标志位和条件跳转
1782 0
|
JavaScript 前端开发 安全
TypeScript的主要优势有哪些?
【6月更文挑战第1天】TypeScript的主要优势有哪些?
430 9
|
人工智能 JSON 安全
Golang 搭建 WebSocket 应用(三) - 实现一个消息推送中心
Golang 搭建 WebSocket 应用(三) - 实现一个消息推送中心
396 0

热门文章

最新文章