最简单的 gRPC 教程—3 拦截器、截止、取消

简介: 前面的两篇文章已经基本讲述了 gRPC 的使用方法,相信已经能够应对大多数开发场景了,只不过我们有时候需要针对业务作出一些定制,这时候就必须要更加了解 gRPC 的一些特性了,比如:• 拦截器• 截止时间• 取消

前面的两篇文章已经基本讲述了 gRPC 的使用方法,相信已经能够应对大多数开发场景了,只不过我们有时候需要针对业务作出一些定制,这时候就必须要更加了解 gRPC 的一些特性了,比如:

  • 拦截器
  • 截止时间
  • 取消


拦截器


我把前一篇文章的订单 Order 服务的代码文件拷贝了一份,用来演示这一节的内容。

gRPC 在服务端和客户端均可以实现拦截器,首先来看一下服务端的拦截器。


服务端拦截器

针对一元通信模式,有对应的一元拦截器,拦截器的方法定义如下:

// 一元拦截器
func orderUnaryServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (res interface{}, err error) {
   //前置处理
   log.Println("==========[Server Unary Interceptor]===========", info.FullMethod)
   //完成方法的正常执行
   res, err = handler(ctx, req)
   //后置处理
   log.Printf("After method call, res = %+v\n", res)
   return
}


然后需要在服务端的 main 方法中注册一下这个拦截器:

// ...
// 注册拦截器
s := grpc.NewServer(grpc.UnaryInterceptor(orderUnaryServerInterceptor))
// ...


实现完成之后,所有服务端的一元通信模式的方法,都会被这个方法所拦截。

针对服务端流模式,也有对应的流拦截器,方法的定义稍微有一些区别了。

// 服务端流拦截器
type WrappedServerStream struct {
   grpc.ServerStream
}
func (w *WrappedServerStream) SendMsg(m interface{}) error {
   log.Printf("[order stream server interceptor] send a msg : %+v", m)
   return w.ServerStream.SendMsg(m)
}
func (w *WrappedServerStream) RecvMsg(m interface{}) error {
   log.Printf("[order stream server interceptor] recv a msg : %+v", m)
   return w.ServerStream.RecvMsg(m)
}
func NewWrappedServerStream(s grpc.ServerStream) *WrappedServerStream {
   return &WrappedServerStream{s}
}
func orderStreamServerInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
   log.Printf("=========[order stream]start %s\n", info.FullMethod)
   //执行方法
   err := handler(srv, NewWrappedServerStream(ss))
   if err != nil {
      log.Println("handle method err.", err)
   }
   log.Printf("=========[order stream]end")
   return nil
}


在接收和发送消息时,都可以进行拦截,这样可以根据业务所需定制化。

在服务端的 main 方法中,还是需要注册一下这个拦截器:

// 注册拦截器
s := grpc.NewServer(
   grpc.UnaryInterceptor(orderUnaryServerInterceptor),
   grpc.StreamInterceptor(orderStreamServerInterceptor),
)


客户端拦截器

和服务端类似,客户端拦截器也分为了一元拦截器和流拦截器,分别对应不同的通信模式。


首先来看看一元拦截器。

//客户端一元拦截器
func UnaryClientOrderInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) (err error) {
   log.Println("=========[client interceptor] ",  method)
   err = invoker(ctx, method, req, reply, cc, opts...)
   if err != nil {
      log.Println("invoke method err.", err)
   }
   log.Println("=========[client interceptor] end. reply : ", reply)
   return
}


然后也需要在监听服务端的时候注册拦截器:

// ...
conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithUnaryInterceptor(UnaryClientOrderInterceptor))
// ...


然后再来看下客户端流拦截器,先看下拦截器方法定义:

// 客户端流拦截器
type WrappedClientStream struct {
   grpc.ClientStream
}
func (w *WrappedClientStream) SendMsg(m interface{}) error {
   log.Printf("===========[client interceptor] send msg : %+v", m)
   return w.ClientStream.SendMsg(m)
}
func (w *WrappedClientStream) RecvMsg(m interface{}) error {
   log.Printf("============[client interceptor] recv msg : %+v", m)
   return w.ClientStream.RecvMsg(m)
}
func NewWrappedClientStream(s grpc.ClientStream) *WrappedClientStream {
   return &WrappedClientStream{s}
}
func StreamClientOrderInterceptor (ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
   log.Printf("===========[client msg]start, method = %+v\n", method)
   clientStream, err := streamer(ctx, desc, cc, method, opts...)
   if err != nil {
      return nil, err
   }
   return NewWrappedClientStream(clientStream), nil
}


最后也需要在客户端的 main 方法中注册拦截器:

conn, err := grpc.Dial(address, 
   grpc.WithInsecure(),
   grpc.WithUnaryInterceptor(UnaryClientOrderInterceptor),    //注册拦截器
   grpc.WithStreamInterceptor(StreamClientOrderInterceptor),
)


截止时间

再来看一下另一个常用的模式截止时间,gRPC 常用于微服务之间的通信,而微服务中,服务之间的调用往往存在不确定性,比如网络环境差,数据聚合量太大等等,都有可能会导致服务调用者长时间等待响应结果。

这样对于用户体验来说非常的不好,并且也很浪费资源,所以微服务之间需要采用快速失败的模式,及时释放资源,不堆积请求,这样对系统的压力也会更小。

在 Go 语言中,对 gRPC 截止时间的控制,主要是使用 context 来实现的。下面是一个简单的 demo:

// 使用带有截止时间的context
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(5 * time.Second)) //适当调整截止时间观察不同的调用效果
defer cancel()
client := order.NewOrderManagementClient(conn)
AddOrder(ctx, client)

这里使用了带有截止时间的 context,如果 AddOrder 方法的调用超过了截止时间,那么调用就会被取消。


取消

在某些情况下,我们可能需要取消 RPC 请求,这和设置截止时间有些类似,都是为了避免请求挂起让客户端一直等待,在 Go 语言中,取消的操作仍然借助于 context 来实现。

以一个简单的例子来说明取消 RPC 的操作:

// 取消RPC请求
func cancelRpcRequest(client order.OrderManagementClient) {
   ctx, cancelFunc := context.WithCancel(context.Background())
   done := make(chan string)
   go func() {
      var id string
      defer func() {
         fmt.Println("结束执行, id = ", id)
         done <- id
      }()
      time.Sleep(2 * time.Second)
      id = AddOrder(ctx, client)
      log.Println("添加订单成功, id = ", id)
   }()
   //等待一秒后取消
   time.Sleep(time.Second)
   cancelFunc()
   <-done
}


相关文章
|
1月前
|
NoSQL Java Redis
SpringBoot集成Redis解决表单重复提交接口幂等(亲测可用)
SpringBoot集成Redis解决表单重复提交接口幂等(亲测可用)
420 0
|
1月前
|
前端开发
若依框架---如何防止请求重复提交?
若依框架---如何防止请求重复提交?
400 2
|
1月前
|
Java 数据库
SpringBoot手动取消接口执行方案
实际开发中经常会遇到比较耗时的接口操作,但页面强制刷新或主动取消接口调用后后台还是会继续运行,特别是有大量数据库操作时会增加服务器压力,所以进行研究测试后总结了一套主动取消接口调用的解决方案
48 0
|
1月前
|
存储 JSON Java
SpringBoot集成AOP实现每个接口请求参数和返回参数并记录每个接口请求时间
SpringBoot集成AOP实现每个接口请求参数和返回参数并记录每个接口请求时间
71 2
|
1月前
|
移动开发 前端开发
基于Jeecg-boot的flowable流程支持拒绝同意流程操作(二)
基于Jeecg-boot的flowable流程支持拒绝同意流程操作(二)
37 0
|
1月前
|
移动开发 前端开发
基于Jeecg-boot的flowable流程支持拒绝同意流程操作
基于Jeecg-boot的flowable流程支持拒绝同意流程操作
47 0
|
1天前
|
JSON Java API
技术笔记:springboot项目使用拦截器实现一个简单的网关请求透传
技术笔记:springboot项目使用拦截器实现一个简单的网关请求透传
|
1月前
|
NoSQL Java API
SpringBoot项目中防止表单重复提交的两种方法(自定义注解解决API接口幂等设计和重定向)
SpringBoot项目中防止表单重复提交的两种方法(自定义注解解决API接口幂等设计和重定向)
173 0
|
9月前
|
Java API Nacos
Nacos服务健康检查与服务变动事件发布源码解析
Nacos服务健康检查与服务变动事件发布源码解析
71 0
|
设计模式 Java 调度
SpringBoot 事件发布监听机制使用、分析、注意点 (一篇到位)
SpringBoot 事件发布监听机制使用、分析、注意点 (一篇到位)
1434 1
SpringBoot 事件发布监听机制使用、分析、注意点 (一篇到位)