3. gRPC Stream
这一部分我们本次不作为重点内容,暂时简单过一遍。
3.1 为什么需要Stream RPC
文件上传下载:比如你上传的文件有500个G,你不可能全部存在内存当中,你只有通过流的方式,服务端发一点,客户端接收一点,以这样的方式进行,才不会把服务端的资源给吃满;
web端可能有一些WebSockets方面的需求,那么对应webSockets的后端就是stream的一套rpc来进行支撑;
heartbeat心跳:就比如你的客户端想要上报自己的心跳,那么你可以直接用stream rpc来建立一套,流式接口类似于tcp的一套接口,然后客户端不断的在接口里发送心跳的数据,而不是发一次request,接收一次response这样的方式;你连上之后可以不停的发,服务端也可以不响应你,当你什么时候合适了,服务端会响应你一下数据,因此我们把这种叫做stream模式;
3.2 gRPC Stream rpc定义
09-grpc-stream/service/service.proto
syntax = "proto3"; package hello; option go_package = "MicroServiceStudy01/09-grpc-stream/service"; message Request{ string value = 1; } message Response{ string value = 1; } // The HelloService service definition // service 关键字 // HelloService 服务名称 对应接口的名称 // service服务会对应.pb.go文件里interface,里面的rpc对应接口中的函数 service HelloService{ rpc Hello (Request) returns (Response){} rpc Channel(stream Request) returns (stream Response) {} }
所以定义streaming RPC的语法如下:
rpc <function_name> (stream <type>) returns (stream <type>) {}
关键点:
stream关键字:
这个关键字可以出现在请求上面(stream Request),也可以出现在响应上(stream Response);
如果出现在请求上,就代表是client客户端的一个stream的模式,你的客户端就可以进行一些流模式的上传,比如你有个500G的文件,你的客户端此时就可以把文件读1MB出来发送,然后再读1MB发送到server端;
如果出现在服务端,就代表server服务端的一个stream的模式,你的服务端就可以进行一些流模式的下载,你的服务端可以1MB、1MB的在这个数据流里发送;
3.3 生成 Stream RPC代码
cd ./09-grpc-stream/service protoc -I . --go_out=. --go_opt=module="MicroServiceStudy01/09-grpc-stream/service" --go-grpc_out=. --go-grpc_opt=module="MicroServiceStudy01/09-grpc-stream/service" ./service.proto
可以看到,流式上传,他的客户端是没有参数的,只有一些选项和我们的上下文信息,会返回一个HelloService_ChannelClient
类型的返回值(就是管道),可以用于和服务端进行双向通信。
可以看到在服务端的Channel方法参数是一个新的HelloService_ChannelServer类型 的参数(就是管道),可以用于和客户端双向通信;
3.4 Stream RPC 接口解读
HelloService_ChannelClient和HelloService_ChannelServer接口定义:
// Request -----> // Response <----- type HelloService_ChannelClient interface { Send(*Request) error Recv() (*Response, error) grpc.ClientStream } // Request <----- // Response -----> type HelloService_ChannelServer interface { Send(*Response) error Recv() (*Request, error) grpc.ServerStream }
可以发现服务端和客户端的流辅助接口 均定义了Send和Recv方法 用于流数据的双向通信。
3.5 Stream 服务端
09-grpc-stream/server/server.proto
server端的核心逻辑:
接收一个Request
发送一个Response
type HelloService struct { service.UnimplementedHelloServiceServer } var _ service.HelloServiceServer = (*HelloService)(nil) func (p *HelloService) Channel(stream service.HelloService_ChannelServer) error { // 服务端在循环中接收客户端发来的数据 for { args, err := stream.Recv() if err != nil { // 如果遇到io.EOF表示客户端流关闭 if err == io.EOF { return nil } return err } // 响应一个请求 // 生成返回的数据通过流发送给客户端 resp := &service.Response{ Value: "hello," + args.GetValue(), } err = stream.Send(resp) if err != nil { // 服务端发送异常,函数退出,服务端流关闭 return err } } } func main() { // 1. 构造一个gRPC服务对象 grpcServer:=grpc.NewServer() // 2. 通过gRPC插件生成的RegisterHelloServiceServer 函数注册我们实现的HelloService服务。 service.RegisterHelloServiceServer(grpcServer,new(HelloService)) // 3. 监听:1234端口 listen,err:=net.Listen("tcp",":1234") if err!=nil{ log.Fatal("Listen TCP err:", err) } // 4. 通过grpcServer.Serve(listen) 在一个监听端口上提供gRPC服务 grpcServer.Serve(listen) }
3.6 Stream 客户端
09-grpc-stream/client/client.proto
func main() { conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure()) if err != nil { log.Fatal(err) } defer conn.Close() client := service.NewHelloServiceClient(conn) // 客户端先调用Channel方法,获取返回的流对象 stream, err01 := client.Channel(context.Background()) if err01 != nil { log.Fatal(err01) } // 在客户端我们将发送和接收放到两个独立的Goroutine // 首先向服务端发送数据: go func() { for { req := &service.Request{ Value: "小明", } if err := stream.Send(req); err != nil { log.Fatal(err) } time.Sleep(time.Second) } }() // 然后再循环中接收服务端返回的数据 for { reply, err := stream.Recv() if err != nil { if err == io.EOF { break } log.Fatal(err) } fmt.Println(reply.GetValue()) } }
4. gRPC 中间件(认证)
前面我们的rpc都存在一些缺陷,我们的rpc在调用的时候是裸着的,任何一个有这个客户端的人都可以调用,就相当于大门常打开,所以我们需要为他加一把锁。
但是加锁的话有很多方式,比如颁发证书,我为你这个客户端颁发了证书,你才可以调用,这也是最常用的方式,不过这里我们没有这样处理,而是写的一个认证中间件:
4.1 中间件RPC接口解读
4.1.1 正常模式
// UnaryServerInterceptor 提供了一个钩子来拦截服务器上一元RPC的执行。 // info 包含拦截器可以操作的RPC的所有信息。 // handler 是服务方法实现的包装器. // 拦截器负责调用 handler 来完成RPC。 type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)
ctx
:请求上下文req
:rpc请求数据info
:服务端相关数据,不用理解这个handler
:处理请求的handler,相对于next()
resp
:rpc相应数据err
:rpc错误
4.1.2 Stream 流模式
// StreamServerInterceptor 提供了一个钩子来拦截服务器上 流RPC 的执行。 // info 包含拦截器可以操作的RPC的所有信息。 // handler是服务方法实现的包装器. // 拦截器负责调用处理程序来完成RPC。 type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error
srv
:service信息ss
:Server数据流info
:服务端相关数据,不用理解这个handler
:处理请求的handler,相对于next()
err
:rpc错误
4.2 代码:
4.2.1 protobuf文件
syntax = "proto3"; package hello; option go_package = "MicroServiceStudy01/10-grpc-auth/service"; message Request{ string value = 1; } message Response{ string value = 1; } // The HelloService service definition // service 关键字 // HelloService 服务名称 对应接口的名称 // service服务会对应.pb.go文件里interface,里面的rpc对应接口中的函数 service HelloService{ rpc Hello (Request) returns (Response){} rpc Channel(stream Request) returns (stream Response) {} }
生成代码:
cd ./10-grpc-auth/service protoc -I . --go_out=. --go_opt=module="MicroServiceStudy01/10-grpc-auth/service" --go-grpc_out=. --go-grpc_opt=module="Mic roServiceStudy01/10-grpc-auth/service" ./service.proto
4.2.2 认证中间件代码
1. 客户端
10-grpc-auth/auther/client.go
客户端的核心逻辑就是从meta中取clientId,clientSecret;
package author import "context" type Authentication struct { clientId string clientSecret string } // NewClientAuthentication 构造凭证 func NewClientAuthentication(clientId, clientSecret string) *Authentication { return &Authentication{ clientId: clientId, clientSecret: clientSecret, } } // WithClientCredentials 通过客户端初始化凭证 func (a *Authentication) WithClientCredentials(clientId, clientSecret string) { a.clientId = clientId a.clientSecret = clientSecret } // GetRequestMetadata 从meta中取凭证信息:clientId,clientSecret func (a *Authentication) GetRequestMetadata(context.Context, ...string) ( map[string]string, error) { return map[string]string{ "client_id": a.clientId, "client_secret": a.clientSecret, }, nil } // RequireTransportSecurity 指示凭据是否需要传输安全性。 func (a *Authentication) RequireTransportSecurity() bool { return false }
2. 服务端
10-grpc-auth/auther/server.go
package author import ( "context" "fmt" "github.com/infraboard/mcube/logger" "github.com/infraboard/mcube/logger/zap" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" ) const ( ClientHeaderKey = "client_id" ClientSecretKey = "client_secret" ) type grpcAuthor struct { log logger.Logger } // 构造并初始化 grpc author func newGrpcAuthor() *grpcAuthor { return &grpcAuthor{ // zap.L(): 返回一个未命名的全局 logger。 // .Named(): 添加一个新的路径段 到 logger 的 名称 。段由句点连接。 log: zap.L().Named("Grpc Author"), } } // GetClientCredentialsFromMeta 从客户端发来的请求中获取凭证信息 func (a *grpcAuthor) GetClientCredentialsFromMeta(md metadata.MD) ( clientId, clientSecret string) { cids := md.Get(ClientHeaderKey) sids := md.Get(ClientSecretKey) if len(cids) > 0 { clientId = cids[0] } if len(sids) > 0 { clientSecret = sids[0] } return } // 验证凭证信息 func (a *grpcAuthor) validateServiceCredential(clientId, clientSecret string) error { if clientId == "" && clientSecret == "" { return status.Errorf(codes.Unauthenticated, "client_id or client_secret is \"\"") } if !(clientId == "admin" && clientSecret == "123456") { return status.Errorf(codes.Unauthenticated, "client_id or client_secret invalidate") } return nil } // Auth 普通模式的拦截器 func (a *grpcAuthor) Auth( ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler, ) (resp interface{}, err error) { // 从上下文中获取认证信息 md, ok := metadata.FromIncomingContext(ctx) if !ok { return nil, fmt.Errorf("ctx is not an grpc incoming context!") } fmt.Println("grpc header info :", md) // 获取客户端凭证信息 clientId, clientSecret := a.GetClientCredentialsFromMeta(md) // 校验调用的客户端携带的凭证是否有效 if err := a.validateServiceCredential(clientId, clientSecret); err != nil { return nil, err } resp, err = handler(ctx, req) return resp, err } // StreamAuth 流模式的拦截器 func (a *grpcAuthor) StreamAuth( srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler, ) (err error) { fmt.Println(srv, info) // 从上下文中获取认证信息 // https://www.bilibili.com/video/BV1mi4y1d7SL?p=3&t=5032.0 md, ok := metadata.FromIncomingContext(ss.Context()) if !ok { return fmt.Errorf("ctx is not an grpc incoming context!") } fmt.Println("grpc header info :", md) // 获取客户端凭证 clientId, clientSecret := a.GetClientCredentialsFromMeta(md) // 校验调用的客户端凭证是否有效 if err := a.validateServiceCredential(clientId, clientSecret); err != nil { return err } return handler(srv, ss) } func GrpcAuthUnaryServerInterceptor() grpc.UnaryServerInterceptor { return newGrpcAuthor().Auth } func GrpcAuthStreamServerInterceptor() grpc.StreamServerInterceptor { return newGrpcAuthor().StreamAuth }
4.2.1 客户端代码
10-grpc-auth/client/client.go
package main import ( "MicroServiceStudy01/10-grpc-auth/author" "MicroServiceStudy01/10-grpc-auth/service" "context" "fmt" "google.golang.org/grpc" "log" ) func main() { conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure(), grpc.WithPerRPCCredentials(author.NewClientAuthentication("admin", "123456")), ) if err != nil { log.Fatal(err) } defer conn.Close() // NewHelloServiceClient函数是xxx_grpc.pb.go中自动生成的函数, // 基于已经建立的连接构造HelloServiceClient对象, // 返回的client其实是一个HelloServiceClient接口对象 // client := service.NewHelloServiceClient(conn) // 通过接口定义的方法就可以调用服务端对应gRPC服务提供的方法 req := &service.Request{Value: "小亮"} reply, err01 := client.Hello(context.Background(), req) if err01 != nil { log.Fatal(err01) } fmt.Println(reply.GetValue()) }
4.2.3 服务端代码
10-grpc-auth/server/server.go
package main import ( "MicroServiceStudy01/10-grpc-auth/author" "MicroServiceStudy01/10-grpc-auth/service" "context" "google.golang.org/grpc" "log" "net" ) type HelloService struct { // UnimplementedHelloServiceServer这个结构体是必须要内嵌进来的 // 也就是说我们定义的这个结构体对象必须继承UnimplementedHelloServiceServer。 // 嵌入之后,我们就已经实现了GRPC这个服务的接口,但是实现之后我们什么都没做,没有写自己的业务逻辑, // 我们要重写实现的这个接口里的函数,这样才能提供一个真正的rpc的能力。 service.UnimplementedHelloServiceServer } var _ service.HelloServiceServer = (*HelloService)(nil) // Hello 重写实现的接口里的Hello函数 func (p *HelloService) Hello(ctx context.Context, req *service.Request) (*service.Response, error){ resp := &service.Response{} resp.Value = "hello:" + req.Value return resp, nil } func main() { // 1. 构造一个gRPC服务对象 grpcServer:=grpc.NewServer( // 添加认证中间件,如果有多个中间件需要添加,使用ChainUnaryInterceptor grpc.UnaryInterceptor(author.GrpcAuthUnaryServerInterceptor()), // 添加stream API拦截器 grpc.StreamInterceptor(author.GrpcAuthStreamServerInterceptor()), ) // 2.通过gRPC插件生成的RegisterHelloServiceServer 函数注册我们实现的HelloService服务。 service.RegisterHelloServiceServer(grpcServer,new(HelloService)) // 3. 监听:1234端口 listen,err:=net.Listen("tcp",":1234") if err!=nil{ log.Fatal("Listen TCP err:", err) } // 4. 通过grpcServer.Serve(listen) 在一个监听端口上提供gRPC服务 grpcServer.Serve(listen) }