手写RPC-简陋版

简介: 手写RPC

一、前言


最近不小心被隔离,放假思考一番,决定开始在手写序列。这个序列在之前看Nacous和网关源码的时候就有想法,只是一直没落实下来,趁着隔离行动起来。

二、必备知识介绍


序列化与反序列化

序列化是把对象的状态信息转化为可存储或传输的形式过程,也就是把对象转化为字节序列的过程称为对象的序列化;

反序列化是序列化的逆向过程,把字节数组反序列化为对象,把字节序列恢复为对象的过程成为对象的反序列化;

在Java中通过 JDK 提供了 Java 对象的序列化方式实现对象序列化传输,主要通过输出流java.io.ObjectOutputStream和对象输入流java.io.ObjectInputStream来实现;

java.io.ObjectOutputStream:表示对象输出流 , 它的 writeObject(Object obj)方法可以对参数指定的 obj 对象进行序列化,把得到的字节序列写到一个目标输出流中;

java.io.ObjectInputStream:表示对象输入流 ,它的 readObject()方法源输入流中读取字节序列,再把它们反序列化成为一个对象,并将其返回;

需要注意的是,被序列化的对象需要实现 java.io.Serializable 接口。Java 的序列化机制是通过判断类的 serialVersionUID 来验证版本一致性的。在进行反序列化时,JVM 会把传来的字节流中的 serialVersionUID 与本地相应实体类的 serialVersionUID 进行比较,如果相同就认为是一致的,可以进行反序列化,否则就会出现序列化版本不一致的异常,即是 InvalidCastException。

另外一个需要注意的就是transient关键字,被transient修饰的属性不会被序列化,如果从重写writeobject和readobject则可以重新被序列化。在JDK中的案例就是ArryList中修饰Object[]的数组使用transient关节字,保证传输过程中不照成浪费,只传输有用的值。本质是是通过反射来实现调用writeobject和readobject。

什么是Socket通信

Socket 的原意是“插座”,在计算机通信领域,Socket 被翻译为“套接字”,它是计算机之间进行通信的一种约定或一种方式。通过 Socket 这种约定,一台计算机可以接收其他计算机的数据,也可以向其他计算机发送数据。

三、RPC原理介绍


RPC是什么

所谓的RPC其实是为了不同主机的两个进程间通信而产生的,通常不同的主机之间的进程通信,程序编写需要考虑到网络通信的功能,这样程序的编写将会变得复杂。RPC就来解决这一问题的,一台主机上的进程对另外一台主机的进程发起请求时,内核会将请求转交给RPC client,RPC client经过报文的封装转交给目标主机的RPC server,RPC server就将报文进行解析,还原成正常的请求,转交给目标主机上的目标进程。在我们看来在就像是在同一台主机上的两个进程通信一样,完全没有意识到是在不同的主机上。因此RPC其实也可以看做是一种协议或者是编程框架,目的是为了简化分布式程序的编写。

RPC基本流程

img

  1. Rpc Client通过传入的IP、端口号、调用类以及方法的参数,通过动态代理找到具体的调用类的方法,将请求的类、方法序列化,传输到服务端;
  2. 当Rpc Service收到请求以后,将传入类和方法反序列化,通过反射找到对应的类的方法进行调用,最后将返回结果进行序列化,返回客户端;
  3. Rpc Client收到返回值以后,进行反序列化,最后将结果展示;

四、手撸RPC


从RPC的基本流程可以看到,对于RPC性能来说可以提升的主要两个地方分别是序列化工具以及通信框架,在我们整个手撸系列里面会一步一步将其中的组件提升为高性能的组件,从阻塞IO到NIO,从JDK原始序列化框架到现在五花把门序列化框架,从手动的创建对象到Spring自动化创建对象,注册中心引入等等,整个过程还会伴随知识介绍,让我们一起携手共进。

迈出第一步

第一步我们只做到支持一个类的远程调用,采用JDK携带的序列化和反序列的工具以及阻塞连接的方式。

img

整体项目结构分为三部分,rpc-api作为Api提供,rpc-common主要是提供公共封装供client和service调用,rpc-v1包括rpc-v1-client主要是客户端调用封装,rpc-v1-service作为Api实现以及暴露对应方法,以后每次做的更改我都会新增一个版本,这样会方便新手进行学习。

Service端

img

服务端的实现采用ServerSocket监听某个端口,循环接收连接请求,如果发来了请求就创建一个线程,在新线程中处理调用,核心类就是RpcProxyService和ProcessorHandler,实现如下:

RpcProxyService
@Slf4j
public class RpcProxyService {
    private ExecutorService threadPool;
    public RpcProxyService() {
        int corePoolSize = 5;
        int maximumPoolSize = 200;
        long keepAliveTime = 60;
        BlockingQueue<Runnable> workingQueue = new ArrayBlockingQueue<>(100);
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("socket-pool-").build();
        threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, workingQueue, threadFactory);
    }
    /**
     * 暴露方法,在注册完成以后服务后立刻开始监听
     *
     * @param service
     * @param port
     */
    public void register(Object service, int port) {
        try (ServerSocket serverSocket = new ServerSocket(port);) {
            Socket socket;
            while ((socket = serverSocket.accept()) != null) {
                log.info("客户端连接IP为:" + socket.getInetAddress());
                threadPool.execute(new ProcessorHandler(socket, service));
            }
        } catch (IOException e) {
            log.error("连接异常", e);
        }
    }
}
ProcessorHandler
@Slf4j
public class ProcessorHandler implements Runnable {
    private Socket socket;
    private Object service;
    public ProcessorHandler(Socket socket, Object service) {
        this.service = service;
        this.socket = socket;
    }
    @Override
    public void run() {
        try (ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream())) {
            ObjectOutputStream outputStream = new ObjectOutputStream(socket.getOutputStream());
            //从输入流读取参数
            RpcRequest rpcRequest = (RpcRequest) inputStream.readObject();
            //通过反射获取到方法
            Method method = service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamTypes());
            //执行方法
            Object result = method.invoke(service, rpcRequest.getParameters());
            outputStream.writeObject(RpcResponse.ok(result));
            outputStream.flush();
        } catch (IOException | ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InvocationTargetException exception) {
            //此处可再次进行包装将异常情况分类
            log.error("调用时发生错误", exception);
        }
    }
}

Client端

img

Client端通过RpcClientProxy动态代理(采用JDK动态代理)生成代理对象,然后通过执行RemoteInvocationHandler的invoke来确定调用具体的类和方法,也就是构建RpcRequest对象,最后通过RpcClient发起远程调用。

RpcClientProxy
public class RpcClientProxy {
    public <T> T getProxy(Class<T> interfaceClass, String host, int port) {
        return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),
                new Class<?>[]{interfaceClass},
                new RemoteInvocationHandler(host, port));
    }
}
RemoteInvocationHandler
public class RemoteInvocationHandler implements InvocationHandler {
    private String host;
    private int port;
    public RemoteInvocationHandler(String host, int port) {
        this.host = host;
        this.port = port;
    }
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        //构造请求参数
        RpcRequest rpcRequest = RpcRequest.builder()
                .interfaceName(method.getDeclaringClass().getName())
                .methodName(method.getName())
                .parameters(args)
                .paramTypes(method.getParameterTypes())
                .build();
        //发送请求
        RpcClient rpcClient = new RpcClient();
        return ((RpcResponse) rpcClient.send(rpcRequest, host, port)).getData();
    }
}
RpcClient
@Slf4j
public class RpcClient {
    public Object send(RpcRequest rpcRequest, String host, int port) {
        try (Socket socket = new Socket(host, port)) {
            ObjectOutputStream outputStream = new ObjectOutputStream(socket.getOutputStream());
            ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream());
            //序列化
            outputStream.writeObject(rpcRequest);
            outputStream.flush();
            return inputStream.readObject();
        } catch (IOException | ClassNotFoundException e) {
            log.error("调用时发生异常", e);
            return null;
        }
    }
}

Common端

Common端目前做入参和出参封装,代码如下:

RpcRequest
@Data
@Builder
public class RpcRequest implements Serializable {
    /**
     * 接口名称
     */
    private String interfaceName;
    /**
     * 方法名称
     */
    private String methodName;
    /**
     * 参数
     */
    private Object[] parameters;
    /**
     * 参数类型
     */
    private Class<?>[] paramTypes;
}
RpcResponse
@Data
public class RpcResponse<T> implements Serializable {
    /**
     * 状态码
     */
    private Integer code;
    /**
     * 提醒信息
     */
    private String message;
    /**
     * 返回信息
     */
    private T data;
    public static <T> RpcResponse<T> ok(T data) {
        RpcResponse<T> rpcResponse = new RpcResponse<>();
        rpcResponse.setCode(ResponseCode.SUCCESS.getCode());
        rpcResponse.setData(data);
        rpcResponse.setMessage(rpcResponse.getMessage());
        return rpcResponse;
    }
    public static <T> RpcResponse<T> error(int code, String message) {
        RpcResponse<T> rpcResponse = new RpcResponse<>();
        rpcResponse.setCode(code);
        rpcResponse.setMessage(message);
        return rpcResponse;
    }
}

整体代码我已经上传github,对于初学者一定要联调一下,理解清楚整体的RPC流程。


目录
打赏
0
0
0
0
28
分享
相关文章
为了带你搞懂RPC,我们手写了一个RPC框架
如今,分布式系统大行其道,RPC 有着举足轻重的地位。Dubbo、Thrift、gRpc 等框架各领风骚,学习RPC是新手也是老鸟的必修课。本文带你手撸一个rpc-spring-starter,深入学习和理解rpc相关技术,包括但不限于 RPC 原理、动态代理、Javassist 字节码增强、服务注册与发现、Netty 网络通讯、传输协议、序列化、包压缩、TCP 粘包、拆包、长连接复用、心跳检测、SpringBoot 自动装载、服务分组、接口版本、客户端连接池、负载均衡、异步调用等知识。
449 1
为了带你搞懂RPC,我们手写了一个RPC框架
为了带你搞懂RPC,我们手写了一个RPC框架
如今分布式系统大行其道的年代,RPC 有这举足轻重的地位。风靡的 Duboo、Thrift、gRpc 等框架各领风骚,深入了解RPC是新手也是老鸟的必修课。你知道 RPC 的实现原理吗?想动手实现一个简单的 RPC 框架吗?本文将通过一个 RPC 项目带你寻找答案,大量代码展示,干货满满。
1623 3
为了带你搞懂RPC,我们手写了一个RPC框架
手写RPC框架第三章《RPC中间件》
1、注册中心,生产者在启动的时候需要将本地接口发布到注册中心,我们这里采用redis作为注册中心,随机取数模拟权重。 2、客户端在启动的时候,连接到注册中心,也就是我们的redis。连接成功后将配置的生产者方法发布到注册中心{接口+别名}。 3、服务端配置生产者的信息后,在加载xml时候由中间件生成动态代理类,当发生发放调用时实际则调用了我们代理类的方法,代理里会通过netty的futuer通信方式进行数据交互。
509 7
手写类似dubbo的rpc框架第二章《netty通信》
在我们实现rpc框架的时候,需要选择socket的通信方式。而我们知道一般情况下socket通信类似与qq聊天,发过去消息,什么时候回复都可以。但是我们rpc框架通信,从感觉上类似http调用,需要在一定时间内返回,否则就会发生超时断开。 这里我们选择netty作为我们的socket框架,采用future方式进行通信。
131 0
手写类似dubbo的rpc框架第二章《netty通信》
手写类似dubbo的rpc框架第三章《rpc框架》
本章将实现rpc的基础功能;提供一给rpc中间件jar给生产端和服务端。
138 0
手写类似dubbo的rpc框架第三章《rpc框架》
向高手进阶,从 0 开始手写实现一个 RPC 框架!
向高手进阶,从 0 开始手写实现一个 RPC 框架!
157 0
向高手进阶,从 0 开始手写实现一个 RPC 框架!
手写RPC框架第二章《netty通信》
在我们实现rpc框架的时候,需要选择socket的通信方式。而我们知道一般情况下socket通信类似与qq聊天,发过去消息,什么时候回复都可以。但是我们rpc框架通信,从感觉上类似http调用,需要在一定时间内返回,否则就会发生超时断开。 这里我们选择netty作为我们的socket框架,采用future方式进行通信。
153 0
手写RPC框架第一章《自定义配置xml》
本案例通过三个章节来实现一共简单的rpc框架,用于深入学习rpc框架是如何通信的,当前章节主要介绍如何自定义xml文件并进行解析。想解析自定义的xml首先定义自己的xsd文件,并且实现spring的NamespaceHandlerSupport、BeanDefinitionParser,两个方法进行处理。
213 0
手写类似dubbo的rpc框架第一章《自定义配置xml》
本案例通过三个章节来实现一共简单的rpc框架,用于深入学习rpc框架是如何通信的,当前章节主要介绍如何自定义xml文件并进行解析。想解析自定义的xml首先定义自己的xsd文件,并且实现spring的NamespaceHandlerSupport、BeanDefinitionParser,两个方法进行处理。
121 0
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等