Netty Review - 优化Netty通信:如何应对粘包和拆包挑战

简介: Netty Review - 优化Netty通信:如何应对粘包和拆包挑战


概述


Pre

Netty Review - 借助SimpleTalkRoom初体验异步网络编程的魅力


概述

粘包和拆包是在计算机网络通信中常见的问题,特别是在使用基于流的传输协议(如TCP)时。这两个问题涉及到数据在传输过程中的组织和解析。

  1. 粘包(Packet Concatenation):
  • 定义: 粘包指的是发送方发送的多个小数据包在接收方看来被组合成一个大的数据包。
  • 原因: 发送方连续发送的数据可能在网络中被合并成一个数据流,导致接收方无法准确分辨每个数据包的边界。
  • 可能的解决方案: 使用特殊的分隔符标记数据包的边界,或者在数据包中包含长度信息。
  1. 拆包(Packet Fragmentation):
  • 定义: 拆包是指接收方接收到的数据包过大,被拆分成多个较小的数据包。
  • 原因: 数据包在传输过程中可能被分割,到达接收方时需要重新组装。
  • 可能的解决方案: 在数据包中包含长度信息,或者使用特殊的标记表示数据包的边界。

在处理粘包和拆包问题时,通信双方需要协调一致,以确保数据的正确性和完整性。使用合适的协议和通信模式,以及采用适当的分隔符或长度字段,有助于减轻或解决这些问题。


TCP是一个流协议,就是没有界限的一长串二进制数据。TCP作为传输层协议并不不了解上层业务数据的具体含义,它会根据TCP缓冲区

的实际情况进行数据包的划分,所以在业务上认为是一个完整的包,可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成

一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。面向流的通信是无消息保护边界的。

如下图所示,client发了两个数据包D1和D2,但是server端可能会收到如下几种情况的数据。

比如


正常情况:


发生了粘包:


发生了拆包:

或者


场景复现

我们的代码还是以 Netty Review - 借助SimpleTalkRoom初体验异步网络编程的魅力中的代码为基础,演示一下粘包拆包


启动Server 和 Client ()

【TalkRoomClient2】发送10条消息

package com.artisan.pack;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class TalkRoomClient2 {
    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            // 分隔符解码器 (用于测试 按照 _ 分隔符 拆包)
                            //pipeline.addLast(new DelimiterBasedFrameDecoder(10240, Unpooled.copiedBuffer("_".getBytes())));
                            //向pipeline加入解码器
                            pipeline.addLast("decoder", new StringDecoder());
                            //向pipeline加入编码器
                            pipeline.addLast("encoder", new StringEncoder());
                            //加入自己的业务处理handler
                            pipeline.addLast(new TalkRoomClientHandler());
                        }
                    });
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 1234).sync();
            //得到 channel
            Channel channel = channelFuture.channel();
            System.out.println("========" + channel.localAddress() + "========");
            // 模拟 拆包粘包
            for (int i = 0; i < 10; i++) {
                channel.writeAndFlush("小工匠123");
            }
            // 阻塞操作,closeFuture()开启了一个channel的监听器(这期间channel在进行各项工作),直到链路断开
            channelFuture.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}

【TalkRoomClient】接收 Client2 ---- Server — 自己的消息

package com.artisan.pack;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.util.Scanner;
/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class TalkRoomClient {
    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            // 分隔符解码器 (用于测试 按照 _ 分隔符 拆包)
                            //pipeline.addLast(new DelimiterBasedFrameDecoder(10240, Unpooled.copiedBuffer("_".getBytes())));
                            //向pipeline加入解码器
                            pipeline.addLast("decoder", new StringDecoder());
                            //向pipeline加入编码器
                            pipeline.addLast("encoder", new StringEncoder());
                            //加入自己的业务处理handler
                            pipeline.addLast(new TalkRoomClientHandler());
                        }
                    });
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 1234).sync();
            //得到 channel
            Channel channel = channelFuture.channel();
            System.out.println("========" + channel.localAddress() + "========");
            //客户端需要输入信息, 创建一个扫描器
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNextLine()) {
                String msg = scanner.nextLine();
                //通过 channel 发送到服务器端
                channel.writeAndFlush(msg);
            }
            // 阻塞操作,closeFuture()开启了一个channel的监听器(这期间channel在进行各项工作),直到链路断开
            channelFuture.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}

【测试】

出现了粘包和拆包的现象



解决办法概览

1)消息定长度,传输的数据大小固定长度,例如每段的长度固定为100字节,如果不够空位补空格

2)在数据包尾部添加特殊分隔符,比如下划线,中划线等,这种方法简单易行,但选择分隔符的时候一定要注意每条数据的内部一定不

能出现分隔符。

3)发送长度:发送每条数据的时候,将数据的长度一并发送,比如可以选择每条数据的前4位是数据的长度,应用层处理时可以根据长度

来判断每条数据的开始和结束。 (推荐方案)

Netty提供了多个解码器,可以进行分包的操作,如下:

  • LineBasedFrameDecoder (回车换行分包)
  • DelimiterBasedFrameDecoder(特殊分隔符分包)
  • FixedLengthFrameDecoder(固定长度报文来分包)

我们先使用第二种方案来描述一下

方式一: 特殊分隔符分包 (演示Netty提供的众多方案中的一种)

我们来看下如何改造?

【TalkRoomServer 】

重点关注的地方是DelimiterBasedFrameDecoder,这是一个基于分隔符的帧解码器,用于处理客户端发送的按照特定分隔符(在这里是下划线_)分割的数据包。

package com.artisan.pack;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class TalkRoomServer {
    public static void main(String[] args) throws InterruptedException {
        // 创建主事件循环组,用于接受进来的连接
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        // 创建工作事件循环组,用于处理已接受连接的IO操作
        EventLoopGroup workerGroup = new NioEventLoopGroup(8);
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            // 配置服务器
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class) // 使用NioServerSocketChannel接受进来的连接
                    .option(ChannelOption.SO_BACKLOG, 1024) // 设置连接队列大小
                    .childHandler(new ChannelInitializer<SocketChannel>() { // 配置子通道初始化器
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            // 分隔符解码器,按照下划线拆包
                            pipeline.addLast(new DelimiterBasedFrameDecoder(1024, Unpooled.copiedBuffer(Delimiter.SPLIT.getBytes())));
                            // 添加字符串解码器
                             pipeline.addLast("decoder", new StringDecoder());
                            // 添加字符串编码器
                            pipeline.addLast("encoder", new StringEncoder());
                            // 添加自定义的业务处理handler
                            pipeline.addLast(new TalkRoomServerHandler());
                        }
                    });
            // 绑定端口并同步等待成功,然后返回ChannelFuture对象
            ChannelFuture channelFuture = bootstrap.bind(1234).sync();
            // 打印服务器启动成功信息
            System.out.println("Talk Room Server启动成功,监听1234端口");
            // 等待服务器socket关闭
            channelFuture.channel().closeFuture().sync();
        } finally {
            // 释放资源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

initChannel方法中,DelimiterBasedFrameDecoder被加入到管道中。它用于接收按分隔符(这里是下划线_)分割的数据包,并把这些数据包转换成一个个的Frame对象,这样就可以在后续的处理器中逐个处理这些数据包了。这种方式的优点是可以有效处理大量且不定长度的数据包,而不需要担心数据包过大导致内存溢出的问题。


【TalkRoomServerHandler】

因为我们队数据进行了加工转发,所以加工后的消息,也得按照DelimiterBasedFrameDecoder的处理规则增加 “_”


同样的Client的Pipeline中别忘了增加解码器


启动Server和Client ,我们来测试下


附上其他的代码

package com.artisan.pack;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class TalkRoomClient2 {
    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            // 分隔符解码器 (用于测试 按照 _ 分隔符 拆包)
                            pipeline.addLast(new DelimiterBasedFrameDecoder(10240, Unpooled.copiedBuffer(Delimiter.SPLIT.getBytes())));
                            //向pipeline加入解码器
                            pipeline.addLast("decoder", new StringDecoder());
                            //向pipeline加入编码器
                            pipeline.addLast("encoder", new StringEncoder());
                            //加入自己的业务处理handler
                            pipeline.addLast(new TalkRoomClientHandler());
                        }
                    });
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 1234).sync();
            //得到 channel
            Channel channel = channelFuture.channel();
            System.out.println("========" + channel.localAddress() + "========");
            // 模拟 拆包粘包
            for (int i = 0; i < 10; i++) {
                channel.writeAndFlush("小工匠123" + Delimiter.SPLIT);
            }
        } finally {
            group.shutdownGracefully();
        }
    }
}
package com.artisan.pack;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.util.Scanner;
/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class TalkRoomClient {
    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            // 分隔符解码器 (用于测试 按照 _ 分隔符 拆包)
                            pipeline.addLast(new DelimiterBasedFrameDecoder(10240, Unpooled.copiedBuffer(Delimiter.SPLIT.getBytes())));
                            //向pipeline加入解码器
                            pipeline.addLast("decoder", new StringDecoder());
                            //向pipeline加入编码器
                            pipeline.addLast("encoder", new StringEncoder());
                            //加入自己的业务处理handler
                            pipeline.addLast(new TalkRoomClientHandler());
                        }
                    });
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 1234).sync();
            //得到 channel
            Channel channel = channelFuture.channel();
            System.out.println("========" + channel.localAddress() + "========");
            //客户端需要输入信息, 创建一个扫描器
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNextLine()) {
                String msg = scanner.nextLine();
                //通过 channel 发送到服务器端
                channel.writeAndFlush(msg);
            }
            // 阻塞操作,closeFuture()开启了一个channel的监听器(这期间channel在进行各项工作),直到链路断开
            channelFuture.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}
package com.artisan.pack;
/**
 * @author artisan
 */
public interface Delimiter {
   String  SPLIT = "_" ;
}

流程分析


方式二: 发送长度(推荐)

TODO


DelimiterBasedFrameDecoder 源码分析

A decoder that splits the received ByteBufs by one or more delimiters. It is particularly useful for decoding the frames which ends with a delimiter such as NUL or newline characters.
Predefined delimiters
Delimiters defines frequently used delimiters for convenience' sake.
Specifying more than one delimiter
DelimiterBasedFrameDecoder allows you to specify more than one delimiter. If more than one delimiter is found in the buffer, it chooses the delimiter which produces the shortest frame. For example, if you have the following data in the buffer:
  +--------------+
  | ABC\nDEF\r\n |
  +--------------+
a DelimiterBasedFrameDecoder(Delimiters.lineDelimiter()) will choose '\n' as the first delimiter and produce two frames:
  +-----+-----+
  | ABC | DEF |
  +-----+-----+
rather than incorrectly choosing '\r\n' as the first delimiter:
  +----------+
  | ABC\nDEF |
  +----------+
/**
     * 从{@link ByteBuf}中创建一个帧并返回。
     *
     * @param   ctx             此{@link ByteToMessageDecoder}所属的{@link ChannelHandlerContext}
     * @param   buffer          要从中读取数据的{@link ByteBuf}
     * @return  frame           表示帧的{@link ByteBuf},如果没有帧可创建,则返回{@code null}
     */
    protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
        if (lineBasedDecoder != null) {
            // 如果设置了行解码器,则使用行解码器进行解码
            return lineBasedDecoder.decode(ctx, buffer);
        }
        // 尝试所有的分隔符,并选择产生最短帧的分隔符
        int minFrameLength = Integer.MAX_VALUE;
        ByteBuf minDelim = null;
        for (ByteBuf delim: delimiters) {
            int frameLength = indexOf(buffer, delim);
            if (frameLength >= 0 && frameLength < minFrameLength) {
                minFrameLength = frameLength;
                minDelim = delim;
            }
        }
        if (minDelim != null) {
            int minDelimLength = minDelim.capacity();
            ByteBuf frame;
            if (discardingTooLongFrame) {
                // 刚刚丢弃了一个非常大的帧
                // 回到初始状态
                discardingTooLongFrame = false;
                buffer.skipBytes(minFrameLength + minDelimLength);
                int tooLongFrameLength = this.tooLongFrameLength;
                this.tooLongFrameLength = 0;
                if (!failFast) {
                    fail(tooLongFrameLength);
                }
                return null;
            }
            if (minFrameLength > maxFrameLength) {
                // 丢弃读取的帧
                buffer.skipBytes(minFrameLength + minDelimLength);
                fail(minFrameLength);
                return null;
            }
            if (stripDelimiter) {
                // 如果需要去除分隔符,则从buffer中读取帧
                frame = buffer.readRetainedSlice(minFrameLength);
                buffer.skipBytes(minDelimLength);
            } else {
                // 否则,直接从buffer中读取包含分隔符的帧
                frame = buffer.readRetainedSlice(minFrameLength + minDelimLength);
            }
            return frame;
        } else {
            if (!discardingTooLongFrame) {
                if (buffer.readableBytes() > maxFrameLength) {
                    // 丢弃buffer中的内容,直到找到分隔符
                    tooLongFrameLength = buffer.readableBytes();
                    buffer.skipBytes(buffer.readableBytes());
                    discardingTooLongFrame = true;
                    if (failFast) {
                        fail(tooLongFrameLength);
                    }
                }
            } else {
                // 由于没有找到分隔符,仍在丢弃buffer
                tooLongFrameLength += buffer.readableBytes();
                buffer.skipBytes(buffer.readableBytes());
            }
            return null;
        }
    }

这段代码是DelimiterBasedFrameDecoder类的decode方法的实现。这个方法的主要作用是根据指定的分隔符将输入的ByteBuf对象中的数据分割成一个个的帧。

首先,我们来看一下方法的定义:

protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
    // ...
}

decode方法接收两个参数:

  1. ctx:解码器所在的ChannelHandlerContext对象。
  2. buffer:待解码的ByteBuf对象。

接下来,我们逐行解析代码并添加中文注释:

if (lineBasedDecoder != null) {
    return lineBasedDecoder.decode(ctx, buffer);
}

如果存在行基于的解码器,则使用该解码器进行解码。

int minFrameLength = Integer.MAX_VALUE;
ByteBuf minDelim = null;
for (ByteBuf delim: delimiters) {
    int frameLength = indexOf(buffer, delim);
    if (frameLength >= 0 && frameLength < minFrameLength) {
        minFrameLength = frameLength;
        minDelim = delim;
    }
}

遍历所有的分隔符,并找到能产生最短帧的分隔符。

if (minDelim != null) {
    // ...
} else {
    // ...
}

如果找到了分隔符,则根据分隔符分割数据;如果没有找到分隔符,则跳过超过最大帧长度的数据。

if (discardingTooLongFrame) {
    // ...
} else {
    // ...
}

如果正在丢弃过长的帧,则回到初始状态;否则,检查当前帧长度是否超过最大帧长度。

if (stripDelimiter) {
    frame = buffer.readRetainedSlice(minFrameLength);
    buffer.skipBytes(minDelimLength);
} else {
    frame = buffer.readRetainedSlice(minFrameLength + minDelimLength);
}

根据stripDelimiter的值来确定是否需要去除分隔符。

return frame;

返回分割后的帧。

if (!discardingTooLongFrame) {
    // ...
} else {
    // ...
}

如果不在丢弃过长的帧,则检查缓冲区中可读字节数是否超过最大帧长度;否则,继续丢弃缓冲区中的数据。

tooLongFrameLength += buffer.readableBytes();
buffer.skipBytes(buffer.readableBytes());

累加过长的帧长度,并跳过过长的数据。

return null;

如果没有找到分隔符,则返回null

通过以上代码,DelimiterBasedFrameDecoder可以根据指定的分隔符将输入的ByteBuf对象中的数据分割成一个个的帧。这样,就可以在后续的处理器中逐个处理这些帧了。


相关文章
|
4月前
|
编解码 网络协议 开发者
Netty运行原理问题之NettyTCP的粘包和拆包的问题如何解决
Netty运行原理问题之NettyTCP的粘包和拆包的问题如何解决
|
4月前
|
编解码 监控 网络协议
Netty优化
Netty优化
68 3
|
4月前
|
前端开发 网络协议
Netty实战巅峰:从零构建高性能IM即时通讯系统,解锁并发通信新境界
【8月更文挑战第3天】Netty是一款高性能、异步事件驱动的网络框架,适用于开发高并发网络应用,如即时通讯(IM)系统。本文将指导你利用Netty从零构建高性能IM程序,介绍Netty基础及服务器/客户端设计。服务器端使用`ServerBootstrap`启动,客户端通过`Bootstrap`连接服务器。示例展示了简单的服务器启动过程。通过深入学习,可进一步实现用户认证等功能,打造出更完善的IM系统。
188 1
|
4月前
|
移动开发 网络协议 算法
(十)Netty进阶篇:漫谈网络粘包、半包问题、解码器与长连接、心跳机制实战
在前面关于《Netty入门篇》的文章中,咱们已经初步对Netty这个著名的网络框架有了认知,本章的目的则是承接上文,再对Netty中的一些进阶知识进行阐述,毕竟前面的内容中,仅阐述了一些Netty的核心组件,想要真正掌握Netty框架,对于它我们应该具备更为全面的认知。
241 2
|
6月前
|
网络协议
netty粘包问题分析
netty粘包问题分析
47 0
|
6月前
|
Java
Netty传输object并解决粘包拆包问题
Netty传输object并解决粘包拆包问题
56 0
|
存储 缓存 NoSQL
跟着源码学IM(十一):一套基于Netty的分布式高可用IM详细设计与实现(有源码)
本文将要分享的是如何从零实现一套基于Netty框架的分布式高可用IM系统,它将支持长连接网关管理、单聊、群聊、聊天记录查询、离线消息存储、消息推送、心跳、分布式唯一ID、红包、消息同步等功能,并且还支持集群部署。
13521 1
|
7月前
|
消息中间件 Oracle Dubbo
Netty 源码共读(一)如何阅读JDK下sun包的源码
Netty 源码共读(一)如何阅读JDK下sun包的源码
140 1
|
NoSQL Java Redis
跟着源码学IM(十二):基于Netty打造一款高性能的IM即时通讯程序
关于Netty网络框架的内容,前面已经讲了两个章节,但总归来说难以真正掌握,毕竟只是对其中一个个组件进行讲解,很难让诸位将其串起来形成一条线,所以本章中则会结合实战案例,对Netty进行更深层次的学习与掌握,实战案例也并不难,一个非常朴素的IM聊天程序。 原本打算做个多人斗地主练习程序,但那需要织入过多的业务逻辑,因此一方面会带来不必要的理解难度,让案例更为复杂化,另一方面代码量也会偏多,所以最终依旧选择实现基本的IM聊天程序,既简单,又能加深对Netty的理解。
175 1
|
7月前
|
编解码 前端开发 网络协议
Netty Review - ObjectEncoder对象和ObjectDecoder对象解码器的使用与源码解读
Netty Review - ObjectEncoder对象和ObjectDecoder对象解码器的使用与源码解读
170 0