01
为什么使用 gRPC?
借助 gRPC,我们可以在 .proto 文件中一次定义我们的服务,并以 gRPC 支持的任何语言生成客户端和服务器代码,无论是在大型数据中心内的服务器,还是在个人的电脑的环境中,这些客户端和服务器代码都可以运行 – gRPC 可以为您处理不同语言和环境之间的通信。我们还获得了使用 protocol buffers 的所有优点,包括有效的序列化,简单的 IDL 和容易的接口更新。
我们的示例是一个简单的路由映射应用程序,它使客户端可以获取有关其路由功能的信息,创建其路由的摘要以及与服务器和其他客户端交换路由信息(例如流量更新)。
02
准备工作
- 安装 Go 最新正式发行版本。
- 安装 protocol buffers 编译器 protoc。请参考「Protobuf - 更小、更快、更简单的交互式数据语言」- Part 05。
- 安装编译器 protoc 的 Go 插件。请参考「gRPC 初探与简单使用」- Part 04。
- git clone 示例代码,并进入该目录。
$ git clone https://github.com/grpc/grpc-go $ cd grpc-go/examples/route_guide
03
定义服务并生成客户端和服务器代码
我们的第一步是使用 protocol buffers 定义 gRPC 服务以及方法请求和响应类型。
有关完整的
.proto 文件,请参阅 Part 2 git clone 的代码 routeguide/route_guide.proto。
要定义服务,请在 .proto 文件中指定一个命名服务:
service RouteGuide { ... }
然后,在服务定义中定义 rpc 方法,并指定它们的请求和响应类型。gRPC 允许您定义四种服务方法,所有这些方法都在 RouteGuide 服务中使用:
- 一个简单的 RPC,客户端使用存根将请求发送到服务器,然后等待响应返回,就像正常的函数调用一样。
// Obtains the feature at a given position. rpc GetFeature(Point) returns (Feature) {}
- 服务器端流式 RPC,客户端在其中向服务器发送请求,并获取流以读取回一系列消息。客户端从返回的流中读取,直到没有更多消息为止。如我们的示例所示,您可以通过在响应类型之前放置 stream 关键字来指定服务器端流方法。
// Obtains the Features available within the given Rectangle. Results are // streamed rather than returned at once (e.g. in a response message with a // repeated field), as the rectangle may cover a large area and contain a // huge number of features. rpc ListFeatures(Rectangle) returns (stream Feature) {}
- 客户端流式 RPC,客户端在其中编写消息序列,然后再次使用提供的流将其发送到服务器。客户端写完消息后,它将等待服务器读取所有消息并返回其响应。通过将 stream 关键字放在请求类型之前,可以指定客户端流方法。
// Accepts a stream of Points on a route being traversed, returning a // RouteSummary when traversal is completed. rpc RecordRoute(stream Point) returns (RouteSummary) {}
- 双向流式 RPC,双方都使用读写流发送一系列消息。这两个流是独立运行的,因此客户端和服务器可以按照自己喜欢的顺序进行读写:例如,服务器可以在写响应之前等待接收所有客户端消息,或者可以先读取一条消息再写入一条消息,或读写的其他组合。每个流中的消息顺序都会保留。您可以通过在请求和响应之前都放置 stream 关键字来指定这种类型的方法。
// Accepts a stream of RouteNotes sent while a route is being traversed, // while receiving other RouteNotes (e.g. from other users). rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
我们的 .proto 文件还包含用于服务方法中所有请求和响应类型的 protocol buffers message 类型定义-例如,这是 Point message 类型:
// Points are represented as latitude-longitude pairs in the E7 representation // (degrees multiplied by 10**7 and rounded to the nearest integer). // Latitudes should be in the range +/- 90 degrees and longitude should be in // the range +/- 180 degrees (inclusive). message Point { int32 latitude = 1; int32 longitude = 2; }
接下来,我们需要根据 .proto 服务定义生成 gRPC 客户端和服务器接口。我们使用带有特殊 gRPC Go 插件的 protocol buffers 编译器 protoc 来执行此操作。
在 examples/route_guide 目录中,运行以下命令:
$ protoc --go_out=. --go_opt=paths=source_relative \ --go-grpc_out=. --go-grpc_opt=paths=source_relative \ routeguide/route_guide.proto
运行此命令将在 routeguide 目录中生成以下文件:
- route_guide.pb.go,其中包含用于填充,序列化和检索请求和响应消息类型的所有 protocol buffers 代码。
- route_guide_grpc.pb.go,其中包含以下内容:
- 客户端使用 RouteGuide 服务中定义的方法调用的接口类型(或存根)。
- 服务器要实现的接口类型,也具有 RouteGuide 服务中定义的方法。
04
创建服务器
首先,让我们看一下如何创建 RouteGuide 服务器。
使我们的 RouteGuide 服务完成其工作包括两个部分:
- 实施根据我们的服务定义生成的服务接口:完成我们服务的实际“工作”。
- 运行 gRPC 服务器以监听来自客户端的请求,并将其分派到正确的服务实现。
您可以在 server/server.go 中找到我们的示例 RouteGuide 服务器。让我们仔细看看它是如何工作的。
实现 RouteGuide
如您所见,我们的服务器具有一个 routeGuideServer 结构体类型,该结构体类型实现了生成的 RouteGuideServer 接口:
type routeGuideServer struct { ... } ... func (s *routeGuideServer) GetFeature(ctx context.Context, point *pb.Point) (*pb.Feature, error) { ... } ... func (s *routeGuideServer) ListFeatures(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error { ... } ... func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error { ... } ... func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error { ... } ...
简单的 RPC
routeGuideServer 实现我们所有的服务方法。首先,让我们看一下最简单的类型 GetFeature,该类型仅从客户端获取一个 Point,然后从其数据库中的Feature 中返回相应的 Feature 信息。
func (s *routeGuideServer) GetFeature(ctx context.Context, point *pb.Point) (*pb.Feature, error) { for _, feature := range s.savedFeatures { if proto.Equal(feature.Location, point) { return feature, nil } } // No feature was found, return an unnamed feature return &pb.Feature{Location: point}, nil }
该方法传递了 RPC 和客户端的 Point protocol buffer 请求的上下文对象。它返回 Feature protocol buffer 对象以及响应信息和错误。在该方法中,我们使用适当的信息填充功能,然后将其返回并返回 nil 错误,以告知 gRPC 我们已经完成了对 RPC 的处理,并且可以将 Feature 返回给客户端。
服务器端流式 RPC
现在,让我们看一下其中的流式 RPC。ListFeatures 是服务器端流式 RPC,因此我们需要将多个 Feature 发送回客户端。
func (s *routeGuideServer) ListFeatures(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error { for _, feature := range s.savedFeatures { if inRange(feature.Location, rect) { if err := stream.Send(feature); err != nil { return err } } } return nil }
如您所见,这次我们没有获得简单的请求和响应对象,而是获得了一个请求对象(客户端要在其中找到 Feature 的 Rectangle)
和一个特殊的 RouteGuide_ListFeaturesServer 对象来编写响应。
在该方法中,我们填充了我们需要返回的所有 Feature 对象,并使用其 Send() 方法将它们写入 RouteGuide_ListFeaturesServer。最后,就像在简单的 RPC 中一样,我们返回 nil 错误来告诉 gRPC 我们已经完成了响应的编写。如果此调用中发生任何错误,我们将返回非 nil 错误;gRPC 层会将其转换为适当的 RPC 状态,以在线上发送。
客户端流式 RPC
现在,让我们看一些更复杂的事情:客户端流方法 RecordRoute,从客户端获取 Point 流,并返回一个包含行程信息的 RouteSummary。如您所见,这次方法完全没有 request 参数。
相反,它获得一个
RouteGuide_RecordRouteServer 流,服务器可以使用该流来读取和写入消息-它可以使用 Recv() 方法接收客户端消息,并使用SendAndClose() 方法返回其单个响应。
func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error { var pointCount, featureCount, distance int32 var lastPoint *pb.Point startTime := time.Now() for { point, err := stream.Recv() if err == io.EOF { endTime := time.Now() return stream.SendAndClose(&pb.RouteSummary{ PointCount: pointCount, FeatureCount: featureCount, Distance: distance, ElapsedTime: int32(endTime.Sub(startTime).Seconds()), }) } if err != nil { return err } pointCount++ for _, feature := range s.savedFeatures { if proto.Equal(feature.Location, point) { featureCount++ } } if lastPoint != nil { distance += calcDistance(lastPoint, point) } lastPoint = point } }
在方法主体中,我们使用
RouteGuide_RecordRouteServer的 Recv() 方法重复读取客户端对请求对象(在本例中为Point)的请求,直到没有更多消息为止:服务器需要检查从 Read() 返回的错误。每个 call。如果为 nil,则流仍然良好,并且可以继续读取;否则为 0。如果是 io.EOF,则消息流已结束,服务器可以返回其 RouteSummary。如果它具有其他值,我们将返回“原样”错误,以便 gRPC 层将其转换为 RPC 状态。
双向流式 RPC
最后,让我们看一下双向流式 RPC RouteChat() 。
flag.Parse() lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", *port)) if err != nil { log.Fatalf("failed to listen: %v", err) } var opts []grpc.ServerOption ... grpcServer := grpc.NewServer(opts...) pb.RegisterRouteGuideServer(grpcServer, newServer()) grpcServer.Serve(lis)
这次,我们获得一个 RouteGuide_RouteChatServer 流,就像在客户端流示例中一样,该流可用于读取和写入消息。但是,这次,当客户端仍在向其消息流中写入消息时,我们通过方法的流返回值。
此处的读写语法与我们的客户端流式传输方法非常相似,不同之处在于服务器使用流的 Send() 方法而不是 SendAndClose() ,因为服务器正在写多个响应。尽管双方总是会按照对方的写入顺序来获取对方的消息,但是客户端和服务器都可以以任意顺序进行读取和写入-流完全独立地运行。
启动服务器
一旦实现了所有方法,我们还需要启动 gRPC 服务器,以便客户端可以实际使用我们的服务。以下代码段显示了如何为 RouteGuide 服务执行此操作:
flag.Parse() lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", *port)) if err != nil { log.Fatalf("failed to listen: %v", err) } var opts []grpc.ServerOption ... grpcServer := grpc.NewServer(opts...) pb.RegisterRouteGuideServer(grpcServer, newServer()) grpcServer.Serve(lis)
构建和启动服务:
- 使用以下命令指定我们要用于监听客户端请求的端口:
lis,err:= net.Listen(...)。 - 使用 grpc.NewServer(...) 创建 gRPC 服务器的实例。
- 在 gRPC 服务器上注册我们的服务实现。
- 使用我们的端口详细信息在服务器上调用 Serve() 进行阻塞等待,直到进程被杀死或调用 Stop() 为止。