Netty的粘包问题
在网络通信中,粘包问题是一种常见的情况,特别是在使用TCP协议进行数据传输时。本文将探讨Netty中的粘包问题及其解决方案,帮助读者更好地理解和应对这一问题。
1. 粘包问题介绍
1.1 什么是粘包问题
粘包问题是指发送端在将多个数据包连续发送到接收端时,接收端在接收数据时可能会将多个数据包粘合在一起,导致数据解析错误或丢失的情况。
1.2 粘包问题的影响
粘包问题可能导致接收端无法准确解析数据,从而影响系统的正常运行,甚至引发数据错误或丢失。
2. 粘包问题的原因
2.1 TCP协议的特点
TCP协议是面向连接的、可靠的数据传输协议,其特性会导致粘包问题的产生。
2.2 发送端造成的粘包
发送端在短时间内连续发送多个数据包时,可能会导致多个数据包在传输过程中被合并成一个数据包。
2.3 接收端造成的粘包
接收端在接收数据时,可能会将多个数据包一次性读取到缓冲区,造成粘包现象。
3. 粘包问题的解决方案
3.1 固定长度消息
通过设置固定长度的消息格式,使得每个数据包的长度固定,从而解决粘包问题。
服务端代码示例
ch.pipeline().addLast(new FixedLengthFrameDecoder(16));
服务端完整代码
// 示例代码 /** * @description: 固定长度方式解决粘包 问题服务端示例 * @author: xz */ @Slf4j public class NettyFixLengthServer { public static void main(String[] args) { new NettyFixLengthServer().start(); } void start() { NioEventLoopGroup boss = new NioEventLoopGroup(1); NioEventLoopGroup worker = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap() .channel(NioServerSocketChannel.class) //调整 netty 的接受缓冲区(byteBuf) .childOption(ChannelOption.RCVBUF_ALLOCATOR,new AdaptiveRecvByteBufAllocator(16,16,16)) .group(boss, worker) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { //设置定长解码器,位置必须再LoggingHandler之前,作用让所有数据包长度固定(假设长度为 16 字节) ch.pipeline().addLast(new FixedLengthFrameDecoder(16)); ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { //会在连接channel建立成功后,触发active事件 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.debug("connected>>>>>>>>>>>>>>>> {}", ctx.channel()); super.channelActive(ctx); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.debug("disconnect>>>>>>>>>>>>>>>> {}", ctx.channel()); super.channelInactive(ctx); } }); } }); ChannelFuture channelFuture = serverBootstrap.bind(8080); log.debug("{}>>>>>>>>>>>>>>>> binding...", channelFuture.channel()); channelFuture.sync(); log.debug("{}>>>>>>>>>>>>>>>> bound...", channelFuture.channel()); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { log.error("server error", e); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); log.debug(">>>>>>>>>>>>>>>>stoped"); } } }
客户端代码
/** * @description: 固定长度方式解决粘包 问题客户端示例 * @author: xz */ @Slf4j public class NettyFixLengthClient { public static void main(String[] args) { send(); } //剩余位置用下划线填充 public static byte[] fill10Bytes(char c,int len){ byte[] bytes = new byte[16]; Arrays.fill(bytes, (byte) '_'); for (int i = 0; i < len; i++) { bytes[i] = (byte) c; } System.out.println(new String(bytes)); return bytes; } private static void send() { NioEventLoopGroup worker = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.channel(NioSocketChannel.class); bootstrap.group(worker); bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { log.debug("connetted..."); ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.debug("sending..."); //设置ByteBuf ByteBuf buffer = ctx.alloc().buffer(); // 发送内容随机的数据包 Random r = new Random(); char c = '0'; for (int i = 0; i < 10; i++) { //剩余位置用下划线填充方法 byte[] bytes =fill10Bytes(c,r.nextInt(16)+1); c++; //写入到ByteBuf buffer.writeBytes(bytes); } ctx.writeAndFlush(buffer); } }); } }); ChannelFuture channelFuture = bootstrap.connect("localhost", 8080).sync(); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { log.error("client error", e); } finally { worker.shutdownGracefully(); } } }
3.2 分隔符消息
在数据包之间添加特定的分隔符,通过分隔符来区分不同的数据包,从而解决粘包问题。
String recieveStr = ""$$$$这是一条粘包的消息&&&&$$$$这是一条粘包的消息&&&&""; if (recieveStr.contains("&&&&$$$$")) { log.debug("发生粘包"); //去掉首尾的分隔符 recieveStr = recieveStr.substring(4, recieveStr.length() - 4); //将分离好的消息写入字符串数组 String[] totalBag = recieveStr.split("&&&&$$$$"); for (String thisTotalBag : totalBag) { //每条消息的业务逻辑。。。 } }
3.3 消息头+消息体
在消息中添加头部信息,包括消息长度等信息,接收端根据头部信息来解析数据包,从而解决粘包问题。
3.4 使用消息结束标志
在每个数据包的末尾添加特定的结束标志,接收端根据结束标志来识别数据包的边界,从而解决粘包问题。netty使用tcp/ip协议传输数据。而tcp/ip协议是类似水流一样的数据传输方式。多次访问的时候有可能出现数据粘包的问题。客户端和服务器,协商定义一个特殊的分隔符号,分隔符号长度自定义。如:‘#’、‘KaTeX parse error: Expected group after '_' at position 1: _̲’、‘AA@’。在通讯的时候,只要没有发送分隔符号,则代表一条数据没有结束。bootstrap.childHandler()方法,在该方法中,定义了一个ChannelHandler[] acceptorHandlers = new ChannelHandler[3];数组的三个元素分别对应:自定义结束符、自定义编码、自定义处理器。最后,将这个数组作为参数进行传递。
- 服务端代码
public class Server4Delimiter { // 监听线程组,监听客户端请求 private EventLoopGroup acceptorGroup = null; // 处理客户端相关操作线程组,负责处理与客户端的数据通讯 private EventLoopGroup clientGroup = null; // 服务启动相关配置信息 private ServerBootstrap bootstrap = null; public Server4Delimiter(){ init(); } private void init(){ acceptorGroup = new NioEventLoopGroup(); clientGroup = new NioEventLoopGroup(); bootstrap = new ServerBootstrap(); // 绑定线程组 bootstrap.group(acceptorGroup, clientGroup); // 设定通讯模式为NIO bootstrap.channel(NioServerSocketChannel.class); // 设定缓冲区大小 bootstrap.option(ChannelOption.SO_BACKLOG, 1024); // SO_SNDBUF发送缓冲区,SO_RCVBUF接收缓冲区,SO_KEEPALIVE开启心跳监测(保证连接有效) bootstrap.option(ChannelOption.SO_SNDBUF, 16*1024) .option(ChannelOption.SO_RCVBUF, 16*1024) .option(ChannelOption.SO_KEEPALIVE, true); } public ChannelFuture doAccept(int port) throws InterruptedException{ bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { // 数据分隔符, 定义的数据分隔符一定是一个ByteBuf类型的数据对象。 ByteBuf delimiter = Unpooled.copiedBuffer("$E$".getBytes()); ChannelHandler[] acceptorHandlers = new ChannelHandler[3]; // 处理固定结束标记符号的Handler。这个Handler没有@Sharable注解修饰, // 必须每次初始化通道时创建一个新对象 // 使用特殊符号分隔处理数据粘包问题,也要定义每个数据包最大长度。netty建议数据有最大长度。 acceptorHandlers[0] = new DelimiterBasedFrameDecoder(1024, delimiter); // 字符串解码器Handler,会自动处理channelRead方法的msg参数,将ByteBuf类型的数据转换为字符串对象 acceptorHandlers[1] = new StringDecoder(Charset.forName("UTF-8")); acceptorHandlers[2] = new Server4DelimiterHandler(); ch.pipeline().addLast(acceptorHandlers); } }); ChannelFuture future = bootstrap.bind(port).sync(); return future; } public void release(){ this.acceptorGroup.shutdownGracefully(); this.clientGroup.shutdownGracefully(); } public static void main(String[] args){ ChannelFuture future = null; Server4Delimiter server = null; try{ server = new Server4Delimiter(); future = server.doAccept(9999); System.out.println("server started."); future.channel().closeFuture().sync(); }catch(InterruptedException e){ e.printStackTrace(); }finally{ if(null != future){ try { future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } } if(null != server){ server.release(); } } } } public class Server4DelimiterHandler extends ChannelHandlerAdapter { // 业务处理逻辑 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String message = msg.toString(); System.out.println("from client : " + message); String line = "server message $E$ test delimiter handler!! $E$ second message $E$"; ctx.writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8"))); } // 异常处理逻辑 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("server exceptionCaught method run..."); // cause.printStackTrace(); ctx.close(); } }
- 客户端代码
public class Client4Delimiter { // 处理请求和处理服务端响应的线程组 private EventLoopGroup group = null; // 服务启动相关配置信息 private Bootstrap bootstrap = null; public Client4Delimiter(){ init(); } private void init(){ group = new NioEventLoopGroup(); bootstrap = new Bootstrap(); // 绑定线程组 bootstrap.group(group); // 设定通讯模式为NIO bootstrap.channel(NioSocketChannel.class); } public ChannelFuture doRequest(String host, int port) throws InterruptedException{ this.bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { // 数据分隔符 ByteBuf delimiter = Unpooled.copiedBuffer("$E$".getBytes()); ChannelHandler[] handlers = new ChannelHandler[3]; handlers[0] = new DelimiterBasedFrameDecoder(1024, delimiter); // 字符串解码器Handler,会自动处理channelRead方法的msg参数,将ByteBuf类型的数据转换为字符串对象 handlers[1] = new StringDecoder(Charset.forName("UTF-8")); handlers[2] = new Client4DelimiterHandler(); ch.pipeline().addLast(handlers); } }); ChannelFuture future = this.bootstrap.connect(host, port).sync(); return future; } public void release(){ this.group.shutdownGracefully(); } public static void main(String[] args) { Client4Delimiter client = null; ChannelFuture future = null; try{ client = new Client4Delimiter(); future = client.doRequest("localhost", 9999); Scanner s = null; while(true){ s = new Scanner(System.in); System.out.print("enter message send to server > "); String line = s.nextLine(); future.channel().writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8"))); TimeUnit.SECONDS.sleep(1); } }catch(Exception e){ e.printStackTrace(); }finally{ if(null != future){ try { future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } } if(null != client){ client.release(); } } } } public class Client4DelimiterHandler extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try{ String message = msg.toString(); System.out.println("from server : " + message); }finally{ // 用于释放缓存。避免内存溢出 ReferenceCountUtil.release(msg); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("client exceptionCaught method run..."); // cause.printStackTrace(); ctx.close(); } }
3.5 使用消息定长协议
利用Netty提供的LengthFieldBasedFrameDecoder等解码器,处理定长消息,从而解决粘包问题。
public class Client4Delimiter { // 处理请求和处理服务端响应的线程组 private EventLoopGroup group = null; // 服务启动相关配置信息 private Bootstrap bootstrap = null; public Client4Delimiter(){ init(); } private void init(){ group = new NioEventLoopGroup(); bootstrap = new Bootstrap(); // 绑定线程组 bootstrap.group(group); // 设定通讯模式为NIO bootstrap.channel(NioSocketChannel.class); } public ChannelFuture doRequest(String host, int port) throws InterruptedException{ this.bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { // 数据分隔符 ByteBuf delimiter = Unpooled.copiedBuffer("$E$".getBytes()); ChannelHandler[] handlers = new ChannelHandler[3]; handlers[0] = new DelimiterBasedFrameDecoder(1024, delimiter); // 字符串解码器Handler,会自动处理channelRead方法的msg参数,将ByteBuf类型的数据转换为字符串对象 handlers[1] = new StringDecoder(Charset.forName("UTF-8")); handlers[2] = new Client4DelimiterHandler(); ch.pipeline().addLast(handlers); } }); ChannelFuture future = this.bootstrap.connect(host, port).sync(); return future; } public void release(){ this.group.shutdownGracefully(); } public static void main(String[] args) { Client4Delimiter client = null; ChannelFuture future = null; try{ client = new Client4Delimiter(); future = client.doRequest("localhost", 9999); Scanner s = null; while(true){ s = new Scanner(System.in); System.out.print("enter message send to server > "); String line = s.nextLine(); future.channel().writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8"))); TimeUnit.SECONDS.sleep(1); } }catch(Exception e){ e.printStackTrace(); }finally{ if(null != future){ try { future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } } if(null != client){ client.release(); } } } } public class Client4DelimiterHandler extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try{ String message = msg.toString(); System.out.println("from server : " + message); }finally{ // 用于释放缓存。避免内存溢出 ReferenceCountUtil.release(msg); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("client exceptionCaught method run..."); // cause.printStackTrace(); ctx.close(); } }
4. Netty中的粘包问题与解决方案
4.1 Netty中的粘包问题
分析Netty中可能出现的粘包问题。
4.2 Netty中的解决方案
介绍Netty提供的解决粘包问题的方案,如DelimiterBasedFrameDecoder、FixedLengthFrameDecoder等。
5. 注意事项与总结
5.1 注意事项
在处理粘包问题时需要注意的事项。
5.2 总结
总结解决粘包问题的方法及其优缺点,以及在实践中的应用。
通过本文的学习,读者可以更深入地了解Netty中的粘包问题及其解决方案,从而更好地应对实际开发中的网络通信挑战。