什么是gRPC
服务器流?
gRPC
服务器流是这样实现的:客户端发送一个请求,服务器在同一通道或连接中返回多个消息。客户端从返回的流中读取信息,直到没有更多的信息。
这种方法可以在我们有很多信息,并且我们想把它作为一个缓冲区来处理,分部分读取,并在客户端安全地完成。
想想一个最常见的例子,一个视频流媒体平台,在流程中,客户端不能在一个请求中得到所有的视频来处理它,在这种情况下,我们可以使用一个服务器流,把视频的每个部分按其需要传递给客户端。
例子
在下面的例子中,我们将模拟一个来自客户端的请求,该请求发送一个ID
,然后服务器将在每个消息上响应一系列的字节,这就是模拟一种数据缓冲区。
很简单吧,让我们去实现吧。
项目结构:
注意:对于这段代码,我没有遵循任何架构来串联包,这只是为了告诉你如何实现一个
gRPC
服务器流的客户端和服务器。
Step 1
你需要安装以下依赖项,并确保在环境变量中包含:
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
Step 2
在这个步骤中,我们将遵循以下几点:
- 定义带有信息和服务器定义的
proto
文件。 - 从
proto
生成golang文件。 - 实现
gRPC
服务器流。 - 实现
gRPC
客户端。
定义带有信息和服务器定义的proto
文件
在proto
文件中,我们将定义两条消息,一条是请求DataRequest
,另一条是响应DataResponse
。
DataRequest: 将被客户端用来消费gRPC服务器。在这个例子中,我们只是定义了一个ID属性。
DataResponse: 将被服务器用来生成响应。在这个例子中,响应将定义缓冲区,它是假数据流的值,以及帮助我们识别包的位置的部分。
另一件重要的事情是StreamingService
的定义,它将被用来准备我们的流媒体服务器,你可以看到在返回值中我们在响应数据类型前定义了关键词stream
,这样做表明我们将使用一个流媒体服务器。
dataStreaming.proto:
syntax = "proto3"; package protofiles; option go_package = "protofiles/data_streaming"; message DataRequest { string id = 1; } message DataResponse { string buffer = 1; int32 part = 2; } service StreamingService { //unary rpc GetDataStreaming(DataRequest) returns (stream DataResponse) {} }
从proto生成golang文件
为了生成所需的文件,你可以使用以下命令:
protoc --go_out=. --go_opt=paths=source_relative \ --go-grpc_out=require_unimplemented_servers=false:. protofiles/data_streaming/dataStreaming.proto
运行该命令后,你会看到两个新的自动生成的文件:
https://pic8.58cdn.com.cn/nowater/webim/big/n_v2c00273679c4e450c813fce3a5f017c83.png
这些文件中的每一个都包含了来自proto
文件的所有转义,这意味着你将在里面找到响应和请求的结构,实现gRPC
服务器和客户端的接口和结构,等等。
实现gRPC
服务器流
为了实现我们的服务器,我们需要定义一个结构,用来实现gRPC
服务方法,这些方法应该与生成的文件中定义的相同。在我们的例子中,我们需要实现GetDataStreaming
。
我们可以在下面的接口中看到生成文件中的签名:
type StreamingServiceServer interface { GetDataStreaming( *DataRequest, StreamingService_GetDataStreamingServer ) error }
在GetDataStreaming
方法中,我们将向客户端发送每条消息,正如你所看到的,我们通过使用for
循环来实现,在每次迭代中我们都会发送流媒体。
现在让我们谈谈定义在main
中的代码。在那里我们可以找到执行服务器所需的代码,它被定义为一个TCP
。我们可以看到的其他重要的事情是对grpc.NewServer()
的调用,它将创建一个新的gRPC
服务器,然后我们需要注册服务器,用方法实现的结构从生成的文件中调用RegisterStreamingServiceServer
。
package server import ( "log" "math/rand" "net" pb "server_stream_example/protofiles/data_streaming" "google.golang.org/grpc" ) type server struct{} // *DataRequest, StreamingService_GetDataStreamingServer) error func (s server) GetDataStreaming(req *pb.DataRequest, srv pb.StreamingService_GetDataStreamingServer) error { log.Println("Fetch data streaming") for i := 0; i < 10; i++ { value := randStringBytes(500) resp := pb.DataResponse{ Part: int32(i), Buffer: value, } if err := srv.Send(&resp); err != nil { log.Println("error generating response") return err } } return nil } func randStringBytes(n int) string { const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" b := make([]byte, n) for i := range b { b[i] = letterBytes[rand.Intn(len(letterBytes))] } return string(b) } func main() { // create listener listener, err := net.Listen("tcp", "localhost:8080") if err != nil { panic("error building server: " + err.Error()) } // create gRPC server s := grpc.NewServer() pb.RegisterStreamingServiceServer(s, server{}) log.Println("start server") if err := s.Serve(listener); err != nil { panic("error building server: " + err.Error()) } }
实现gRPC
客户端
对于我们的客户,我们将使用生成的proto
文件中的代码,这样做我们就可以执行在我们的服务器上定义的功能。
第一步是拨号连接到我们的gRPC
服务器grpc.Dial
,然后我们需要创建一个服务器-客户端的实例,这将允许我们通过函数调用来消费服务器。
与只等待一个响应的单数客户端不同,消费服务器流的客户端需要迭代,直到所有的消息都由服务器发送。
在这个例子中,我们正在检查EOF
错误,以控制客户端何时获得所有的消息。
package main import ( "context" "fmt" "io" "log" "google.golang.org/grpc" pb "server_stream_example/protofiles/data_streaming" ) func main() { // dial to server conn, err := grpc.Dial("localhost:8080", grpc.WithInsecure()) if err != nil { log.Println("Error connecting to gRPC server: ", err.Error()) } defer conn.Close() // create the stream client := pb.NewStreamingServiceClient(conn) req := pb.DataRequest{Id: "123"} stream, err := client.GetDataStreaming(context.Background(), &req) if err != nil { panic(err) // dont use panic in your real project } for { resp, err := stream.Recv() if err == io.EOF { return } else if err == nil { valStr := fmt.Sprintf("Response\n Part: %d \n Val: %s", resp.Part, resp.Buffer) log.Println(valStr) } if err != nil { panic(err) // dont use panic in your real project } } }
运行服务器和客户端服务
server/main.go
https://pic8.58cdn.com.cn/nowater/webim/big/n_v2dd99a008f988437495f5e28f6f17b091.png
client/main.go
https://pic5.58cdn.com.cn/nowater/webim/big/n_v280ef8041d9be4afd9bf90f4df826e15c.png
准备好了! 现在你已经有了开始建立你自己的流媒体gRPC
服务的基础,记住这只是一个指南,帮助你了解它是如何工作和如何实现的。从这里开始,我们就可以开始讨论与gRPC世界有关的更复杂的解决方案了。
如果你在运行客户端出现以下错误时:
panic: rpc error: code = Unavailable desc = connection error: desc = "error reading server preface: http2: frame too large"
调整一下服务端以及客户端所使用的端口即可