gRPC三种流和消息格式(二)

简介: gRPC三种流和消息格式

gRPC三种流和消息格式(一)https://developer.aliyun.com/article/1392117


客户端代码

package main
import (
  "context"
  "flag"
  "io"
  "log"
  "time"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  pb "mygrpc/proto/hello" // 引入编译生成的包
)
const (
  defaultName = "world"
)
var (
  addr = flag.String("addr", "localhost:50051", "the address to connect to")
  name = flag.String("name", defaultName, "Name to greet")
)
func main() {
  flag.Parse()
  // 与服务建立连接.
  conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
  if err != nil {
    log.Fatalf("did not connect: %v", err)
  }
  defer func(conn *grpc.ClientConn) {
    err := conn.Close()
    if err != nil {
    }
  }(conn)
  // 创建指定服务的客户端
  c := pb.NewGreeterClient(conn)
  // 连接服务器并打印出其响应。
  ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  defer cancel()
  // 调用指定方法
  searchStream, _ := c.SearchOrders(ctx, &pb.HelloRequest{Name: "开始服务端rpc流测试"})
  for {
    // 客户端 Recv 方法接收服务端发送的流
    searchOrder, err := searchStream.Recv()
    if err == io.EOF {
      log.Print("EOF")
      break
    }
    if err == nil {
      log.Print("Search Result : ", searchOrder)
    }
  }
  r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name})
  if err != nil {
    log.Fatalf("could not greet: %v", err)
  }
  log.Printf("Greeting: %s", r.GetMessage())
}

客户流RPC

客户端多个请求发给服务端,服务端发送一个响应给客户端,比如更新业务,客户端的读个请求发过来,服务端更新完返回一个成功的结果

在protobuf中的 service添加以下代码

rpc updateOrders(stream HelloRequest) returns (stream HelloReply);

服务端代码

func (s *server) UpdateOrders(stream pb.OrderManagement_UpdateOrdersServer) error {
  ordersStr := "Updated Order IDs : "
  for {
    // Recv 对客户端发来的请求接收
    order, err := stream.Recv()
    if err == io.EOF {
      // 流结束,关闭并发送响应给客户端
      return stream.SendAndClose(&wrapper.StringValue{Value: "Orders processed " + ordersStr})
    }
    if err != nil {
      return err
    }
    // 更新数据
    orderMap[order.Id] = *order
    log.Printf("Order ID : %s - %s", order.Id, "Updated")
    ordersStr += order.Id + ", "
  }
}

客户端代码

package main
import (
  "context"
  "flag"
  "log"
  "time"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  pb "mygrpc/proto/hello" // 引入编译生成的包
)
const (
  defaultName = "world"
)
var (
  addr = flag.String("addr", "localhost:50051", "the address to connect to")
  name = flag.String("name", defaultName, "Name to greet")
)
func main() {
  flag.Parse()
  // 与服务建立连接.
  conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
  if err != nil {
    log.Fatalf("did not connect: %v", err)
  }
  defer func(conn *grpc.ClientConn) {
    err := conn.Close()
    if err != nil {
    }
  }(conn)
  // 创建指定服务的客户端
  c := pb.NewGreeterClient(conn)
  // 连接服务器并打印出其响应。
  ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  defer cancel()
  // 调用指定方法
  updateStream, err := c.UpdateOrders(ctx)
  if err != nil {
    log.Fatalf("%v.UpdateOrders(_) = _, %v", c, err)
  }
  // Updating order 1
  if err := updateStream.Send(&pb.HelloRequest{Name: "1"}); err != nil {
    log.Fatalf("%v.Send(%v) = %v", updateStream, &pb.HelloRequest{Name: "1"}, err)
  }
  // Updating order 2
  if err := updateStream.Send(&pb.HelloRequest{Name: "2"}); err != nil {
    log.Fatalf("%v.Send(%v) = %v", updateStream, &pb.HelloRequest{Name: "2"}, err)
  }
  // 发送关闭信号并接收服务端响应
  err = updateStream.CloseSend()
  if err != nil {
    log.Fatalf("%v.CloseAndRecv() got error %v, want %v", updateStream, err, nil)
  }
  log.Printf("客户端流传输结束")
}

双工流RPC

对应的业务就比如实时的消息流

protobuf

// 设置双工rpc
  rpc processOrders(stream HelloRequest) returns (stream HelloReply);

服务端

package main
import (
  "flag"
  "fmt"
  "google.golang.org/grpc"
  "io"
  "log"
  pb "mygrpc/proto/hello"
  "net"
  "sync"
)
var (
  port = flag.Int("port", 50051, "The service port")
)
type server struct {
  pb.UnimplementedGreeterServer
}
func (s *server) ProcessOrders(stream pb.Greeter_ProcessOrdersServer) error {
  var (
    waitGroup sync.WaitGroup // 一组 goroutine 的结束
    // 设置通道
    msgCh = make(chan *pb.HelloReply)
  )
  // 计数器加1
  waitGroup.Add(1)
  // 消费队列中的内容
  go func() {
    // 计数器减一
    defer waitGroup.Done()
    for {
      v := <-msgCh
      fmt.Println(v)
      err := stream.Send(v)
      if err != nil {
        fmt.Println("Send error:", err)
        break
      }
    }
  }()
  waitGroup.Add(1)
  // 向队列中添加内容
  go func() {
    defer waitGroup.Done()
    for {
      req, err := stream.Recv()
      if err == io.EOF {
        break
      }
      if err != nil {
        log.Fatalf("recv error:%v", err)
      }
      fmt.Printf("Recved :%v \n", req.GetName())
      msgCh <- &pb.HelloReply{Message: "服务端传输数据"}
    }
    close(msgCh)
  }()
  // 等待 计数器问0 推出
  waitGroup.Wait()
  // 返回nil表示已经完成响应
  return nil
}
func main() {
  flag.Parse()
  lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
  if err != nil {
    log.Fatalf("failed to listen: %v", err)
  }
  // 开启rpc
  s := grpc.NewServer()
  // 注册服务
  pb.RegisterGreeterServer(s, &server{})
  log.Printf("service listening at %v", lis.Addr())
  if err := s.Serve(lis); err != nil {
    log.Fatalf("failed to serve: %v", err)
  }
}

客户端

package main
import (
  "context"
  "flag"
  "fmt"
  "io"
  "log"
  "sync"
  "time"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  pb "mygrpc/proto/hello" // 引入编译生成的包
)
const (
  defaultName = "world"
)
var (
  addr = flag.String("addr", "localhost:50051", "the address to connect to")
  name = flag.String("name", defaultName, "Name to greet")
)
func main() {
  flag.Parse()
  // 与服务建立连接.
  conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
  if err != nil {
    log.Fatalf("did not connect: %v", err)
  }
  defer func(conn *grpc.ClientConn) {
    err := conn.Close()
    if err != nil {
    }
  }(conn)
  // 创建指定服务的客户端
  c := pb.NewGreeterClient(conn)
  // 连接服务器并打印出其响应。
  ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  defer cancel()
  // 设置
  var wg sync.WaitGroup
  // 调用指定方法
  stream, _ := c.ProcessOrders(ctx)
  if err != nil {
    panic(err)
  }
  // 3.开两个goroutine 分别用于Recv()和Send()
  wg.Add(1)
  go func() {
    defer wg.Done()
    for {
      req, err := stream.Recv()
      if err == io.EOF {
        fmt.Println("Server Closed")
        break
      }
      if err != nil {
        continue
      }
      fmt.Printf("Recv Data:%v \n", req.GetMessage())
    }
  }()
  wg.Add(1)
  go func() {
    defer wg.Done()
    for i := 0; i < 2; i++ {
      err := stream.Send(&pb.HelloRequest{Name: "hello world"})
      if err != nil {
        log.Printf("send error:%v\n", err)
      }
    }
    // 4. 发送完毕关闭stream
    err := stream.CloseSend()
    if err != nil {
      log.Printf("Send error:%v\n", err)
      return
    }
  }()
  wg.Wait()
  log.Printf("服务端流传输结束")
}

代码仓库

https://github.com/onenewcode/mygrpc.git

也可以直接下载绑定的资源。

目录
打赏
0
0
0
0
5
分享
相关文章
|
9月前
gRPC三种流和消息格式(一)
gRPC三种流和消息格式
297 0
|
8月前
|
gRPC 四模式之 双向流RPC模式
gRPC 四模式之 双向流RPC模式
366 0
|
8月前
|
gRPC 四模式之 客户端流RPC模式
gRPC 四模式之 客户端流RPC模式
81 0
|
8月前
|
C++
gRPC 四模式之 服务器端流RPC模式
gRPC 四模式之 服务器端流RPC模式
160 0
java流是指在Java中用来读写数据的一组有序的数据序列,它可以将数据从一个地方带到另一个地方。java流分为输入流和输出流,输入流是从源读取数据的流,而输出流是将数据写入到目的地的流。Java流又可以分为字节流和字符流,字节流读取的最小单位是一个字节(1byte=8bit),而字符流一次可以读取一个字符(1char = 2byte = 16bit)。Java流还可以分为节点流和处理流,节点流是直接从一个源读写数据的流(这个流没有经过包装和修饰),处理流是在对节点流封装的基础上的一种流。
143 0
HTTP鸡础(传输协议,特点,历史版本,请求消息数据格式)
HTTP鸡础(传输协议,特点,历史版本,请求消息数据格式)
开始使用流
Java 8 中的 Stream 俗称为流,它与 java.io 包里的 InputStream 和 OutputStream 是完全不同的概念 Stream 用于对集合对象进行各种非常便利、高效的聚合操作,或者大批量数据操作 Stream API 借助于 Lambda 表达式,极大的提高编程效率和程序可读性 同时它提供串行和并行两种模式进行汇聚操作,并发模式能够充分利用多核处理器的优势 通过下面的例子我们可以初步体会到使用 Stream 处理集合的便利性
69 1
I/O流
IO流:I的全称是Input,O的全称是Output。表示读取,流可以看做是程序传输数据的通道。 作用:解决程序请求资源,输出资源的问题。
65 0
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等