十分钟写个RPC框架

简介: 互联网时代,各种RPC框架盛行,细看各种框架,应用层面有各种变化,但是万变不离其宗,RPC最核心的部分基本都是一致的。那就是跟远程的服务器进行通信,像调用本地服务一样调用远程服务。然后在这些基础上可能会附加一些诸如,服务自动注册和发现,负载均衡,就近路由,调用链路记录,远程mock等等功能。今天想给大家分享的是,如果不考虑性能,API易用性,服务发现,负载均衡,环境隔离等其他因素,其实做一个基本功
互联网时代,各种RPC框架盛行,细看各种框架,应用层面有各种变化,但是万变不离其宗,RPC最核心的部分基本都是一致的。
那就是跟远程的服务器进行通信,像调用本地服务一样调用远程服务。
然后在这些基础上可能会附加一些诸如,服务自动注册和发现,负载均衡,就近路由,调用链路记录,远程mock等等功能。
今天想给大家分享的是,如果不考虑性能,API易用性,服务发现,负载均衡,环境隔离等其他因素,其实做一个基本功能的RPC框架,几分钟就能搞定。
个人认为RPC功能的基石有下面几个 一、序列化协议 二、远程通信报文协议 三、协议报文处理实现。
序列化协议JDK为我们提供了一套自带的序列化协议,虽然不能跨语言,压缩率不高,不过我们只为展示RPC技术,没有考虑其他,如果觉得不好可以使用Hessian ,protobuf,甚至是诸如fastjson这些json序列化的开源框架。
远程通信JDK有一套socket和输入输出流,虽然是同步的,性能不怎么样,但是只为展示RPC技术原理,不考虑其他,为了性能和吞吐量我们可以选择netty进行改造。
通信协议,我只做了一个简单的 MAGIC_NUM+两个字节报文长+序列化对象字节流 的协议,协议上可以增加很多东西,比如版本号,心跳包,状态码,可扩展的报文头等等,不过同样的,这里只为展示RPC原理,不考虑其他的。
协议报文处理部分只是通过报文体里面携带的 类名 方法名,方法参数,做了个简单的反射处理,这部分其实可以扩展的部分很多,比如预先做方法缓存,方法签名使用短命名注册等等,或者想更快还能通过字节码注入的方式自动生成一些模板代码的方式,将反射变成直接的方法调用。

下面直接展示代码吧
首先是传输的对象都是java可序列化对象:
public class RpcCommand implements Serializable{
    String className ;
    String methodName ;
    String[] argumetsType ;

    Object[] params ;
    public String getClassName() {
        return className;
    }

    public void setClassName(String className) {
        this.className = className;
    }

    public String getMethodName() {
        return methodName;
    }

    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }

    public String[] getArgumetsType() {
        return argumetsType;
    }

    public void setArgumetsType(String[] argumetsType) {
        this.argumetsType = argumetsType;
    }

    public Object[] getParams() {
        return params;
    }

    public void setParams(Object[] params) {
        this.params = params;
    }


}
public class RpcResponse implements Serializable{
    boolean isException;
    Object result ;
    Exception exception;
    public boolean isException() {
        return isException;
    }

    public void setException(boolean exception) {
        isException = exception;
    }

    public Object getResult() {
        return result;
    }

    public void setResult(Object result) {
        this.result = result;
    }

    public Exception getException() {
        return exception;
    }

    public void setException(Exception exception) {
        this.exception = exception;
    }

}

 

其次是请求对象的处理部分
package com.shock.rpc;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;

/**
 * ${DESCRIPTION}
 * com.shock.rpc.${CLASS_NAME}
 * Created by zhengdong.lzd on 2016/11/29 0029.
 */
public class RpcHandler {

    ConcurrentHashMap<String, Object> registered = new ConcurrentHashMap<String, Object>(128);

    public RpcResponse handler(RpcCommand commond) {
        String className = commond.getClassName();
        RpcResponse response = new RpcResponse();
        try {
            Object obj = registered.get(className);
            String[] argTypes = commond.getArgumetsType();
            Class aClass = Class.forName(className);
            List<Class> argsTypeList = new ArrayList<Class>(argTypes.length);
            for (String s : argTypes) {
                argsTypeList.add(Class.forName(s));
            }
            Method method = aClass.getMethod(commond.getMethodName(),
                argsTypeList.toArray(new Class[argsTypeList.size()]));
            Object object = method.invoke(obj, commond.getParams());
            response.setResult(object);
        } catch (Exception e) {
            e.printStackTrace();
            response.setException(true);
            response.setException(e);
        }
        return response;
    }

    public void regist(Class interfa, Object object) {
        registered.put(interfa.getName(), object);
    }
}

 代码里面只有很粗暴的反射实现

第三个是服务端启动和服务端协议处理代码

package com.shock.rpc;

import com.shock.rpc.demo.IDemoImpl;
import com.shock.rpc.demo.IDemoInterface;

import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * ${DESCRIPTION}
 * com.shock.rpc.${CLASS_NAME}
 * Created by zhengdong.lzd on 2016/11/29 0029.
 */
public class RpcServer {
    int port;

    public RpcServer(int port, RpcHandler handler) {
        this.port = port;
        this.handler = handler;
    }

    RpcHandler      handler;

    ExecutorService executorService = Executors.newFixedThreadPool(20);

    public void start() {
        try {
            ServerSocket serverSocket = new ServerSocket(port);
            while (true) {
                Socket socket = serverSocket.accept();
                executorService.submit(new WorkThread(socket));
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    public class WorkThread implements Runnable {
        Socket socket;

        WorkThread(Socket socket) {
            this.socket = socket;
        }

        @Override
        public void run() {
            try {
                InputStream inputStream = socket.getInputStream();
                OutputStream outputStream = socket.getOutputStream();
                while (true) {
                    int magic = inputStream.read();
                    //魔数
                    if (magic == 0x5A) {
                        //两个字节用来计算长度数据长度,服务传送的数据过大可能会出现截断问题
                        int length1 = inputStream.read();
                        int length2 = inputStream.read();
                        int length = (length1 << 8) + length2;
                        ByteArrayOutputStream bout = new ByteArrayOutputStream(length);
                        int sum = 0;
                        byte[] bs = new byte[length];
                        while (true) {
                            int readLength = inputStream.read(bs, 0, length - sum);
                            if (readLength > 0) {
                                bout.write(bs, 0, readLength);
                                sum += readLength;
                            }
                            if (sum >= length) {
                                break;
                            }
                        }
                        ObjectInputStream objectInputStream = new ObjectInputStream(
                            new ByteArrayInputStream(bout.toByteArray()));
                        try {
                            RpcCommand commond = (RpcCommand) objectInputStream.readObject();
                            RpcResponse response = handler.handler(commond);
                            ByteArrayOutputStream objectout = new ByteArrayOutputStream(length);
                            ObjectOutputStream objectOutputStream = new ObjectOutputStream(objectout);
                            objectOutputStream.writeObject(response);
                            objectOutputStream.flush();
                            byte[] commondBytes = objectout.toByteArray();
                            int len = commondBytes.length;
                            outputStream.write(0x5A);
                            outputStream.write(len >> 8);
                            outputStream.write(len & 0x00FF);
                            outputStream.write(commondBytes);
                            outputStream.flush();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
                System.out.println("和客户端连接断开了");
            } finally {
                if (socket != null) {
                    try {
                        socket.close();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

    public static void main(String[] args) {
        RpcHandler rpcHandler = new RpcHandler();
        rpcHandler.regist(IDemoInterface.class, new IDemoImpl());
        RpcServer servcer = new RpcServer(8081, rpcHandler);
        servcer.start();
    }
}

 代码实现也很简单,就是根据前面说的传输报文协议读取传输的报文,反序列化出请求对象RpcCommand,给处理类进行处理,如果做好兼容加上版本和不同协议的话,可以增加不同的处理实现。

最后是客户端传输和协议处理代码
package com.shock.rpc;

import com.shock.rpc.demo.IDemoInterface;

import java.io.*;
import java.net.InetSocketAddress;
import java.net.Socket;

/**
 * ${DESCRIPTION}
 * com.shock.rpc.${CLASS_NAME}
 * Created by zhengdong.lzd on 2016/11/29 0029.
 */
public class RpcClient {

    String       host;
    int          port;
    Socket       socket;
    InputStream  inputStream;
    OutputStream outputStream;

    public RpcClient(String host, int port) {
        try {
            socket = new Socket();
            socket.connect(new InetSocketAddress(host, port));
            inputStream = socket.getInputStream();
            outputStream = socket.getOutputStream();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    //这个不能并发请求,否则会出现数据流乱的情况
    public synchronized RpcResponse invoke(RpcCommand commond) {
        RpcResponse response = new RpcResponse();
        try {
            ByteArrayOutputStream objectout = new ByteArrayOutputStream();
            ObjectOutputStream objectOutputStream = new ObjectOutputStream(objectout);
            objectOutputStream.writeObject(commond);
            objectOutputStream.flush();
            byte[] commondBytes = objectout.toByteArray();
            outputStream.write(0x5A);
            int len = commondBytes.length;

            outputStream.write(len >> 8);
            outputStream.write(0x00FF & len);
            outputStream.write(commondBytes);
            outputStream.flush();
            while (true) {
                int magic = inputStream.read();
                if (magic == 0x5A) {
                    int length1 = inputStream.read();
                    int length2 = inputStream.read();
                    int length = (length1 << 8) + length2;
                    ByteArrayOutputStream bout = new ByteArrayOutputStream(length);
                    int sum = 0;
                    byte[] bs = new byte[length];
                    while (true) {
                        int readLength = inputStream.read(bs, 0, length - sum);
                        if (readLength > 0) {
                            bout.write(bs, 0, readLength);
                            sum += readLength;
                        }
                        if (sum >= length) {
                            break;
                        }
                    }
                    ObjectInputStream objectInputStream = new ObjectInputStream(
                        new ByteArrayInputStream(bout.toByteArray()));
                    RpcResponse response1 = (RpcResponse) objectInputStream.readObject();
                    return response1;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
        return response;
    }

    public static void main(String[] args) {
        RpcClient client = new RpcClient("localhost", 8081);
        RpcCommand command = new RpcCommand();
        command.setClassName(IDemoInterface.class.getName());
        command.setMethodName("noArgument");
        command.setArgumetsType(new String[0]);
        RpcResponse response = client.invoke(command);

        RpcCommand command2 = new RpcCommand();
        command2.setClassName(IDemoInterface.class.getName());
        command2.setMethodName("withReturn");
        command2.setArgumetsType(new String[] { "java.lang.String" });
        command2.setParams(new String[] { "shocklee" });
        RpcResponse response2 = client.invoke(command2);
        System.out.println(response.getResult());
        System.out.println(response2.getResult());
    }
}

 

至此整个框架部分已经完成,暂时还没有做常见的rpc客户端api包装,比如包装成从某个容器里面根据接口取出一个远程对象,直接调用远程对象的方法。
最后贴个测试类和接口
package com.shock.rpc.demo;

/**
 * ${DESCRIPTION}
 * com.shock.rpc.demo.${CLASS_NAME}
 * Created by zhengdong.lzd on 2016/11/29 0029.
 */
public interface IDemoInterface {

    public String withReturn(String name);
    public void noReturn(String name);
    public String noArgument();
}
package com.shock.rpc.demo;

/**
 * ${DESCRIPTION}
 * com.shock.rpc.demo.${CLASS_NAME}
 * Created by zhengdong.lzd on 2016/11/29 0029.
 */
public class IDemoImpl implements IDemoInterface {
    @Override
    public String withReturn(String name) {
        System.out.println("withReturn "+name);
        return "hello " + name;
    }

    @Override
    public void noReturn(String name) {
        System.out.println("noReturn "+name);
    }

    @Override
    public String noArgument() {
        System.out.println("noArgument");
        return "noArgument";
    }
}

 

整个RPC功能已经都贴出来了,代码没有做过整理,没有把序列化/反序列代码抽象,协议部分也没做抽象,只是想写的快点,能够在短时间内写出来和标题十分钟对应上,所以可能代码难看点,不过整体已经展示出来了,关键代码是不需要使用任何第三方框架和工具包的。

欢迎大家进行拍砖。

另外打个广告吧,本人写了一个稍微复杂点的RPC放在github上,有木有同学想一起进行写着玩的的,赶紧约起啊,代码地址是
https://github.com/shocklee6315/simpleRpcServer
目录
相关文章
|
9月前
|
负载均衡 Dubbo Java
Dubbo 3.x:探索阿里巴巴的开源RPC框架新技术
随着微服务架构的兴起,远程过程调用(RPC)框架成为了关键组件。Dubbo,作为阿里巴巴的开源RPC框架,已经演进到了3.x版本,带来了许多新特性和技术改进。本文将探讨Dubbo 3.x中的一些最新技术,包括服务注册与发现、负载均衡、服务治理等,并通过代码示例展示其使用方式。
492 9
|
9月前
|
JSON 负载均衡 网络协议
Rpc编程系列文章第二篇:RPC框架设计目标
Rpc编程系列文章第二篇:RPC框架设计目标
|
9月前
|
设计模式 负载均衡 网络协议
【分布式技术专题】「分布式技术架构」实践见真知,手把手教你如何实现一个属于自己的RPC框架(架构技术引导篇)
【分布式技术专题】「分布式技术架构」实践见真知,手把手教你如何实现一个属于自己的RPC框架(架构技术引导篇)
386 0
|
9月前
|
Dubbo Java 应用服务中间件
Rpc编程系列文章第三篇:Hessian RPC一个老的RPC框架
Rpc编程系列文章第三篇:Hessian RPC一个老的RPC框架
|
3月前
|
自然语言处理 负载均衡 API
gRPC 一种现代、开源、高性能的远程过程调用 (RPC) 可以在任何地方运行的框架
gRPC 是一种现代开源高性能远程过程调用(RPC)框架,支持多种编程语言,可在任何环境中运行。它通过高效的连接方式,支持负载平衡、跟踪、健康检查和身份验证,适用于微服务架构、移动设备和浏览器客户端连接后端服务等场景。gRPC 使用 Protocol Buffers 作为接口定义语言,支持四种服务方法:一元 RPC、服务器流式处理、客户端流式处理和双向流式处理。
|
6月前
|
Dubbo 网络协议 Java
RPC框架:一文带你搞懂RPC
这篇文章全面介绍了RPC(远程过程调用)的概念、原理和应用场景,解释了RPC如何工作以及为什么在分布式系统中广泛使用,并探讨了几种常用的RPC框架如Thrift、gRPC、Dubbo和Spring Cloud,同时详细阐述了RPC调用流程和实现透明化远程服务调用的关键技术,包括动态代理和消息的编码解码过程。
RPC框架:一文带你搞懂RPC
|
8月前
|
存储 缓存 Linux
【实战指南】嵌入式RPC框架设计实践:六大核心类构建高效RPC框架
在先前的文章基础上,本文讨论如何通过分层封装提升一个针对嵌入式Linux的RPC框架的易用性。设计包括自动服务注册、高性能通信、泛型序列化和简洁API。框架分为6个关键类:BindingHub、SharedRingBuffer、Parcel、Binder、IBinder和BindInterface。BindingHub负责服务注册,SharedRingBuffer实现高效数据传输,Parcel处理序列化,而Binder和IBinder分别用于服务端和客户端交互。BindInterface提供简单的初始化接口,简化应用集成。测试案例展示了客户端和服务端的交互,验证了RPC功能的有效性。
516 11
|
5月前
|
XML 负载均衡 监控
分布式-dubbo-简易版的RPC框架
分布式-dubbo-简易版的RPC框架
|
6月前
|
XML 存储 JSON
(十二)探索高性能通信与RPC框架基石:Json、ProtoBuf、Hessian序列化详解
如今这个分布式风靡的时代,网络通信技术,是每位技术人员必须掌握的技能,因为无论是哪种分布式技术,都离不开心跳、选举、节点感知、数据同步……等机制,而究其根本,这些技术的本质都是网络间的数据交互。正因如此,想要构建一个高性能的分布式组件/系统,不得不思考一个问题:怎么才能让数据传输的速度更快?
155 1
|
7月前
|
分布式计算 负载均衡 数据安全/隐私保护
什么是RPC?有哪些RPC框架?
RPC(Remote Procedure Call,远程过程调用)是一种允许运行在一台计算机上的程序调用另一台计算机上子程序的技术。这种技术屏蔽了底层的网络通信细节,使得程序间的远程通信如同本地调用一样简单。RPC机制使得开发者能够构建分布式计算系统,其中不同的组件可以分布在不同的计算机上,但它们之间可以像在同一台机器上一样相互调用。
197 8