Netty 中的粘包和拆包详解

简介: Netty 中的粘包和拆包详解

Netty 底层是基于 TCP 协议来处理网络数据传输。我们知道 TCP 协议是面向字节流的协议,数据像流水一样在网络中传输那何来 “包” 的概念呢?

TCP是四层协议不负责数据逻辑的处理,但是数据在TCP层 “流” 的时候为了保证安全和节约效率会把 “流” 做一些分包处理,比如:

  1. 发送方约定了每次数据传输的最大包大小,超过该值的内容将会被拆分成两个包发送;
  2. 发送端 和 接收端 约定每次发送数据包长度并随着网络状况动态调整接收窗口大小,这里也会出现拆包的情况;

一、TCP 中可能出现粘包/拆包的原因

数据流在TCP协议下传播,因为协议本身对于流有一些规则的限制,这些规则会导致当前对端接收到的数据包不完整,归结原因有下面三种情况:

  • Socket 缓冲区与滑动窗口
  • MSS/MTU限制
  • Nagle算法

1. Socket缓冲区与滑动窗口

对于 TCP 协议而言,它传输数据是基于字节流传输的。应用层在传输数据时,实际上会先将数据写入到 TCP 套接字的缓冲区,当缓冲区被写满后,数据才会被写出去。每个TCP Socket 在内核中都有一个发送缓冲区(SO_SNDBUF )和一个接收缓冲区(SO_RCVBUF),TCP 的全双工的工作模式以及 TCP 的滑动窗口便是依赖于这两个独立的 buffer 以及此 buffer 的填充状态。

SO_SNDBUF:

进程发送的数据的时候假设调用了一个 send 方法,将数据拷贝进入 Socket 的内核发送缓冲区之中,然后 send 便会在上层返回。换句话说,send 返回之时,数据不一定会发送到对端去(和write写文件有点类似),send 仅仅是把应用层 buffer 的数据拷贝进 Socket 的内核发送 buffer 中。

SO_RCVBUF:

把接收到的数据缓存入内核,应用进程一直没有调用 read 进行读取的话,此数据会一直缓存在相应 Socket 的接收缓冲区内。不管进程是否读取 Socket,对端发来的数据都会经由内核接收并且缓存到 Socket 的内核接收缓冲区之中。read 所做的工作,就是把内核缓冲区中的数据拷贝到应用层用户的 buffer 里面,仅此而已。

接收缓冲区保存收到的数据一直到应用进程读走为止。对于 TCP,如果应用进程一直没有读取,buffer 满了之后发生的动作是:通知对端 TCP 协议中的窗口关闭。这个便是滑动窗口的实现。保证 TCP 套接口接收缓冲区不会溢出,从而保证了 TCP 是可靠传输。因为对方不允许发出超过所通告窗口大小的数据。 这就是 TCP 的流量控制,如果对方无视窗口大小而发出了超过窗口大小的数据,则接收方 TCP 将丢弃它。

滑动窗口:

TCP连接在三次握手的时候,会将自己的窗口大小(window size)发送给对方,其实就是 SO_RCVBUF 指定的值。之后在发送数据的时,发送方必须要先确认接收方的窗口没有被填充满,如果没有填满,则可以发送。

每次发送数据后,发送方将自己维护的对方的 window size 减小,表示对方的 SO_RCVBUF 可用空间变小。

当接收方处理开始处理 SO_RCVBUF 中的数据时,会将数据从 Socket 在内核中的接受缓冲区读出,此时接收方的 SO_RCVBUF 可用空间变大,即 window size 变大,接受方会以 ack 消息的方式将自己最新的 window size 返回给发送方,此时发送方将自己的维护的接受的方的 window size 设置为ack消息返回的 window size。

此外,发送方可以连续的给接受方发送消息,只要保证对方的 SO_RCVBUF 空间可以缓存数据即可,即 window size>0。当接收方的 SO_RCVBUF 被填充满时,此时 window size=0,发送方不能再继续发送数据,要等待接收方 ack 消息,以获得最新可用的 window size。

2. MSS/MTU分片

MTU (Maxitum Transmission Unit,最大传输单元)是链路层对一次可以发送的最大数据的限制。MSS(Maxitum Segment Size,最大分段大小)是 TCP 报文中 data 部分的最大长度,是传输层对一次可以发送的最大数据的限制。

数据在传输过程中,每经过一层,都会加上一些额外的信息:

  • 应用层:只关心发送的数据 data,将数据写入 Socket 在内核中的缓冲区 SO_SNDBUF 即返回,操作系统会将 SO_SNDBUF 中的数据取出来进行发送;
  • 传输层:会在 data 前面加上 TCP Header(20字节);
  • 网络层:会在 TCP 报文的基础上再添加一个 IP Header,也就是将自己的网络地址加入到报文中。IPv4 中 IP Header 长度是 20 字节,IPV6 中 IP Header 长度是 40 字节;
  • 链路层:加上 Datalink Header 和 CRC。会将 SMAC(Source Machine,数据发送方的MAC地址),DMAC(Destination Machine,数据接受方的MAC地址 )和 Type 域加入。SMAC+DMAC+Type+CRC 总长度为 18 字节;
  • 物理层:进行传输。

在回顾这个基本内容之后,再来看 MTU 和 MSS。MTU 是以太网传输数据方面的限制,每个以太网帧最大不能超过 1518bytes。刨去以太网帧的帧头(DMAC+SMAC+Type域) 14Bytes 和帧尾 (CRC校验 ) 4 Bytes,那么剩下承载上层协议的地方也就是 data 域最大就只能有 1500 Bytes 这个值 我们就把它称之为 MTU。

MSS 是在 MTU 的基础上减去网络层的 IP Header 和传输层的 TCP Header 的部分,这就是 TCP 协议一次可以发送的实际应用数据的最大大小。

3. Nagle 算法

TCP/IP 协议中,无论发送多少数据,总是要在数据(data)前面加上协议头(TCP Header+IP Header),同时,对方接收到数据,也需要发送 ACK 表示确认。

即使从键盘输入的一个字符,占用一个字节,可能在传输上造成 41 字节的包,其中包括 1 字节的有用信息和 40 字节的首部数据。这种情况转变成了 4000% 的消耗,这样的情况对于重负载的网络来是无法接受的。称之为"糊涂窗口综合征"。

为了尽可能的利用网络带宽,TCP 总是希望尽可能的发送足够大的数据。(一个连接会设置 MSS 参数,因此,TCP/IP 希望每次都能够以 MSS 尺寸的数据块来发送数据)。Nagle 算法就是为了尽可能发送大块数据,避免网络中充斥着许多小数据块。

Nagle 算法的基本定义是任意时刻,最多只能有一个未被确认的小段。 所谓 “小段”,指的是小于 MSS 尺寸的数据块;所谓“未被确认”,是指一个数据块发送出去后,没有收到对方发送的 ACK 确认该数据已收到。

Nagle 算法的规则:

  1. 如果 SO_SNDBUF 中的数据长度达到 MSS,则允许发送;
  2. 如果该 SO_SNDBUF 中含有 FIN,表示请求关闭连接,则先将 SO_SNDBUF 中的剩余数据发送,再关闭;
  3. 设置了 TCP_NODELAY=true 选项,则允许发送。TCP_NODELAY 是取消 TCP 的确认延迟机制,相当于禁用了 Negale 算法。正常情况下,当 Server 端收到数据之后,它并不会马上向 client 端发送 ACK,而是会将 ACK 的发送延迟一段时间(一般是 40ms),它希望在 t 时间内 server 端会向 client 端发送应答数据,这样 ACK 就能够和应答数据一起发送,就像是应答数据捎带着 ACK 过去。当然,TCP 确认延迟 40ms 并不是一直不变的, TCP 连接的延迟确认时间一般初始化为最小值 40ms,随后根据连接的重传超时时间(RTO)、上次收到数据包与本次接收数据包的时间间隔等参数进行不断调整。另外可以通过设置 TCP_QUICKACK 选项来取消确认延迟;
  4. 未设置 TCP_CORK 选项时,若所有发出去的小数据包(包长度小于MSS)均被确认,则允许发送;
  5. 上述条件都未满足,但发生了超时(一般为200ms),则立即发送。

基于以上问题,TCP层肯定是会出现当次接收到的数据是不完整数据的情况。出现粘包可能的原因有:

  1. 发送方每次写入数据 < 套接字缓冲区大小;
  2. 接收方读取套接字缓冲区数据不够及时。

出现半包的可能原因有:

  1. 发送方每次写入数据 > 套接字缓冲区大小;
  2. 发送的数据大于协议 MTU,所以必须要拆包。

解决问题肯定不是在4层来做而是在应用层,通过定义通信协议来解决粘包和拆包的问题。发送方 和 接收方约定某个规则:

  1. 当发生粘包的时候通过某种约定来拆包;
  2. 如果在拆包,通过某种约定来将数据组成一个完整的包处理。

二、业界常用解决方案

1. 定长协议

指定一个报文具有固定长度。比如约定一个报文的长度是 5 字节,那么:

报文:1234,只有4字节,但是还差一个怎么办呢,不足部分用空格补齐。就变为:1234 。

如果不补齐空格,那么就会读到下一个报文的字节来填充上一个报文直到补齐为止,这样粘包了。

定长协议的优点是使用简单,缺点很明显:浪费带宽。

Netty 中提供了 FixedLengthFrameDecoder ,支持把固定的长度的字节数当做一个完整的消息进行解码。

2. 特殊字符分割协议

很好理解,在每一个你认为是一个完整的包的尾部添加指定的特殊字符,比如:\n,\r等等。

需要注意的是:约定的特殊字符要保证唯一性,不能出现在报文的正文中,否则就将正文一分为二了。

Netty 中提供了 DelimiterBasedFrameDecoder 根据特殊字符进行解码,LineBasedFrameDecoder默认以换行符作为分隔符。

3. 变长协议

变长协议的核心就是:将消息分为消息头和消息体,消息头中标识当前完整的消息体长度。

  1. 发送方在发送数据之前先获取数据的二进制字节大小,然后在消息体前面添加消息大小;
  2. 接收方在解析消息时先获取消息大小,之后必须读到该大小的字节数才认为是完整的消息。

Netty 中提供了 LengthFieldBasedFrameDecoder ,通过LengthFieldPrepender 来给实际的消息体添加 length 字段。

三、Netty粘包和拆包解决方案

为了解决网络数据流的拆包粘包问题,Netty 为我们内置了如下的解码器

  • FixedLengthFrameDecoder(使用定长的报文来分包)
  • DelimiterBasedFrameDecoder(添加特殊分隔符报文来分包)
  • LineBasedFrameDecoder(数据未尾添加回车换行符来分包)
  • LengthFieldBasedFrameDecoder(使用消息头和消息体来分包)
  • ByteToMessageDecoder(如果想实现自己的半包解码器,实现该类)
  • MessageToMessageDecoder(一般作为二次解码器,当我们在 ByteToMessageDecoder 将一个 bytes 数组转换成一个 java 对象的时候,我们可能还需要将这个对象进行二次解码成其他对象,我们就可以继承这个类)
  • StringDecoder(字符串解码器)
  • ProtoBufVarint32FrameDecoder(通过 Protobuf 解码器来区分整包消息)
  • ProtobufDecoder(Protobuf 解码器)

Netty 还内置了如下的编码器

  • MessageToByteEncoder(将 Java 对象编码成 ByteBuf)
  • MessageToMessageEncoder(如果不想将 Java 对象编码成 ByteBuf,而是自定义类就继承这个)
  • LengthFieldPrepender(如果我们在发送消息的时候采用:消息长度字段+原始消息的形式,我们就可以使用 LengthFieldPrepender。因为 LengthFieldPrepender 可以将待发送消息的长度(二进制字节长度)写到 ByteBuf 的前两个字节)
  • ProtobufVarint32LengthFieldPrepender(Protobuf 编码器,在原来的数据前面,追加一个长度)
  • ProtobufEncoder(Protobuf 编码器)

LineBasedFrameDecoder 使用

LineBasedFrameDecoder 通过在包尾添加回车换行符\r\n 来区分整包消息

复制代码
@Override
protected void initChannel(SocketChannel ch) throws Exception {
    ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
    // 将分隔之后的字节数据转换为字符串数据
    ch.pipeline().addLast(new StringDecoder());
    // 这是我们自定义的一个编码器,主要作用是在返回的响应数据最后添加分隔符
    ch.pipeline().addLast(new LineBasedFrameEncoder());
    // 最终处理数据并且返回响应的handler
    ch.pipeline().addLast(new EchoServerHandler());
}

FixedLengthFrameDecoder解码器

对于使用固定长度的粘包和拆包场景,可以使用FixedLengthFrameDecoder,该解码器会每次读取固定长度的消息,如果当前读取到的消息不足指定长度,那么就会等待下一个消息到达后进行补足

public class EchoServer {
  public void bind(int port) throws InterruptedException {
    EventLoopGroup bossGroup = new NioEventLoopGroup();
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
      ServerBootstrap bootstrap = new ServerBootstrap();
      bootstrap.group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .option(ChannelOption.SO_BACKLOG, 1024)
        .handler(new LoggingHandler(LogLevel.INFO))
        .childHandler(new ChannelInitializer<SocketChannel>() {
          @Override
          protected void initChannel(SocketChannel ch) throws Exception {
            // 这里将FixedLengthFrameDecoder添加到pipeline中,指定长度为20
            ch.pipeline().addLast(new FixedLengthFrameDecoder(20));
            // 将前一步解码得到的数据转码为字符串
            ch.pipeline().addLast(new StringDecoder());
            // 这里FixedLengthFrameEncoder是我们自定义的,用于将长度不足20的消息进行补全空格
            ch.pipeline().addLast(new FixedLengthFrameEncoder(20));
            // 最终的数据处理
            ch.pipeline().addLast(new EchoServerHandler());
          }
        });
      ChannelFuture future = bootstrap.bind(port).sync();
      future.channel().closeFuture().sync();
    } finally {
      bossGroup.shutdownGracefully();
      workerGroup.shutdownGracefully();
    }
  }

DelimiterBasedFrameDecoder解码器

DelimiterBasedFrameDecoder是分隔符解码器,用户可以指定消息结束的分隔符,它可以自动完成以分隔符作为码流结束标识的消息的解码。

@Override
protected void initChannel(SocketChannel ch) throws Exception {
    String delimiter = "_$";
    // 将delimiter设置到DelimiterBasedFrameDecoder中,经过该解码一器进行处理之后,源数据将会
    // 被按照_$进行分隔,这里1024指的是分隔的最大长度,即当读取到1024个字节的数据之后,若还是未
    // 读取到分隔符,则舍弃当前数据段,因为其很有可能是由于码流紊乱造成的
    ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, Unpooled.wrappedBuffer(delimiter.getBytes())));
    // 将分隔之后的字节数据转换为字符串数据
    ch.pipeline().addLast(new StringDecoder());
    // 这是我们自定义的一个编码器,主要作用是在返回的响应数据最后添加分隔符
    ch.pipeline().addLast(new DelimiterBasedFrameEncoder(delimiter));
    // 最终处理数据并且返回响应的handler
    ch.pipeline().addLast(new EchoServerHandler());
}

LengthFieldBasedFrameDecoder 使用

LengthFieldBasedFrameDecoder相对就高端一点。前面我们使用到的拆包都是基于一些约定来做的,比如固定长度,特殊分隔符,这些方案总是有一定的弊端。最好的方案就是:发送方告诉我当前消息总长度,接收方如果没有收到该长度大小的数据就认为是没有收完继续等待。

* @param maxFrameLength 帧的最大长度
     *        
     * @param lengthFieldOffset 长度字段偏移的地址
     *        
     * @param lengthFieldLength 长度字段所占的字节长
     *        修改帧数据长度字段中定义的值,可以为负数 因为有时候我们习惯把头部记入长度,
     *        若为负数,则说明要推后多少个字段
     * @param lengthAdjustment 解析时候跳过多少个长度
     *
     * @param initialBytesToStrip 解码出一个数据包之后,去掉开头的字节数
     *        
     * @param initialBytesToStrip  为true,当frame长度超过maxFrameLength时立即报
     *                   TooLongFrameException异常,为false,读取完整个帧再报异
     *        
     */
public LengthFieldBasedFrameDecoder(
  int maxFrameLength,
  int lengthFieldOffset, int lengthFieldLength,
  int lengthAdjustment, int initialBytesToStrip) {
  this(
    maxFrameLength,
    lengthFieldOffset, lengthFieldLength, lengthAdjustment,
    initialBytesToStrip, true);
}
public class EchoServer {
  public void bind(int port) throws InterruptedException {
    EventLoopGroup bossGroup = new NioEventLoopGroup();
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
      ServerBootstrap bootstrap = new ServerBootstrap();
      bootstrap.group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .option(ChannelOption.SO_BACKLOG, 1024)
        .handler(new LoggingHandler(LogLevel.INFO))
        .childHandler(new ChannelInitializer<SocketChannel>() {
          @Override
          protected void initChannel(SocketChannel ch) throws Exception {
            // 这里将LengthFieldBasedFrameDecoder添加到pipeline的首位,因为其需要对接收到的数据
            // 进行长度字段解码,这里也会对数据进行粘包和拆包处理
            ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));
            // LengthFieldPrepender是一个编码器,主要是在响应字节数据前面添加字节长度字段
            ch.pipeline().addLast(new LengthFieldPrepender(2));
            // 对经过粘包和拆包处理之后的数据进行json反序列化,从而得到User对象
            ch.pipeline().addLast(new JsonDecoder());
            // 对响应数据进行编码,主要是将User对象序列化为json
            ch.pipeline().addLast(new JsonEncoder());
            // 处理客户端的请求的数据,并且进行响应
            ch.pipeline().addLast(new EchoServerHandler());
          }
        });
      ChannelFuture future = bootstrap.bind(port).sync();
      future.channel().closeFuture().sync();
    } finally {
      bossGroup.shutdownGracefully();
      workerGroup.shutdownGracefully();
    }
  }
  public static void main(String[] args) throws InterruptedException {
    new EchoServer().bind(8080);
  }
}




📢文章下方有交流学习区!一起学习进步!也可以前往官网,加入官方微信交流群💪💪💪

📢创作不易,如果觉得文章不错,可以点赞👍收藏📁评论📒

📢你的支持和鼓励是我创作的动力❗❗❗  

官网Doker 多克;官方旗舰店官方旗舰店  全品优惠

目录
相关文章
|
4月前
|
编解码 缓存 移动开发
TCP粘包/拆包与Netty解决方案
TCP粘包/拆包与Netty解决方案
46 0
|
7月前
|
移动开发 网络协议 算法
由浅入深Netty粘包与半包解决方案
由浅入深Netty粘包与半包解决方案
52 0
|
19天前
|
网络协议 Java 物联网
Spring Boot与Netty打造TCP服务端(解决粘包问题)
Spring Boot与Netty打造TCP服务端(解决粘包问题)
27 1
|
3月前
|
编解码
Netty Review - 优化Netty通信:如何应对粘包和拆包挑战_自定义长度分包编解码码器
Netty Review - 优化Netty通信:如何应对粘包和拆包挑战_自定义长度分包编解码码器
48 0
|
3月前
|
网络协议
Netty Review - 优化Netty通信:如何应对粘包和拆包挑战
Netty Review - 优化Netty通信:如何应对粘包和拆包挑战
53 0
|
4月前
|
存储 网络协议 算法
Netty使用篇:半包&粘包
Netty使用篇:半包&粘包
|
4月前
|
消息中间件 Oracle Dubbo
Netty 源码共读(一)如何阅读JDK下sun包的源码
Netty 源码共读(一)如何阅读JDK下sun包的源码
43 1
|
8月前
|
Nacos
Netty自定义消息协议的实现逻辑处理粘包拆包、心跳机制
Netty自定义消息协议的实现逻辑处理粘包拆包、心跳机制
107 0
|
7月前
|
监控 Java Linux
由浅入深Netty基础知识NIO网络编程1
由浅入深Netty基础知识NIO网络编程
40 0
|
7月前
|
缓存 安全 Java
由浅入深Netty基础知识NIO三大组件原理实战 2
由浅入深Netty基础知识NIO三大组件原理实战
48 0