最简最快了解RPC核心流程

本文涉及的产品
传统型负载均衡 CLB,每月750个小时 15LCU
EMR Serverless StarRocks,5000CU*H 48000GB*H
网络型负载均衡 NLB,每月750个小时 15LCU
简介: 本文主要以最简易最快速的方式介绍RPC调用核心流程,文中以Dubbo为例。同时,会写一个简易的RPC调用代码,方便理解和记忆核心组件和核心流程。

本文主要以最简易最快速的方式介绍RPC调用核心流程,文中以Dubbo为例。同时,会写一个简易的RPC调用代码,方便理解和记忆核心组件和核心流程。

1、核心思想

RPC调用过程中,最粗矿的核心组件3个:RegistryProviderConsumer。最粗矿的流程4个:注册、订阅、通知、调用。最简单的流程图就1个:

本文会继续细粒度地拆解以上流程,拆解之前,请牢记这段话:

RPC调用,不管中间流程多么复杂,不管代码多么复杂,所有的努力也只为做2件事情:

  1. 在Consumer端,将ReferenceConfig配置的类转换成Proxy代理。

  2. 在Provider端,将ServiceConfig配置的类转换成Proxy代理。

2、核心组件

为了能在Consumer端和Provider端生成各自的Proxy代理,并且发起调用和响应,需要如下核心组件:

  • Registry:注册中心,主要是为了实现 Provider接口注册、Consumer订阅接口、接口变更通知、接口查找等功能。
  • Proxy:服务代理,核心中的核心,一切的努力都是为了生成合适的Proxy服务代理。
    • Consumer的Proxy:Consumer端根据ReferenceConfig生成Proxy,此Proxy主要用于找到合适的Provider接口,然后发起网络调用。
    • Provider的Proxy:Provider端根据ServiceConfig生成Proxy,此Proxy主要作用是通过类似反射的方法调用本地代码,再将结果返回给Consumer。
  • Protocol:服务协议,它相当于一个中间层,用于与注册中心打交道 和 封装 RPC 调用。它在初始化时会创建Client模块 与 服务端建立连接,也会生成用于远程调用的Invoker
  • Cluster:服务集群,主要用于路由、负载均衡、服务容错等。
  • Invoker:服务调用者。
    • Consumer的服务调用者主要是利用Client模块发起远程调用,然后等待Provider返回结果。
    • Provider的服务调用者主要是根据接收到的消息利用反射生成本地代理,然后执行方法,再将结果返回到Consumer。
  • Client:客户端模块,默认是Netty实现,主要用于客户端和服务端通讯(主要是服务调用),比如将请求的接口、参数、请求ID等封装起来发给Server端。
  • Server:服务端模拟,默认是Netty实现。主要是用于客户端和服务端通讯。

3、核心流程

3.1、Consumer流程

流程:

Consumer的流程实际上就是一个从ReferenceConfig 生成Proxy代理的过程。核心事情由Protocol完成。

  1. 根据ReferenceConfig生成代理
  2. 注册到注册中心、订阅注册中心事件
  3. 建立NettyClient,并且与NettyServer建立连接
  4. 生成客户端的ClientInvoker
  5. 选择负载均衡和集群容错
  6. ClientInvoker发起网络调用和等待结果

流程图:

3.2、Provider流程

流程

Provider的流程实际上就是个从ServiceConfig生成Proxy代理的过程。核心事情由PorxyProtocol完成。

  1. 根据ServiceConfig生成本地代理
  2. 注册到注册中心
  3. 启动NettyServer等待客户端连接
  4. 生成服务端Invoker
  5. Invoker监听调用请求
  6. 接收到请求后新建任务丢入到线程池去执行
  7. 执行时会生成本地代理执行(比如通过反射去调用具体的方法),再将返回结果写出去

流程图:

3.3、整体流程图

4、简易代码实现

4.1、核心代码介绍

客户端Proxy

/**
 * 获取代理Service
 */
@SuppressWarnings("unchecked")
public <T> T getService(Class clazz) throws Exception {

    return (T) Proxy.newProxyInstance(getClass().getClassLoader(), new Class[]{clazz}, new InvocationHandler() {
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            String methodName = method.getName();

            if ("equals".equals(methodName) || "hashCode".equals(methodName)) {
                throw new IllegalAccessException("不能访问" + methodName + "方法");
            }
            if ("toString".equals(methodName)) {
                return clazz.getName() + "#" + methodName;
            }

            List<RegistryInfo> registryInfoList = interfaceMethodsRegistryInfoMap.get(clazz);
            if (registryInfoList == null) {
                throw new RuntimeException("无法找到对应的服务提供者");
            }

            LoadBalancer loadBalancer = new RandomLoadBalancer();
            RegistryInfo registryInfo = loadBalancer.choose(registryInfoList);

            ChannelHandlerContext ctx = registryChannelMap.get(registryInfo);

            String identity = InvokeUtils.buildInterfaceMethodIdentify(clazz, method);
            String requestId;

            synchronized (ProxyProtocol.this) {
                requestIdWorker.increment();
                requestId = String.valueOf(requestIdWorker.longValue());
            }

            ClientInvoker clientInvoker = new DefaultClientInvoker(method.getReturnType(), ctx, requestId, identity);

            inProcessInvokerMap.put(identity + "#" + requestId, clientInvoker);

            return clientInvoker.invoke(args);
        }
    });
}

服务端Proxy

private class RpcInvokerTask implements Runnable {
    private RpcRequest rpcRequest;

    public RpcInvokerTask(RpcRequest rpcRequest) {
        this.rpcRequest = rpcRequest;
    }

    @Override
    public void run() {
        try {
            ChannelHandlerContext ctx = rpcRequest.getCtx();
            String interfaceIdentity = rpcRequest.getInterfaceIdentity();
            String requestId = rpcRequest.getRequestId();
            Map<String, Object> parameterMap = rpcRequest.getParameterMap();

            //interfaceIdentity组成:接口类+方法+参数类型
            Map<String, String> interfaceIdentityMap = string2Map(interfaceIdentity);

            //拿出是哪个类
            String interfaceName = interfaceIdentityMap.get("interface");
            Class interfaceClass = Class.forName(interfaceName);
            Object o = interfaceInstanceMap.get(interfaceClass);

            //拿出是哪个方法
            Method method = interfaceMethodMap.get(interfaceIdentity);

            //反射执行
            Object result = null;
            String parameterStr = interfaceIdentityMap.get("parameter");
            if (parameterStr != null && parameterStr.length() > 0) {
                String[] parameterTypeClasses = parameterStr.split(",");//接口方法参数参数可能有多个,用,号隔开
                Object[] parameterInstance = new Object[parameterTypeClasses.length];
                for (int i = 0; i < parameterTypeClasses.length; i++) {
                    parameterInstance[i] = parameterMap.get(parameterTypeClasses[i]);
                }
                result = method.invoke(o, parameterInstance);
            } else {
                result = method.invoke(o);
            }

            //将结果封装成rcpResponse
            RpcResponse rpcResponse = RpcResponse.create(JSONObject.toJSONString(result), interfaceIdentity, requestId);

            //ctx返回执行结果
            String resultStr = JSONObject.toJSONString(rpcResponse) + DELIMITER_STR;

            ByteBuf byteBuf = Unpooled.copiedBuffer(resultStr.getBytes());
            ctx.writeAndFlush(byteBuf);

            System.out.println("响应给客户端:" + resultStr);

        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

Protocol

public ProxyProtocol(String registryUrl, List<ServiceConfig> serviceConfigList, List<ReferenceConfig> referenceConfigList, int port) throws Exception {
    this.serviceConfigList = serviceConfigList == null ? new ArrayList<>() : serviceConfigList;
    this.registryUrl = registryUrl;
    this.port = port;
    this.referenceConfigList = referenceConfigList == null ? new ArrayList<>() : referenceConfigList;

    //1、初始化注册中心
    initRegistry(this.registryUrl);

    //2、将服务注册到注册中心
    InetAddress addr = InetAddress.getLocalHost();
    String hostName = addr.getHostName();
    String hostAddr = addr.getHostAddress();
    registryInfo = new RegistryInfo(hostName, hostAddr, this.port);
    doRegistry(registryInfo);

    //3、初始化nettyServer,启动nettyServer
    if (!this.serviceConfigList.isEmpty()) {
        nettyServer = new NettyServer(this.serviceConfigList, this.interfaceMethodMap);
        nettyServer.init(this.port);
    }

    //如果是客户端引用启动,则初始化处理线程
    if (!this.referenceConfigList.isEmpty()) {
        initProcessor();
    }
}

客户端Invoker

@Override
public T invoke(Object[] args) {
    JSONObject jsonObject = new JSONObject();
    jsonObject.put("interfaces", identity);

    JSONObject param = new JSONObject();
    if (args != null) {
        for (Object obj : args) {
            param.put(obj.getClass().getName(), obj);
        }
    }
    jsonObject.put("parameter", param);
    jsonObject.put("requestId", requestId);
    String msg = jsonObject.toJSONString() + Constants.DELIMITER_STR;
    System.out.println("发送给服务端JSON为:" + msg);

    ByteBuf byteBuf = Unpooled.copiedBuffer(msg.getBytes());
    ctx.writeAndFlush(byteBuf);

    wait4Result();

    return result;
}

private void wait4Result() {
    synchronized (this) {
        try {
            wait();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

@Override
public void setResult(String result) {
    synchronized (this) {
        this.result = (T) JSONObject.parseObject(result, returnType);
        notifyAll();
    }
}

服务端Invoker

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    String message = (String) msg;
    System.out.println("提供者收到消息:" + message);
    //解析消费者发来的消息
    RpcRequest rpcRequest = RpcRequest.parse(message, ctx);
    //接受到消息,启动线程池处理消费者发过来的请求
    threadPoolExecutor.execute(new RpcInvokerTask(rpcRequest));
}

/**
 * 处理消费者发过来的请求
 */
private class RpcInvokerTask implements Runnable {
    private RpcRequest rpcRequest;

    public RpcInvokerTask(RpcRequest rpcRequest) {
        this.rpcRequest = rpcRequest;
    }

    @Override
    public void run() {
        try {
            ChannelHandlerContext ctx = rpcRequest.getCtx();
            String interfaceIdentity = rpcRequest.getInterfaceIdentity();
            String requestId = rpcRequest.getRequestId();
            Map<String, Object> parameterMap = rpcRequest.getParameterMap();

            //interfaceIdentity组成:接口类+方法+参数类型
            Map<String, String> interfaceIdentityMap = string2Map(interfaceIdentity);

            //拿出是哪个类
            String interfaceName = interfaceIdentityMap.get("interface");
            Class interfaceClass = Class.forName(interfaceName);
            Object o = interfaceInstanceMap.get(interfaceClass);

            //拿出是哪个方法
            Method method = interfaceMethodMap.get(interfaceIdentity);

            //反射执行
            Object result = null;
            String parameterStr = interfaceIdentityMap.get("parameter");
            if (parameterStr != null && parameterStr.length() > 0) {
                String[] parameterTypeClasses = parameterStr.split(",");//接口方法参数参数可能有多个,用,号隔开
                Object[] parameterInstance = new Object[parameterTypeClasses.length];
                for (int i = 0; i < parameterTypeClasses.length; i++) {
                    parameterInstance[i] = parameterMap.get(parameterTypeClasses[i]);
                }
                result = method.invoke(o, parameterInstance);
            } else {
                result = method.invoke(o);
            }

            //将结果封装成rcpResponse
            RpcResponse rpcResponse = RpcResponse.create(JSONObject.toJSONString(result), interfaceIdentity, requestId);

            //ctx返回执行结果
            String resultStr = JSONObject.toJSONString(rpcResponse) + DELIMITER_STR;

            ByteBuf byteBuf = Unpooled.copiedBuffer(resultStr.getBytes());
            ctx.writeAndFlush(byteBuf);

            System.out.println("响应给客户端:" + resultStr);

        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

Client

EventLoopGroup group = new NioEventLoopGroup();
try {
    Bootstrap bootstrap = new Bootstrap();
    bootstrap.group(group)
            .channel(NioSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY, true)
            .handler(new ChannelInitializer() {
                @Override
                protected void initChannel(Channel ch) throws Exception {
                    ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024 * 1024, Constants.DELIMITER));
                    ch.pipeline().addLast(new StringDecoder());
                    ch.pipeline().addLast(new NettyClientHandler());

                    System.out.println("initChannel - " + Thread.currentThread().getName());
                }
            });
    ChannelFuture cf = bootstrap.connect(ip, port).sync();
//            cf.channel().closeFuture().sync();
    System.out.println("客户端启动成功");
} catch (Exception e) {
    e.printStackTrace();
    group.shutdownGracefully();
}

Server

public NettyServer(List<ServiceConfig> serviceConfigList, Map<String, Method> interfaceMethodMap) {
    this.serviceConfigList = serviceConfigList;
    this.interfaceMethodMap = interfaceMethodMap;
}

public int init(int port) throws InterruptedException {
    EventLoopGroup bossGroup = new NioEventLoopGroup();
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    ServerBootstrap bootstrap = new ServerBootstrap();
    bootstrap.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .option(ChannelOption.SO_BACKLOG, 1024)
            .childHandler(new ChannelInitializer() {
                @Override
                protected void initChannel(Channel channel) throws Exception {
                    channel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024 * 1024, DELIMITER));
                    channel.pipeline().addLast(new StringDecoder());
                    channel.pipeline().addLast(new RpcInvokeHandler(serviceConfigList, interfaceMethodMap));
                }
            });
    ChannelFuture cf = bootstrap.bind(port).sync();
    System.out.println("启动NettyServer,端口为:" + port);
    return port;
}

4.2、项目地址

https://github.com/yclxiao/rpc-demo.git

5、总结

本文主要以Dubbo为例介绍了RPC调用核心流程,同时,写了个简易的RPC调用代码。

记住以上的流程图即可搞明白整个调用流程。然后再记住最核心的2句话:

  • 所有的努力都是为了能在Consumer端和Provider端生成功能丰富的Proxy。核心事情由Protocol完成。
  • 核心的5个部件:Registry、ProxyProtocolInvokerClientServer

本篇完结!欢迎点赞 关注 收藏!!!

原文链接:https://mp.weixin.qq.com/s/9fF2weLLBR7SChOxPEEqEA

======>>>>>> 关于我 <<<<<<======

相关实践学习
SLB负载均衡实践
本场景通过使用阿里云负载均衡 SLB 以及对负载均衡 SLB 后端服务器 ECS 的权重进行修改,快速解决服务器响应速度慢的问题
负载均衡入门与产品使用指南
负载均衡(Server Load Balancer)是对多台云服务器进行流量分发的负载均衡服务,可以通过流量分发扩展应用系统对外的服务能力,通过消除单点故障提升应用系统的可用性。 本课程主要介绍负载均衡的相关技术以及阿里云负载均衡产品的使用方法。
相关文章
|
网络协议 算法
一次完整的 RPC 流程
一次完整的 RPC 流程 因为 RPC 是远程调用,首先会涉及网络通信, 又因为 RPC 用于业务系统之间的数据交互,要保证数据传输的可靠性,所以它一般默认采用 TCP 来实现网络数据传输。 网络传输的数据必须是二进制数据,可是在 RPC 框架中,调用方请求的出入参数都是对象,对象不能直接在网络中传输,所以需要提前把对象转成可传输的二进制数据,转换算法还要可逆,这个过程就叫“序列化”和“反序列化”。
353 0
一次完整的 RPC 流程
|
数据格式
一个最简单的RPC服务流程(二)
一个最简单的RPC服务流程(二)
159 0
|
JSON Java Go
一个最简单的RPC服务流程
一个最简单的RPC服务流程
437 0
|
编解码 Java 数据格式
一个最简单的RPC服务流程(三)
一个最简单的RPC服务流程(三)
156 0
|
缓存 Java 应用服务中间件
深入理解 RPC 交互流程
文节我们讲解 RPC 的消息交互流程,目的是搞清楚一个简单的 RPC 方法调用背后究竟发生了怎样复杂曲折的故事,以看透 RPC 的本质。 上图是信息系统交互模型宏观示意图,RPC 的消息交互则会深入到底层。
1062 0
|
6月前
|
设计模式 负载均衡 网络协议
【分布式技术专题】「分布式技术架构」实践见真知,手把手教你如何实现一个属于自己的RPC框架(架构技术引导篇)
【分布式技术专题】「分布式技术架构」实践见真知,手把手教你如何实现一个属于自己的RPC框架(架构技术引导篇)
270 0
|
20天前
|
自然语言处理 负载均衡 API
gRPC 一种现代、开源、高性能的远程过程调用 (RPC) 可以在任何地方运行的框架
gRPC 是一种现代开源高性能远程过程调用(RPC)框架,支持多种编程语言,可在任何环境中运行。它通过高效的连接方式,支持负载平衡、跟踪、健康检查和身份验证,适用于微服务架构、移动设备和浏览器客户端连接后端服务等场景。gRPC 使用 Protocol Buffers 作为接口定义语言,支持四种服务方法:一元 RPC、服务器流式处理、客户端流式处理和双向流式处理。
|
3月前
|
Dubbo 网络协议 Java
RPC框架:一文带你搞懂RPC
这篇文章全面介绍了RPC(远程过程调用)的概念、原理和应用场景,解释了RPC如何工作以及为什么在分布式系统中广泛使用,并探讨了几种常用的RPC框架如Thrift、gRPC、Dubbo和Spring Cloud,同时详细阐述了RPC调用流程和实现透明化远程服务调用的关键技术,包括动态代理和消息的编码解码过程。
RPC框架:一文带你搞懂RPC
|
2月前
|
XML 负载均衡 监控
分布式-dubbo-简易版的RPC框架
分布式-dubbo-简易版的RPC框架