概述
Pre
Netty Review - 借助SimpleTalkRoom初体验异步网络编程的魅力
Netty Review - 优化Netty通信:如何应对粘包和拆包挑战 中我们遗留了一个内容
今天我们就通过自定义长度分包解码器来解决粘包拆包的问题
概述
在Netty中,自定义长度分包编解码器通常涉及到两个组件:
- 一个用于编码的
MessageToByteEncoder
, - 另一个用于解码的
ByteToMessageDecoder
。
Code
核心思路:发送每条数据的时候,将数据的长度一并发送,比如可以选择每条数据的前4位是数据的长度,应用层处理时可以根据长度来判断每条数据的开始和结束。
服务器端程序如下,其目的是创建一个服务,该服务器监听1234端口,并使用自定义的编解码器处理接收到的消息。
package com.artisan.pack_custom_codec; import com.artisan.pack_custom_codec.codec.CustomMessageDecoder; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * @author 小工匠 * @version 1.0 * @mark: show me the code , change the world */ public class ArtisanServer { public static void main(String[] args) throws InterruptedException { // 主事件循环组,用于接受进来的连接,这里只设置了1个线程 EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 从事件循环组,用于处理已接受连接的IO操作,这里设置了8个线程 EventLoopGroup workerGroup = new NioEventLoopGroup(8); try { // 创建服务器启动类 ServerBootstrap bootstrap = new ServerBootstrap(); // 配置事件循环组 bootstrap.group(bossGroup, workerGroup) // 使用NioServerSocketChannel作为服务器通道 .channel(NioServerSocketChannel.class) // 设置一个选项,用于配置服务器同步接受连接的backlog大小 .option(ChannelOption.SO_BACKLOG, 1024) // 配置子通道初始化处理器 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { // 获取通道的管道 ChannelPipeline pipeline = ch.pipeline(); // 添加自定义的解码器 pipeline.addLast("customDecoder",new CustomMessageDecoder()); // 添加业务处理handler pipeline.addLast(new ArtisanServerHandler()); } }); // 绑定端口并同步等待成功,打印成功启动信息 ChannelFuture channelFuture = bootstrap.bind(1234).sync(); System.out.println("Talk Room Server启动成功,监听1234端口"); // 关闭通道 // 这里的关闭是等待服务器socket关闭,通常在服务器关闭时调用 channelFuture.channel().closeFuture().sync(); } finally { // 优雅地关闭主从事件循环组,释放资源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
代码中主要涉及以下部分:
- 服务器端的启动类
ArtisanServer
。 NioEventLoopGroup
的使用,用于接受和处理网络事件。ServerBootstrap
的配置,用于设置服务器参数。CustomMessageDecoder
的使用,用于自定义消息的编解码方式。ArtisanServerHandler
的添加,用于处理具体的业务逻辑。- 端口绑定和通道关闭的操作。
自定义协议
package com.artisan.pack_custom_codec.protocol; /** * @author 小工匠 * @version 1.0 * @mark: show me the code , change the world * @description: 自定义协议 */ public class CustomMessageProtocol { /** * 定义一次发送包体长度 */ private int len; /** * 一次发送包体内容 */ private byte[] content; public int getLen() { return len; } public void setLen(int len) { this.len = len; } public byte[] getContent() { return content; } public void setContent(byte[] content) { this.content = content; } }
自定义解码器
server端接收网卡过来的消息,入站事件,自然是要解码,将网络传输的二进制数据转换为对象
package com.artisan.pack_custom_codec.codec; import com.artisan.pack_custom_codec.protocol.CustomMessageProtocol; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import java.util.List; /** * @author 小工匠 * @version 1.0 * @mark: show me the code , change the world */ public class CustomMessageDecoder extends ByteToMessageDecoder { int length = 0; @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { System.out.println(); System.out.println("CustomMessageDecoder decode 被调用"); //需要将得到二进制字节码-> CustomMessageDecoder 数据包(对象) System.out.println(in); // int 占 4个字节 (int 类型占用4个字节,即32位) if (in.readableBytes() >= 4) { if (length == 0) { length = in.readInt(); } if (in.readableBytes() < length) { System.out.println("当前可读数据不够,继续等待。。"); return; } byte[] content = new byte[length]; if (in.readableBytes() >= length) { in.readBytes(content); //封装成CustomMessageDecoder对象,传递到下一个handler业务处理 CustomMessageProtocol messageProtocol = new CustomMessageProtocol(); messageProtocol.setLen(length); messageProtocol.setContent(content); out.add(messageProtocol); } length = 0; } } }
服务端的消息处理
package com.artisan.pack_custom_codec; import com.artisan.pack_custom_codec.protocol.CustomMessageProtocol; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.CharsetUtil; /** * @author 小工匠 * @version 1.0 * @mark: show me the code , change the world */ public class ArtisanServerHandler extends SimpleChannelInboundHandler<CustomMessageProtocol> { private int count; @Override protected void channelRead0(ChannelHandlerContext ctx, CustomMessageProtocol msg) throws Exception { System.out.println("====服务端接收到消息如下===="); System.out.println("长度=" + msg.getLen()); System.out.println("内容=" + new String(msg.getContent(), CharsetUtil.UTF_8)); System.out.println("服务端接收到消息包数量=" + (++this.count)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
接下来继续看看客户端该如何处理 ?
客户端启动类
package com.artisan.pack_custom_codec; import com.artisan.pack_custom_codec.codec.CustomMessageEncoder; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import java.util.Scanner; /** * @author 小工匠 * @version 1.0 * @mark: show me the code , change the world */ public class ArtisanClient { public static void main(String[] args) throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // 自定义长度分包编码器 pipeline.addLast("customEncoder",new CustomMessageEncoder()); //加入自己的业务处理handler pipeline.addLast(new ArtisanClientHandler()); } }); ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 1234).sync(); //得到 channel Channel channel = channelFuture.channel(); System.out.println("========" + channel.localAddress() + "========"); //客户端需要输入信息, 创建一个扫描器 Scanner scanner = new Scanner(System.in); while (scanner.hasNextLine()) { String msg = scanner.nextLine(); //通过 channel 发送到服务器端 channel.writeAndFlush(msg); } // 阻塞操作,closeFuture()开启了一个channel的监听器(这期间channel在进行各项工作),直到链路断开 channelFuture.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); } } }
这段代码定义了一个名为CustomMessageEncoder
的Netty编解码器,用于将自定义协议的消息编码为字节流。下面是代码的详细解释:
CustomMessageEncoder
类继承自MessageToByteEncoder
,这意味着它将被用于编码CustomMessageProtocol
类型的消息。encode
方法在需要将消息编码为字节流时调用。在这个方法中,首先打印了一条消息,表明encode
方法被调用了。然后,它将消息的长度写入到输出缓冲区out
中,接着将消息内容写入到输出缓冲区。
这个编解码器的主要作用是将自定义协议的消息转换为字节流,以便可以在网络上传输。它首先写入消息的长度,然后写入消息的内容,这样接收方就可以根据长度来解析消息的内容。
自定义编码器
package com.artisan.pack_custom_codec.codec; import com.artisan.pack_custom_codec.protocol.CustomMessageProtocol; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; /** * @author 小工匠 * @version 1.0 * @mark: show me the code , change the world */ public class CustomMessageEncoder extends MessageToByteEncoder<CustomMessageProtocol> { @Override protected void encode(ChannelHandlerContext ctx, CustomMessageProtocol msg, ByteBuf out) throws Exception { System.out.println("CustomMessageEncoder encode called~"); out.writeInt(msg.getLen()); out.writeBytes(msg.getContent()); } }
客户端发数据到网卡,出站事件 , 需要编码。
客户端业务处理Handler
package com.artisan.pack_custom_codec; import com.artisan.pack_custom_codec.protocol.CustomMessageProtocol; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.CharsetUtil; /** * @author 小工匠 * @version 1.0 * @mark: show me the code , change the world */ public class ArtisanClientHandler extends SimpleChannelInboundHandler<CustomMessageProtocol> { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { for (int i = 0; i < 2; i++) { String msg = "小工匠的Netty Review之旅"; //创建协议包对象 CustomMessageProtocol customMessageProtocol = new CustomMessageProtocol(); customMessageProtocol.setLen(msg.getBytes(CharsetUtil.UTF_8).length); customMessageProtocol.setContent(msg.getBytes(CharsetUtil.UTF_8)); ctx.writeAndFlush(customMessageProtocol); } } @Override protected void channelRead0(ChannelHandlerContext ctx, CustomMessageProtocol msg) throws Exception { } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
这段代码定义了一个名为ArtisanClientHandler
的Netty处理器,用于处理自定义协议的消息。下面是代码的详细解释:
ArtisanClientHandler
类继承自SimpleChannelInboundHandler
,这意味着它将被用于处理CustomMessageProtocol
类型的消息。channelActive
方法在Netty通道激活时调用。在这个方法中,代码循环两次,发送一个包含特定字符串的消息。每次循环,它都会创建一个CustomMessageProtocol
对象,设置消息长度,并填充内容,然后通过ctx.writeAndFlush
方法将消息写入通道。channelRead0
方法在接收到消息时调用。在这个例子中,该方法为空,没有实现任何功能。exceptionCaught
方法在捕获到异常时调用。在这个方法中,它打印了异常的堆栈跟踪,并关闭了通道。
测试
启动server 和 client