netty粘包问题分析

简介: netty粘包问题分析

Netty的粘包问题

在网络通信中,粘包问题是一种常见的情况,特别是在使用TCP协议进行数据传输时。本文将探讨Netty中的粘包问题及其解决方案,帮助读者更好地理解和应对这一问题。

1. 粘包问题介绍
1.1 什么是粘包问题

粘包问题是指发送端在将多个数据包连续发送到接收端时,接收端在接收数据时可能会将多个数据包粘合在一起,导致数据解析错误或丢失的情况。

1.2 粘包问题的影响

粘包问题可能导致接收端无法准确解析数据,从而影响系统的正常运行,甚至引发数据错误或丢失。

2. 粘包问题的原因
2.1 TCP协议的特点

TCP协议是面向连接的、可靠的数据传输协议,其特性会导致粘包问题的产生。

2.2 发送端造成的粘包

发送端在短时间内连续发送多个数据包时,可能会导致多个数据包在传输过程中被合并成一个数据包。

2.3 接收端造成的粘包

接收端在接收数据时,可能会将多个数据包一次性读取到缓冲区,造成粘包现象。

3. 粘包问题的解决方案
3.1 固定长度消息

通过设置固定长度的消息格式,使得每个数据包的长度固定,从而解决粘包问题。

服务端代码示例

ch.pipeline().addLast(new FixedLengthFrameDecoder(16));

服务端完整代码

// 示例代码
/**
 * @description: 固定长度方式解决粘包 问题服务端示例
 * @author: xz
 */
@Slf4j
public class NettyFixLengthServer {
    public static void main(String[] args) {
        new NettyFixLengthServer().start();
    }
    void start() {
        NioEventLoopGroup boss = new NioEventLoopGroup(1);
        NioEventLoopGroup worker = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap()
                    .channel(NioServerSocketChannel.class)
                     //调整 netty 的接受缓冲区(byteBuf)
                    .childOption(ChannelOption.RCVBUF_ALLOCATOR,new AdaptiveRecvByteBufAllocator(16,16,16))
                    .group(boss, worker)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //设置定长解码器,位置必须再LoggingHandler之前,作用让所有数据包长度固定(假设长度为 16 字节)
                            ch.pipeline().addLast(new FixedLengthFrameDecoder(16));
                            ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                            ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                                //会在连接channel建立成功后,触发active事件
                                @Override
                                public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                    log.debug("connected>>>>>>>>>>>>>>>> {}", ctx.channel());
                                    super.channelActive(ctx);
                                }
                                @Override
                                public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                                    log.debug("disconnect>>>>>>>>>>>>>>>> {}", ctx.channel());
                                    super.channelInactive(ctx);
                                }
                            });
                        }
                    });
            ChannelFuture channelFuture = serverBootstrap.bind(8080);
            log.debug("{}>>>>>>>>>>>>>>>> binding...", channelFuture.channel());
            channelFuture.sync();
            log.debug("{}>>>>>>>>>>>>>>>> bound...", channelFuture.channel());
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            log.error("server error", e);
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
            log.debug(">>>>>>>>>>>>>>>>stoped");
        }
    }
}

客户端代码

/**
 * @description: 固定长度方式解决粘包 问题客户端示例
 * @author: xz
 */
@Slf4j
public class NettyFixLengthClient {
    public static void main(String[] args) {
        send();
    }
    //剩余位置用下划线填充
    public static byte[] fill10Bytes(char c,int len){
        byte[] bytes = new byte[16];
        Arrays.fill(bytes, (byte) '_');
        for (int i = 0; i < len; i++) {
            bytes[i] = (byte) c;
        }
        System.out.println(new String(bytes));
        return bytes;
    }
    private static void send() {
        NioEventLoopGroup worker = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.group(worker);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    log.debug("connetted...");
                    ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                        @Override
                        public void channelActive(ChannelHandlerContext ctx) throws Exception {
                            log.debug("sending...");
                            //设置ByteBuf
                            ByteBuf buffer = ctx.alloc().buffer();
                            // 发送内容随机的数据包
                            Random r = new Random();
                            char c = '0';
                            for (int i = 0; i < 10; i++) {
                                //剩余位置用下划线填充方法
                                byte[] bytes =fill10Bytes(c,r.nextInt(16)+1);
                                c++;
                                //写入到ByteBuf
                                buffer.writeBytes(bytes);
                            }
                            ctx.writeAndFlush(buffer);
                        }
                    });
                }
            });
            ChannelFuture channelFuture = bootstrap.connect("localhost", 8080).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            log.error("client error", e);
        } finally {
            worker.shutdownGracefully();
        }
    }
}
3.2 分隔符消息

在数据包之间添加特定的分隔符,通过分隔符来区分不同的数据包,从而解决粘包问题。

String recieveStr = ""$$$$这是一条粘包的消息&&&&$$$$这是一条粘包的消息&&&&"";
        if (recieveStr.contains("&&&&$$$$")) {
            log.debug("发生粘包");
            //去掉首尾的分隔符
            recieveStr = recieveStr.substring(4, recieveStr.length() - 4);
            //将分离好的消息写入字符串数组
            String[] totalBag = recieveStr.split("&&&&$$$$");
            for (String thisTotalBag : totalBag) {                
                //每条消息的业务逻辑。。。        
           
            }
        }
3.3 消息头+消息体

在消息中添加头部信息,包括消息长度等信息,接收端根据头部信息来解析数据包,从而解决粘包问题。

3.4 使用消息结束标志

在每个数据包的末尾添加特定的结束标志,接收端根据结束标志来识别数据包的边界,从而解决粘包问题。netty使用tcp/ip协议传输数据。而tcp/ip协议是类似水流一样的数据传输方式。多次访问的时候有可能出现数据粘包的问题。客户端和服务器,协商定义一个特殊的分隔符号,分隔符号长度自定义。如:‘#’、‘KaTeX parse error: Expected group after '_' at position 1: _̲’、‘AA@’。在通讯的时候,只要没有发送分隔符号,则代表一条数据没有结束。bootstrap.childHandler()方法,在该方法中,定义了一个ChannelHandler[] acceptorHandlers = new ChannelHandler[3];数组的三个元素分别对应:自定义结束符、自定义编码、自定义处理器。最后,将这个数组作为参数进行传递。

  • 服务端代码
public class Server4Delimiter {
  // 监听线程组,监听客户端请求
  private EventLoopGroup acceptorGroup = null;
  // 处理客户端相关操作线程组,负责处理与客户端的数据通讯
  private EventLoopGroup clientGroup = null;
  // 服务启动相关配置信息
  private ServerBootstrap bootstrap = null;
  public Server4Delimiter(){
    init();
  }
  private void init(){
    acceptorGroup = new NioEventLoopGroup();
    clientGroup = new NioEventLoopGroup();
    bootstrap = new ServerBootstrap();
    // 绑定线程组
    bootstrap.group(acceptorGroup, clientGroup);
    // 设定通讯模式为NIO
    bootstrap.channel(NioServerSocketChannel.class);
    // 设定缓冲区大小
    bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
    // SO_SNDBUF发送缓冲区,SO_RCVBUF接收缓冲区,SO_KEEPALIVE开启心跳监测(保证连接有效)
    bootstrap.option(ChannelOption.SO_SNDBUF, 16*1024)
      .option(ChannelOption.SO_RCVBUF, 16*1024)
      .option(ChannelOption.SO_KEEPALIVE, true);
  }
  public ChannelFuture doAccept(int port) throws InterruptedException{
    
    bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
 
      @Override
      protected void initChannel(SocketChannel ch) throws Exception {
        // 数据分隔符, 定义的数据分隔符一定是一个ByteBuf类型的数据对象。
        ByteBuf delimiter = Unpooled.copiedBuffer("$E$".getBytes());
        ChannelHandler[] acceptorHandlers = new ChannelHandler[3];
        // 处理固定结束标记符号的Handler。这个Handler没有@Sharable注解修饰,
        // 必须每次初始化通道时创建一个新对象
        // 使用特殊符号分隔处理数据粘包问题,也要定义每个数据包最大长度。netty建议数据有最大长度。
        acceptorHandlers[0] = new DelimiterBasedFrameDecoder(1024, delimiter);
        // 字符串解码器Handler,会自动处理channelRead方法的msg参数,将ByteBuf类型的数据转换为字符串对象
        acceptorHandlers[1] = new StringDecoder(Charset.forName("UTF-8"));
        acceptorHandlers[2] = new Server4DelimiterHandler();
        ch.pipeline().addLast(acceptorHandlers);
      }
    });
    ChannelFuture future = bootstrap.bind(port).sync();
    return future;
  }
  public void release(){
    this.acceptorGroup.shutdownGracefully();
    this.clientGroup.shutdownGracefully();
  }
  
  public static void main(String[] args){
    ChannelFuture future = null;
    Server4Delimiter server = null;
    try{
      server = new Server4Delimiter();
      
      future = server.doAccept(9999);
      System.out.println("server started.");
      future.channel().closeFuture().sync();
    }catch(InterruptedException e){
      e.printStackTrace();
    }finally{
      if(null != future){
        try {
          future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
      
      if(null != server){
        server.release();
      }
    }
  }
  
}
 
 
public class Server4DelimiterHandler extends ChannelHandlerAdapter {
  
  // 业务处理逻辑
  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    String message = msg.toString();
    System.out.println("from client : " + message);
    String line = "server message $E$ test delimiter handler!! $E$ second message $E$";
    ctx.writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
  }
  
 
  // 异常处理逻辑
  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    System.out.println("server exceptionCaught method run...");
    // cause.printStackTrace();
    ctx.close();
  }
 
}
  • 客户端代码
public class Client4Delimiter {
  
  // 处理请求和处理服务端响应的线程组
  private EventLoopGroup group = null;
  // 服务启动相关配置信息
  private Bootstrap bootstrap = null;
  
  public Client4Delimiter(){
    init();
  }
  
  private void init(){
    group = new NioEventLoopGroup();
    bootstrap = new Bootstrap();
    // 绑定线程组
    bootstrap.group(group);
    // 设定通讯模式为NIO
    bootstrap.channel(NioSocketChannel.class);
  }
  
  public ChannelFuture doRequest(String host, int port) throws InterruptedException{
    this.bootstrap.handler(new ChannelInitializer<SocketChannel>() {
 
      @Override
      protected void initChannel(SocketChannel ch) throws Exception {
        // 数据分隔符
        ByteBuf delimiter = Unpooled.copiedBuffer("$E$".getBytes());
        ChannelHandler[] handlers = new ChannelHandler[3];
        handlers[0] = new DelimiterBasedFrameDecoder(1024, delimiter);
        // 字符串解码器Handler,会自动处理channelRead方法的msg参数,将ByteBuf类型的数据转换为字符串对象
        handlers[1] = new StringDecoder(Charset.forName("UTF-8"));
        handlers[2] = new Client4DelimiterHandler();
        
        ch.pipeline().addLast(handlers);
      }
    });
    ChannelFuture future = this.bootstrap.connect(host, port).sync();
    return future;
  }
  
  public void release(){
    this.group.shutdownGracefully();
  }
  
  public static void main(String[] args) {
    Client4Delimiter client = null;
    ChannelFuture future = null;
    try{
      client = new Client4Delimiter();
      
      future = client.doRequest("localhost", 9999);
      
      Scanner s = null;
      while(true){
        s = new Scanner(System.in);
        System.out.print("enter message send to server > ");
        String line = s.nextLine();
        future.channel().writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
        TimeUnit.SECONDS.sleep(1);
      }
    }catch(Exception e){
      e.printStackTrace();
    }finally{
      if(null != future){
        try {
          future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
      if(null != client){
        client.release();
      }
    }
  }
  
}
 
 
 
public class Client4DelimiterHandler extends ChannelHandlerAdapter {
 
  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    try{
      String message = msg.toString();
      System.out.println("from server : " + message);
    }finally{
      // 用于释放缓存。避免内存溢出
      ReferenceCountUtil.release(msg);
    }
  }
 
  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    System.out.println("client exceptionCaught method run...");
    // cause.printStackTrace();
    ctx.close();
  }
 
}
3.5 使用消息定长协议

利用Netty提供的LengthFieldBasedFrameDecoder等解码器,处理定长消息,从而解决粘包问题。

public class Client4Delimiter {
  
  // 处理请求和处理服务端响应的线程组
  private EventLoopGroup group = null;
  // 服务启动相关配置信息
  private Bootstrap bootstrap = null;
  
  public Client4Delimiter(){
    init();
  }
  
  private void init(){
    group = new NioEventLoopGroup();
    bootstrap = new Bootstrap();
    // 绑定线程组
    bootstrap.group(group);
    // 设定通讯模式为NIO
    bootstrap.channel(NioSocketChannel.class);
  }
  
  public ChannelFuture doRequest(String host, int port) throws InterruptedException{
    this.bootstrap.handler(new ChannelInitializer<SocketChannel>() {
 
      @Override
      protected void initChannel(SocketChannel ch) throws Exception {
        // 数据分隔符
        ByteBuf delimiter = Unpooled.copiedBuffer("$E$".getBytes());
        ChannelHandler[] handlers = new ChannelHandler[3];
        handlers[0] = new DelimiterBasedFrameDecoder(1024, delimiter);
        // 字符串解码器Handler,会自动处理channelRead方法的msg参数,将ByteBuf类型的数据转换为字符串对象
        handlers[1] = new StringDecoder(Charset.forName("UTF-8"));
        handlers[2] = new Client4DelimiterHandler();
        
        ch.pipeline().addLast(handlers);
      }
    });
    ChannelFuture future = this.bootstrap.connect(host, port).sync();
    return future;
  }
  
  public void release(){
    this.group.shutdownGracefully();
  }
  
  public static void main(String[] args) {
    Client4Delimiter client = null;
    ChannelFuture future = null;
    try{
      client = new Client4Delimiter();
      
      future = client.doRequest("localhost", 9999);
      
      Scanner s = null;
      while(true){
        s = new Scanner(System.in);
        System.out.print("enter message send to server > ");
        String line = s.nextLine();
        future.channel().writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
        TimeUnit.SECONDS.sleep(1);
      }
    }catch(Exception e){
      e.printStackTrace();
    }finally{
      if(null != future){
        try {
          future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
      if(null != client){
        client.release();
      }
    }
  }
  
}
 
 
 
public class Client4DelimiterHandler extends ChannelHandlerAdapter {
 
  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    try{
      String message = msg.toString();
      System.out.println("from server : " + message);
    }finally{
      // 用于释放缓存。避免内存溢出
      ReferenceCountUtil.release(msg);
    }
  }
 
  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    System.out.println("client exceptionCaught method run...");
    // cause.printStackTrace();
    ctx.close();
  }
 
}
4. Netty中的粘包问题与解决方案
4.1 Netty中的粘包问题

分析Netty中可能出现的粘包问题。

4.2 Netty中的解决方案

介绍Netty提供的解决粘包问题的方案,如DelimiterBasedFrameDecoder、FixedLengthFrameDecoder等。

5. 注意事项与总结
5.1 注意事项

在处理粘包问题时需要注意的事项。

5.2 总结

总结解决粘包问题的方法及其优缺点,以及在实践中的应用。

通过本文的学习,读者可以更深入地了解Netty中的粘包问题及其解决方案,从而更好地应对实际开发中的网络通信挑战。

相关文章
|
1月前
|
编解码 缓存 移动开发
TCP粘包/拆包与Netty解决方案
TCP粘包/拆包与Netty解决方案
57 0
|
9月前
|
移动开发 网络协议 算法
由浅入深Netty粘包与半包解决方案
由浅入深Netty粘包与半包解决方案
57 0
|
2天前
|
存储 缓存 JSON
Netty - 粘包和半包(上)
Netty - 粘包和半包(上)
|
27天前
|
消息中间件 存储 网络协议
拼多多面试:Netty如何解决粘包问题?
粘包和拆包问题也叫做粘包和半包问题,**它是指在数据传输时,接收方未能正常读取到一条完整数据的情况(只读取了部分数据,或多读取到了另一条数据的情况)就叫做粘包或拆包问题。** 从严格意义上来说,粘包问题和拆包问题属于两个不同的问题,接下来我们分别来看。 ## 1.粘包问题 粘包问题是指在网络通信中,发送方连续发送的多个小数据包被接收方一次性接收的现象。这可能是因为底层传输层协议(如 TCP)会将多个小数据包合并成一个大的数据块进行传输,导致接收方在接收数据时一次性接收了多个数据包,造成粘连。 例如以下案例,正常情况下客户端发送了两条消息,分别为“ABC”和“DEF”,那么接收端也应该收到两
14 0
拼多多面试:Netty如何解决粘包问题?
|
10天前
|
Java
Netty传输object并解决粘包拆包问题
Netty传输object并解决粘包拆包问题
8 0
|
9天前
|
Java
Netty中粘包拆包问题解决探讨
Netty中粘包拆包问题解决探讨
2 0
|
1月前
|
网络协议 Java 物联网
Spring Boot与Netty打造TCP服务端(解决粘包问题)
Spring Boot与Netty打造TCP服务端(解决粘包问题)
172 1
|
1月前
|
JSON 移动开发 网络协议
数据拆散与黏连:深入剖析Netty中的半包与粘包问题
数据拆散与黏连:深入剖析Netty中的半包与粘包问题
38 0
|
1月前
|
编解码
Netty Review - 优化Netty通信:如何应对粘包和拆包挑战_自定义长度分包编解码码器
Netty Review - 优化Netty通信:如何应对粘包和拆包挑战_自定义长度分包编解码码器
56 0
|
1月前
|
网络协议
Netty Review - 优化Netty通信:如何应对粘包和拆包挑战
Netty Review - 优化Netty通信:如何应对粘包和拆包挑战
70 0