概述
Pre
Netty Review - 借助SimpleTalkRoom初体验异步网络编程的魅力
概述
粘包和拆包是在计算机网络通信中常见的问题,特别是在使用基于流的传输协议(如TCP)时。这两个问题涉及到数据在传输过程中的组织和解析。
- 粘包(Packet Concatenation):
- 定义: 粘包指的是发送方发送的多个小数据包在接收方看来被组合成一个大的数据包。
- 原因: 发送方连续发送的数据可能在网络中被合并成一个数据流,导致接收方无法准确分辨每个数据包的边界。
- 可能的解决方案: 使用特殊的分隔符标记数据包的边界,或者在数据包中包含长度信息。
- 拆包(Packet Fragmentation):
- 定义: 拆包是指接收方接收到的数据包过大,被拆分成多个较小的数据包。
- 原因: 数据包在传输过程中可能被分割,到达接收方时需要重新组装。
- 可能的解决方案: 在数据包中包含长度信息,或者使用特殊的标记表示数据包的边界。
在处理粘包和拆包问题时,通信双方需要协调一致,以确保数据的正确性和完整性。使用合适的协议和通信模式,以及采用适当的分隔符或长度字段,有助于减轻或解决这些问题。
TCP是一个流协议,就是没有界限的一长串二进制数据。TCP作为传输层协议并不不了解上层业务数据的具体含义,它会根据TCP缓冲区
的实际情况进行数据包的划分,所以在业务上认为是一个完整的包,可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成
一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。面向流的通信是无消息保护边界的。
如下图所示,client发了两个数据包D1和D2,但是server端可能会收到如下几种情况的数据。
比如
正常情况:
发生了粘包:
发生了拆包:
或者
场景复现
我们的代码还是以 Netty Review - 借助SimpleTalkRoom初体验异步网络编程的魅力中的代码为基础,演示一下粘包拆包
启动Server 和 Client ()
【TalkRoomClient2】发送10条消息
package com.artisan.pack; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; /** * @author 小工匠 * @version 1.0 * @mark: show me the code , change the world */ public class TalkRoomClient2 { public static void main(String[] args) throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // 分隔符解码器 (用于测试 按照 _ 分隔符 拆包) //pipeline.addLast(new DelimiterBasedFrameDecoder(10240, Unpooled.copiedBuffer("_".getBytes()))); //向pipeline加入解码器 pipeline.addLast("decoder", new StringDecoder()); //向pipeline加入编码器 pipeline.addLast("encoder", new StringEncoder()); //加入自己的业务处理handler pipeline.addLast(new TalkRoomClientHandler()); } }); ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 1234).sync(); //得到 channel Channel channel = channelFuture.channel(); System.out.println("========" + channel.localAddress() + "========"); // 模拟 拆包粘包 for (int i = 0; i < 10; i++) { channel.writeAndFlush("小工匠123"); } // 阻塞操作,closeFuture()开启了一个channel的监听器(这期间channel在进行各项工作),直到链路断开 channelFuture.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); } } }
【TalkRoomClient】接收 Client2 ---- Server — 自己的消息
package com.artisan.pack; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import java.util.Scanner; /** * @author 小工匠 * @version 1.0 * @mark: show me the code , change the world */ public class TalkRoomClient { public static void main(String[] args) throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // 分隔符解码器 (用于测试 按照 _ 分隔符 拆包) //pipeline.addLast(new DelimiterBasedFrameDecoder(10240, Unpooled.copiedBuffer("_".getBytes()))); //向pipeline加入解码器 pipeline.addLast("decoder", new StringDecoder()); //向pipeline加入编码器 pipeline.addLast("encoder", new StringEncoder()); //加入自己的业务处理handler pipeline.addLast(new TalkRoomClientHandler()); } }); ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 1234).sync(); //得到 channel Channel channel = channelFuture.channel(); System.out.println("========" + channel.localAddress() + "========"); //客户端需要输入信息, 创建一个扫描器 Scanner scanner = new Scanner(System.in); while (scanner.hasNextLine()) { String msg = scanner.nextLine(); //通过 channel 发送到服务器端 channel.writeAndFlush(msg); } // 阻塞操作,closeFuture()开启了一个channel的监听器(这期间channel在进行各项工作),直到链路断开 channelFuture.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); } } }
【测试】
出现了粘包和拆包的现象
解决办法概览
1)消息定长度,传输的数据大小固定长度,例如每段的长度固定为100字节,如果不够空位补空格
2)在数据包尾部添加特殊分隔符,比如下划线,中划线等,这种方法简单易行,但选择分隔符的时候一定要注意每条数据的内部一定不
能出现分隔符。
3)发送长度:发送每条数据的时候,将数据的长度一并发送,比如可以选择每条数据的前4位是数据的长度,应用层处理时可以根据长度
来判断每条数据的开始和结束。 (推荐方案)
Netty提供了多个解码器,可以进行分包的操作,如下:
LineBasedFrameDecoder
(回车换行分包)DelimiterBasedFrameDecoder
(特殊分隔符分包)FixedLengthFrameDecoder
(固定长度报文来分包)
我们先使用第二种方案来描述一下
方式一: 特殊分隔符分包 (演示Netty提供的众多方案中的一种)
我们来看下如何改造?
【TalkRoomServer 】
重点关注的地方是DelimiterBasedFrameDecoder
,这是一个基于分隔符的帧解码器,用于处理客户端发送的按照特定分隔符(在这里是下划线_
)分割的数据包。
package com.artisan.pack; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; /** * @author 小工匠 * @version 1.0 * @mark: show me the code , change the world */ public class TalkRoomServer { public static void main(String[] args) throws InterruptedException { // 创建主事件循环组,用于接受进来的连接 EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 创建工作事件循环组,用于处理已接受连接的IO操作 EventLoopGroup workerGroup = new NioEventLoopGroup(8); try { ServerBootstrap bootstrap = new ServerBootstrap(); // 配置服务器 bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) // 使用NioServerSocketChannel接受进来的连接 .option(ChannelOption.SO_BACKLOG, 1024) // 设置连接队列大小 .childHandler(new ChannelInitializer<SocketChannel>() { // 配置子通道初始化器 @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // 分隔符解码器,按照下划线拆包 pipeline.addLast(new DelimiterBasedFrameDecoder(1024, Unpooled.copiedBuffer(Delimiter.SPLIT.getBytes()))); // 添加字符串解码器 pipeline.addLast("decoder", new StringDecoder()); // 添加字符串编码器 pipeline.addLast("encoder", new StringEncoder()); // 添加自定义的业务处理handler pipeline.addLast(new TalkRoomServerHandler()); } }); // 绑定端口并同步等待成功,然后返回ChannelFuture对象 ChannelFuture channelFuture = bootstrap.bind(1234).sync(); // 打印服务器启动成功信息 System.out.println("Talk Room Server启动成功,监听1234端口"); // 等待服务器socket关闭 channelFuture.channel().closeFuture().sync(); } finally { // 释放资源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
在initChannel
方法中,DelimiterBasedFrameDecoder
被加入到管道中。它用于接收按分隔符(这里是下划线_
)分割的数据包,并把这些数据包转换成一个个的Frame
对象,这样就可以在后续的处理器中逐个处理这些数据包了。这种方式的优点是可以有效处理大量且不定长度的数据包,而不需要担心数据包过大导致内存溢出的问题。
【TalkRoomServerHandler】
因为我们队数据进行了加工转发,所以加工后的消息,也得按照DelimiterBasedFrameDecoder
的处理规则增加 “_”
同样的Client的Pipeline中别忘了增加解码器
启动Server和Client ,我们来测试下
附上其他的代码
package com.artisan.pack; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; /** * @author 小工匠 * @version 1.0 * @mark: show me the code , change the world */ public class TalkRoomClient2 { public static void main(String[] args) throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // 分隔符解码器 (用于测试 按照 _ 分隔符 拆包) pipeline.addLast(new DelimiterBasedFrameDecoder(10240, Unpooled.copiedBuffer(Delimiter.SPLIT.getBytes()))); //向pipeline加入解码器 pipeline.addLast("decoder", new StringDecoder()); //向pipeline加入编码器 pipeline.addLast("encoder", new StringEncoder()); //加入自己的业务处理handler pipeline.addLast(new TalkRoomClientHandler()); } }); ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 1234).sync(); //得到 channel Channel channel = channelFuture.channel(); System.out.println("========" + channel.localAddress() + "========"); // 模拟 拆包粘包 for (int i = 0; i < 10; i++) { channel.writeAndFlush("小工匠123" + Delimiter.SPLIT); } } finally { group.shutdownGracefully(); } } }
package com.artisan.pack; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import java.util.Scanner; /** * @author 小工匠 * @version 1.0 * @mark: show me the code , change the world */ public class TalkRoomClient { public static void main(String[] args) throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // 分隔符解码器 (用于测试 按照 _ 分隔符 拆包) pipeline.addLast(new DelimiterBasedFrameDecoder(10240, Unpooled.copiedBuffer(Delimiter.SPLIT.getBytes()))); //向pipeline加入解码器 pipeline.addLast("decoder", new StringDecoder()); //向pipeline加入编码器 pipeline.addLast("encoder", new StringEncoder()); //加入自己的业务处理handler pipeline.addLast(new TalkRoomClientHandler()); } }); ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 1234).sync(); //得到 channel Channel channel = channelFuture.channel(); System.out.println("========" + channel.localAddress() + "========"); //客户端需要输入信息, 创建一个扫描器 Scanner scanner = new Scanner(System.in); while (scanner.hasNextLine()) { String msg = scanner.nextLine(); //通过 channel 发送到服务器端 channel.writeAndFlush(msg); } // 阻塞操作,closeFuture()开启了一个channel的监听器(这期间channel在进行各项工作),直到链路断开 channelFuture.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); } } }
package com.artisan.pack; /** * @author artisan */ public interface Delimiter { String SPLIT = "_" ; }
流程分析
方式二: 发送长度(推荐)
TODO
DelimiterBasedFrameDecoder 源码分析
A decoder that splits the received ByteBufs by one or more delimiters. It is particularly useful for decoding the frames which ends with a delimiter such as NUL or newline characters. Predefined delimiters Delimiters defines frequently used delimiters for convenience' sake. Specifying more than one delimiter DelimiterBasedFrameDecoder allows you to specify more than one delimiter. If more than one delimiter is found in the buffer, it chooses the delimiter which produces the shortest frame. For example, if you have the following data in the buffer: +--------------+ | ABC\nDEF\r\n | +--------------+ a DelimiterBasedFrameDecoder(Delimiters.lineDelimiter()) will choose '\n' as the first delimiter and produce two frames: +-----+-----+ | ABC | DEF | +-----+-----+ rather than incorrectly choosing '\r\n' as the first delimiter: +----------+ | ABC\nDEF | +----------+
/** * 从{@link ByteBuf}中创建一个帧并返回。 * * @param ctx 此{@link ByteToMessageDecoder}所属的{@link ChannelHandlerContext} * @param buffer 要从中读取数据的{@link ByteBuf} * @return frame 表示帧的{@link ByteBuf},如果没有帧可创建,则返回{@code null} */ protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { if (lineBasedDecoder != null) { // 如果设置了行解码器,则使用行解码器进行解码 return lineBasedDecoder.decode(ctx, buffer); } // 尝试所有的分隔符,并选择产生最短帧的分隔符 int minFrameLength = Integer.MAX_VALUE; ByteBuf minDelim = null; for (ByteBuf delim: delimiters) { int frameLength = indexOf(buffer, delim); if (frameLength >= 0 && frameLength < minFrameLength) { minFrameLength = frameLength; minDelim = delim; } } if (minDelim != null) { int minDelimLength = minDelim.capacity(); ByteBuf frame; if (discardingTooLongFrame) { // 刚刚丢弃了一个非常大的帧 // 回到初始状态 discardingTooLongFrame = false; buffer.skipBytes(minFrameLength + minDelimLength); int tooLongFrameLength = this.tooLongFrameLength; this.tooLongFrameLength = 0; if (!failFast) { fail(tooLongFrameLength); } return null; } if (minFrameLength > maxFrameLength) { // 丢弃读取的帧 buffer.skipBytes(minFrameLength + minDelimLength); fail(minFrameLength); return null; } if (stripDelimiter) { // 如果需要去除分隔符,则从buffer中读取帧 frame = buffer.readRetainedSlice(minFrameLength); buffer.skipBytes(minDelimLength); } else { // 否则,直接从buffer中读取包含分隔符的帧 frame = buffer.readRetainedSlice(minFrameLength + minDelimLength); } return frame; } else { if (!discardingTooLongFrame) { if (buffer.readableBytes() > maxFrameLength) { // 丢弃buffer中的内容,直到找到分隔符 tooLongFrameLength = buffer.readableBytes(); buffer.skipBytes(buffer.readableBytes()); discardingTooLongFrame = true; if (failFast) { fail(tooLongFrameLength); } } } else { // 由于没有找到分隔符,仍在丢弃buffer tooLongFrameLength += buffer.readableBytes(); buffer.skipBytes(buffer.readableBytes()); } return null; } }
这段代码是DelimiterBasedFrameDecoder
类的decode
方法的实现。这个方法的主要作用是根据指定的分隔符将输入的ByteBuf
对象中的数据分割成一个个的帧。
首先,我们来看一下方法的定义:
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { // ... }
decode
方法接收两个参数:
ctx
:解码器所在的ChannelHandlerContext
对象。buffer
:待解码的ByteBuf
对象。
接下来,我们逐行解析代码并添加中文注释:
if (lineBasedDecoder != null) { return lineBasedDecoder.decode(ctx, buffer); }
如果存在行基于的解码器,则使用该解码器进行解码。
int minFrameLength = Integer.MAX_VALUE; ByteBuf minDelim = null; for (ByteBuf delim: delimiters) { int frameLength = indexOf(buffer, delim); if (frameLength >= 0 && frameLength < minFrameLength) { minFrameLength = frameLength; minDelim = delim; } }
遍历所有的分隔符,并找到能产生最短帧的分隔符。
if (minDelim != null) { // ... } else { // ... }
如果找到了分隔符,则根据分隔符分割数据;如果没有找到分隔符,则跳过超过最大帧长度的数据。
if (discardingTooLongFrame) { // ... } else { // ... }
如果正在丢弃过长的帧,则回到初始状态;否则,检查当前帧长度是否超过最大帧长度。
if (stripDelimiter) { frame = buffer.readRetainedSlice(minFrameLength); buffer.skipBytes(minDelimLength); } else { frame = buffer.readRetainedSlice(minFrameLength + minDelimLength); }
根据stripDelimiter
的值来确定是否需要去除分隔符。
return frame;
返回分割后的帧。
if (!discardingTooLongFrame) { // ... } else { // ... }
如果不在丢弃过长的帧,则检查缓冲区中可读字节数是否超过最大帧长度;否则,继续丢弃缓冲区中的数据。
tooLongFrameLength += buffer.readableBytes(); buffer.skipBytes(buffer.readableBytes());
累加过长的帧长度,并跳过过长的数据。
return null;
如果没有找到分隔符,则返回null
。
通过以上代码,DelimiterBasedFrameDecoder
可以根据指定的分隔符将输入的ByteBuf
对象中的数据分割成一个个的帧。这样,就可以在后续的处理器中逐个处理这些帧了。