Go微服务(三)——gRPC详细入门 下

本文涉及的产品
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,182元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: Go微服务(三)——gRPC详细入门 下

3. gRPC Stream

这一部分我们本次不作为重点内容,暂时简单过一遍。

3.1 为什么需要Stream RPC

文件上传下载:比如你上传的文件有500个G,你不可能全部存在内存当中,你只有通过流的方式,服务端发一点,客户端接收一点,以这样的方式进行,才不会把服务端的资源给吃满;

web端可能有一些WebSockets方面的需求,那么对应webSockets的后端就是stream的一套rpc来进行支撑;

heartbeat心跳:就比如你的客户端想要上报自己的心跳,那么你可以直接用stream rpc来建立一套,流式接口类似于tcp的一套接口,然后客户端不断的在接口里发送心跳的数据,而不是发一次request,接收一次response这样的方式;你连上之后可以不停的发,服务端也可以不响应你,当你什么时候合适了,服务端会响应你一下数据,因此我们把这种叫做stream模式;

3.2 gRPC Stream rpc定义

09-grpc-stream/service/service.proto

syntax = "proto3";
package hello;
option go_package = "MicroServiceStudy01/09-grpc-stream/service";
message Request{
  string value = 1;
}
message Response{
  string value = 1;
}
// The HelloService service definition
// service 关键字
// HelloService 服务名称 对应接口的名称
// service服务会对应.pb.go文件里interface,里面的rpc对应接口中的函数
service HelloService{
  rpc Hello (Request) returns (Response){}
  rpc Channel(stream Request) returns (stream Response) {}
}

所以定义streaming RPC的语法如下:

rpc <function_name> (stream <type>) returns (stream <type>) {}

关键点:

stream关键字:

这个关键字可以出现在请求上面(stream Request),也可以出现在响应上(stream Response);

如果出现在请求上,就代表是client客户端的一个stream的模式,你的客户端就可以进行一些流模式的上传,比如你有个500G的文件,你的客户端此时就可以把文件读1MB出来发送,然后再读1MB发送到server端;

如果出现在服务端,就代表server服务端的一个stream的模式,你的服务端就可以进行一些流模式的下载,你的服务端可以1MB、1MB的在这个数据流里发送;

3.3 生成 Stream RPC代码

cd ./09-grpc-stream/service
protoc -I . --go_out=. --go_opt=module="MicroServiceStudy01/09-grpc-stream/service" --go-grpc_out=. --go-grpc_opt=module="MicroServiceStudy01/09-grpc-stream/service" ./service.proto

可以看到,流式上传,他的客户端是没有参数的,只有一些选项和我们的上下文信息,会返回一个HelloService_ChannelClient类型的返回值(就是管道),可以用于和服务端进行双向通信。

可以看到在服务端的Channel方法参数是一个新的HelloService_ChannelServer类型 的参数(就是管道),可以用于和客户端双向通信;

3.4 Stream RPC 接口解读

HelloService_ChannelClient和HelloService_ChannelServer接口定义:

// Request  ----->
// Response <-----
type HelloService_ChannelClient interface {
  Send(*Request) error
  Recv() (*Response, error)
  grpc.ClientStream
}
// Request  <-----
// Response ----->
type HelloService_ChannelServer interface {
  Send(*Response) error
  Recv() (*Request, error)
  grpc.ServerStream
}

可以发现服务端和客户端的流辅助接口 均定义了Send和Recv方法 用于流数据的双向通信。

3.5 Stream 服务端

09-grpc-stream/server/server.proto

server端的核心逻辑:

接收一个Request

发送一个Response

type HelloService struct {
  service.UnimplementedHelloServiceServer
}
var _ service.HelloServiceServer = (*HelloService)(nil)
func (p *HelloService) Channel(stream service.HelloService_ChannelServer) error {
  // 服务端在循环中接收客户端发来的数据
  for {
    args, err := stream.Recv()
    if err != nil {
      // 如果遇到io.EOF表示客户端流关闭
      if err == io.EOF {
        return nil
      }
      return err
    }
    // 响应一个请求
    // 生成返回的数据通过流发送给客户端
    resp := &service.Response{
      Value: "hello," + args.GetValue(),
    }
    err = stream.Send(resp)
    if err != nil {
      // 服务端发送异常,函数退出,服务端流关闭
      return err
    }
  }
}
func main() {
  // 1. 构造一个gRPC服务对象
  grpcServer:=grpc.NewServer()
  // 2. 通过gRPC插件生成的RegisterHelloServiceServer 函数注册我们实现的HelloService服务。
  service.RegisterHelloServiceServer(grpcServer,new(HelloService))
  // 3. 监听:1234端口
  listen,err:=net.Listen("tcp",":1234")
  if err!=nil{
    log.Fatal("Listen TCP err:", err)
  }
  // 4. 通过grpcServer.Serve(listen) 在一个监听端口上提供gRPC服务
  grpcServer.Serve(listen)
}

3.6 Stream 客户端

09-grpc-stream/client/client.proto

func main() {
  conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure())
  if err != nil {
    log.Fatal(err)
  }
  defer conn.Close()
  client := service.NewHelloServiceClient(conn)
  // 客户端先调用Channel方法,获取返回的流对象
  stream, err01 := client.Channel(context.Background())
  if err01 != nil {
    log.Fatal(err01)
  }
  // 在客户端我们将发送和接收放到两个独立的Goroutine
  // 首先向服务端发送数据:
  go func() {
    for {
      req := &service.Request{
        Value: "小明",
      }
      if err := stream.Send(req); err != nil {
        log.Fatal(err)
      }
      time.Sleep(time.Second)
    }
  }()
  // 然后再循环中接收服务端返回的数据
  for {
    reply, err := stream.Recv()
    if err != nil {
      if err == io.EOF {
        break
      }
      log.Fatal(err)
    }
    fmt.Println(reply.GetValue())
  }
}

4. gRPC 中间件(认证)

前面我们的rpc都存在一些缺陷,我们的rpc在调用的时候是裸着的,任何一个有这个客户端的人都可以调用,就相当于大门常打开,所以我们需要为他加一把锁。

但是加锁的话有很多方式,比如颁发证书,我为你这个客户端颁发了证书,你才可以调用,这也是最常用的方式,不过这里我们没有这样处理,而是写的一个认证中间件:

4.1 中间件RPC接口解读

4.1.1 正常模式

// UnaryServerInterceptor 提供了一个钩子来拦截服务器上一元RPC的执行。
// info 包含拦截器可以操作的RPC的所有信息。
// handler 是服务方法实现的包装器.
// 拦截器负责调用 handler 来完成RPC。
type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)
  • ctx:请求上下文
  • req:rpc请求数据
  • info:服务端相关数据,不用理解这个
  • handler:处理请求的handler,相对于next()
  • resp:rpc相应数据
  • err:rpc错误

4.1.2 Stream 流模式

// StreamServerInterceptor 提供了一个钩子来拦截服务器上 流RPC 的执行。
// info 包含拦截器可以操作的RPC的所有信息。
// handler是服务方法实现的包装器.
// 拦截器负责调用处理程序来完成RPC。
type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error
  • srv:service信息
  • ss:Server数据流
  • info:服务端相关数据,不用理解这个
  • handler:处理请求的handler,相对于next()
  • err:rpc错误

4.2 代码:

4.2.1 protobuf文件

syntax = "proto3";
package hello;
option go_package = "MicroServiceStudy01/10-grpc-auth/service";
message Request{
  string value = 1;
}
message Response{
  string value = 1;
}
// The HelloService service definition
// service 关键字
// HelloService 服务名称 对应接口的名称
// service服务会对应.pb.go文件里interface,里面的rpc对应接口中的函数
service HelloService{
  rpc Hello (Request) returns (Response){}
  rpc Channel(stream Request) returns (stream Response) {}
}

生成代码:

cd ./10-grpc-auth/service
protoc -I . --go_out=. --go_opt=module="MicroServiceStudy01/10-grpc-auth/service" --go-grpc_out=. --go-grpc_opt=module="Mic
roServiceStudy01/10-grpc-auth/service" ./service.proto 

4.2.2 认证中间件代码

1. 客户端

10-grpc-auth/auther/client.go

客户端的核心逻辑就是从meta中取clientId,clientSecret;

package author
import "context"
type Authentication struct {
  clientId     string
  clientSecret string
}
// NewClientAuthentication 构造凭证
func NewClientAuthentication(clientId, clientSecret string) *Authentication {
  return &Authentication{
    clientId:     clientId,
    clientSecret: clientSecret,
  }
}
// WithClientCredentials 通过客户端初始化凭证
func (a *Authentication) WithClientCredentials(clientId, clientSecret string) {
  a.clientId = clientId
  a.clientSecret = clientSecret
}
// GetRequestMetadata 从meta中取凭证信息:clientId,clientSecret
func (a *Authentication) GetRequestMetadata(context.Context, ...string) (
  map[string]string, error) {
  return map[string]string{
    "client_id":     a.clientId,
    "client_secret": a.clientSecret,
  }, nil
}
// RequireTransportSecurity 指示凭据是否需要传输安全性。
func (a *Authentication) RequireTransportSecurity() bool {
  return false
}
2. 服务端

10-grpc-auth/auther/server.go

package author
import (
  "context"
  "fmt"
  "github.com/infraboard/mcube/logger"
  "github.com/infraboard/mcube/logger/zap"
  "google.golang.org/grpc"
  "google.golang.org/grpc/codes"
  "google.golang.org/grpc/metadata"
  "google.golang.org/grpc/status"
)
const (
  ClientHeaderKey = "client_id"
  ClientSecretKey = "client_secret"
)
type grpcAuthor struct {
  log logger.Logger
}
// 构造并初始化 grpc author
func newGrpcAuthor() *grpcAuthor {
  return &grpcAuthor{
    // zap.L(): 返回一个未命名的全局 logger。
    // .Named(): 添加一个新的路径段 到 logger 的 名称 。段由句点连接。
    log: zap.L().Named("Grpc Author"),
  }
}
// GetClientCredentialsFromMeta 从客户端发来的请求中获取凭证信息
func (a *grpcAuthor) GetClientCredentialsFromMeta(md metadata.MD) (
  clientId, clientSecret string) {
  cids := md.Get(ClientHeaderKey)
  sids := md.Get(ClientSecretKey)
  if len(cids) > 0 {
    clientId = cids[0]
  }
  if len(sids) > 0 {
    clientSecret = sids[0]
  }
  return
}
// 验证凭证信息
func (a *grpcAuthor) validateServiceCredential(clientId, clientSecret string) error {
  if clientId == "" && clientSecret == "" {
    return status.Errorf(codes.Unauthenticated, "client_id or client_secret is \"\"")
  }
  if !(clientId == "admin" && clientSecret == "123456") {
    return status.Errorf(codes.Unauthenticated, "client_id or client_secret invalidate")
  }
  return nil
}
// Auth 普通模式的拦截器
func (a *grpcAuthor) Auth(
  ctx context.Context,
  req interface{},
  info *grpc.UnaryServerInfo,
  handler grpc.UnaryHandler,
) (resp interface{}, err error) {
  // 从上下文中获取认证信息
  md, ok := metadata.FromIncomingContext(ctx)
  if !ok {
    return nil, fmt.Errorf("ctx is not an grpc incoming context!")
  }
  fmt.Println("grpc header info :", md)
  // 获取客户端凭证信息
  clientId, clientSecret := a.GetClientCredentialsFromMeta(md)
  // 校验调用的客户端携带的凭证是否有效
  if err := a.validateServiceCredential(clientId, clientSecret); err != nil {
    return nil, err
  }
  resp, err = handler(ctx, req)
  return resp, err
}
// StreamAuth 流模式的拦截器
func (a *grpcAuthor) StreamAuth(
  srv interface{},
  ss grpc.ServerStream,
  info *grpc.StreamServerInfo,
  handler grpc.StreamHandler,
) (err error) {
  fmt.Println(srv, info)
  // 从上下文中获取认证信息
  // https://www.bilibili.com/video/BV1mi4y1d7SL?p=3&t=5032.0
  md, ok := metadata.FromIncomingContext(ss.Context())
  if !ok {
    return fmt.Errorf("ctx is not an grpc incoming context!")
  }
  fmt.Println("grpc header info :", md)
  // 获取客户端凭证
  clientId, clientSecret := a.GetClientCredentialsFromMeta(md)
  // 校验调用的客户端凭证是否有效
  if err := a.validateServiceCredential(clientId, clientSecret); err != nil {
    return err
  }
  return handler(srv, ss)
}
func GrpcAuthUnaryServerInterceptor() grpc.UnaryServerInterceptor {
  return newGrpcAuthor().Auth
}
func GrpcAuthStreamServerInterceptor() grpc.StreamServerInterceptor {
  return newGrpcAuthor().StreamAuth
}

4.2.1 客户端代码

10-grpc-auth/client/client.go

package main
import (
  "MicroServiceStudy01/10-grpc-auth/author"
  "MicroServiceStudy01/10-grpc-auth/service"
  "context"
  "fmt"
  "google.golang.org/grpc"
  "log"
)
func main() {
  conn, err := grpc.Dial("localhost:1234",
    grpc.WithInsecure(),
    grpc.WithPerRPCCredentials(author.NewClientAuthentication("admin", "123456")),
  )
  if err != nil {
    log.Fatal(err)
  }
  defer conn.Close()
  // NewHelloServiceClient函数是xxx_grpc.pb.go中自动生成的函数,
  // 基于已经建立的连接构造HelloServiceClient对象,
  // 返回的client其实是一个HelloServiceClient接口对象
  //
  client := service.NewHelloServiceClient(conn)
  // 通过接口定义的方法就可以调用服务端对应gRPC服务提供的方法
  req := &service.Request{Value: "小亮"}
  reply, err01 := client.Hello(context.Background(), req)
  if err01 != nil {
    log.Fatal(err01)
  }
  fmt.Println(reply.GetValue())
}

4.2.3 服务端代码

10-grpc-auth/server/server.go

package main
import (
  "MicroServiceStudy01/10-grpc-auth/author"
  "MicroServiceStudy01/10-grpc-auth/service"
  "context"
  "google.golang.org/grpc"
  "log"
  "net"
)
type HelloService struct {
  // UnimplementedHelloServiceServer这个结构体是必须要内嵌进来的
  // 也就是说我们定义的这个结构体对象必须继承UnimplementedHelloServiceServer。
  // 嵌入之后,我们就已经实现了GRPC这个服务的接口,但是实现之后我们什么都没做,没有写自己的业务逻辑,
  // 我们要重写实现的这个接口里的函数,这样才能提供一个真正的rpc的能力。
  service.UnimplementedHelloServiceServer
}
var _ service.HelloServiceServer = (*HelloService)(nil)
// Hello 重写实现的接口里的Hello函数
func (p *HelloService) Hello(ctx context.Context, req *service.Request) (*service.Response, error){
  resp := &service.Response{}
  resp.Value = "hello:" + req.Value
  return resp, nil
}
func main() {
  // 1. 构造一个gRPC服务对象
  grpcServer:=grpc.NewServer(
    // 添加认证中间件,如果有多个中间件需要添加,使用ChainUnaryInterceptor
    grpc.UnaryInterceptor(author.GrpcAuthUnaryServerInterceptor()),
    // 添加stream API拦截器
    grpc.StreamInterceptor(author.GrpcAuthStreamServerInterceptor()),
  )
  // 2.通过gRPC插件生成的RegisterHelloServiceServer 函数注册我们实现的HelloService服务。
  service.RegisterHelloServiceServer(grpcServer,new(HelloService))
  // 3. 监听:1234端口
  listen,err:=net.Listen("tcp",":1234")
  if err!=nil{
    log.Fatal("Listen TCP err:", err)
  }
  // 4. 通过grpcServer.Serve(listen) 在一个监听端口上提供gRPC服务
  grpcServer.Serve(listen)
}


相关文章
|
2月前
|
监控 算法 NoSQL
Go 微服务限流与熔断最佳实践:滑动窗口、令牌桶与自适应阈值
🌟蒋星熠Jaxonic:Go微服务限流熔断实践者。分享基于滑动窗口、令牌桶与自适应阈值的智能防护体系,助力高并发系统稳定运行。
Go 微服务限流与熔断最佳实践:滑动窗口、令牌桶与自适应阈值
|
2月前
|
Cloud Native 安全 Java
Go语言深度解析:从入门到精通的完整指南
🌟蒋星熠Jaxonic,Go语言探索者。深耕云计算、微服务与并发编程,以代码为笔,在二进制星河中书写极客诗篇。分享Go核心原理、性能优化与实战架构,助力开发者掌握云原生时代利器。#Go语言 #并发编程 #性能优化
421 43
Go语言深度解析:从入门到精通的完整指南
|
4月前
|
JSON 自然语言处理 API
gRPC凭什么成为微服务通信首选?深度解析RPC进化史
本文深入解析了分布式系统中服务通信的核心机制,重点介绍了 RPC 与 gRPC 的原理、优势及使用场景,并详解 gRPC 所依赖的序列化协议 Protocol Buffers(Protobuf)。内容涵盖 RPC 概念、gRPC 特性、Protobuf 语法及服务定义,适合微服务架构设计与维护人员阅读,助你构建高性能、低耦合的服务通信体系。
566 73
gRPC凭什么成为微服务通信首选?深度解析RPC进化史
|
7月前
|
人工智能 安全 算法
Go入门实战:并发模式的使用
本文详细探讨了Go语言的并发模式,包括Goroutine、Channel、Mutex和WaitGroup等核心概念。通过具体代码实例与详细解释,介绍了这些模式的原理及应用。同时分析了未来发展趋势与挑战,如更高效的并发控制、更好的并发安全及性能优化。Go语言凭借其优秀的并发性能,在现代编程中备受青睐。
238 33
|
3月前
|
Cloud Native 安全 Java
Go语言深度解析:从入门到精通的完整指南
🌟 蒋星熠Jaxonic,执着的星际旅人,用Go语言编写代码诗篇。🚀 Go语言以简洁、高效、并发为核心,助力云计算与微服务革新。📚 本文详解Go语法、并发模型、性能优化与实战案例,助你掌握现代编程精髓。🌌 从goroutine到channel,从内存优化到高并发架构,全面解析Go的强大力量。🔧 实战构建高性能Web服务,展现Go在云原生时代的无限可能。✨ 附技术对比、最佳实践与生态全景,带你踏上Go语言的星辰征途。#Go语言 #并发编程 #云原生 #性能优化
|
JavaScript Java Go
探索Go语言在微服务架构中的优势
在微服务架构的浪潮中,Go语言以其简洁、高效和并发处理能力脱颖而出。本文将深入探讨Go语言在构建微服务时的性能优势,包括其在内存管理、网络编程、并发模型以及工具链支持方面的特点。通过对比其他流行语言,我们将揭示Go语言如何成为微服务架构中的一股清流。
291 53
|
6月前
|
Java API 微服务
Java 21 与 Spring Boot 3.2 微服务开发从入门到精通实操指南
《Java 21与Spring Boot 3.2微服务开发实践》摘要: 本文基于Java 21和Spring Boot 3.2最新特性,通过完整代码示例展示了微服务开发全流程。主要内容包括:1) 使用Spring Initializr初始化项目,集成Web、JPA、H2等组件;2) 配置虚拟线程支持高并发;3) 采用记录类优化DTO设计;4) 实现JPA Repository与Stream API数据访问;5) 服务层整合虚拟线程异步处理和结构化并发;6) 构建RESTful API并使用Springdoc生成文档。文中特别演示了虚拟线程配置(@Async)和StructuredTaskSco
721 0
|
10月前
|
Shell Go 开发工具
【环境】Rocky8使用gvm配置Go多版本管理的微服务开发环境(go-zero)
通过本文的介绍,我们详细讲解了如何在Rocky8上使用gvm来管理多个Go版本,并配置go-zero框架的开发环境。通过gvm的灵活管理,开发者可以轻松切换不同的Go版本,以适应不同项目的需求。同时,go-zero框架的使用进一步提升了微服务开发的效率和质量。希望本文能帮助开发者构建高效的Go语言开发环境,提高项目开发的灵活性和稳定性。
314 63
|
8月前
|
存储 算法 数据可视化
【二叉树遍历入门:从中序遍历到层序与右视图】【LeetCode 热题100】94:二叉树的中序遍历、102:二叉树的层序遍历、199:二叉树的右视图(详细解析)(Go语言版)
本文详细解析了二叉树的三种经典遍历方式:中序遍历(94题)、层序遍历(102题)和右视图(199题)。通过递归与迭代实现中序遍历,深入理解深度优先搜索(DFS);借助队列完成层序遍历和右视图,掌握广度优先搜索(BFS)。文章对比DFS与BFS的思维方式,总结不同遍历的应用场景,为后续构造树结构奠定基础。
401 10
|
10月前
|
存储 Go
Go 语言入门指南:切片
Golang中的切片(Slice)是基于数组的动态序列,支持变长操作。它由指针、长度和容量三部分组成,底层引用一个连续的数组片段。切片提供灵活的增减元素功能,语法形式为`[]T`,其中T为元素类型。相比固定长度的数组,切片更常用,允许动态调整大小,并且多个切片可以共享同一底层数组。通过内置的`make`函数可创建指定长度和容量的切片。需要注意的是,切片不能直接比较,只能与`nil`比较,且空切片的长度为0。
253 3
Go 语言入门指南:切片