手把手教你写一个RPC框架(三)

简介: 手把手教你写一个RPC框架(三)

七 序列化


RPC框架中,离不开网络请求,服务A调用服务B的方法,要发送一个网络请求,服务B收到网络请求后,解析请求,然后把方法的执行结果返回给服务A。为了实现这些步骤,需要编写消息请求体、消息相应体、序列化与反序列化的相关逻辑。下面一起来看看怎么写吧~


import lombok.Data;
import java.io.Serializable;
/**
 * RPC请求体
 *
 * @author zhongmingyi
 * @date 2021/12/12 1:01 下午
 */
@Data
public class RpcRequest implements Serializable {
    /**
     * 所请求的服务接口名
     */
    private String serviceName;
    /**
     * 所请求的服务接口中,具体的方法名
     */
    private String method;
    /**
     * 所请求的方法的参数类型
     */
    private Class<?>[] methodParameterTypes;
    /**
     * 所请求的方法的参数
     */
    private Object[] methodParameters;
}

上面这四个属性是一个RPC请求中必不可少的


  • 所请求的服务接口名
  • 所请求的服务接口中,具体的方法名
  • 所请求的方法的参数类型
  • 所请求的方法的参数


有了这些属性,我们才能从注册中心中找到对应的服务。当然有小伙伴肯定会问,RpcRequest中并没有所请求服务的IP地址和端口号,该怎么找到对应的地址啊?


其实在上一节,在注册中心编写的代码中,就有写到注册的逻辑:

package com.zhongger.rpc.register.impl;
import com.alibaba.fastjson.JSON;
import com.zhongger.rpc.entity.ServerNode;
import com.zhongger.rpc.register.RpcServiceRegister;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URLEncoder;
/**
 * @author zhongmingyi
 * @date 2021/12/11 12:20 下午
 */
public class ZookeeperRpcServiceRegister implements RpcServiceRegister {
       @Override
       public void register(ServiceNode serviceNode) throws Exception {
        logger.info("register server node info is {}", serviceNode);
        String uri = JSON.toJSONString(serviceNode);
        uri = URLEncoder.encode(uri, "UTF-8");
        String servicePath = "/com/zhongger/rpc/" + serviceNode.getServiceName() + "/service";
        // 创建永久节点
        if (zookeeperClient.checkExists().forPath(servicePath) == null) {
            logger.info("service path {} not exist create persistent node ", servicePath);
            zookeeperClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(servicePath);
        }
        //创建临时节点
        String uriPath = servicePath + "/" + uri;
        logger.info("uri path is {}", uriPath);
        if (zookeeperClient.checkExists().forPath(uriPath) != null) {
            zookeeperClient.delete().forPath(uriPath);
        }
        zookeeperClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(uriPath);
    }
}

其中ServerNode实体类就包含了IP地址、端口号、服务名称的信息,比如我有个接口HelloService,IP地址127.0.0.1,端口8888:注册到Zookeeper中的结构就是这样的,永久节点是:/com/zhongger/rpc/HelloService/service 临时节点则是:/com/zhongger/rpc/HelloService/service/{ServiceNode的JSON结构},所以RpcRequest其实就不需要Ip地址和port这些属性了。另外,寻找对应方法的时候,只需要遍历/com/zhongger/rpc/HelloService/service下的所有子节点,然后通过负载均衡来选择一个Ip地址+Port来调用对应的方法就行啦,后面会详细介绍这里的实现。


写完了请求,那么来编写响应,响应就比较简单了,就是把方法的执行结果封装一下:

package com.zhongger.rpc.entity;
import lombok.Data;
import java.io.Serializable;
/**
 * RPC响应
 *
 * @author zhongmingyi
 * @date 2021/12/12 4:38 下午
 */
@Data
public class RpcResponse implements Serializable {
    /**
     * 状态
     */
    private String status;
    /**
     * 返回结果
     */
    private Object value;
    /**
     * 异常
     */
    private Exception exception;
}


那么对于网络传输,我们需要把对象序列化成byte数组,然后我们要操作对象的话,则需要把byte数组反序列化成对象。


1 定义序列化协议


写一个接口,来约定序列化的协议

/**
 * 序列化协议
 *
 * @author zhongmingyi
 * @date 2021/12/14 4:03 下午
 */
public interface MessageSerializationProtocol {
    byte[] marshal(Object object) throws Exception;
    <T> T unMarshal(byte[] bytes, Class<T> clazz) throws Exception;
}


marshal:将对象序列化成byte[]

unMarshal:将byte[]反序列化成指定Class的对象


2 JDK序列化


定义 JdkMessageSerializationProtocol 类实现 MessageSerializationProtocol接口,JDK序列化的实现如下:

import com.zhongger.rpc.serialization.MessageSerializationProtocol;
import java.io.*;
/**
 * JDK序列化
 *
 * @author zhongmingyi
 * @date 2021/12/14 4:13 下午
 */
public class JdkMessageSerializationProtocol implements MessageSerializationProtocol {
    @Override
    public byte[] marshal(Object object) throws Exception {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
        objectOutputStream.writeObject(object);
        byte[] result = byteArrayOutputStream.toByteArray();
        objectOutputStream.close();
        byteArrayOutputStream.close();
        return result;
    }
    @Override
    public <T> T unMarshal(byte[] bytes, Class<T> clazz) throws Exception {
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
        ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream);
        T result = clazz.cast(objectInputStream.readObject());
        objectInputStream.close();
        byteArrayInputStream.close();
        return result;
    }
}

JDK序列化的方式效率是比较低的,于是我用了比较流行的Kryo序列化框架又实现了一套序列化协议


3 Kryo序列化


使用Kryo要注意:Kryo是非线程安全的,需要ThreadLocal来防止出现线程安全问题

package com.zhongger.rpc.serialization.impl;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.zhongger.rpc.entity.RpcRequest;
import com.zhongger.rpc.entity.RpcResponse;
import com.zhongger.rpc.serialization.MessageSerializationProtocol;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
/**
 * Kryo序列化
 *
 * @author zhongmingyi
 * @date 2021/12/14 5:32 下午
 */
public class KryoMessageSerializationProtocol implements MessageSerializationProtocol {
    /**
     * Kryo是非线程安全的,需要ThreadLocal来防止出现线程安全问题
     */
    private final ThreadLocal<Kryo> kryoThreadLocal = ThreadLocal.withInitial(() -> {
        Kryo kryo = new Kryo();
        kryo.register(RpcRequest.class);
        kryo.register(RpcResponse.class);
        return kryo;
    });
    @Override
    public byte[] marshal(Object object) throws Exception {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        Output output = new Output(byteArrayOutputStream);
        Kryo kryo = kryoThreadLocal.get();
        kryo.writeObject(output, object);
        kryoThreadLocal.remove();
        return output.toBytes();
    }
    @Override
    public <T> T unMarshal(byte[] bytes, Class<T> clazz) throws Exception {
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
        Input input = new Input(byteArrayInputStream);
        Kryo kryo = kryoThreadLocal.get();
        Object o = kryo.readObject(input, clazz);
        kryoThreadLocal.remove();
        return clazz.cast(o);
    }
}

好了序列化协议就这样写好了,接下来,就要写比较复杂的网络通信咯~

相关文章
|
负载均衡 Dubbo Java
Dubbo 3.x:探索阿里巴巴的开源RPC框架新技术
随着微服务架构的兴起,远程过程调用(RPC)框架成为了关键组件。Dubbo,作为阿里巴巴的开源RPC框架,已经演进到了3.x版本,带来了许多新特性和技术改进。本文将探讨Dubbo 3.x中的一些最新技术,包括服务注册与发现、负载均衡、服务治理等,并通过代码示例展示其使用方式。
950 9
|
JSON 负载均衡 网络协议
Rpc编程系列文章第二篇:RPC框架设计目标
Rpc编程系列文章第二篇:RPC框架设计目标
|
设计模式 负载均衡 网络协议
【分布式技术专题】「分布式技术架构」实践见真知,手把手教你如何实现一个属于自己的RPC框架(架构技术引导篇)
【分布式技术专题】「分布式技术架构」实践见真知,手把手教你如何实现一个属于自己的RPC框架(架构技术引导篇)
1007 0
|
Dubbo Java 应用服务中间件
Rpc编程系列文章第三篇:Hessian RPC一个老的RPC框架
Rpc编程系列文章第三篇:Hessian RPC一个老的RPC框架
|
存储 缓存 Linux
【实战指南】嵌入式RPC框架设计实践:六大核心类构建高效RPC框架
在先前的文章基础上,本文讨论如何通过分层封装提升一个针对嵌入式Linux的RPC框架的易用性。设计包括自动服务注册、高性能通信、泛型序列化和简洁API。框架分为6个关键类:BindingHub、SharedRingBuffer、Parcel、Binder、IBinder和BindInterface。BindingHub负责服务注册,SharedRingBuffer实现高效数据传输,Parcel处理序列化,而Binder和IBinder分别用于服务端和客户端交互。BindInterface提供简单的初始化接口,简化应用集成。测试案例展示了客户端和服务端的交互,验证了RPC功能的有效性。
913 94
|
自然语言处理 负载均衡 API
gRPC 一种现代、开源、高性能的远程过程调用 (RPC) 可以在任何地方运行的框架
gRPC 是一种现代开源高性能远程过程调用(RPC)框架,支持多种编程语言,可在任何环境中运行。它通过高效的连接方式,支持负载平衡、跟踪、健康检查和身份验证,适用于微服务架构、移动设备和浏览器客户端连接后端服务等场景。gRPC 使用 Protocol Buffers 作为接口定义语言,支持四种服务方法:一元 RPC、服务器流式处理、客户端流式处理和双向流式处理。
|
Dubbo 网络协议 Java
RPC框架:一文带你搞懂RPC
这篇文章全面介绍了RPC(远程过程调用)的概念、原理和应用场景,解释了RPC如何工作以及为什么在分布式系统中广泛使用,并探讨了几种常用的RPC框架如Thrift、gRPC、Dubbo和Spring Cloud,同时详细阐述了RPC调用流程和实现透明化远程服务调用的关键技术,包括动态代理和消息的编码解码过程。
RPC框架:一文带你搞懂RPC
|
XML 负载均衡 监控
分布式-dubbo-简易版的RPC框架
分布式-dubbo-简易版的RPC框架
|
分布式计算 负载均衡 数据安全/隐私保护
什么是RPC?有哪些RPC框架?
RPC(Remote Procedure Call,远程过程调用)是一种允许运行在一台计算机上的程序调用另一台计算机上子程序的技术。这种技术屏蔽了底层的网络通信细节,使得程序间的远程通信如同本地调用一样简单。RPC机制使得开发者能够构建分布式计算系统,其中不同的组件可以分布在不同的计算机上,但它们之间可以像在同一台机器上一样相互调用。
859 8
|
XML 存储 JSON
(十二)探索高性能通信与RPC框架基石:Json、ProtoBuf、Hessian序列化详解
如今这个分布式风靡的时代,网络通信技术,是每位技术人员必须掌握的技能,因为无论是哪种分布式技术,都离不开心跳、选举、节点感知、数据同步……等机制,而究其根本,这些技术的本质都是网络间的数据交互。正因如此,想要构建一个高性能的分布式组件/系统,不得不思考一个问题:怎么才能让数据传输的速度更快?
1236 1