概述
Netty是一个高性能、异步的网络应用程序框架,它提供了对TCP、UDP和文件传输的支持。在Netty中,数据的发送和接收都是以字节流的形式进行的,因此需要将对象转换为字节流(编码)以及将字节流转换回对象(解码)。
ObjectEncoder
ObjectEncoder
是 Netty 中用于将对象编码为字节流的一种组件。在 Netty 的 pipeline 中,当你需要将某个对象发送到网络时,你可以使用 ObjectEncoder
来实现。它会将对象序列化为字节流,以便可以在网络中传输。
例如,当你使用 Netty 的 Bootstrap
类来配置你的客户端时,你可以为你的 channel pipeline 添加一个 ObjectEncoder
:
Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ObjectEncoder()); // 添加其他 handlers... } });
在这个例子中,ObjectEncoder
被添加到了 channel 的 pipeline 中,这样在数据传输过程中,发送的对象就会被自动编码为字节流。
ObjectDecoder
与 ObjectEncoder
相对应,ObjectDecoder
是用于将接收到的字节流解码为对象的组件。当你在 Netty 的 pipeline 中接收到字节流时,你可以使用 ObjectDecoder
来自动将字节流反序列化为对象。
继续上面的例子,如果你想在 pipeline 中添加 ObjectDecoder
,你可以这样做:
Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ObjectDecoder()); // 添加其他 handlers... } });
在这个例子中,ObjectDecoder
被添加到了 channel 的 pipeline 中,这样在数据接收过程中,接收到的字节流就会被自动解码为对象。
总的来说,ObjectEncoder
和 ObjectDecoder
是 Netty 中用于对象序列化和反序列化的工具,它们让开发者可以更方便地在网络中传输对象。
Code
这段代码是一个简单的Netty服务器启动类
package com.artisan.codec.objectencoder; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; /** * @author 小工匠 * @version 1.0 * @mark: show me the code , change the world */ public class NettyServer { public static void main(String[] args) throws Exception { // 创建事件循环组 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { // 创建ServerBootstrap ServerBootstrap serverBootstrap = new ServerBootstrap(); // 配置ServerBootstrap serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { // 初始化通道 ChannelPipeline pipeline = ch.pipeline(); // 添加ObjectDecoder pipeline.addLast(new ObjectDecoder(10240, ClassResolvers.cacheDisabled(this.getClass().getClassLoader()))); // 添加自定义的处理器 pipeline.addLast(new NettyServerHandler()); } }); // 打印日志 System.out.println("netty server start。。"); // 绑定端口并启动服务器 ChannelFuture channelFuture = serverBootstrap.bind(4567).sync(); // 等待服务器通道关闭 channelFuture.channel().closeFuture().sync(); } finally { // 优雅地关闭事件循环组 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
在上述代码中,NettyServer
类通过ServerBootstrap
配置并启动了一个Netty服务器。服务器使用了两个事件循环组:一个用于处理连接(bossGroup
),另一个用于处理已连接的通道(workerGroup
)。
在initChannel
方法中,初始化了SocketChannel
的通道 pipeline,并添加了ObjectDecoder
和自定义的处理器NettyServerHandler
。ObjectDecoder
用于反序列化接收到的字节流为Java对象,NettyServerHandler
用于处理业务逻辑。
服务器启动后,会绑定到指定端口(本例中为4567),并等待服务器通道关闭。在关闭服务器之前,通过调用shutdownGracefully
方法优雅地关闭事件循环组。
请注意,此代码片段仅作为Netty服务器启动的示例,实际应用中需要根据具体业务需求调整NettyServerHandler
以实现相应的功能。
package com.artisan.codec.objectencoder; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; /** * @author 小工匠 * @version 1.0 * @mark: show me the code , change the world */ public class NettyServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 当接收到客户端发送的消息时,执行该方法 System.out.println("从客户端读取到Object:" + ((ArtisanSimple) msg).toString()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // 当发生异常时,执行该方法 cause.printStackTrace(); ctx.close(); } }
package com.artisan.codec.objectencoder; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.serialization.ObjectEncoder; /** * @author 小工匠 * @version 1.0 * @mark: show me the code , change the world */ public class NettyClient { public static void main(String[] args) throws Exception { // 创建事件循环组 EventLoopGroup group = new NioEventLoopGroup(); try { // 创建Bootstrap Bootstrap bootstrap = new Bootstrap(); // 配置Bootstrap bootstrap.group(group).channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { // 初始化通道 ChannelPipeline pipeline = ch.pipeline(); // 添加ObjectEncoder pipeline.addLast(new ObjectEncoder()); // 添加自定义的处理器 pipeline.addLast(new NettyClientHandler()); } }); // 打印日志 System.out.println("netty client start。。"); // 连接到服务器 ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 4567).sync(); // 等待客户端通道关闭 channelFuture.channel().closeFuture().sync(); } finally { // 优雅地关闭事件循环组 group.shutdownGracefully(); } } }
在上述代码中,NettyClient
类通过Bootstrap
配置并启动了一个Netty客户端。客户端使用了一个事件循环组(group
)来处理通道的连接和接收到的消息。
在initChannel
方法中,初始化了SocketChannel
的通道 pipeline,并添加了ObjectEncoder
和自定义的处理器NettyClientHandler
。ObjectEncoder
用于将Java对象序列化为字节流,NettyClientHandler
用于处理业务逻辑。
客户端启动后,会连接到指定IP地址(本例中为127.0.0.1)和端口(本例中为4567)的服务器,并等待客户端通道关闭。在关闭客户端之前,通过调用shutdownGracefully
方法优雅地关闭事件循环组。
请注意,此代码片段仅作为Netty客户端启动的示例,实际应用中需要根据具体业务需求调整NettyClientHandler
以实现相应的功能。
这段代码是一个自定义的Netty处理器,名为NettyClientHandler
。它继承自ChannelInboundHandlerAdapter
,用于处理客户端接收到的消息和通道激活事件。
package com.artisan.codec.objectencoder; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; /** * @author 小工匠 * @version 1.0 * @mark: show me the code , change the world */ public class NettyClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 当接收到服务器发送的消息时,执行该方法 System.out.println("收到服务器消息:" + msg); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 当通道激活时,执行该方法 System.out.println("NettyClientHandler发送数据"); // 测试对象编解码 ArtisanSimple artisanSimple = new ArtisanSimple(1, "xxxx"); ctx.writeAndFlush(artisanSimple); } }
在上述代码中,NettyClientHandler
类重写了channelRead
和channelActive
方法。
channelRead
方法用于处理客户端接收到的服务器消息。在这个例子中,它将打印出接收到的消息。在实际应用中,你可以根据业务需求修改此方法以处理不同的消息类型和逻辑。
channelActive
方法用于处理通道激活事件。在这个例子中,它将打印一条日志,并测试对象编解码功能。具体来说,它创建了一个ArtisanSimple
对象,并通过ctx.writeAndFlush()
方法将其发送到服务器。
在实际应用中,你可以根据需求修改此方法以实现不同的业务逻辑。
NettyClientHandler
处理器需要与ObjectEncoder
和ObjectDecoder
配合使用,以确保发送和接收到的字节流能够正确地反序列化为Java对象。在客户端启动类NettyClient
中,NettyClientHandler
已经添加到了通道的pipeline中,因此可以处理发送和接收到的消息。
package com.artisan.codec.objectencoder; import java.io.Serializable; /** * @author 小工匠 * @version 1.0 * @mark: show me the code , change the world */ public class ArtisanSimple implements Serializable { private int id; private String name; public ArtisanSimple() { } public ArtisanSimple(int id, String name) { super(); this.id = id; this.name = name; } public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public String toString() { return "ArtisanSimple{" + "id=" + id + ", name='" + name + '\'' + '}'; } }
package com.artisan.codec.objectencoder; /** * @author 小工匠 * @version 1.0 * @mark: show me the code , change the world */ public class AddressSimple { private String location; public String getLocation() { return location; } public void setLocation(String location) { this.location = location; } public AddressSimple() { } public AddressSimple(String location) { this.location = location; } @Override public String toString() { return "AddressSimple{" + "location='" + location + '\'' + '}'; } }
【测试】
源码分析
ObjectEncoder
这段代码定义了一个名为ObjectEncoder
的类,它属于Netty网络通信框架的一部分,用于将Java对象序列化为字节流。
下面是对代码的详细分析以及增加的中文注释:
package io.netty.handler.codec.serialization; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufOutputStream; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.Serializable; /** * An encoder which serializes a Java object into a {@link ByteBuf}. * <p> * 请注意,此编码器产生的序列化形式与标准的{@link ObjectInputStream}不兼容。 * 请使用{@link ObjectDecoder}或{@link ObjectDecoderInputStream}以确保与该编码器的互操作性。 */ @Sharable public class ObjectEncoder extends MessageToByteEncoder<Serializable> { // 定义一个占位符,用于标记ByteBuf中对象序列化数据长度的位置 private static final byte[] LENGTH_PLACEHOLDER = new byte[4]; // 覆写encode方法,实现序列化逻辑 @Override protected void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) throws Exception { int startIdx = out.writerIndex(); // 记录开始编码的位置 // 创建一个ByteBufOutputStream包装器,用于向ByteBuf中写入数据 ByteBufOutputStream bout = new ByteBufOutputStream(out); ObjectOutputStream oout = null; try { bout.write(LENGTH_PLACEHOLDER); // 先写入长度占位符 // 创建一个紧凑型ObjectOutputStream,用于序列化对象 oout = new CompactObjectOutputStream(bout); oout.writeObject(msg); // 将要序列化的对象写入流中 oout.flush(); // 刷新输出流,确保所有数据都被写出 } finally { // 关闭ObjectOutputStream和ByteBufOutputStream if (oout != null) { oout.close(); } else { bout.close(); } } int endIdx = out.writerIndex(); // 记录编码结束的位置 // 设置占位符的长度,即实际序列化数据长度 out.setInt(startIdx, endIdx - startIdx - 4); } }
在上述代码中,ObjectEncoder
类继承自MessageToByteEncoder
,这意味着它是一个用于将某种类型消息编码成字节流的编码器。encode
方法被重写以实现序列化过程。在这个方法中,首先通过ByteBufOutputStream
向ByteBuf
写入了一个长度占位符,然后通过CompactObjectOutputStream
将传入的Serializable
对象序列化成字节流,并写入到ByteBuf
中。最后,修改了长度占位符,将其设置为实际序列化数据的长度。
此代码片段使用@Sharable
注解标记,表明这个ChannelHandler
是可以共享给多个ChannelPipeline
的。
序列化完成后,通过ObjectOutputStream
的flush
方法刷新流,确保所有数据都被写出。最后,在finally
块中关闭输出流,确保资源被正确释放。
ObjectDecoder
这段代码定义了一个名为ObjectDecoder
的类,它也是Netty网络通信框架的一部分,用于将接收到的字节流反序列化为Java对象。
下面是对代码的详细分析以及增加的中文注释:
package io.netty.handler.codec.serialization; // 引入Netty相关类库 import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufInputStream; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; // 引入Java序列化相关类库 import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.StreamCorruptedException; /** * A decoder which deserializes the received {@link ByteBuf}s into Java * objects. * <p> * 请注意,此解码器期望的序列化形式与标准的{@link ObjectOutputStream}不兼容。 * 请使用{@link ObjectEncoder}或{@link ObjectEncoderOutputStream}以确保与该解码器的互操作性。 */ public class ObjectDecoder extends LengthFieldBasedFrameDecoder { // ClassResolver用于加载序列化对象的类 private final ClassResolver classResolver; /** * 创建一个新的解码器,其最大对象大小为1048576字节。 * 如果接收到的对象大小大于1048576字节,将抛出StreamCorruptedException异常。 * * @param classResolver 用于此解码器的ClassResolver */ public ObjectDecoder(ClassResolver classResolver) { this(1048576, classResolver); } /** * 创建一个新的解码器,其最大对象大小为指定的值。 * * @param maxObjectSize 序列化对象的最大字节长度。 * 如果接收到的对象的长度大于此值,将抛出StreamCorruptedException异常。 * @param classResolver 用于加载序列化对象类的ClassResolver */ public ObjectDecoder(int maxObjectSize, ClassResolver classResolver) { super(maxObjectSize, 0, 4, 0, 4); this.classResolver = classResolver; } // 覆写decode方法,实现反序列化逻辑 @Override protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { ByteBuf frame = (ByteBuf) super.decode(ctx, in); if (frame == null) { return null; } // 创建一个紧凑型ObjectInputStream,用于反序列化对象 ObjectInputStream ois = new CompactObjectInputStream(new ByteBufInputStream(frame, true), classResolver); try { return ois.readObject(); // 读取并返回反序列化的对象 } finally { ois.close(); // 关闭输入流 } } }
在上述代码中,ObjectDecoder
类继承自LengthFieldBasedFrameDecoder
,这意味着它是一个用于解码具有长度字段帧的解码器。decode
方法被重写以实现反序列化过程。在这个方法中,首先通过LengthFieldBasedFrameDecoder
的解码方法获取到包含序列化数据的ByteBuf
帧,然后通过CompactObjectInputStream
将字节流反序列化为Java对象。
此代码片段使用了一个ClassResolver
,它负责加载序列化对象的类,从而允许在反序列化过程中创建对象。反序列化完成后,通过ObjectInputStream
的close
方法关闭输入流,确保资源被正确释放。
小结
ObjectEncoder和ObjectDecoder是Netty框架中的两个重要组件,它们分别负责将Java对象编码为字节流以及将字节流解码为Java对象。
ObjectEncoder是一个ChannelOutboundHandler,它主要负责将Java对象转换为字节流。当发送一个对象时,ObjectEncoder会根据对象的类型将其序列化为字节流,以便在网络上进行传输。ObjectEncoder通常与ObjectDecoder配合使用,以确保编码和解码过程能够正确地进行。
ObjectDecoder是一个ChannelInboundHandler,它主要负责将接收到的字节流解码为Java对象。当接收到字节流时,ObjectDecoder会根据字节流的类型进行反序列化,将字节流转换回原始的Java对象。ObjectDecoder通常与ObjectEncoder配合使用,以确保编码和解码过程能够正确地进行。
在实际应用中,ObjectEncoder
和ObjectDecoder
需要根据业务需求进行定制,以便正确地处理各种不同类型的对象。通过使用这两个组件,Netty框架可以在发送和接收消息时自动进行对象的编码和解码,简化了网络编程的复杂度。