RPC框架(1 - 实现服务端注册一个服务)

简介: RPC框架(1 - 实现服务端注册一个服务)

5.实现流程



5.1实现服务端注册一个服务


5.1.1通用接口


基于这样一个假设,那就是客户端已经知道了服务端的地址,这部分会由后续的服务发现机制完善。通用接口

public interface HelloService {
    String hello(HelloObject object);
}


hello方法需要传递一个对象,HelloObject对象,定义如下:

@Data
@AllArgsConstructor
public class HelloObject implements Serializable {
    private Integer id;
    private String message;
}


注意这个对象需要实现Serializable接口,因为它需要在调用过程中从客户端传递给服务端。


默认的序列化机制,即实现Serializable接口即可,不需要实现任何方法。该接口没有任何方法,只是一个标记而已,告诉Java虚拟机该类可以被序列化了。然后利用ObjectOutputStream进行序列化和用ObjectInputStream进行反序列化。

问题一:为何要实现序列化?

答:序列化就是对实例对象的状态(State 对象属性而不包括对象方法)进行通用编码(如格式化的字节码)并保存,以保证对象的完整性和可传递性。简而言之:序列化,就是为了在不同时间或不同平台的JVM之间共享实例对象,只有经过序列化,才能兼容对象在磁盘文本以及在网络中的传输,以及恢复对象的时候反序列化等操作。

在Java中常见的几个类,如:Interger/String等,都实现了java.io.Serializable接口。这个序列化接口没有任何方法和域,仅用于标识序列化语意;实现 Serializable 接口的类是可序列化的,没有实现此接口的类将不能被序列化和反序列化。序列化类的所有子类本身都是可序列化的,不再需要显式实现 Serializable 接口。

如没有 实现Serializable接口,在序列化时,使用ObjectOutputStream的write(object)方法将对象保存时将会出现异常。其实 java.io.Serializable 只是一个没有属性和方法的空接口。

问题二:为何一定要实现 Serializable 才能进行序列化呢?

用 ObjectOutputStream 来持久化对象, 对于此处抛出的异常,如果被写对象类型是String、数组、Enum、Serializable,就可以进行序列化,否则将抛出NotSerializableException。

拓展:serialVersionUID 用来表明类的不同版本间的兼容性。Java的序列化机制是通过在运行时判断类的serialVersionUID来验证版本一致性的。在进行反序列化时,JVM会把传来的字节流中的serialVersionUID与本地相应实体(类)的serialVersionUID进行比较,如果相同就认为是一致的,可以进行反序列化,否则就会出现序列化版本不一致的异常。


接着我们在服务端对这个接口进行实现,实现的方式也很简单,返回一个字符串就行:

public class HelloServiceImpl implements HelloService {
    private static final Logger logger = LoggerFactory.getLogger(HelloServiceImpl.class);
    @Override
    public String hello(HelloObject object) {
        logger.info("接收到:{}", object.getMessage());
        return "这是掉用的返回值,id=" + object.getId();
    }
}


5.1.2传输协议


服务端需要哪些信息,才能唯一确定服务端需要调用的接口的方法呢?


  • 待调用接口的名称
  • 待调用方法名称
  • 待调用方法的参数(由于方法重载的缘故)
  • 待调用方法的参数类型


重载发生在同一个类中,方法名必须相同,参数列表(参数个数、类型不同,多个参数的顺序)必须不同,方法返回值和访问修饰符可以不同,发生在编译时(编译器会根据参数列表进行匹配,确定方法,如果失败,编译器报错,这叫做重载分辨)。

每个重构的方法(构造函数)都必须有独一无二的参数类型列表。最常见的地方就是构造器的重载,即构造函数。

好处:面向对象的特点,代码逻辑更简洁、明了。


那么服务端知道以上四个条件,就可以找到这个方法并且调用了。我们把这四个条件写到一个对象里,到时候传输时传输这个对象就行了。即RpcRequest对象:

@Data
@Builder
public class RpcRequest implements Serializable {
    /**
     * 待调用接口名称
     */
    private String interfaceName;
    /**
     * 待调用方法名称
     */
    private String methodName;
    /**
     * 调用方法的参数
     */
    private Object[] parameters;
    /**
     * 调用方法的参数类型
     */
    private Class<?>[] paramTypes;
}


那么服务器调用完这个方法后,需要给客户端返回哪些信息呢?


如果调用成功的话,显然需要返回值,如果调用失败了,就需要失败的信息,这里封装成一个RpcResponse对象:

@Data
public class RpcResponse<T> implements Serializable {
    /**
     * 响应状态码
     */
    private Integer statusCode;
    /**
     * 响应状态补充信息
     */
    private String message;
    /**
     * 响应数据
     */
    private T data;
    public static <T> RpcResponse<T> success(T data) {
        RpcResponse<T> response = new RpcResponse<>();
        response.setStatusCode(ResponseCode.SUCCESS.getCode());
        response.setData(data);
        return response;
    }
    public static <T> RpcResponse<T> fail(ResponseCode code) {
        RpcResponse<T> response = new RpcResponse<>();
        response.setStatusCode(code.getCode());
        response.setMessage(code.getMessage());
        return response;
    }
}


这里还多写了两个静态方法,用于快速生成成功与失败的响应对象。其中,statusCode属性可以自行定义,客户端服务端一致即可。


静态方法:先于其他方法初始化,系统会为静态方法分配一个固定的内存空间(方法区中的静态方法区),只会被初始化一次。而普通方法,会随着对象的调用而加载,当使用完毕,会自动释放掉空间。

普通方法的好处是,动态规划了内存空间的使用,节省内存资源。静态方法,方便,运行快,而如果全部方法都用静态方法,那么每个方法都要有一个固定的空间,这样的话太占内存。

拓展:Java方法区存的是什么样的东西?

方法区存着类的信息,常量和静态变量,即类被编译后的数据。(线程共享的)


5.1.3客户端实现-动态代理


客户端方面,由于在客户端这一侧我们并没有接口的具体实现类,就没有办法直接生成实例对象。这时,我们可以通过动态代理的方式生成实例,并且调用方法时生成需要的RpcRequest对象并且发送给服务端


这里我们采用JDK动态代理,代理类是需要实现InvocationHandler接口的。

public class RpcClientProxy implements InvocationHandler {
    private String host;
    private int port;
    public RpcClientProxy(String host, int port) {
        this.host = host;
        this.port = port;
    }
    @SuppressWarnings("unchecked")
    public <T> T getProxy(Class<T> clazz) {
        return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz}, this);
    }
}


我们需要传递host和port来指明服务端的位置。并且使用getProxy()方法来生成代理对象。


InvocationHandler接口需要实现invoke()方法,来指明代理对象的方法被调用时的动作。在这里,我们显然就需要生成一个RpcRequest对象,发送出去,然后返回从服务端接收到的结果即可:

@Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        RpcRequest rpcRequest = RpcRequest.builder()
                .interfaceName(method.getDeclaringClass().getName())
                .methodName(method.getName())
                .parameters(args)
                .paramTypes(method.getParameterTypes())
                .build();
        RpcClient rpcClient = new RpcClient();
        return ((RpcResponse) rpcClient.sendRequest(rpcRequest, host, port)).getData();
    }


生成RpcRequest很简单,我使用Builder模式来生成这个对象。发送的逻辑我使用了一个RpcClient对象来实现,这个对象的作用,就是将一个对象发过去,并且接收返回的对象。

public class RpcClient {
    private static final Logger logger = LoggerFactory.getLogger(RpcClient.class);
    public Object sendRequest(RpcRequest rpcRequest, String host, int port) {
        try (Socket socket = new Socket(host, port)) {
            ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
            ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
            objectOutputStream.writeObject(rpcRequest);
            objectOutputStream.flush();
            return objectInputStream.readObject();
        } catch (IOException | ClassNotFoundException e) {
            logger.error("调用时有错误发生:", e);
            return null;
        }
    }
}


我的实现很简单,直接使用Java的序列化方式,通过Socket传输。创建一个Socket,获取ObjectOutputStream对象,然后把需要发送的对象传进去即可,接收时获取ObjectInputStream对象,readObject()方法就可以获得一个返回的对象。


socket通信(数据传送):socket主要包含了客户端和服务端

  • 客户端:主要用来发送数据,主要参数为server的ip地址,和端口号;然后进行流的输出,随后一定需要注意socket的shutdownOutput关闭,否则会出现异常
  • 服务端:ServerClient。服务启动后,会一直存在,不会关闭。服务器主要是依赖服务器端口是否匹配
    serversocket、进行inputstream流的读取

socket通信大致过程

  • 服务器端创建一个ServerSocket对象监听某个端口,然后调用accept()方法等待客户连接。
  • 客户端程序创建一个Socket对象,请求与服务器端程序建立连接。
  • 服务器端程序接收客户端连接请求,创建一个新的线程,在新的线程中处理调用。
  • 打开连接服务器与客户端程序中的Socket的输入/输出流(socket.getOutputStream())。
  • 利用输入/输出流,按照一定的协议对Socket进行读/写操作。
  • 关闭输入\输出流和Socket


5.1.4服务端实现-反射调用


服务端的实现就简单多了,使用一个ServerSocket监听某个端口,循环接收连接请求,如果发来了请求就创建一个线程,在新线程中处理调用。这里创建线程采用线程池:

public class RpcServer {
    private final ExecutorService threadPool;
    private static final Logger logger = LoggerFactory.getLogger(RpcServer.class);
    public RpcServer() {
        int corePoolSize = 5;
        int maximumPoolSize = 50;
        long keepAliveTime = 60;
        BlockingQueue<Runnable> workingQueue = new ArrayBlockingQueue<>(100);
        ThreadFactory threadFactory = Executors.defaultThreadFactory();
        threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, workingQueue, threadFactory);
    }
}


这里简化了一下,RpcServer暂时只能注册一个接口,即对外提供一个接口的调用服务,添加register方法,在注册完一个服务后立刻开始监听:

public void register(Object service, int port) {
        try (ServerSocket serverSocket = new ServerSocket(port)) {
            logger.info("服务器正在启动...");
            Socket socket;
            while((socket = serverSocket.accept()) != null) {
                logger.info("客户端连接!Ip为:" + socket.getInetAddress());
                threadPool.execute(new WorkerThread(socket, service));
            }
        } catch (IOException e) {
            logger.error("连接时有错误发生:", e);
        }
    }


这里向工作线程WorkerThread传入了socket和用于服务端实例service。


WorkerThread实现了Runnable接口,用于接收RpcRequest对象,解析并且调用,生成RpcResponse对象并传输回去。run方法如下:

@Override
    public void run() {
        try (ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
             ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream())) {
            RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject();
            Method method = service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamTypes());
            Object returnObject = method.invoke(service, rpcRequest.getParameters());
            objectOutputStream.writeObject(RpcResponse.success(returnObject));
            objectOutputStream.flush();
        } catch (IOException | ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
            logger.error("调用或发送时有错误发生:", e);
        }
    }


其中,通过class.getMethod方法,传入方法名和方法参数类型即可获得Method对象。如果你上面RpcRequest中使用String数组来存储方法参数类型的话,这里你就需要通过反射生成对应的Class数组了。通过method.invoke方法,传入对象实例和参数,即可调用并且获得返回值。


5.1.5测试


服务端侧,我们已经在上面实现了一个HelloService的实现类HelloServiceImpl,我们只需要创建一个RpcServer并且把这个实现类注册进去就行了:

public class TestServer {
    public static void main(String[] args) {
        HelloService helloService = new HelloServiceImpl();
        RpcServer rpcServer = new RpcServer();
        // 将HelloServiceImpl注册到rpcServer,同时也启动了服务器
        rpcServer.register(helloService, 9000);
    }
}


服务端开放在9000端口。


客户端方面,我们需要通过动态代理,生成代理对象,并且调用,动态代理会自动帮我们向服务端发送请求的:

public class TestClient {
    public static void main(String[] args) {
        RpcClientProxy proxy = new RpcClientProxy("127.0.0.1", 9000);
        HelloService helloService = proxy.getProxy(HelloService.class);
        HelloObject object = new HelloObject(12, "This is a message");
        String res = helloService.hello(object);
        System.out.println(res);
    }
}


我们这里生成了一个HelloObject对象作为方法的参数。


首先启动服务端,再启动客户端,测试结果:

// 服务端输出:
服务器正在启动...
客户端连接!Ip为:127.0.0.1
接收到:This is a message
// 客户端输出:
这是调用的返回值,id=12


相关文章
|
11天前
|
XML 负载均衡 监控
分布式-dubbo-简易版的RPC框架
分布式-dubbo-简易版的RPC框架
|
2月前
|
Dubbo 网络协议 Java
RPC框架:一文带你搞懂RPC
这篇文章全面介绍了RPC(远程过程调用)的概念、原理和应用场景,解释了RPC如何工作以及为什么在分布式系统中广泛使用,并探讨了几种常用的RPC框架如Thrift、gRPC、Dubbo和Spring Cloud,同时详细阐述了RPC调用流程和实现透明化远程服务调用的关键技术,包括动态代理和消息的编码解码过程。
RPC框架:一文带你搞懂RPC
|
2月前
|
XML 存储 JSON
(十二)探索高性能通信与RPC框架基石:Json、ProtoBuf、Hessian序列化详解
如今这个分布式风靡的时代,网络通信技术,是每位技术人员必须掌握的技能,因为无论是哪种分布式技术,都离不开心跳、选举、节点感知、数据同步……等机制,而究其根本,这些技术的本质都是网络间的数据交互。正因如此,想要构建一个高性能的分布式组件/系统,不得不思考一个问题:怎么才能让数据传输的速度更快?
|
3月前
|
网络协议 Dubbo Java
什么是RPC?RPC和HTTP对比?RPC有什么缺点?市面上常用的RPC框架?
选择合适的RPC框架和通信协议,对于构建高效、稳定的分布式系统至关重要。开发者需要根据自己的业务需求和系统架构,综合考虑各种因素,做出适宜的技术选型。
168 1
|
5月前
|
负载均衡 Dubbo Java
Dubbo 3.x:探索阿里巴巴的开源RPC框架新技术
随着微服务架构的兴起,远程过程调用(RPC)框架成为了关键组件。Dubbo,作为阿里巴巴的开源RPC框架,已经演进到了3.x版本,带来了许多新特性和技术改进。本文将探讨Dubbo 3.x中的一些最新技术,包括服务注册与发现、负载均衡、服务治理等,并通过代码示例展示其使用方式。
272 9
|
5月前
|
JSON 负载均衡 网络协议
Rpc编程系列文章第二篇:RPC框架设计目标
Rpc编程系列文章第二篇:RPC框架设计目标
|
5月前
|
设计模式 负载均衡 网络协议
【分布式技术专题】「分布式技术架构」实践见真知,手把手教你如何实现一个属于自己的RPC框架(架构技术引导篇)
【分布式技术专题】「分布式技术架构」实践见真知,手把手教你如何实现一个属于自己的RPC框架(架构技术引导篇)
206 0
|
5月前
|
Dubbo Java 应用服务中间件
Rpc编程系列文章第三篇:Hessian RPC一个老的RPC框架
Rpc编程系列文章第三篇:Hessian RPC一个老的RPC框架
|
3月前
|
分布式计算 负载均衡 数据安全/隐私保护
什么是RPC?有哪些RPC框架?
RPC(Remote Procedure Call,远程过程调用)是一种允许运行在一台计算机上的程序调用另一台计算机上子程序的技术。这种技术屏蔽了底层的网络通信细节,使得程序间的远程通信如同本地调用一样简单。RPC机制使得开发者能够构建分布式计算系统,其中不同的组件可以分布在不同的计算机上,但它们之间可以像在同一台机器上一样相互调用。
123 8
|
4月前
|
存储 缓存 Linux
【实战指南】嵌入式RPC框架设计实践:六大核心类构建高效RPC框架
在先前的文章基础上,本文讨论如何通过分层封装提升一个针对嵌入式Linux的RPC框架的易用性。设计包括自动服务注册、高性能通信、泛型序列化和简洁API。框架分为6个关键类:BindingHub、SharedRingBuffer、Parcel、Binder、IBinder和BindInterface。BindingHub负责服务注册,SharedRingBuffer实现高效数据传输,Parcel处理序列化,而Binder和IBinder分别用于服务端和客户端交互。BindInterface提供简单的初始化接口,简化应用集成。测试案例展示了客户端和服务端的交互,验证了RPC功能的有效性。
333 4
下一篇
无影云桌面