建立一个流式的RPC通信 | 青训营笔记

本文涉及的产品
Digicert DV 证书 单域名,20个 3个月
简介: 建立一个流式的RPC通信 | 青训营笔记

前言

在前面的笔记中,已经知道了如何利用gRPC建立简单RPC通信。但这样简单的实现有时候满足不了的业务需求。在一些场景中需要防止数据被劫持,或是一些场景中希望客户端与服务器不是简单的一问一答,而是建立起一个流式的RPC通信,那么该怎么做到

TLS加密通信

TLS加密无非就是认证客户端与服务器,如果对SSL/TLS加密通信有所了解的童鞋都知道首先需要两张证书。

所以作为准备工作,首先要申请两张测试证书。一张客户端证书,一张服务器证书。

生成测试证书

利用MySSL测试证书生成工具可以很简单的生成两张证书,如下所示:

如图,填入域名生成一张服务器证书,然后将私钥,证书链,根证书都下载下来,保存到文件。

image.png

image.png同样,生成一张客户端证书并保存。

image.png

客户端与服务器TLS认证

gRPC通信中,完成服务器认证与客户端认证主要使用的是grpc下的credentials库。

服务端实现

func main() {
    lis, err := net.Listen("tcp", ":8080")
    if err != nil {
        panic(err)
    }
    // 加载证书和密钥 (同时能验证证书与私钥是否匹配)
    cert, err := tls.LoadX509KeyPair("certs/test_server.pem", "certs/test_server.key")
    if err != nil {
        panic(err)
    }
    // 将根证书加入证书池
    // 测试证书的根如果不加入可信池,那么测试证书将视为不可惜,无法通过验证。
    certPool := x509.NewCertPool()
    rootBuf, err := ioutil.ReadFile("certs/root.pem")
    if err != nil {
        panic(err)
    }
    if !certPool.AppendCertsFromPEM(rootBuf) {
        panic("fail to append test ca")
    }
    tlsConf := &tls.Config{
        ClientAuth:   tls.RequireAndVerifyClientCert,
        Certificates: []tls.Certificate{cert},
        ClientCAs:    certPool,
    }
    serverOpt := grpc.Creds(credentials.NewTLS(tlsConf))
    grpcServer := grpc.NewServer(serverOpt)
    pb.RegisterHelloWorldServiceServer(grpcServer, &SayHelloServer{})
    log.Println("Server Start...")
    grpcServer.Serve(lis)
}

客户端实现

func main() {
    cert, err := tls.LoadX509KeyPair("certs/test_client.pem", "certs/test_client.key")
    if err != nil {
        panic(err)
    }
    // 将根证书加入证书池
    certPool := x509.NewCertPool()
    bs, err := ioutil.ReadFile("certs/root.pem")
    if err != nil {
        panic(err)
    }
    if !certPool.AppendCertsFromPEM(bs) {
        panic("fail to append test ca")
    }
    // 新建凭证
    // ServerName 需要与服务器证书内的通用名称一致
    transportCreds := credentials.NewTLS(&tls.Config{
        ServerName:   "server.razeen.me",
        Certificates: []tls.Certificate{cert},
        RootCAs:      certPool,
    })
    dialOpt := grpc.WithTransportCredentials(transportCreds)
    conn, err := grpc.Dial("localhost:8080", dialOpt)
    if err != nil {
        log.Fatalf("Dial failed:%v", err)
    }
    defer conn.Close()
    client := pb.NewHelloWorldServiceClient(conn)
    resp1, err := client.SayHelloWorld(context.Background(), &pb.HelloWorldRequest{
        Greeting: "Hello Server 1 !!",
        Infos:    map[string]string{"hello": "world"},
    })
    if err != nil {
        log.Printf("%v", err)
    }
    log.Printf("Resp1:%+v", resp1)
    resp2, err := client.SayHelloWorld(context.Background(), &pb.HelloWorldRequest{
        Greeting: "Hello Server 2 !!",
    })
    if err != nil {
        log.Printf("%v", err)
    }
    log.Printf("Resp2:%+v", resp2)
}

从代码中,不难看出,主要是创建一个通信凭证(TransportCredentials)。利用credentials库的NewTLS方法从tls加载一个通信凭证用于通信。而在其中需要注意的是:

  • 如果你使用的是自签发的证书,注意将根加入证书池。如果你使用的是可信CA签发的证书大部分不用添加,因为系统的可信CA库已经有了。如果没有成功添加, 在通信时会出现以下错误:

rpc error: code = Unavailable desc = all SubConns are in TransientFailure, latest connection error: connection error: desc = “transport: authentication handshake failed: x509: certificate signed by unknown authority”

rpc error: code = Unavailable desc = all SubConns are in TransientFailure, latest connection error: connection error: desc = “transport: authentication handshake failed: remote error: tls: bad certificate”

  • 客户端凭证内 ServerName 需要与服务器证书内的通用名称一致,如果不一致会出现如下错误:

rpc error: code = Unavailable desc = all SubConns are in TransientFailure, latest connection error: connection error: desc = “transport: authentication handshake failed: x509: certificate is valid for server.razeen.me, not xxxxx”

之后,就可安心的通信了,在私钥不泄漏的情况下,基本不再担心数据劫持问题了。

这里我想多说一句: 经常在提交代码时会直接 git add . ,这是个不好的习惯,有时后会将一些不必要的文件提交上去,特别是一些证书私钥密码之类的文件。

流式的RPC通信

流式PRC通信可以分为:

  • 服务器端流式 RPC;
    客户端发送请求到服务器,拿到一个流去读取返回的消息序列。 客户端读取返回的流,直到里面没有任何消息。如:
rpc ListHello(HelloWorldRequest) returns (stream HelloWorldResponse) {}
  • 客户端流式 RPC;
    客户端写入一个消息序列并将其发送到服务器,同样也是使用流。一旦客户端完成写入消息,它等待服务器完成读取返回它的响应。如:
rpc SayMoreHello(stream HelloWorldRequest) returns (HelloWorldResponse) {}
  • 双向流式 RPC;
    双方使用读写流去发送一个消息序列。两个流独立操作,因此客户端和服务器可以以任意喜欢的顺序读写。如:
rpc SayHelloChat(stream HelloWorldRequest) returns (stream HelloWorldRequest) {}

从上面的定义不难看出,用stream可以定义一个流式消息。下面就通过实例来演示一下流式通信的使用方法。

首先,将上面三个rpc server加入.proto , 并且生成新的.pb.go代码。

在生成的代码hello_world.pb.go中,可以看到客户端接口如下:

type HelloWorldServiceClient interface {
    SayHelloWorld(ctx context.Context, in *HelloWorldRequest, opts ...grpc.CallOption) (*HelloWorldResponse, error)
    ListHello(ctx context.Context, in *HelloWorldRequest, opts ...grpc.CallOption) (HelloWorldService_ListHelloClient, error)
    SayMoreHello(ctx context.Context, opts ...grpc.CallOption) (HelloWorldService_SayMoreHelloClient, error)
    SayHelloChat(ctx context.Context, opts ...grpc.CallOption) (HelloWorldService_SayHelloChatClient, error)
}

服务端接口如下:

// HelloWorldServiceServer is the server API for HelloWorldService service.
type HelloWorldServiceServer interface {
    SayHelloWorld(context.Context, *HelloWorldRequest) (*HelloWorldResponse, error)
    ListHello(*HelloWorldRequest, HelloWorldService_ListHelloServer) error
    SayMoreHello(HelloWorldService_SayMoreHelloServer) error
    SayHelloChat(HelloWorldService_SayHelloChatServer) error
}

在客户段的接口中,生成了HelloWorldService_XXXXClient接口类型。 在服务端的接口中,生成了HelloWorldService_XXXXServer接口类型。 再查看这些接口的定义,发现这这几个接口都是实现了以下几个方法中的数个:

Send(*HelloWorldRequest) error
Recv() (*HelloWorldRequest, error)
CloseAndRecv() (*HelloWorldResponse, error)
grpc.ClientStream

看其名字,不难知道,流式RPC的使用,或者说流的收发也就离不开这几个方法了。下面通过几个实例来验证一下。

在服务端,实现这三个接口。

// 服务器端流式 RPC, 接收一次客户端请求,返回一个流
func (s *SayHelloServer) ListHello(in *pb.HelloWorldRequest, stream pb.HelloWorldService_ListHelloServer) error {
    log.Printf("Client Say: %v", in.Greeting)
    // 返回多条数据
    stream.Send(&pb.HelloWorldResponse{Reply: "ListHello Reply " + in.Greeting + " 1"})
    time.Sleep(1 * time.Second)
    stream.Send(&pb.HelloWorldResponse{Reply: "ListHello Reply " + in.Greeting + " 2"})
    time.Sleep(1 * time.Second)
    stream.Send(&pb.HelloWorldResponse{Reply: "ListHello Reply " + in.Greeting + " 3"})
    time.Sleep(1 * time.Second)
    return nil
}
// 客户端流式 RPC, 客户端流式请求,服务器可返回一次
func (s *SayHelloServer) SayMoreHello(stream pb.HelloWorldService_SayMoreHelloServer) error {
    // 接受客户端请求
    for {
        req, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            return err
        }
        log.Printf("SayMoreHello Client Say: %v", req.Greeting)
    }
    // 流读取完成后,返回
    return stream.SendAndClose(&pb.HelloWorldResponse{Reply: "SayMoreHello Recv Muti Greeting"})
}
// 双向流式 RPC
func (s *SayHelloServer) SayHelloChat(stream pb.HelloWorldService_SayHelloChatServer) error {
    // 开一个协程去处理客户端数据
    go func() {
        for {
            req, err := stream.Recv()
            if err == io.EOF {
                break
            }
            if err != nil {
                return
            }
            log.Printf("SayHelloChat Client Say: %v", req.Greeting)
        }
    }()
    // 向客户端写入多条数据
    stream.Send(&pb.HelloWorldRequest{Greeting: "SayHelloChat Server Say Hello 1"})
    time.Sleep(1 * time.Second)
    stream.Send(&pb.HelloWorldRequest{Greeting: "SayHelloChat Server Say Hello 2"})
    time.Sleep(1 * time.Second)
    stream.Send(&pb.HelloWorldRequest{Greeting: "SayHelloChat Server Say Hello 3"})
    time.Sleep(1 * time.Second)
    return nil
}

之后就可以在客户端分别请求这几个rpc服务。

// 服务器端流式 RPC;
    // 向服务器SayHello 
    recvListHello, err := client.ListHello(context.Background(), &pb.HelloWorldRequest{Greeting: "Hello Server List Hello"})
    if err != nil {
        log.Fatalf("ListHello err: %v", err)
    }
    // 服务器以流式返回
    // 直到 err==io.EOF时,表示接收完毕。
    for {
        resp, err := recvListHello.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            log.Fatal(err)
        }
        log.Printf("ListHello Server Resp: %v", resp.Reply)
    }
// Client Out:
// 2018/08/06 01:27:55 ListHello Server Resp: ListHello Reply Hello Server List Hello 1
// 2018/08/06 01:27:56 ListHello Server Resp: ListHello Reply Hello Server List Hello 2
// 2018/08/06 01:27:57 ListHello Server Resp: ListHello Reply Hello Server List Hello 3
// Server Out:
// 2018/08/06 01:27:55 Client Say: Hello Server List Hello
    // 客户端流式 RPC;
    sayMoreClient, err := client.SayMoreHello(context.Background())
    if err != nil {
        log.Fatal(err)
    }
    for i := 0; i < 3; i++ {
        sayMoreClient.Send(&pb.HelloWorldRequest{Greeting: fmt.Sprintf("SayMoreHello Hello Server %d", i)})
    }
    sayMoreResp, err := sayMoreClient.CloseAndRecv()
    if err != nil {
        log.Fatal(err)
    }
    log.Printf("SayMoreHello Server Resp: %v", sayMoreResp.Reply)
// Client Out:
// 2018/08/06 01:31:11 SayMoreHello Server Resp: SayMoreHello Recv Muti Greeting
// Server Out:
// 2018/08/06 01:31:11 SayMoreHello Client Say: SayMoreHello Hello Server 0
// 2018/08/06 01:31:11 SayMoreHello Client Say: SayMoreHello Hello Server 1
// 2018/08/06 01:31:11 SayMoreHello Client Say: SayMoreHello Hello Server 2
    // 双向流式 RPC;
    sayHelloChat, err := client.SayHelloChat(context.Background())
    if err != nil {
        log.Fatal(err)
    }
    go func() {
        for i := 0; i < 3; i++ {
            sayHelloChat.Send(&pb.HelloWorldRequest{Greeting: fmt.Sprintf("SayHelloChat Hello Server %d", i)})
        }
    }()
    for {
        resp, err := sayHelloChat.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            log.Fatal(err)
        }
        log.Printf("SayHelloChat Server Say: %v", resp.Greeting)
    }
// Client Out:
// 2018/08/06 01:31:11 SayHelloChat Server Say: SayHelloChat Server Say Hello 1
// 2018/08/06 01:31:12 SayHelloChat Server Say: SayHelloChat Server Say Hello 2
// 2018/08/06 01:31:13 SayHelloChat Server Say: SayHelloChat Server Say Hello 3
// Server Out:
// 2018/08/06 01:31:11 SayHelloChat Client Say: SayHelloChat Hello Server 0
// 2018/08/06 01:31:11 SayHelloChat Client Say: SayHelloChat Hello Server 1
// 2018/08/06 01:31:11 SayHelloChat Client Say: SayHelloChat Hello Server 2

总结

看了实例,是不是觉得很简单~。三种方式大同小异,只要掌握了怎么去收发流,怎么判断流的结束,基本就可以了。

目录
相关文章
|
Dubbo Java 应用服务中间件
由浅入深RPC通信原理实战1
由浅入深RPC通信原理实战1
78 0
|
4月前
|
XML 存储 JSON
(十二)探索高性能通信与RPC框架基石:Json、ProtoBuf、Hessian序列化详解
如今这个分布式风靡的时代,网络通信技术,是每位技术人员必须掌握的技能,因为无论是哪种分布式技术,都离不开心跳、选举、节点感知、数据同步……等机制,而究其根本,这些技术的本质都是网络间的数据交互。正因如此,想要构建一个高性能的分布式组件/系统,不得不思考一个问题:怎么才能让数据传输的速度更快?
113 1
|
7月前
|
消息中间件 缓存 API
|
负载均衡 算法 Dubbo
由浅入深RPC通信原理实战2
由浅入深RPC通信原理实战2
81 0
|
消息中间件 微服务
微服务通信:RPC、消息队列和事件驱动架构的比较
在微服务架构中,微服务之间的通信是至关重要的。为了实现松耦合、高效可靠的通信,开发人员可以选择不同的通信方式,包括RPC(远程过程调用)、消息队列和事件驱动架构。本文将对这三种常见的微服务通信方式进行比较,探讨它们的特点、适用场景和优缺点,帮助开发人员选择合适的通信方式。
352 0
|
存储 JavaScript Linux
开源 Golang 微服务入门二:RPC 框架 Kitex| 青训营笔记
Kitex 字节跳动内部的 Golang 微服务 RPC 框架,具有高性能、强可扩展的特点,在字节内部已广泛使用。如果对微服务性能有要求,又希望定制扩展融入自己的治理体系,Kitex 会是一个不错的选
308 0
开源 Golang 微服务入门二:RPC 框架 Kitex| 青训营笔记
|
编解码 运维 负载均衡
深入浅出RPC框架|青训营笔记
由于课程涉及到的RPC知识需要自己对其有较为全面的理解后才能比较好的get到课程中提及的各种框架设计的点,因此我建议阅读Kitex框架的源码,再结合课程目录去体会Kitex设计的初衷。
192 0
深入浅出RPC框架|青训营笔记
|
消息中间件 存储 设计模式
Seata 高性能 RPC 通信的实现- 巧用 reactor 模式
reactor 模式是一种事件驱动的应用层 I/O 处理模式,基于分而治之和事件驱动的思想,致力于构建一个高性能的可伸缩的 I/O 处理模式
185 0
|
前端开发 JavaScript Java
Seata 高性能RPC通信的实现基石-Netty篇
Netty是一个异步的、基于事件驱动的网络应用框架,用以快速开发高性能、高可靠性的网络 IO 程序。
185 0
|
分布式计算 Dubbo Hadoop
使用Hadoop提供的工具实现基于Proto协议的RPC通信
使用Hadoop提供的工具实现基于Proto协议的RPC通信
244 0
下一篇
DataWorks