Netty Review - ObjectEncoder对象和ObjectDecoder对象解码器的使用与源码解读

简介: Netty Review - ObjectEncoder对象和ObjectDecoder对象解码器的使用与源码解读


概述

Netty是一个高性能、异步的网络应用程序框架,它提供了对TCP、UDP和文件传输的支持。在Netty中,数据的发送和接收都是以字节流的形式进行的,因此需要将对象转换为字节流(编码)以及将字节流转换回对象(解码)。

ObjectEncoder

ObjectEncoder 是 Netty 中用于将对象编码为字节流的一种组件。在 Netty 的 pipeline 中,当你需要将某个对象发送到网络时,你可以使用 ObjectEncoder 来实现。它会将对象序列化为字节流,以便可以在网络中传输。

例如,当你使用 Netty 的 Bootstrap 类来配置你的客户端时,你可以为你的 channel pipeline 添加一个 ObjectEncoder

Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
       .channel(NioSocketChannel.class)
       .handler(new ChannelInitializer<SocketChannel>() {
           @Override
           public void initChannel(SocketChannel ch) throws Exception {
               ch.pipeline().addLast(new ObjectEncoder());
               // 添加其他 handlers...
           }
       });

在这个例子中,ObjectEncoder 被添加到了 channel 的 pipeline 中,这样在数据传输过程中,发送的对象就会被自动编码为字节流。


ObjectDecoder

ObjectEncoder 相对应,ObjectDecoder 是用于将接收到的字节流解码为对象的组件。当你在 Netty 的 pipeline 中接收到字节流时,你可以使用 ObjectDecoder 来自动将字节流反序列化为对象。

继续上面的例子,如果你想在 pipeline 中添加 ObjectDecoder,你可以这样做:

Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
       .channel(NioSocketChannel.class)
       .handler(new ChannelInitializer<SocketChannel>() {
           @Override
           public void initChannel(SocketChannel ch) throws Exception {
               ch.pipeline().addLast(new ObjectDecoder());
               // 添加其他 handlers...
           }
       });

在这个例子中,ObjectDecoder 被添加到了 channel 的 pipeline 中,这样在数据接收过程中,接收到的字节流就会被自动解码为对象。

总的来说,ObjectEncoderObjectDecoder 是 Netty 中用于对象序列化和反序列化的工具,它们让开发者可以更方便地在网络中传输对象。


Code


这段代码是一个简单的Netty服务器启动类

package com.artisan.codec.objectencoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class NettyServer {
    public static void main(String[] args) throws Exception {
        // 创建事件循环组
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // 创建ServerBootstrap
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            // 配置ServerBootstrap
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            // 初始化通道
                            ChannelPipeline pipeline = ch.pipeline();
                            // 添加ObjectDecoder
                            pipeline.addLast(new ObjectDecoder(10240, ClassResolvers.cacheDisabled(this.getClass().getClassLoader())));
                            // 添加自定义的处理器
                            pipeline.addLast(new NettyServerHandler());
                        }
                    });
            // 打印日志
            System.out.println("netty server start。。");
            // 绑定端口并启动服务器
            ChannelFuture channelFuture = serverBootstrap.bind(4567).sync();
            // 等待服务器通道关闭
            channelFuture.channel().closeFuture().sync();
        } finally {
            // 优雅地关闭事件循环组
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

在上述代码中,NettyServer类通过ServerBootstrap配置并启动了一个Netty服务器。服务器使用了两个事件循环组:一个用于处理连接(bossGroup),另一个用于处理已连接的通道(workerGroup)。

initChannel方法中,初始化了SocketChannel的通道 pipeline,并添加了ObjectDecoder和自定义的处理器NettyServerHandlerObjectDecoder用于反序列化接收到的字节流为Java对象,NettyServerHandler用于处理业务逻辑。

服务器启动后,会绑定到指定端口(本例中为4567),并等待服务器通道关闭。在关闭服务器之前,通过调用shutdownGracefully方法优雅地关闭事件循环组。

请注意,此代码片段仅作为Netty服务器启动的示例,实际应用中需要根据具体业务需求调整NettyServerHandler以实现相应的功能。


package com.artisan.codec.objectencoder;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 当接收到客户端发送的消息时,执行该方法
        System.out.println("从客户端读取到Object:" + ((ArtisanSimple) msg).toString());
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // 当发生异常时,执行该方法
        cause.printStackTrace();
        ctx.close();
    }
}

package com.artisan.codec.objectencoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ObjectEncoder;
/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class NettyClient {
    public static void main(String[] args) throws Exception {
        // 创建事件循环组
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            // 创建Bootstrap
            Bootstrap bootstrap = new Bootstrap();
            // 配置Bootstrap
            bootstrap.group(group).channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            // 初始化通道
                            ChannelPipeline pipeline = ch.pipeline();
                            // 添加ObjectEncoder
                            pipeline.addLast(new ObjectEncoder());
                            // 添加自定义的处理器
                            pipeline.addLast(new NettyClientHandler());
                        }
                    });
            // 打印日志
            System.out.println("netty client start。。");
            // 连接到服务器
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 4567).sync();
            // 等待客户端通道关闭
            channelFuture.channel().closeFuture().sync();
        } finally {
            // 优雅地关闭事件循环组
            group.shutdownGracefully();
        }
    }
}

在上述代码中,NettyClient类通过Bootstrap配置并启动了一个Netty客户端。客户端使用了一个事件循环组(group)来处理通道的连接和接收到的消息。

initChannel方法中,初始化了SocketChannel的通道 pipeline,并添加了ObjectEncoder和自定义的处理器NettyClientHandlerObjectEncoder用于将Java对象序列化为字节流,NettyClientHandler用于处理业务逻辑。

客户端启动后,会连接到指定IP地址(本例中为127.0.0.1)和端口(本例中为4567)的服务器,并等待客户端通道关闭。在关闭客户端之前,通过调用shutdownGracefully方法优雅地关闭事件循环组。

请注意,此代码片段仅作为Netty客户端启动的示例,实际应用中需要根据具体业务需求调整NettyClientHandler以实现相应的功能。


这段代码是一个自定义的Netty处理器,名为NettyClientHandler。它继承自ChannelInboundHandlerAdapter,用于处理客户端接收到的消息和通道激活事件。

package com.artisan.codec.objectencoder;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 当接收到服务器发送的消息时,执行该方法
        System.out.println("收到服务器消息:" + msg);
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 当通道激活时,执行该方法
        System.out.println("NettyClientHandler发送数据");
        // 测试对象编解码
        ArtisanSimple artisanSimple = new ArtisanSimple(1, "xxxx");
        ctx.writeAndFlush(artisanSimple);
    }
}

在上述代码中,NettyClientHandler类重写了channelReadchannelActive方法。

channelRead方法用于处理客户端接收到的服务器消息。在这个例子中,它将打印出接收到的消息。在实际应用中,你可以根据业务需求修改此方法以处理不同的消息类型和逻辑。

channelActive方法用于处理通道激活事件。在这个例子中,它将打印一条日志,并测试对象编解码功能。具体来说,它创建了一个ArtisanSimple对象,并通过ctx.writeAndFlush()方法将其发送到服务器。

在实际应用中,你可以根据需求修改此方法以实现不同的业务逻辑。

NettyClientHandler处理器需要与ObjectEncoderObjectDecoder配合使用,以确保发送和接收到的字节流能够正确地反序列化为Java对象。在客户端启动类NettyClient中,NettyClientHandler已经添加到了通道的pipeline中,因此可以处理发送和接收到的消息。


package com.artisan.codec.objectencoder;
import java.io.Serializable;
/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class ArtisanSimple implements Serializable {
    private int id;
    private String name;
    public ArtisanSimple() {
    }
    public ArtisanSimple(int id, String name) {
        super();
        this.id = id;
        this.name = name;
    }
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    @Override
    public String toString() {
        return "ArtisanSimple{" +
                "id=" + id +
                ", name='" + name + '\'' +
                '}';
    }
}
package com.artisan.codec.objectencoder;
/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class AddressSimple {
    private String location;
    public String getLocation() {
        return location;
    }
    public void setLocation(String location) {
        this.location = location;
    }
    public AddressSimple() {
    }
    public AddressSimple(String location) {
        this.location = location;
    }
    @Override
    public String toString() {
        return "AddressSimple{" +
                "location='" + location + '\'' +
                '}';
    }
}

【测试】


源码分析

ObjectEncoder

这段代码定义了一个名为ObjectEncoder的类,它属于Netty网络通信框架的一部分,用于将Java对象序列化为字节流。

下面是对代码的详细分析以及增加的中文注释:

package io.netty.handler.codec.serialization; 
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder; 
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
/**
 * An encoder which serializes a Java object into a {@link ByteBuf}.
 * <p>
 * 请注意,此编码器产生的序列化形式与标准的{@link ObjectInputStream}不兼容。
 * 请使用{@link ObjectDecoder}或{@link ObjectDecoderInputStream}以确保与该编码器的互操作性。
 */
@Sharable
public class ObjectEncoder extends MessageToByteEncoder<Serializable> {
    // 定义一个占位符,用于标记ByteBuf中对象序列化数据长度的位置
    private static final byte[] LENGTH_PLACEHOLDER = new byte[4];
    // 覆写encode方法,实现序列化逻辑
    @Override
    protected void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) throws Exception {
        int startIdx = out.writerIndex(); // 记录开始编码的位置
        // 创建一个ByteBufOutputStream包装器,用于向ByteBuf中写入数据
        ByteBufOutputStream bout = new ByteBufOutputStream(out);
        ObjectOutputStream oout = null;
        try {
            bout.write(LENGTH_PLACEHOLDER); // 先写入长度占位符
            // 创建一个紧凑型ObjectOutputStream,用于序列化对象
            oout = new CompactObjectOutputStream(bout);
            oout.writeObject(msg); // 将要序列化的对象写入流中
            oout.flush(); // 刷新输出流,确保所有数据都被写出
        } finally {
            // 关闭ObjectOutputStream和ByteBufOutputStream
            if (oout != null) {
                oout.close();
            } else {
                bout.close();
            }
        }
        int endIdx = out.writerIndex(); // 记录编码结束的位置
        // 设置占位符的长度,即实际序列化数据长度
        out.setInt(startIdx, endIdx - startIdx - 4);
    }
}

在上述代码中,ObjectEncoder类继承自MessageToByteEncoder,这意味着它是一个用于将某种类型消息编码成字节流的编码器。encode方法被重写以实现序列化过程。在这个方法中,首先通过ByteBufOutputStreamByteBuf写入了一个长度占位符,然后通过CompactObjectOutputStream将传入的Serializable对象序列化成字节流,并写入到ByteBuf中。最后,修改了长度占位符,将其设置为实际序列化数据的长度。

此代码片段使用@Sharable注解标记,表明这个ChannelHandler是可以共享给多个ChannelPipeline的。

序列化完成后,通过ObjectOutputStreamflush方法刷新流,确保所有数据都被写出。最后,在finally块中关闭输出流,确保资源被正确释放。


ObjectDecoder

这段代码定义了一个名为ObjectDecoder的类,它也是Netty网络通信框架的一部分,用于将接收到的字节流反序列化为Java对象。

下面是对代码的详细分析以及增加的中文注释:

package io.netty.handler.codec.serialization;
// 引入Netty相关类库
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
// 引入Java序列化相关类库
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.StreamCorruptedException;
/**
 * A decoder which deserializes the received {@link ByteBuf}s into Java
 * objects.
 * <p>
 * 请注意,此解码器期望的序列化形式与标准的{@link ObjectOutputStream}不兼容。
 * 请使用{@link ObjectEncoder}或{@link ObjectEncoderOutputStream}以确保与该解码器的互操作性。
 */
public class ObjectDecoder extends LengthFieldBasedFrameDecoder {
    // ClassResolver用于加载序列化对象的类
    private final ClassResolver classResolver;
    /**
     * 创建一个新的解码器,其最大对象大小为1048576字节。
     * 如果接收到的对象大小大于1048576字节,将抛出StreamCorruptedException异常。
     *
     * @param classResolver  用于此解码器的ClassResolver
     */
    public ObjectDecoder(ClassResolver classResolver) {
        this(1048576, classResolver);
    }
    /**
     * 创建一个新的解码器,其最大对象大小为指定的值。
     *
     * @param maxObjectSize  序列化对象的最大字节长度。
     *                     如果接收到的对象的长度大于此值,将抛出StreamCorruptedException异常。
     * @param classResolver  用于加载序列化对象类的ClassResolver
     */
    public ObjectDecoder(int maxObjectSize, ClassResolver classResolver) {
        super(maxObjectSize, 0, 4, 0, 4);
        this.classResolver = classResolver;
    }
    // 覆写decode方法,实现反序列化逻辑
    @Override
    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        ByteBuf frame = (ByteBuf) super.decode(ctx, in);
        if (frame == null) {
            return null;
        }
        // 创建一个紧凑型ObjectInputStream,用于反序列化对象
        ObjectInputStream ois = new CompactObjectInputStream(new ByteBufInputStream(frame, true), classResolver);
        try {
            return ois.readObject(); // 读取并返回反序列化的对象
        } finally {
            ois.close(); // 关闭输入流
        }
    }
}

在上述代码中,ObjectDecoder类继承自LengthFieldBasedFrameDecoder,这意味着它是一个用于解码具有长度字段帧的解码器。decode方法被重写以实现反序列化过程。在这个方法中,首先通过LengthFieldBasedFrameDecoder的解码方法获取到包含序列化数据的ByteBuf帧,然后通过CompactObjectInputStream将字节流反序列化为Java对象。

此代码片段使用了一个ClassResolver,它负责加载序列化对象的类,从而允许在反序列化过程中创建对象。反序列化完成后,通过ObjectInputStreamclose方法关闭输入流,确保资源被正确释放。


小结

ObjectEncoder和ObjectDecoder是Netty框架中的两个重要组件,它们分别负责将Java对象编码为字节流以及将字节流解码为Java对象。

ObjectEncoder是一个ChannelOutboundHandler,它主要负责将Java对象转换为字节流。当发送一个对象时,ObjectEncoder会根据对象的类型将其序列化为字节流,以便在网络上进行传输。ObjectEncoder通常与ObjectDecoder配合使用,以确保编码和解码过程能够正确地进行。

ObjectDecoder是一个ChannelInboundHandler,它主要负责将接收到的字节流解码为Java对象。当接收到字节流时,ObjectDecoder会根据字节流的类型进行反序列化,将字节流转换回原始的Java对象。ObjectDecoder通常与ObjectEncoder配合使用,以确保编码和解码过程能够正确地进行。

在实际应用中,ObjectEncoderObjectDecoder需要根据业务需求进行定制,以便正确地处理各种不同类型的对象。通过使用这两个组件,Netty框架可以在发送和接收消息时自动进行对象的编码和解码,简化了网络编程的复杂度。


相关文章
|
4天前
|
机器学习/深度学习 缓存 算法
netty源码解解析(4.0)-25 ByteBuf内存池:PoolArena-PoolChunk
netty源码解解析(4.0)-25 ByteBuf内存池:PoolArena-PoolChunk
|
1月前
|
编解码 开发者
Netty Review - 深入理解Netty: ChannelHandler的生命周期与事件处理机制
Netty Review - 深入理解Netty: ChannelHandler的生命周期与事件处理机制
72 0
|
1月前
|
前端开发 UED
Netty Review - Netty自动重连机制揭秘:原理与最佳实践
Netty Review - Netty自动重连机制揭秘:原理与最佳实践
129 0
|
1月前
|
监控 网络协议 调度
Netty Review - 深入探讨Netty的心跳检测机制:原理、实战、IdleStateHandler源码分析
Netty Review - 深入探讨Netty的心跳检测机制:原理、实战、IdleStateHandler源码分析
167 0
|
1月前
|
编解码
Netty Review - 优化Netty通信:如何应对粘包和拆包挑战_自定义长度分包编解码码器
Netty Review - 优化Netty通信:如何应对粘包和拆包挑战_自定义长度分包编解码码器
55 0
|
1月前
|
网络协议
Netty Review - 优化Netty通信:如何应对粘包和拆包挑战
Netty Review - 优化Netty通信:如何应对粘包和拆包挑战
70 0
|
1月前
|
缓存 前端开发 Java
Netty Review - Netty与Protostuff:打造高效的网络通信
Netty Review - Netty与Protostuff:打造高效的网络通信
58 0
|
1月前
|
编解码 安全 前端开发
Netty Review - StringEncoder字符串编码器和StringDecoder 解码器的使用与源码解读
Netty Review - StringEncoder字符串编码器和StringDecoder 解码器的使用与源码解读
68 0
|
存储 缓存 NoSQL
跟着源码学IM(十一):一套基于Netty的分布式高可用IM详细设计与实现(有源码)
本文将要分享的是如何从零实现一套基于Netty框架的分布式高可用IM系统,它将支持长连接网关管理、单聊、群聊、聊天记录查询、离线消息存储、消息推送、心跳、分布式唯一ID、红包、消息同步等功能,并且还支持集群部署。
13276 1
|
1月前
|
消息中间件 Oracle Dubbo
Netty 源码共读(一)如何阅读JDK下sun包的源码
Netty 源码共读(一)如何阅读JDK下sun包的源码
54 1