RPC 实战:剖析 gRPC 源码,动手实现一个完整的 RPC

简介: 本讲通过剖析gRPC源码,实战实现RPC框架。利用Protocol Buffer定义接口,生成客户端和服务端代码,结合HTTP/2多路复用与PB序列化,详解请求发送、接收及编解码流程,揭示动态代理、序列化等技术在gRPC中的落地应用,帮助读者掌握RPC核心原理与实现。

上一讲我分享了动态代理,其作用总结起来就是一句话:我们可以通过动态代理技术,屏蔽 RPC 调用的细节,从而让使用者能够面向接口编程。
到今天为止,我们已经把 RPC 通信过程中要用到的所有基础知识都讲了一遍,但这些内容多属于理论。这一讲我们就来实战一下,看看具体落实到代码上,我们应该怎么实现一个 RPC 框架?
为了能让咱们快速达成共识,我选择剖析 gRPC 源码(源码地址:https://github.com/grpc/grpc-java)。通过分析 gRPC 的通信过程,我们可以清楚地知道在 gRPC 里面这些知识点是怎么落地到具体代码上的。
gRPC 是由 Google 开发并且开源的一款高性能、跨语言的 RPC 框架,当前支持 C、Java 和 Go 等语言,当前 Java 版本最新 Release 版为 1.27.0。gRPC 有很多特点,比如跨语言,通信协议是基于标准的 HTTP/2 设计的,序列化支持 PB(Protocol Buffer)和 JSON,整个调用示例如下图所示:

如果你想快速地了解一个全新框架的工作原理,我个人认为最快的方式就是从使用示例开始,所以现在我们就以最简单的 HelloWord 为例开始了解。
生成 gRPC 客户端代码
在这个例子里面,我们会定义一个 say 方法,调用方通过 gRPC 调用服务提供方,然后服务提供方会返回一个字符串给调用方。
首先我们先来准备使用 gRpc 的环境,笔者这里使用 gradle,添加依赖
implementation 'io.grpc:grpc-netty-shaded:1.27.0'
implementation 'io.grpc:grpc-protobuf:1.27.0'
implementation 'io.grpc:grpc-stub:1.27.0'
添加 protobuf-gradle-plugin 插件,用于根据 IDL 生成基础代码
plugins {
id 'com.google.protobuf' version '0.8.15'
}

protobuf {
protoc {
artifact = "com.google.protobuf:protoc:3.12.0"
}
plugins {
grpc {
artifact = 'io.grpc:protoc-gen-grpc-java:1.27.0'
}
}
generateProtoTasks {
all()*.plugins {
grpc {}
}
}
}
为了保证调用方和服务提供方能够正常通信,我们需要先约定一个通信过程中的契约,也就是我们在 Java 里面说的定义一个接口,这个接口里面只会包含一个 say 方法。在 gRPC 里面定义接口是通过写 Protocol Buffer 代码,从而把接口的定义信息通过 Protocol Buffer 语义表达出来。HelloWord 的 Protocol Buffer 代码如下所示:
该文件需要按照 grpc 插件的定义放在 src/main/proto/ 目录下,完整路径为 src/main/proto/helloworld.proto
syntax = "proto3";
option java_multiple_files = true;
// 定义这些类生成到的路径
option java_package = "cn.mrcode.study.rpc.s06.grpc.hello";
option java_outer_classname = "HelloProto";
option objc_class_prefix = "HLW";
package hello;

// 定义 RPC 服务接口
service HelloService{
rpc Say(HelloRequest) returns (HelloReply) {}
}

// 定义请求对象
message HelloRequest {
string name = 1;
}

// 定义响应对象
message HelloReply {
string message = 1;
}
然后运行 gradle task generateProto ,这个可以在 idea 右侧的 gradle 面板中的 other 中找到该任务运行,任务运行之后,产生的结果文件在 build/generated/source/proto/main/java 目录下,是安装我们配置好的 java_package 生成的,需要我们将这个包路径下的类文件拷贝到我们正常的代码目录中,而另外一个 HelloServiceGrpc 类的包路径前面单独加了 grpc ,如下图所示

如果你的是 maven 项目,也可以 参考官网文档 的方式进行配置,或则这两种构建项目的方式你都不用,也可以使用命令行的方式生成。
发送原理
生成完基础代码以后,我们就可以基于生成的代码写下调用端代码,具体如下
package cn.mrcode.study.rpc.s06;

import java.util.concurrent.TimeUnit;

import cn.mrcode.study.rpc.s06.grpc.hello.HelloReply;
import cn.mrcode.study.rpc.s06.grpc.hello.HelloRequest;
import cn.mrcode.study.rpc.s06.grpc.hello.HelloServiceGrpc;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;

public class HelloWorldClient {

private final ManagedChannel channel;
private final HelloServiceGrpc.HelloServiceBlockingStub blockingStub;

/**
 * 通过 ip:端口 构建 Channel 连接
 **/
public HelloWorldClient(String host, int port) {
    this(ManagedChannelBuilder.forAddress(host, port)
            .usePlaintext()
            .build());
}

/**
 * 构建 Stub 用于发请求
 **/
HelloWorldClient(ManagedChannel channel) {
    this.channel = channel;
    blockingStub = HelloServiceGrpc.newBlockingStub(channel);
}

/**
 * 调用完手动关闭
 **/
public void shutdown() throws InterruptedException {
    channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
}

/**
 * 发送rpc请求
 **/
public void say(String name) {
    // 构建入参对象
    HelloRequest request = HelloRequest.newBuilder().setName(name).build();
    HelloReply response;
    try {
        // 发送请求
        response = blockingStub.say(request);
    } catch (StatusRuntimeException e) {
        return;
    }
    System.out.println(response);
}

public static void main(String[] args) throws Exception {
    HelloWorldClient client = new HelloWorldClient("127.0.0.1", 50051);
    try {
        client.say("world");
    } finally {
        client.shutdown();
    }
}

}
调用端代码大致分成三个步骤:

  1. 首先用 host 和 port 生成 channel 连接;
  2. 然后用前面生成的 HelloService gRPC 创建 Stub 类;
  3. 最后我们可以用生成的这个 Stub 调用 say 方法发起真正的 RPC 调用,后续其它的 RPC 通信细节就对我们使用者透明了。
    为了能看清楚里面具体发生了什么,我们需要进入到 ClientCalls.blockingUnaryCall 方法里面(就是当我们调用 blockingStub.say(request) 方法时,该方法源码中调用的其他方法 )看下逻辑细节。但是为了避免太多的细节影响你理解整体流程,我在下面这张图中只画下了最重要的部分。

我们可以看到,在调用端代码里面,我们只需要一行(HelloWorldClient 中第 54 行)代码就可以发起一个 RPC 调用,而具体这个请求是怎么发送到服务提供者那端的呢?这对于我们 gRPC 使用者来说是完全透明的,我们只要关注是怎么创建出 stub 对象的就可以了。
比如入参是一个字符对象,gRPC 是怎么把这个对象传输到服务提供方的呢?因为在 第 03 讲 中我们说过,只有二进制才能在网络中传输,但是目前调用端代码的入参是一个字符对象,那在 gRPC 里面我们是怎么把对象转成二进制数据的呢?
回到上面流程图的第 3 步,在 writePayload 之前,ClientCallImpl 里面有一行代码就是 method.streamRequest(message),看方法签名我们大概就知道它是用来把对象转成一个 InputStream,有了 InputStream 我们就很容易获得入参对象的二进制数据。这个方法返回值很有意思,就是为啥不直接返回我们想要的二进制数组,而是返回一个 InputStream 对象呢?你可以先停下来想下原因,我们会在最后继续讨论这个问题。
我们接着看 streamRequest 方法的拥有者 method 是个什么对象?我们可以看到 method 是 MethodDescriptor 对象关联的一个实例,而 MethodDescriptor 是用来存放要调用 RPC 服务的接口名、方法名、服务调用的方式以及请求和响应的序列化和反序列化实现类。
大白话说就是,MethodDescriptor 是用来存储一些 RPC 调用过程中的元数据,而在 MethodDescriptor 里面 requestMarshaller 是在绑定请求的时候用来序列化方式对象的,所以当我们调用 method.streamRequest(message) 的时候,实际是调用 requestMarshaller.stream(requestMessage) 方法,而 requestMarshaller 里面会绑定一个 Parser,这个 Parser 才真正地把对象转成了 InputStream 对象。而这个 Parser 的实现类在我们刚刚生成的基础代码里面就有,HelloReply 和 HelloRequest 中都有,下面贴出 HelloReply 的 Parser 实现类源码
private static final com.google.protobuf.Parser
PARSER = new com.google.protobuf.AbstractParser() {
@Override
public HelloReply parsePartialFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return new HelloReply(input, extensionRegistry);
}
};
我们在 gRPC 文档中可以看到,gRPC 的通信协议是基于标准的 HTTP/2 设计的,而 HTTP/2 相对于常用的 HTTP/1.X 来说,它最大的特点就是多路复用、双向流,该怎么理解这个特点呢?这就好比我们生活中的单行道和双行道,HTTP/1.X 就是单行道,HTTP/2 就是双行道。
那既然在请求收到后需要进行请求「断句」,那肯定就需要在发送的时候把断句的符号加上,我们看下在 gRPC 里面是怎么加的?
因为 gRPC 是基于 HTTP/2 协议,而 HTTP/2 传输基本单位是 Frame,Frame 格式是以固定 9 字节长度的 header,后面加上不定长的 payload 组成,协议格式如下图所示:

那在 gRPC 里面就变成怎么构造一个 HTTP/2 的 Frame 了。
现在回看我们上面那个流程图的第 4 步,在 write 到 Netty 里面之前,我们看到在 MessageFramer.writePayload 方法里面会间接调用 writeKnownLengthUncompressed 方法,该方法要做的两件事情就是构造 Frame Header 和 Frame Body,然后再把构造的 Frame 发送到 NettyClientHandler,最后将 Frame 写入到 HTTP/2 Stream 中,完成请求消息的发送。
接收原理
讲完 gRPC 的请求发送原理,我们再来看下服务提供方收到请求后会怎么处理?我们还是接着前面的那个例子,先看下服务提供方代码,具体如下:
package cn.mrcode.study.rpc.s06.service;

import cn.mrcode.study.rpc.s06.grpc.hello.HelloReply;
import cn.mrcode.study.rpc.s06.grpc.hello.HelloRequest;
import cn.mrcode.study.rpc.s06.grpc.hello.HelloServiceGrpc;
import io.grpc.stub.StreamObserver;

public class HelloServiceImpl extends HelloServiceGrpc.HelloServiceImplBase {
@Override
public void say(HelloRequest request, StreamObserver responseObserver) {
// 构造一个响应对象
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + request.getName()).build();

    // 将响应对象发出去
    responseObserver.onNext(reply);
    responseObserver.onCompleted();
}

}

上面 HelloServiceImpl 类是按照 gRPC 使用方式实现了 HelloService 接口逻辑,但是对于调用者来说并不能把它调用过来,因为我们没有把这个接口对外暴露,在 gRPC 里面我们是采用 Build 模式对底层服务进行绑定,具体代码如下:
package cn.mrcode.study.rpc.s06.service;

import java.io.IOException;

import io.grpc.Server;
import io.grpc.ServerBuilder;

public class HelloWorldServer {
private Server server;

/**
 * 对外暴露服务
 **/
private void start() throws IOException {
    int port = 50051;
    server = ServerBuilder.forPort(port)
            .addService(new HelloServiceImpl())
            .build()
            .start();
    Runtime.getRuntime().addShutdownHook(new Thread() {
        @Override
        public void run() {
            HelloWorldServer.this.stop();
        }
    });
}

/**
 * 关闭端口
 **/
private void stop() {
    if (server != null) {
        server.shutdown();
    }
}

/**
 * 优雅关闭
 **/
private void blockUntilShutdown() throws InterruptedException {
    if (server != null) {
        server.awaitTermination();
    }
}

public static void main(String[] args) throws IOException, InterruptedException {
    final HelloWorldServer server = new HelloWorldServer();
    server.start();
    server.blockUntilShutdown();
}

}

这个时候,你可以先运行测试下,运行顺序如下
// 先运行服务端,提供服务
cn.mrcode.study.rpc.s06.service.HelloWorldServer#main

// 在运行客户端,调用服务
cn.mrcode.study.rpc.s06.HelloWorldClient#main

// 如果运行正常的话,客户端的控制台中会输出以下内容
message: "Hello world"
这里需要特别说明一下:笔者在实践过程中,如果 IDEA 的 Gradle 构建运行模式选择的是 Gradle 而不是 IDEA 的话,运行将会报错:说类重复,大概原因是,使用 gradle 运行时,它会自动构建 proto 基础代码,并加载构建的基础代码,那么就和我们手动复制的代码声明是一样的了,运行会报错
目前的解决方案是:修改配置,如下图所示

服务对外暴露的目的是让过来的请求在被还原成信息后,能找到对应接口的实现。在这之前,我们需要先保证能正常接收请求,通俗地讲就是要先开启一个 TCP 端口,让调用方可以建立连接,并把二进制数据发送到这个连接通道里面,这里依然只展示最重要的部分。

这四个步骤是用来开启一个 Netty Server,并绑定编解码逻辑的,如果你暂时看不懂,没关系的,我们可以先忽略细节。我们重点看下 NettyServerHandler 就行了,在这个 Handler 里面会绑定一个 FrameListener,gRPC 会在这个 Listener 里面处理收到数据请求的 Header 和 Body,并且也会处理 Ping、RST 命令等,具体流程如下图所示:

在收到 Header 或者 Body 二进制数据后,NettyServerHandler 上绑定的 FrameListener 会把这些二进制数据转到 MessageDeframer 里面,从而实现 gRPC 协议消息的解析 。
那你可能会问,这些 Header 和 Body 数据是怎么分离出来的呢?按照我们前面说的,调用方发过来的是一串二进制数据,这就是我们前面开启 Netty Server 的时候绑定 Default HTTP/2FrameReader 的作用,它能帮助我们按照 HTTP/2 协议的格式自动切分出 Header 和 Body 数据来,而对我们上层应用 gRPC 来说,它可以直接拿拆分后的数据来用。
总结
这是我们基础篇的最后一讲,我们采用剖析 gRPC 源码的方式来学习如何实现一个完整的 RPC。当然整个 gRPC 的代码量可比这多得多,但今天的主要目就是想让你把前面所学的序列化、协议等方面的知识落实到具体代码上,所以我们这儿只分析了 gRPC 收发请求两个过程。
实现了这两个过程,我们就可以完成一个点对点的 RPC 功能,但在实际使用的时候,我们的服务提供方通常都是以一个集群的方式对外提供服务的,所以在 gRPC 里面你还可以看到负载均衡、服务发现等功能。而且 gRPC 采用的是 HTTP/2 协议,我们还可以通过 Stream 方式来调用服务,以提升调用性能。
总的来说,其实我们可以简单地认为 gRPC 就是采用 HTTP/2 协议,并且默认采用 PB 序列化方式的一种 RPC,它充分利用了 HTTP/2 的多路复用特性,使得我们可以在同一条链路上双向发送不同的 Stream 数据,以解决 HTTP/1.X 存在的性能问题。
课后思考
我们讲到,在 gRPC 调用的时候,我们有一个关键步骤就是把对象转成可传输的二进制,但是在 gRPC 里面,我们并没有直接转成二进制数组,而是返回一个 InputStream,你知道这样做的好处是什么吗?

相关文章
|
7月前
|
机器学习/深度学习 搜索推荐 算法
广告系统:广告引擎如何做到在 0.1s 内返回广告信息?
广告系统是互联网核心营收支柱,支撑Google、Facebook等公司超80%收入。其本质是高并发、低延迟的实时检索系统,需在0.1秒内完成百万级广告匹配。本文详解广告引擎架构:通过标签过滤、树形分片优化索引;引入向量检索实现智能匹配;采用非精准打分预筛+深度学习精排的混合排序策略;并在离线索引构建时前置过滤无效广告,压缩检索空间。结合业务特点,从索引、召回到排序全方位提升性能,保障高效精准投放。
|
8月前
|
Java Nacos Sentinel
Spring Cloud Alibaba 深度实战:Nacos + Sentinel + Gateway 整合指南
本指南深入整合Spring Cloud Alibaba核心组件:Nacos实现服务注册与配置管理,Sentinel提供流量控制与熔断降级,Gateway构建统一API网关。涵盖环境搭建、动态配置、服务调用与监控,助你打造高可用微服务架构。(238字)
2108 10
|
3月前
|
人工智能 自然语言处理 安全
Claude Code 全攻略:命令大全 + 实战工作流(建议收藏)
本文介绍了Claude Code终端AI助手的使用指南,主要内容包括:1)常用命令如版本查看、项目启动和更新;2)三种工作模式切换及界面说明;3)核心功能指令速查表,包含初始化、压缩对话、清除历史等操作;4)详细解析了/init、/help、/clear、/compact、/memory等关键命令的使用场景和语法。文章通过丰富的界面截图和场景示例,帮助开发者快速掌握如何通过命令行和交互界面高效使用Claude Code进行项目开发,特别强调了CLAUDE.md文件作为项目知识库的核心作用。
46839 72
Claude Code 全攻略:命令大全 + 实战工作流(建议收藏)
|
7月前
|
存储 缓存 NoSQL
存储系统:从检索技术角度剖析 LevelDB 的架构设计思想
LevelDB是Google开源的高性能键值存储系统,基于LSM树优化,采用跳表、读写分离、SSTable分层与Compaction等技术,结合BloomFilter、缓存机制与索引分离设计,显著提升数据读写与检索效率,广泛应用于工业级系统中。(238字)
|
7月前
|
存储 机器学习/深度学习 算法
最近邻检索(下):如何用乘积量化实现「拍照识花」功能?
AI时代,以图搜图、拍图识物广泛应用。其核心是图片特征提取与高维向量相似检索。本文解析聚类算法(如K-Means)与局部敏感哈希的区别,详解乘积量化压缩向量、倒排索引加速检索的技术原理,揭示图像检索背后的高效机制。(238字)
|
7月前
|
机器学习/深度学习 数据采集 自然语言处理
搜索引擎:输入搜索词以后,搜索引擎是怎么工作的?
搜索引擎通过爬虫抓取网页,经索引系统处理生成倒排索引,再由检索系统结合分词、纠错、推荐等技术理解用户意图,利用位置信息和最小窗口排序,精准返回结果。其核心在于以查询词为约束,实现高效相关性匹配。
|
7月前
|
自然语言处理 运维 负载均衡
索引拆分:大规模检索系统如何使用分布式技术加速检索?
本文介绍了分布式技术在大规模检索系统中的应用,重点探讨了如何通过索引拆分提升检索效率。常见的拆分方式有基于业务、文档(水平拆分)和关键词(垂直拆分)。其中,基于文档的拆分更易维护:新增文档仅影响一个分片,且负载更均衡,支持副本扩容应对热点查询,系统可扩展性强,是工业界主流方案。(238字)
|
7月前
|
机器学习/深度学习 算法 搜索推荐
精准 Top K 检索:搜索结果是怎么进行打分排序的?
搜索引擎排序直接影响用户体验,核心是Top K检索。本文介绍三种打分算法:经典TF-IDF衡量词项权重;BM25在此基础上优化,引入文档长度、词频饱和等因子;机器学习则融合数百特征自动学习权重,提升排序精度。最后通过堆排序高效实现Top K结果返回,兼顾性能与效果。(239字)
|
7月前
|
存储 NoSQL 定位技术
空间检索(上):如何用 Geohash 实现「查找附近的人」功能?
本文介绍了如何高效实现“查找附近的人”功能,提出基于Geohash的区域编码与索引方案。通过将二维空间划分为带层次的编码区域,利用一维索引(如跳表、哈希表)快速检索目标区域及邻接区域用户,结合非精准与精准Top K检索策略,在保证性能的同时控制误差。适用于社交、出行等LBS场景。
|
消息中间件 存储 算法
一文详解 RocketMQ 如何利用 Raft 进行高可用保障
本文介绍 RocketMQ 如何利用 Raft(一种简单有效的分布式一致性算法)进行高可用的保障,总结了 RocketMQ 与 Raft 的前世今生。可以说 Raft 的设计给 RocketMQ 的高可用注入了非常多的养分,RocketMQ 的共识算法与高可用设计在 2023 年也得到了学术界的认可,被 CCF-A 类学术会议 ASE 23' 录用。
1181 102

热门文章

最新文章