Netty中粘包拆包问题解决探讨

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: Netty中粘包拆包问题解决探讨

⭐️ 前言

开发的小伙伴们对于Netty并不陌生,本文就Netty粘包拆包问题及其解决方案做一个介绍,希望能对大家有所帮助。

⭐️ 什么是粘包拆包问题

我们知道,传统的IO是面向流的,而Netty(它的底层是Java NIO)是面向Buffer的。所以当发送很多体量比较小的消息时,消息会堆积在Buffer(例如send buffer)中,触发Flush后才会真正在网路上传输数据。这种情况下,接收端会接收到很多连在一起的体量较小的消息,这就产生了粘包;另外,当需要发送的消息体量较大,大到超出Buffer的最大容量时,只能先发送消息的一部分,剩下的部分会在稍晚一些发送,这样,一个大体量的消息被拆开了,于是就产生了拆包问题。

下文会演示粘包现象,并探讨粘包拆包的解决方案。

⭐️ netty handler 执行顺序

在演示之前,我们先来看看netty handler 的执行顺序,handler可以通过ChannelPipeline的addLast方法按顺序添加。总体来说,接收消息会沿着handler链路从前往后寻找InboundHandler依次处理;发送消息时,会沿着handler链路从后往前寻找OutboundHandler依次处理,若在handler链路中间的某个InboundHandler发送数据,则分两种情况:

1、调用ctx.writeAndFlush,从当前handler沿链路向前寻找OutboundHandler依次处理

2、调用ctx.channel().writeAndFlush,从handler链路的tail向前寻找OutboundHandler依次处理

⭐️ 日志设置

日志很重要,在java项目中,日志需要两个组件,日志门面和日志实现,日志门面这里采用slf4j,具体日志实现选择log4j,依赖如下,这里顺便给出netty的依赖。

<dependencies>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.59.Final</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
        </dependency>
    </dependencies>

log4j.properties对log进行相关设置

log4j.rootLogger=DEBUG,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy/MM/dd HH:mm:ss,SSS} [%p]%C{1}-%m%n

⭐️ 粘包演示

这里客户端发送了三条消息,在最后一条消息发送时进行flush操作。

server端代码

public class Server {
        public static void main(String[] args) {
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .childOption(ChannelOption.SO_KEEPALIVE, true)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                ChannelPipeline pipeline = socketChannel.pipeline();
                                pipeline.addLast(new LoggingHandler())
                                        .addLast(new StringDecoder())
                                        .addLast(new ServerTestHandler());
                            }
                        });
                System.out.println("server ready");
                ChannelFuture sync = bootstrap.bind(8888).sync();
                sync.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
}

客户端代码

public class Client {
    public static void main(String[] args) {
        EventLoopGroup workGroup = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        try {
            bootstrap.group(workGroup)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new LoggingHandler())
                                    .addLast(new StringEncoder(CharsetUtil.UTF_8));
                        }
                    });
            System.out.println("client ok");
            ChannelFuture localhost = bootstrap.connect("localhost", 8888).sync();
            // 发送消息
            String msg1 = "hello world";
            localhost.channel().write(msg1);
            String msg2 = "my name is eryx";
            localhost.channel().write(msg2);
            String msg3 = "i am robot";
            localhost.channel().writeAndFlush(msg3);
            localhost.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            workGroup.shutdownGracefully();
        }
    }
}

这里定义一个ServerTestHandler,用以显示客户端发给服务端的数据

public class ServerTestHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("来自client的消息:" + String.valueOf(msg));
    }
}

客户端日志

+-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f 20 77 6f 72 6c 64                |hello world     |
+--------+-------------------------------------------------+----------------+
2023/11/17 12:04:15,780 [DEBUG]AbstractInternalLogger-[id: 0x07a76a5a, L:/127.0.0.1:54667 - R:localhost/127.0.0.1:8888] WRITE: 15B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 6d 79 20 6e 61 6d 65 20 69 73 20 65 72 79 78    |my name is eryx |
+--------+-------------------------------------------------+----------------+
2023/11/17 12:04:15,780 [DEBUG]AbstractInternalLogger-[id: 0x07a76a5a, L:/127.0.0.1:54667 - R:localhost/127.0.0.1:8888] WRITE: 10B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 69 20 61 6d 20 72 6f 62 6f 74                   |i am robot      |
+--------+-------------------------------------------------+----------------+
2023/11/17 12:04:15,780 [DEBUG]AbstractInternalLogger-[id: 0x07a76a5a, L:/127.0.0.1:54667 - R:localhost/127.0.0.1:8888] FLUSH

服务端日志

+-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f 20 77 6f 72 6c 64 6d 79 20 6e 61 |hello worldmy na|
|00000010| 6d 65 20 69 73 20 65 72 79 78 69 20 61 6d 20 72 |me is eryxi am r|
|00000020| 6f 62 6f 74                                     |obot            |
+--------+-------------------------------------------------+----------------+
来自client的消息:hello worldmy name is eryxi am robot

可以看到,在客户端分3次写入的消息,服务端接收到时,三条消息连接在了一起,发生了粘包问题。

⭐️ 如何解决粘包拆包

为解决粘拆包问题,netty提供了一些解码器,这里介绍两个:

1、FixedLengthFrameDecoder 固定长度帧解码器

2、LengthFieldBasedFrameDecoder 以长度字段为基础的解码器

FixedLengthFrameDecoder

FixedLengthFrameDecoder对于固定长度的消息很方便,我们对server做一些调整,这里客户端发送的虽然不是固定长度消息,但可以更清洗的理解FixedLengthFrameDecoder的效果。

server端代码

public class Server {
        public static void main(String[] args) {
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .childOption(ChannelOption.SO_KEEPALIVE, true)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                ChannelPipeline pipeline = socketChannel.pipeline();
                                pipeline.addLast(new LoggingHandler())
                                        // 以5为帧的固定长度
                                        .addLast(new FixedLengthFrameDecoder(5))
                                        .addLast(new StringDecoder())
                                        .addLast(new ServerTestHandler());
                            }
                        });
                System.out.println("server ready");
                ChannelFuture sync = bootstrap.bind(8888).sync();
                sync.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
}

客户端的代码和日志均与上一小节相同,而server端的日志输出如下:

+-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f 20 77 6f 72 6c 64 6d 79 20 6e 61 |hello worldmy na|
|00000010| 6d 65 20 69 73 20 65 72 79 78 69 20 61 6d 20 72 |me is eryxi am r|
|00000020| 6f 62 6f 74                                     |obot            |
+--------+-------------------------------------------------+----------------+
来自client的消息:hello
来自client的消息: worl
来自client的消息:dmy n
来自client的消息:ame i
来自client的消息:s ery
来自client的消息:xi am
来自client的消息: robo

可见,server端解析的消息,每一条都有5个字符。

LengthFieldBasedFrameDecoder

LengthFieldBasedFrameDecoder包含一些参数,说明如下:

(1) maxFrameLength:发送的数据包最大长度

(2) lengthFieldOffset:长度域偏移量,指的是长度域位于整个数据包字节数组中的下标

(3) lengthFieldLength:长度域的字节数长度

(4) lengthAdjustment:长度域的偏移量矫正

(5) initialBytesToStrip:丢弃的起始字节数

在只关注消息本身和其长度的情况下,LengthFieldBasedFrameDecoder可以和LengthFieldPrepender配合使用,LengthFieldPrepender可以指定一个参数,即长度域的字节数长度。

server端代码

public class Server {
        public static void main(String[] args) {
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .childOption(ChannelOption.SO_KEEPALIVE, true)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                ChannelPipeline pipeline = socketChannel.pipeline();
                                pipeline.addLast(new LoggingHandler())
                                        .addLast(new LengthFieldBasedFrameDecoder(1024,0, 4, 0, 4))
                                        .addLast(new StringDecoder())
                                        .addLast(new ServerTestHandler());
                            }
                        });
                System.out.println("server ready");
                ChannelFuture sync = bootstrap.bind(8888).sync();
                sync.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
}

client端代码

public class Client {
    public static void main(String[] args) {
        EventLoopGroup workGroup = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        try {
            bootstrap.group(workGroup)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new LoggingHandler())
                                    .addLast(new LengthFieldPrepender(4))
                                    .addLast(new StringEncoder(CharsetUtil.UTF_8));
                        }
                    });
            System.out.println("client ok");
            ChannelFuture localhost = bootstrap.connect("localhost", 8888).sync();
            // 发送消息
            String msg1 = "hello world";
            localhost.channel().write(msg1);
            String msg2 = "my name is eryx";
            localhost.channel().write(msg2);
            String msg3 = "i am robot";
            localhost.channel().writeAndFlush(msg3);
            localhost.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            workGroup.shutdownGracefully();
        }
    }
}

客户端日志

+-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 0b                                     |....            |
+--------+-------------------------------------------------+----------------+
2023/11/17 15:16:19,338 [DEBUG]AbstractInternalLogger-[id: 0x5598fbcb, L:/127.0.0.1:50147 - R:localhost/127.0.0.1:8888] WRITE: 11B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f 20 77 6f 72 6c 64                |hello world     |
+--------+-------------------------------------------------+----------------+
2023/11/17 15:16:19,341 [DEBUG]AbstractInternalLogger-[id: 0x5598fbcb, L:/127.0.0.1:50147 - R:localhost/127.0.0.1:8888] WRITE: 4B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 0f                                     |....            |
+--------+-------------------------------------------------+----------------+
2023/11/17 15:16:19,342 [DEBUG]AbstractInternalLogger-[id: 0x5598fbcb, L:/127.0.0.1:50147 - R:localhost/127.0.0.1:8888] WRITE: 15B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 6d 79 20 6e 61 6d 65 20 69 73 20 65 72 79 78    |my name is eryx |
+--------+-------------------------------------------------+----------------+
2023/11/17 15:16:19,342 [DEBUG]AbstractInternalLogger-[id: 0x5598fbcb, L:/127.0.0.1:50147 - R:localhost/127.0.0.1:8888] WRITE: 4B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 0a                                     |....            |
+--------+-------------------------------------------------+----------------+
2023/11/17 15:16:19,342 [DEBUG]AbstractInternalLogger-[id: 0x5598fbcb, L:/127.0.0.1:50147 - R:localhost/127.0.0.1:8888] WRITE: 10B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 69 20 61 6d 20 72 6f 62 6f 74                   |i am robot      |
+--------+-------------------------------------------------+----------------+
2023/11/17 15:16:19,342 [DEBUG]AbstractInternalLogger-[id: 0x5598fbcb, L:/127.0.0.1:50147 - R:localhost/127.0.0.1:8888] FLUSH

服务端日志

+-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 0b 68 65 6c 6c 6f 20 77 6f 72 6c 64 00 |....hello world.|
|00000010| 00 00 0f 6d 79 20 6e 61 6d 65 20 69 73 20 65 72 |...my name is er|
|00000020| 79 78 00 00 00 0a 69 20 61 6d 20 72 6f 62 6f 74 |yx....i am robot|
+--------+-------------------------------------------------+----------------+
来自client的消息:hello world
来自client的消息:my name is eryx
来自client的消息:i am robot
2023/11/17 15:16:19,383 [DEBUG]AbstractInternalLogger-[id: 0xb4d43375, L:/127.0.0.1:8888 - R:/127.0.0.1:50147] READ COMPLETE

可见,每一天消息的都被正确的解析了,消息的界限明确了,粘包问题解决了!!!

笔者水平有限,若有不对的地方欢迎评论指正!

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
3月前
|
编解码 网络协议 开发者
Netty运行原理问题之NettyTCP的粘包和拆包的问题如何解决
Netty运行原理问题之NettyTCP的粘包和拆包的问题如何解决
|
3月前
|
移动开发 网络协议 算法
(十)Netty进阶篇:漫谈网络粘包、半包问题、解码器与长连接、心跳机制实战
在前面关于《Netty入门篇》的文章中,咱们已经初步对Netty这个著名的网络框架有了认知,本章的目的则是承接上文,再对Netty中的一些进阶知识进行阐述,毕竟前面的内容中,仅阐述了一些Netty的核心组件,想要真正掌握Netty框架,对于它我们应该具备更为全面的认知。
183 2
|
5月前
|
消息中间件 存储 网络协议
拼多多面试:Netty如何解决粘包问题?
粘包和拆包问题也叫做粘包和半包问题,**它是指在数据传输时,接收方未能正常读取到一条完整数据的情况(只读取了部分数据,或多读取到了另一条数据的情况)就叫做粘包或拆包问题。** 从严格意义上来说,粘包问题和拆包问题属于两个不同的问题,接下来我们分别来看。 ## 1.粘包问题 粘包问题是指在网络通信中,发送方连续发送的多个小数据包被接收方一次性接收的现象。这可能是因为底层传输层协议(如 TCP)会将多个小数据包合并成一个大的数据块进行传输,导致接收方在接收数据时一次性接收了多个数据包,造成粘连。 例如以下案例,正常情况下客户端发送了两条消息,分别为“ABC”和“DEF”,那么接收端也应该收到两
38 0
拼多多面试:Netty如何解决粘包问题?
|
5月前
|
网络协议
netty粘包问题分析
netty粘包问题分析
41 0
|
5月前
|
Java
Netty传输object并解决粘包拆包问题
Netty传输object并解决粘包拆包问题
49 0
|
6月前
|
网络协议 Java 物联网
Spring Boot与Netty打造TCP服务端(解决粘包问题)
Spring Boot与Netty打造TCP服务端(解决粘包问题)
990 2
|
缓存 Java 编解码
netty之粘包分包的处理
  1、netty在进行字节数组传输的时候,会出现粘包和分包的情况。当个数据还好,如果数据量很大。并且不间断的发送给服务器,这个时候就会出现粘包和分包的情况。   2、简单来说:channelBuffer在接收包的时候,会在当时进行处理,但是当数据量一大,这个时候数据的分隔就不是很明显了。
1898 0
|
存储 缓存 NoSQL
跟着源码学IM(十一):一套基于Netty的分布式高可用IM详细设计与实现(有源码)
本文将要分享的是如何从零实现一套基于Netty框架的分布式高可用IM系统,它将支持长连接网关管理、单聊、群聊、聊天记录查询、离线消息存储、消息推送、心跳、分布式唯一ID、红包、消息同步等功能,并且还支持集群部署。
13496 1
|
6月前
|
消息中间件 Oracle Dubbo
Netty 源码共读(一)如何阅读JDK下sun包的源码
Netty 源码共读(一)如何阅读JDK下sun包的源码
127 1
|
11月前
|
NoSQL Java Redis
跟着源码学IM(十二):基于Netty打造一款高性能的IM即时通讯程序
关于Netty网络框架的内容,前面已经讲了两个章节,但总归来说难以真正掌握,毕竟只是对其中一个个组件进行讲解,很难让诸位将其串起来形成一条线,所以本章中则会结合实战案例,对Netty进行更深层次的学习与掌握,实战案例也并不难,一个非常朴素的IM聊天程序。 原本打算做个多人斗地主练习程序,但那需要织入过多的业务逻辑,因此一方面会带来不必要的理解难度,让案例更为复杂化,另一方面代码量也会偏多,所以最终依旧选择实现基本的IM聊天程序,既简单,又能加深对Netty的理解。
159 1