RPC框架(3 - 实现Netty传输和通用序列化接口)

简介: RPC框架(3 - 实现Netty传输和通用序列化接口)

5.3实现Netty传输和通用序列化接口



核心:将传统的 BIO 方式传输换成效率更高的 NIO 方式,使用Netty(并非Java原生NIO);实现通用的序列化接口,为多种序列化支持做准备,自定义传输的协议。


5.3.1Netty 服务端与客户端


首先就需要在 pom.xml 中加入 Netty 依赖:

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>${netty-version}</version>
</dependency>


netty 的最新版本可以在 maven repository查到,注意使用 netty 4 而不是 netty 5。


为了保证通用性,我们可以把 Server 和 Client 抽象成两个接口,分别是 RpcServer 和 RpcClient

public interface RpcServer {
    void start(int port);
}
public interface RpcClient {
    Object sendRequest(RpcRequest rpcRequest);
}


而原来的 RpcServer 和 RpcClient 类实际上是上述两个接口的 Socket 方式实现类,改成 SocketServer 和 SocketClient 并实现上面两个接口即可,几乎不需要做什么修改。

我们的任务,就是要实现 NettyServer 和 NettyClient。


这里提一个改动,就是在 DefaultServiceRegistry.java 中,将包含注册信息的 serviceMap 和 registeredService 都改成了 static ,这样就能保证全局唯一的注册信息,并且在创建 RpcServer 时也就不需要传入了。


NettyServer的实现很传统:

public class NettyServer implements RpcServer {
    private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
    @Override
    public void start(int port) {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .option(ChannelOption.SO_BACKLOG, 256)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new CommonEncoder(new JsonSerializer()));
                            pipeline.addLast(new CommonDecoder());
                            pipeline.addLast(new NettyServerHandler());
                        }
                    });
            ChannelFuture future = serverBootstrap.bind(port).sync();
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            logger.error("启动服务器时有错误发生: ", e);
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}


了解过 Netty 的同学可能知道,Netty 中有一个很重要的设计模式——责任链模式,责任链上有多个处理器,每个处理器都会对数据进行加工,并将处理后的数据传给下一个处理器。代码中的 CommonEncoder、CommonDecoder和NettyServerHandler 分别就是编码器,解码器和数据处理器。因为数据从外部传入时需要解码,而传出时需要编码,类似计算机网络的分层模型,每一层向下层传递数据时都要加上该层的信息,而向上层传递时则需要对本层信息进行解码。


而 NettyClient 的实现也很类似:

public class NettyClient implements RpcClient {
    private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);
    private String host;
    private int port;
    private static final Bootstrap bootstrap;
    public NettyClient(String host, int port) {
        this.host = host;
        this.port = port;
    }
    static {
        EventLoopGroup group = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast(new CommonDecoder())
                                .addLast(new CommonEncoder(new JsonSerializer()))
                                .addLast(new NettyClientHandler());
                    }
                });
    }
    @Override
    public Object sendRequest(RpcRequest rpcRequest) {
        try {
            ChannelFuture future = bootstrap.connect(host, port).sync();
            logger.info("客户端连接到服务器 {}:{}", host, port);
            Channel channel = future.channel();
            if(channel != null) {
                channel.writeAndFlush(rpcRequest).addListener(future1 -> {
                    if(future1.isSuccess()) {
                        logger.info(String.format("客户端发送消息: %s", rpcRequest.toString()));
                    } else {
                        logger.error("发送消息时有错误发生: ", future1.cause());
                    }
                });
                channel.closeFuture().sync();
                AttributeKey<RpcResponse> key = AttributeKey.valueOf("rpcResponse");
                RpcResponse rpcResponse = channel.attr(key).get();
                return rpcResponse.getData();
            }
        } catch (InterruptedException e) {
            logger.error("发送消息时有错误发生: ", e);
        }
        return null;
    }
}


静态代码块中就直接配置好了 Netty 客户端,等待发送数据时启动,channel 将 RpcRequest 对象写出,并且等待服务端返回的结果。注意这里的发送是非阻塞的,所以发送后会立刻返回,而无法得到结果。这里通过 AttributeKey 的方式阻塞获得返回结果:

AttributeKey<RpcResponse> key = AttributeKey.valueOf("rpcResponse");
RpcResponse rpcResponse = channel.attr(key).get();


通过这种方式获得全局可见的返回结果,在获得返回结果 RpcResponse 后,将这个对象以 key 为 rpcResponse 放入 ChannelHandlerContext 中,这里就可以立刻获得结果并返回,我们会在 NettyClientHandler 中看到放入的过程。


5.3.2自定义协议与编解码器


在传输过程中,我们可以在发送的数据上加上各种必要的数据,形成自定义的协议,而自动加上这个数据就是编码器的工作,解析数据获得原始数据就是解码器的工作。


我们定义的协议是这样的:

+---------------+---------------+-----------------+-------------+
|  Magic Number |  Package Type | Serializer Type | Data Length |
|    4 bytes    |    4 bytes    |     4 bytes     |   4 bytes   |
+---------------+---------------+-----------------+-------------+
|                          Data Bytes                           |
|                   Length: ${Data Length}                      |
+---------------------------------------------------------------+


  • 4 字节魔数,表示一个协议包,通过固定数值和字符串标识这个协议包。
  • Package Type,标明这是一个调用请求还是调用响应
  • Serializer Type 标明了实际数据使用的序列化器,这个服务端和客户端应当使用统一标准
  • Data Length 就是实际数据的长度,设置这个字段主要防止粘包,最后就是经过序列化后的实际数据,可能是 RpcRequest 也可能是 RpcResponse 经过序列化后的字节,取决于 Package Type。


规定好协议后,我们就可以来看看 CommonEncoder(编码器) 了:

public class CommonEncoder extends MessageToByteEncoder {
    private static final int MAGIC_NUMBER = 0xCAFEBABE;
    private final CommonSerializer serializer;
    public CommonEncoder(CommonSerializer serializer) {
        this.serializer = serializer;
    }
    @Override
    protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
        out.writeInt(MAGIC_NUMBER);
        if(msg instanceof RpcRequest) {
            out.writeInt(PackageType.REQUEST_PACK.getCode());
        } else {
            out.writeInt(PackageType.RESPONSE_PACK.getCode());
        }
        out.writeInt(serializer.getCode());
        byte[] bytes = serializer.serialize(msg);
        out.writeInt(bytes.length);
        out.writeBytes(bytes);
    }
}


CommonEncoder 继承了MessageToByteEncoder 类,见名知义,就是把 Message(实际要发送的对象)转化成 Byte 数组。CommonEncoder 的工作很简单,就是把 RpcRequest 或者 RpcResponse 包装成协议包。 根据上面提到的协议格式,将各个字段写到管道里就可以了,这里serializer.getCode() 获取序列化器的编号,之后使用传入的序列化器将请求或响应包序列化为字节数组写入管道即可。


CommonDecoder 的工作就更简单了:

public class CommonDecoder extends ReplayingDecoder {
    private static final Logger logger = LoggerFactory.getLogger(CommonDecoder.class);
    private static final int MAGIC_NUMBER = 0xCAFEBABE;
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        int magic = in.readInt();
        if(magic != MAGIC_NUMBER) {
            logger.error("不识别的协议包: {}", magic);
            throw new RpcException(RpcError.UNKNOWN_PROTOCOL);
        }
        int packageCode = in.readInt();
        Class<?> packageClass;
        if(packageCode == PackageType.REQUEST_PACK.getCode()) {
            packageClass = RpcRequest.class;
        } else if(packageCode == PackageType.RESPONSE_PACK.getCode()) {
            packageClass = RpcResponse.class;
        } else {
            logger.error("不识别的数据包: {}", packageCode);
            throw new RpcException(RpcError.UNKNOWN_PACKAGE_TYPE);
        }
        int serializerCode = in.readInt();
        CommonSerializer serializer = CommonSerializer.getByCode(serializerCode);
        if(serializer == null) {
            logger.error("不识别的反序列化器: {}", serializerCode);
            throw new RpcException(RpcError.UNKNOWN_SERIALIZER);
        }
        int length = in.readInt();
        byte[] bytes = new byte[length];
        in.readBytes(bytes);
        Object obj = serializer.deserialize(bytes, packageClass);
        out.add(obj);
    }
}


CommonDecoder 继承自 ReplayingDecoder ,与 MessageToByteEncoder 相反,它用于将收到的字节序列还原为实际对象。主要就是一些字段的校验,比较重要的就是取出序列化器的编号,以获得正确的反序列化方式,并且读入 length 字段来确定数据包的长度(防止粘包),最后读入正确大小的字节数组,反序列化成对应的对象。


5.3.3序列化接口


序列化器接口(CommonSerializer)如下:

public interface CommonSerializer {
    byte[] serialize(Object obj);
    Object deserialize(byte[] bytes, Class<?> clazz);
    int getCode();
    static CommonSerializer getByCode(int code) {
        switch (code) {
            case 1:
                return new JsonSerializer();
            default:
                return null;
        }
    }
}


主要就是四个方法,序列化,反序列化,获得该序列化器的编号,已经根据编号获取序列化器,这里我已经写了一个示例的 JSON 序列化器,Kryo 序列化器会在后面讲解。


作为一个比较简单的例子,我写了一个 JSON 的序列化器:

public class JsonSerializer implements CommonSerializer {
    private static final Logger logger = LoggerFactory.getLogger(JsonSerializer.class);
    private ObjectMapper objectMapper = new ObjectMapper();
    @Override
    public byte[] serialize(Object obj) {
        try {
            return objectMapper.writeValueAsBytes(obj);
        } catch (JsonProcessingException e) {
            logger.error("序列化时有错误发生: {}", e.getMessage());
            e.printStackTrace();
            return null;
        }
    }
    @Override
    public Object deserialize(byte[] bytes, Class<?> clazz) {
        try {
            Object obj = objectMapper.readValue(bytes, clazz);
            if(obj instanceof RpcRequest) {
                obj = handleRequest(obj);
            }
            return obj;
        } catch (IOException e) {
            logger.error("反序列化时有错误发生: {}", e.getMessage());
            e.printStackTrace();
            return null;
        }
    }
    /*
        这里由于使用JSON序列化和反序列化Object数组,无法保证反序列化后仍然为原实例类型
        需要重新判断处理
     */
    private Object handleRequest(Object obj) throws IOException {
        RpcRequest rpcRequest = (RpcRequest) obj;
        for(int i = 0; i < rpcRequest.getParamTypes().length; i ++) {
            Class<?> clazz = rpcRequest.getParamTypes()[i];
            if(!clazz.isAssignableFrom(rpcRequest.getParameters()[i].getClass())) {
                byte[] bytes = objectMapper.writeValueAsBytes(rpcRequest.getParameters()[i]);
                rpcRequest.getParameters()[i] = objectMapper.readValue(bytes, clazz);
            }
        }
        return rpcRequest;
    }
    @Override
    public int getCode() {
        return SerializerCode.valueOf("JSON").getCode();
    }
}


JSON 序列化工具我使用的是 Jackson,在 pom.xml 中添加依赖即可。序列化和反序列化都比较循规蹈矩,把对象翻译成字节数组,和根据字节数组和 Class 反序列化成对象。这里有一个需要注意的点,就是在 RpcRequest 反序列化时,由于其中有一个字段是 Object 数组,在反序列化时序列化器会根据字段类型进行反序列化,而 Object 就是一个十分模糊的类型,会出现反序列化失败的现象,这时就需要 RpcRequest 中的另一个字段 ParamTypes 来获取到 Object 数组中的每个实例的实际类,辅助反序列化,这就是 handleRequest() 方法的作用。


上面提到的这种情况不会在其他序列化方式中出现,因为其他序列化方式是转换成字节数组,会记录对象的信息,而 JSON 方式本质上只是转换成 JSON 字符串,会丢失对象的类型信息。


5.3.4NettyServerHandler 和 NettyClientHandler


NettyServerHandler 和 NettyClientHandler 都分别位于服务器端和客户端责任链的尾部,直接和 RpcServer 对象或 RpcClient 对象打交道,而无需关心字节序列的情况。


NettyServerhandler 用于接收 RpcRequest,并且执行调用,将调用结果返回封装成 RpcResponse 发送出去。

public class NettyServerHandler extends SimpleChannelInboundHandler<RpcRequest> {
    private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
    private static RequestHandler requestHandler;
    private static ServiceRegistry serviceRegistry;
    static {
        requestHandler = new RequestHandler();
        serviceRegistry = new DefaultServiceRegistry();
    }
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcRequest msg) throws Exception {
        try {
            logger.info("服务器接收到请求: {}", msg);
            String interfaceName = msg.getInterfaceName();
            Object service = serviceRegistry.getService(interfaceName);
            Object result = requestHandler.handle(msg, service);
            ChannelFuture future = ctx.writeAndFlush(RpcResponse.success(result));
            future.addListener(ChannelFutureListener.CLOSE);
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        logger.error("处理过程调用时有错误发生:");
        cause.printStackTrace();
        ctx.close();
    }
}


处理方式和 Socket 中的逻辑基本一致,不做讲解。


NettyClientHandler

public class NettyClientHandler extends SimpleChannelInboundHandler<RpcResponse> {
    private static final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class);
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcResponse msg) throws Exception {
        try {
            logger.info(String.format("客户端接收到消息: %s", msg));
            AttributeKey<RpcResponse> key = AttributeKey.valueOf("rpcResponse");
            ctx.channel().attr(key).set(msg);
            ctx.channel().close();
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        logger.error("过程调用时有错误发生:");
        cause.printStackTrace();
        ctx.close();
    }
}


这里只需要处理收到的消息,即 RpcResponse 对象,由于前面已经有解码器解码了,这里就直接将返回的结果放入 ctx 中即可。


5.3.5测试netty方式

public class NettyTestServer {
    public static void main(String[] args) {
        HelloService helloService = new HelloServiceImpl();
        ServiceRegistry registry = new DefaultServiceRegistry();
        registry.register(helloService);
        NettyServer server = new NettyServer();
        server.start(9999);
    }
}
public class NettyTestClient {
    public static void main(String[] args) {
        RpcClient client = new NettyClient("127.0.0.1", 9999);
        RpcClientProxy rpcClientProxy = new RpcClientProxy(client);
        HelloService helloService = rpcClientProxy.getProxy(HelloService.class);
        HelloObject object = new HelloObject(12, "This is a message");
        String res = helloService.hello(object);
        System.out.println(res);
    }
}


注意这里 RpcClientProxy 通过传入不同的 Client(SocketClient、NettyClient)来切换客户端不同的发送方式。


执行后可以获得与之前类似的结果。

相关文章
|
3月前
|
NoSQL 前端开发 Java
Lettuce的特性和内部实现问题之Lettuce基于Netty框架实现的问题如何解决
Lettuce的特性和内部实现问题之Lettuce基于Netty框架实现的问题如何解决
|
2月前
|
编解码 分布式计算 网络协议
Netty高性能网络框架(一)
Netty高性能网络框架(一)
|
8天前
|
自然语言处理 负载均衡 API
gRPC 一种现代、开源、高性能的远程过程调用 (RPC) 可以在任何地方运行的框架
gRPC 是一种现代开源高性能远程过程调用(RPC)框架,支持多种编程语言,可在任何环境中运行。它通过高效的连接方式,支持负载平衡、跟踪、健康检查和身份验证,适用于微服务架构、移动设备和浏览器客户端连接后端服务等场景。gRPC 使用 Protocol Buffers 作为接口定义语言,支持四种服务方法:一元 RPC、服务器流式处理、客户端流式处理和双向流式处理。
|
1月前
|
NoSQL Java Redis
redis的基本命令,并用netty操作redis(不使用springboot或者spring框架)就单纯的用netty搞。
这篇文章介绍了Redis的基本命令,并展示了如何使用Netty框架直接与Redis服务器进行通信,包括设置Netty客户端、编写处理程序以及初始化Channel的完整示例代码。
38 1
redis的基本命令,并用netty操作redis(不使用springboot或者spring框架)就单纯的用netty搞。
|
15天前
|
存储 前端开发 JavaScript
前端的全栈之路Meteor篇(四):RPC方法注册及调用-更轻量的服务接口提供方式
RPC机制通过前后端的`callAsync`方法实现了高效的数据交互。后端通过`Meteor.methods()`注册方法,支持异步操作;前端使用`callAsync`调用后端方法,代码更简洁、易读。本文详细介绍了Methods注册机制、异步支持及最佳实践。
|
15天前
|
JSON 前端开发 数据格式
前端的全栈之路Meteor篇(五):自定义对象序列化的EJSON介绍 - 跨设备的对象传输
EJSON是Meteor框架中扩展了标准JSON的库,支持更多数据类型如`Date`、`Binary`等。它提供了序列化和反序列化功能,使客户端和服务器之间的复杂数据传输更加便捷高效。EJSON还支持自定义对象的定义和传输,通过`EJSON.addType`注册自定义类型,确保数据在两端无缝传递。
|
3月前
|
Dubbo 网络协议 Java
RPC框架:一文带你搞懂RPC
这篇文章全面介绍了RPC(远程过程调用)的概念、原理和应用场景,解释了RPC如何工作以及为什么在分布式系统中广泛使用,并探讨了几种常用的RPC框架如Thrift、gRPC、Dubbo和Spring Cloud,同时详细阐述了RPC调用流程和实现透明化远程服务调用的关键技术,包括动态代理和消息的编码解码过程。
RPC框架:一文带你搞懂RPC
|
2月前
|
设计模式 缓存 算法
Netty框架的重要性
Netty框架的重要性
|
2月前
|
XML 负载均衡 监控
分布式-dubbo-简易版的RPC框架
分布式-dubbo-简易版的RPC框架
|
3月前
|
前端开发 Java Spring
springboot 整合 netty框架, 实现 心跳检测,自动重连
springboot 整合 netty框架, 实现 心跳检测,自动重连