Netty 底层是基于 TCP 协议来处理网络数据传输。我们知道 TCP 协议是面向字节流的协议,数据像流水一样在网络中传输那何来 “包” 的概念呢?
TCP是四层协议不负责数据逻辑的处理,但是数据在TCP层 “流” 的时候为了保证安全和节约效率会把 “流” 做一些分包处理,比如:
- 发送方约定了每次数据传输的最大包大小,超过该值的内容将会被拆分成两个包发送;
- 发送端 和 接收端 约定每次发送数据包长度并随着网络状况动态调整接收窗口大小,这里也会出现拆包的情况;
一、TCP 中可能出现粘包/拆包的原因
数据流在TCP协议下传播,因为协议本身对于流有一些规则的限制,这些规则会导致当前对端接收到的数据包不完整,归结原因有下面三种情况:
- Socket 缓冲区与滑动窗口
- MSS/MTU限制
- Nagle算法
1. Socket缓冲区与滑动窗口
对于 TCP 协议而言,它传输数据是基于字节流传输的。应用层在传输数据时,实际上会先将数据写入到 TCP 套接字的缓冲区,当缓冲区被写满后,数据才会被写出去。每个TCP Socket 在内核中都有一个发送缓冲区(SO_SNDBUF )和一个接收缓冲区(SO_RCVBUF),TCP 的全双工的工作模式以及 TCP 的滑动窗口便是依赖于这两个独立的 buffer 以及此 buffer 的填充状态。
SO_SNDBUF:
进程发送的数据的时候假设调用了一个 send 方法,将数据拷贝进入 Socket 的内核发送缓冲区之中,然后 send 便会在上层返回。换句话说,send 返回之时,数据不一定会发送到对端去(和write写文件有点类似),send 仅仅是把应用层 buffer 的数据拷贝进 Socket 的内核发送 buffer 中。
SO_RCVBUF:
把接收到的数据缓存入内核,应用进程一直没有调用 read 进行读取的话,此数据会一直缓存在相应 Socket 的接收缓冲区内。不管进程是否读取 Socket,对端发来的数据都会经由内核接收并且缓存到 Socket 的内核接收缓冲区之中。read 所做的工作,就是把内核缓冲区中的数据拷贝到应用层用户的 buffer 里面,仅此而已。
接收缓冲区保存收到的数据一直到应用进程读走为止。对于 TCP,如果应用进程一直没有读取,buffer 满了之后发生的动作是:通知对端 TCP 协议中的窗口关闭。这个便是滑动窗口的实现。保证 TCP 套接口接收缓冲区不会溢出,从而保证了 TCP 是可靠传输。因为对方不允许发出超过所通告窗口大小的数据。 这就是 TCP 的流量控制,如果对方无视窗口大小而发出了超过窗口大小的数据,则接收方 TCP 将丢弃它。
滑动窗口:
TCP连接在三次握手的时候,会将自己的窗口大小(window size)发送给对方,其实就是 SO_RCVBUF 指定的值。之后在发送数据的时,发送方必须要先确认接收方的窗口没有被填充满,如果没有填满,则可以发送。
每次发送数据后,发送方将自己维护的对方的 window size 减小,表示对方的 SO_RCVBUF 可用空间变小。
当接收方处理开始处理 SO_RCVBUF 中的数据时,会将数据从 Socket 在内核中的接受缓冲区读出,此时接收方的 SO_RCVBUF 可用空间变大,即 window size 变大,接受方会以 ack 消息的方式将自己最新的 window size 返回给发送方,此时发送方将自己的维护的接受的方的 window size 设置为ack消息返回的 window size。
此外,发送方可以连续的给接受方发送消息,只要保证对方的 SO_RCVBUF 空间可以缓存数据即可,即 window size>0。当接收方的 SO_RCVBUF 被填充满时,此时 window size=0,发送方不能再继续发送数据,要等待接收方 ack 消息,以获得最新可用的 window size。
2. MSS/MTU分片
MTU (Maxitum Transmission Unit,最大传输单元)是链路层对一次可以发送的最大数据的限制。MSS(Maxitum Segment Size,最大分段大小)是 TCP 报文中 data 部分的最大长度,是传输层对一次可以发送的最大数据的限制。
数据在传输过程中,每经过一层,都会加上一些额外的信息:
- 应用层:只关心发送的数据 data,将数据写入 Socket 在内核中的缓冲区 SO_SNDBUF 即返回,操作系统会将 SO_SNDBUF 中的数据取出来进行发送;
- 传输层:会在 data 前面加上 TCP Header(20字节);
- 网络层:会在 TCP 报文的基础上再添加一个 IP Header,也就是将自己的网络地址加入到报文中。IPv4 中 IP Header 长度是 20 字节,IPV6 中 IP Header 长度是 40 字节;
- 链路层:加上 Datalink Header 和 CRC。会将 SMAC(Source Machine,数据发送方的MAC地址),DMAC(Destination Machine,数据接受方的MAC地址 )和 Type 域加入。SMAC+DMAC+Type+CRC 总长度为 18 字节;
- 物理层:进行传输。
在回顾这个基本内容之后,再来看 MTU 和 MSS。MTU 是以太网传输数据方面的限制,每个以太网帧最大不能超过 1518bytes。刨去以太网帧的帧头(DMAC+SMAC+Type域) 14Bytes 和帧尾 (CRC校验 ) 4 Bytes,那么剩下承载上层协议的地方也就是 data 域最大就只能有 1500 Bytes 这个值 我们就把它称之为 MTU。
MSS 是在 MTU 的基础上减去网络层的 IP Header 和传输层的 TCP Header 的部分,这就是 TCP 协议一次可以发送的实际应用数据的最大大小。
3. Nagle 算法
TCP/IP 协议中,无论发送多少数据,总是要在数据(data)前面加上协议头(TCP Header+IP Header),同时,对方接收到数据,也需要发送 ACK 表示确认。
即使从键盘输入的一个字符,占用一个字节,可能在传输上造成 41 字节的包,其中包括 1 字节的有用信息和 40 字节的首部数据。这种情况转变成了 4000% 的消耗,这样的情况对于重负载的网络来是无法接受的。称之为"糊涂窗口综合征"。
为了尽可能的利用网络带宽,TCP 总是希望尽可能的发送足够大的数据。(一个连接会设置 MSS 参数,因此,TCP/IP 希望每次都能够以 MSS 尺寸的数据块来发送数据)。Nagle 算法就是为了尽可能发送大块数据,避免网络中充斥着许多小数据块。
Nagle 算法的基本定义是任意时刻,最多只能有一个未被确认的小段。 所谓 “小段”,指的是小于 MSS 尺寸的数据块;所谓“未被确认”,是指一个数据块发送出去后,没有收到对方发送的 ACK 确认该数据已收到。
Nagle 算法的规则:
- 如果 SO_SNDBUF 中的数据长度达到 MSS,则允许发送;
- 如果该 SO_SNDBUF 中含有 FIN,表示请求关闭连接,则先将 SO_SNDBUF 中的剩余数据发送,再关闭;
- 设置了 TCP_NODELAY=true 选项,则允许发送。TCP_NODELAY 是取消 TCP 的确认延迟机制,相当于禁用了 Negale 算法。正常情况下,当 Server 端收到数据之后,它并不会马上向 client 端发送 ACK,而是会将 ACK 的发送延迟一段时间(一般是 40ms),它希望在 t 时间内 server 端会向 client 端发送应答数据,这样 ACK 就能够和应答数据一起发送,就像是应答数据捎带着 ACK 过去。当然,TCP 确认延迟 40ms 并不是一直不变的, TCP 连接的延迟确认时间一般初始化为最小值 40ms,随后根据连接的重传超时时间(RTO)、上次收到数据包与本次接收数据包的时间间隔等参数进行不断调整。另外可以通过设置 TCP_QUICKACK 选项来取消确认延迟;
- 未设置 TCP_CORK 选项时,若所有发出去的小数据包(包长度小于MSS)均被确认,则允许发送;
- 上述条件都未满足,但发生了超时(一般为200ms),则立即发送。
基于以上问题,TCP层肯定是会出现当次接收到的数据是不完整数据的情况。出现粘包可能的原因有:
- 发送方每次写入数据 < 套接字缓冲区大小;
- 接收方读取套接字缓冲区数据不够及时。
出现半包的可能原因有:
- 发送方每次写入数据 > 套接字缓冲区大小;
- 发送的数据大于协议 MTU,所以必须要拆包。
解决问题肯定不是在4层来做而是在应用层,通过定义通信协议来解决粘包和拆包的问题。发送方 和 接收方约定某个规则:
- 当发生粘包的时候通过某种约定来拆包;
- 如果在拆包,通过某种约定来将数据组成一个完整的包处理。
二、业界常用解决方案
1. 定长协议
指定一个报文具有固定长度。比如约定一个报文的长度是 5 字节,那么:
报文:1234,只有4字节,但是还差一个怎么办呢,不足部分用空格补齐。就变为:1234 。
如果不补齐空格,那么就会读到下一个报文的字节来填充上一个报文直到补齐为止,这样粘包了。
定长协议的优点是使用简单,缺点很明显:浪费带宽。
Netty 中提供了 FixedLengthFrameDecoder ,支持把固定的长度的字节数当做一个完整的消息进行解码。
2. 特殊字符分割协议
很好理解,在每一个你认为是一个完整的包的尾部添加指定的特殊字符,比如:\n,\r等等。
需要注意的是:约定的特殊字符要保证唯一性,不能出现在报文的正文中,否则就将正文一分为二了。
Netty 中提供了 DelimiterBasedFrameDecoder 根据特殊字符进行解码,LineBasedFrameDecoder默认以换行符作为分隔符。
3. 变长协议
变长协议的核心就是:将消息分为消息头和消息体,消息头中标识当前完整的消息体长度。
- 发送方在发送数据之前先获取数据的二进制字节大小,然后在消息体前面添加消息大小;
- 接收方在解析消息时先获取消息大小,之后必须读到该大小的字节数才认为是完整的消息。
Netty 中提供了 LengthFieldBasedFrameDecoder ,通过LengthFieldPrepender 来给实际的消息体添加 length 字段。
三、Netty粘包和拆包解决方案
为了解决网络数据流的拆包粘包问题,Netty 为我们内置了如下的解码器:
- FixedLengthFrameDecoder(使用定长的报文来分包)
- DelimiterBasedFrameDecoder(添加特殊分隔符报文来分包)
- LineBasedFrameDecoder(数据未尾添加回车换行符来分包)
- LengthFieldBasedFrameDecoder(使用消息头和消息体来分包)
- ByteToMessageDecoder(如果想实现自己的半包解码器,实现该类)
- MessageToMessageDecoder(一般作为二次解码器,当我们在 ByteToMessageDecoder 将一个 bytes 数组转换成一个 java 对象的时候,我们可能还需要将这个对象进行二次解码成其他对象,我们就可以继承这个类)
- StringDecoder(字符串解码器)
- ProtoBufVarint32FrameDecoder(通过 Protobuf 解码器来区分整包消息)
- ProtobufDecoder(Protobuf 解码器)
Netty 还内置了如下的编码器:
- MessageToByteEncoder(将 Java 对象编码成 ByteBuf)
- MessageToMessageEncoder(如果不想将 Java 对象编码成 ByteBuf,而是自定义类就继承这个)
- LengthFieldPrepender(如果我们在发送消息的时候采用:消息长度字段+原始消息的形式,我们就可以使用 LengthFieldPrepender。因为 LengthFieldPrepender 可以将待发送消息的长度(二进制字节长度)写到 ByteBuf 的前两个字节)
- ProtobufVarint32LengthFieldPrepender(Protobuf 编码器,在原来的数据前面,追加一个长度)
- ProtobufEncoder(Protobuf 编码器)
LineBasedFrameDecoder 使用
LineBasedFrameDecoder 通过在包尾添加回车换行符\r\n 来区分整包消息
复制代码 @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); // 将分隔之后的字节数据转换为字符串数据 ch.pipeline().addLast(new StringDecoder()); // 这是我们自定义的一个编码器,主要作用是在返回的响应数据最后添加分隔符 ch.pipeline().addLast(new LineBasedFrameEncoder()); // 最终处理数据并且返回响应的handler ch.pipeline().addLast(new EchoServerHandler()); }
FixedLengthFrameDecoder解码器
对于使用固定长度的粘包和拆包场景,可以使用FixedLengthFrameDecoder,该解码器会每次读取固定长度的消息,如果当前读取到的消息不足指定长度,那么就会等待下一个消息到达后进行补足
public class EchoServer { public void bind(int port) throws InterruptedException { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { // 这里将FixedLengthFrameDecoder添加到pipeline中,指定长度为20 ch.pipeline().addLast(new FixedLengthFrameDecoder(20)); // 将前一步解码得到的数据转码为字符串 ch.pipeline().addLast(new StringDecoder()); // 这里FixedLengthFrameEncoder是我们自定义的,用于将长度不足20的消息进行补全空格 ch.pipeline().addLast(new FixedLengthFrameEncoder(20)); // 最终的数据处理 ch.pipeline().addLast(new EchoServerHandler()); } }); ChannelFuture future = bootstrap.bind(port).sync(); future.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }
DelimiterBasedFrameDecoder解码器
DelimiterBasedFrameDecoder是分隔符解码器,用户可以指定消息结束的分隔符,它可以自动完成以分隔符作为码流结束标识的消息的解码。
@Override protected void initChannel(SocketChannel ch) throws Exception { String delimiter = "_$"; // 将delimiter设置到DelimiterBasedFrameDecoder中,经过该解码一器进行处理之后,源数据将会 // 被按照_$进行分隔,这里1024指的是分隔的最大长度,即当读取到1024个字节的数据之后,若还是未 // 读取到分隔符,则舍弃当前数据段,因为其很有可能是由于码流紊乱造成的 ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, Unpooled.wrappedBuffer(delimiter.getBytes()))); // 将分隔之后的字节数据转换为字符串数据 ch.pipeline().addLast(new StringDecoder()); // 这是我们自定义的一个编码器,主要作用是在返回的响应数据最后添加分隔符 ch.pipeline().addLast(new DelimiterBasedFrameEncoder(delimiter)); // 最终处理数据并且返回响应的handler ch.pipeline().addLast(new EchoServerHandler()); }
LengthFieldBasedFrameDecoder 使用
LengthFieldBasedFrameDecoder相对就高端一点。前面我们使用到的拆包都是基于一些约定来做的,比如固定长度,特殊分隔符,这些方案总是有一定的弊端。最好的方案就是:发送方告诉我当前消息总长度,接收方如果没有收到该长度大小的数据就认为是没有收完继续等待。
* @param maxFrameLength 帧的最大长度 * * @param lengthFieldOffset 长度字段偏移的地址 * * @param lengthFieldLength 长度字段所占的字节长 * 修改帧数据长度字段中定义的值,可以为负数 因为有时候我们习惯把头部记入长度, * 若为负数,则说明要推后多少个字段 * @param lengthAdjustment 解析时候跳过多少个长度 * * @param initialBytesToStrip 解码出一个数据包之后,去掉开头的字节数 * * @param initialBytesToStrip 为true,当frame长度超过maxFrameLength时立即报 * TooLongFrameException异常,为false,读取完整个帧再报异 * */ public LengthFieldBasedFrameDecoder( int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) { this( maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, true); }
public class EchoServer { public void bind(int port) throws InterruptedException { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { // 这里将LengthFieldBasedFrameDecoder添加到pipeline的首位,因为其需要对接收到的数据 // 进行长度字段解码,这里也会对数据进行粘包和拆包处理 ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2)); // LengthFieldPrepender是一个编码器,主要是在响应字节数据前面添加字节长度字段 ch.pipeline().addLast(new LengthFieldPrepender(2)); // 对经过粘包和拆包处理之后的数据进行json反序列化,从而得到User对象 ch.pipeline().addLast(new JsonDecoder()); // 对响应数据进行编码,主要是将User对象序列化为json ch.pipeline().addLast(new JsonEncoder()); // 处理客户端的请求的数据,并且进行响应 ch.pipeline().addLast(new EchoServerHandler()); } }); ChannelFuture future = bootstrap.bind(port).sync(); future.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws InterruptedException { new EchoServer().bind(8080); } }
📢文章下方有交流学习区!一起学习进步!也可以前往官网,加入官方微信交流群💪💪💪
📢创作不易,如果觉得文章不错,可以点赞👍收藏📁评论📒
📢你的支持和鼓励是我创作的动力❗❗❗
官网:Doker 多克;官方旗舰店:官方旗舰店 全品优惠