Netty分隔符和定长解码器使用

简介: TCP以流的方式进行数据传输,上层的应用协议为了对消息进行区分,往往采用如下4种方式 消息长度固定,累计读取到长度总和为定长的LEN的报文后,就认为读取到了一个完整的消息,将计数器置位,重新开始读取下一个数据报。


 TCP以流的方式进行数据传输,上层的应用协议为了对消息进行区分,往往采用如下4种方式

   消息长度固定,累计读取到长度总和为定长的LEN的报文后,就认为读取到了一个完整的消息,将计数器置位,重新开始读取下一个数据报。

   将回车换行符作为消息的结束标志,例如FTP协议,这种方式在文本协议中应用比较广泛;

   将特殊的分隔符作为消息的结束标志,回车换行符就是一种特殊的结束分隔符

   通过在消息头中定义长度字段来标志消息的总长度。

 Netty对上面4中应用做了统一的抽象,提供了4中解码器来解决对应的问题,使用起来非常方便,有了这些解码器,用户不需要自己对读取的报文进行人工解码,也不需要考虑TCP的粘包和拆包。

DelimiterBasedFrameDecoder

 DelimiterBasedFrameDecoder可以帮助我们自动完成以分隔符作为码流结束标示的消息的解码。通过案例我们具体来看下,案例中以"$_"作为分隔符

服务端

EchoServer

package com.dpb.netty.demo3;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
/**
 * DelimiterBasedFrameDecoder案例
 *    服务端
 * @author 波波烤鸭
 * @email dengpbs@163.com
 *
 */
public class EchoServer {
  public void bind(int port) throws Exception {
    // 配置服务端的NIO线程组
    // 服务端接受客户端的连接
    NioEventLoopGroup bossGroup = new NioEventLoopGroup();
    // 进行SocketChannel的网络读写
    NioEventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
      ServerBootstrap b = new ServerBootstrap();
      b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
          .option(ChannelOption.SO_BACKLOG, 100)
          .handler(new LoggingHandler(LogLevel.INFO))
          .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
              // 1.定义分隔符
              ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
              // 2.添加分隔符解码器 单条消息最大长度1024,
              // 当到达长度后仍然没有查找到分隔符,就抛TooLongFrameException
              // 第二个参数是分隔符缓冲对象
              ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
              // 3.添加字符串处理解码器
              ch.pipeline().addLast(new StringDecoder());
              // 4.添加自定义的处理器
              ch.pipeline().addLast(new EchoServerHandler());
            }
          });
      // 绑定端口,同步等待成功
      ChannelFuture f = b.bind(port).sync();
      // 等待服务端监听端口关闭
      f.channel().closeFuture().sync();
    } finally {
      // 优雅退出,释放线程池资源
      bossGroup.shutdownGracefully();
      workerGroup.shutdownGracefully();
    }
  }
  public static void main(String[] args) throws Exception {
    int port = 8080;
    if(args!=null && args.length > 0){
      try{
        port = Integer.valueOf(args[0]);
      }catch(NumberFormatException e){
        // 采用默认值
      }
    }
    new EchoServer().bind(port);
  }
}

EchoServerHandler

package com.dpb.netty.demo3;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
/**
 * DelimiterBasedFrameDecoder 案例
 *    自定义处理器
 * @author 波波烤鸭
 * @email dengpbs@163.com
 *
 */
public class EchoServerHandler extends ChannelHandlerAdapter{
  // 统计接收消息的数量
  private int counter;
  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    // 获取客户端传递的消息
    String body = (String) msg;
    // 打印消息
    System.out.println("This is "+ ++counter + " times receive client :["+body+"]");
    // 分隔符已经被截取掉了,响应信息的时候我们需要再加上分隔符
    body += "$_";
    ByteBuf echo = Unpooled.copiedBuffer(body.getBytes());
    ctx.writeAndFlush(echo);
  }
  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    cause.printStackTrace();
    ctx.close(); // 发生异常关闭链路
  }
}

客户断

EchoClient

package com.dpb.netty.demo3;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
/**
 * DelimiterBasedFrameDecoder 案例 客户端
 * 
 * @author 波波烤鸭
 * @email dengpbs@163.com
 *
 */
public class EchoClient {
  public static void main(String[] args) throws Exception {
    int port = 8080;
    if (args != null && args.length > 0) {
      try {
        port = Integer.valueOf(args[0]);
      } catch (NumberFormatException e) {
        // 采用默认值
      }
    }
    new EchoClient().connector(port, "127.0.0.1");
  }
  public void connector(int port, String host) throws Exception {
    // 配置客户端NIO线程组
    EventLoopGroup group = new NioEventLoopGroup();
    try {
      Bootstrap b = new Bootstrap();
      b.group(group).channel(NioSocketChannel.class)
              .option(ChannelOption.TCP_NODELAY, true)
              .handler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
          // TODO Auto-generated method stub
          // 1.定义分隔符
          ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
          // 2.添加分隔符解码器
          ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
          // 3.添加字符串处理解码器
          ch.pipeline().addLast(new StringDecoder());
          // 4.添加自定义的处理器
          ch.pipeline().addLast(new EchoClientHandler());
        }
      });
      // 发起异步连接操作
      ChannelFuture f = b.connect(host, port).sync();
      // 等待客户端链路关闭
      f.channel().closeFuture().sync();
    } finally {
      // 优雅退出,释放NIO线程组
      group.shutdownGracefully();
    }
  }
}

EchoClientHandler

package com.dpb.netty.demo3;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
/**
 * DelimiterBasedFrameDecoder 案例
 *  自定义客户端处理器
 * @author 波波烤鸭
 * @email dengpbs@163.com
 *
 */
public class EchoClientHandler extends ChannelHandlerAdapter{
  private int counter;
  static final String ECHO_REQ = "Hi , bobo烤鸭. Welcome to Netty.$_";
  public EchoClientHandler(){
  }
  @Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
    // TODO Auto-generated method stub
    for (int i = 0; i < 10; i++) {
      // 发送消息别立马刷新
      ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_REQ.getBytes()));
    }
  }
  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    // TODO Auto-generated method stub
    System.out.println("This is "+ ++counter + "time recevice server :【"+msg+"】");
  }
  @Override
  public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    // TODO Auto-generated method stub
    ctx.flush();
  }
  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    // TODO Auto-generated method stub
    cause.printStackTrace();
    ctx.close();
  }
}

测试

服务端运行结果

image.png

客户端运行结果

image.png

 服务端成功接收到了客户端发送的10条信息,客户端成功接收到了服务端返回的10条信息,测试结果表明使用DelimiterBasedFrameDecoder可以自动对采用分隔符做码流结束标识的消息进行解码。运行多次的原因是模拟TCP粘包/拆包,如果没有DelimiterBasedFrameDecoder解码处理,服务端和客户端都将运行失败,如下:

image.png

image.png

输出结果:

This is 1 times receive client :
[Hi , bobo烤鸭. Welcome to Netty.$_Hi , 
bobo烤鸭. Welcome to Netty.$_Hi , 
bobo烤鸭. Welcome to Netty.$_Hi , 
bobo烤鸭. Welcome to Netty.$_Hi , 
bobo烤鸭. Welcome to Netty.$_Hi , 
bobo烤鸭. Welcome to Netty.$_Hi , 
bobo烤鸭. Welcome to Netty.$_Hi , 
bobo烤鸭. Welcome to Netty.$_Hi , 
bobo烤鸭. Welcome to Netty.$_Hi , 
bobo烤鸭. Welcome to Netty.$_]

客户端发送的10条信息,在服务端粘包成一条信息了。那么响应信息肯定也是一条了。

FixedLengthFrameDecoder

 FixedLengthFrameDecoder是固定长度解码器,它能够按照指定的长度对消息进行自动解码,开发者不需要考虑TCP的粘包/拆包问题,非常使用。通过案例来演示。

服务端

 服务端还是用上个案例中的代码,我们需要调整两处地方

EchoServer

image.png

EchoServerHandler

image.png

客户端

 客户端不用处理,就用上面案例中的就可以,服务端不返回响应,所以客户端也不用处理响应的数据。直接运行客户端

image.png

总结

 DelimiterBasedFrameDecoder用于对使用分隔符结尾的信息进行自动解码,FixedLengthFrameDecoder用于对固定长度的消息进行自动解码,有了上述两种解码器再结合其他的解码器,如字符串解码器等,可以轻松地完成对很多消息自动解码,而且不再需要考虑TCP粘包/拆包导致的读半包问题,极大地提升了开发效率。


相关文章
|
4月前
|
移动开发 编解码 Java
Netty编码器和解码器
Netty从底层Java通道读到ByteBuf二进制数据,传入Netty通道的流水线,随后开始入站处理。在入站处理过程中,需要将ByteBuf二进制类型解码成Java POJO对象。这个解码过程可以通过Netty的Decoder解码器去完成。在出站处理过程中,业务处理后的结果需要从某个Java POJO对象编码为最终的ByteBuf二进制数据,然后通过底层 Java通道发送到对端。在编码过程中,需要用到Netty的Encoder编码器去完成数据的编码工作。
|
3月前
|
编解码 前端开发 网络协议
Netty Review - ObjectEncoder对象和ObjectDecoder对象解码器的使用与源码解读
Netty Review - ObjectEncoder对象和ObjectDecoder对象解码器的使用与源码解读
45 0
|
3月前
|
编解码 安全 前端开发
Netty Review - StringEncoder字符串编码器和StringDecoder 解码器的使用与源码解读
Netty Review - StringEncoder字符串编码器和StringDecoder 解码器的使用与源码解读
46 0
|
数据格式
netty系列之:自定义编码解码器
netty系列之:自定义编码解码器
|
存储 算法 Java
netty系列之:netty中常用的对象编码解码器
我们在程序中除了使用常用的字符串进行数据传递之外,使用最多的还是JAVA对象。在JDK中,对象如果需要在网络中传输,必须实现Serializable接口,表示这个对象是可以被序列化的。这样就可以调用JDK自身的对象对象方法,进行对象的读写。 那么在netty中进行对象的传递可不可以直接使用JDK的对象序列化方法呢?如果不能的话,又应该怎么处理呢? 今天带大家来看看netty中提供的对象编码器。
|
XML JSON 数据格式
netty系列之:netty中常用的xml编码解码器
在json之前,xml是最常用的数据传输格式,虽然xml的冗余数据有点多,但是xml的结构简单清晰,至今仍然运用在程序中的不同地方,对于netty来说自然也提供了对于xml数据的支持。 netty对xml的支持表现在两个方面,第一个方面是将编码过后的多个xml数据进行frame拆分,每个frame包含一个完整的xml。另一方面是将分割好的frame进行xml的语义解析。 进行frame拆分可以使用XmlFrameDecoder,进行xml文件内容的解析则可以使用XmlDecoder,接下来我们会详细讲解两个decoder实现和使用。
|
移动开发 Unix Java
netty系列之:netty中常用的字符串编码解码器
字符串是我们程序中最常用到的消息格式,也是最简单的消息格式,但是正因为字符串string太过简单,不能附加更多的信息,所以在netty中选择的是使用byteBuf作为最底层的消息传递载体。 虽然底层使用的ByteBuf,但是对于程序员来说,还是希望能够使用这种最简单的字符串格式,那么有什么简单的方法吗?
|
移动开发 网络协议
深入理解 Netty-解码器架构与常用解码器 (二)
深入理解 Netty-解码器架构与常用解码器 (二)
109 0
|
编解码 容器
深入理解 Netty-解码器架构与常用解码器 (一)
深入理解 Netty-解码器架构与常用解码器 (一)
156 0
|
移动开发 安全
netty系列之:netty中的frame解码器
netty系列之:netty中的frame解码器
netty系列之:netty中的frame解码器