TCP以流的方式进行数据传输,上层的应用协议为了对消息进行区分,往往采用如下4种方式
消息长度固定,累计读取到长度总和为定长的LEN的报文后,就认为读取到了一个完整的消息,将计数器置位,重新开始读取下一个数据报。
将回车换行符作为消息的结束标志,例如FTP协议,这种方式在文本协议中应用比较广泛;
将特殊的分隔符作为消息的结束标志,回车换行符就是一种特殊的结束分隔符
通过在消息头中定义长度字段来标志消息的总长度。
Netty对上面4中应用做了统一的抽象,提供了4中解码器来解决对应的问题,使用起来非常方便,有了这些解码器,用户不需要自己对读取的报文进行人工解码,也不需要考虑TCP的粘包和拆包。
DelimiterBasedFrameDecoder
DelimiterBasedFrameDecoder可以帮助我们自动完成以分隔符作为码流结束标示的消息的解码。通过案例我们具体来看下,案例中以"$_"作为分隔符
服务端
EchoServer
package com.dpb.netty.demo3; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; /** * DelimiterBasedFrameDecoder案例 * 服务端 * @author 波波烤鸭 * @email dengpbs@163.com * */ public class EchoServer { public void bind(int port) throws Exception { // 配置服务端的NIO线程组 // 服务端接受客户端的连接 NioEventLoopGroup bossGroup = new NioEventLoopGroup(); // 进行SocketChannel的网络读写 NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { // 1.定义分隔符 ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes()); // 2.添加分隔符解码器 单条消息最大长度1024, // 当到达长度后仍然没有查找到分隔符,就抛TooLongFrameException // 第二个参数是分隔符缓冲对象 ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter)); // 3.添加字符串处理解码器 ch.pipeline().addLast(new StringDecoder()); // 4.添加自定义的处理器 ch.pipeline().addLast(new EchoServerHandler()); } }); // 绑定端口,同步等待成功 ChannelFuture f = b.bind(port).sync(); // 等待服务端监听端口关闭 f.channel().closeFuture().sync(); } finally { // 优雅退出,释放线程池资源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8080; if(args!=null && args.length > 0){ try{ port = Integer.valueOf(args[0]); }catch(NumberFormatException e){ // 采用默认值 } } new EchoServer().bind(port); } }
EchoServerHandler
package com.dpb.netty.demo3; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; /** * DelimiterBasedFrameDecoder 案例 * 自定义处理器 * @author 波波烤鸭 * @email dengpbs@163.com * */ public class EchoServerHandler extends ChannelHandlerAdapter{ // 统计接收消息的数量 private int counter; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 获取客户端传递的消息 String body = (String) msg; // 打印消息 System.out.println("This is "+ ++counter + " times receive client :["+body+"]"); // 分隔符已经被截取掉了,响应信息的时候我们需要再加上分隔符 body += "$_"; ByteBuf echo = Unpooled.copiedBuffer(body.getBytes()); ctx.writeAndFlush(echo); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); // 发生异常关闭链路 } }
客户断
EchoClient
package com.dpb.netty.demo3; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; /** * DelimiterBasedFrameDecoder 案例 客户端 * * @author 波波烤鸭 * @email dengpbs@163.com * */ public class EchoClient { public static void main(String[] args) throws Exception { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 采用默认值 } } new EchoClient().connector(port, "127.0.0.1"); } public void connector(int port, String host) throws Exception { // 配置客户端NIO线程组 EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { // TODO Auto-generated method stub // 1.定义分隔符 ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes()); // 2.添加分隔符解码器 ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter)); // 3.添加字符串处理解码器 ch.pipeline().addLast(new StringDecoder()); // 4.添加自定义的处理器 ch.pipeline().addLast(new EchoClientHandler()); } }); // 发起异步连接操作 ChannelFuture f = b.connect(host, port).sync(); // 等待客户端链路关闭 f.channel().closeFuture().sync(); } finally { // 优雅退出,释放NIO线程组 group.shutdownGracefully(); } } }
EchoClientHandler
package com.dpb.netty.demo3; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; /** * DelimiterBasedFrameDecoder 案例 * 自定义客户端处理器 * @author 波波烤鸭 * @email dengpbs@163.com * */ public class EchoClientHandler extends ChannelHandlerAdapter{ private int counter; static final String ECHO_REQ = "Hi , bobo烤鸭. Welcome to Netty.$_"; public EchoClientHandler(){ } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // TODO Auto-generated method stub for (int i = 0; i < 10; i++) { // 发送消息别立马刷新 ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_REQ.getBytes())); } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // TODO Auto-generated method stub System.out.println("This is "+ ++counter + "time recevice server :【"+msg+"】"); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { // TODO Auto-generated method stub ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // TODO Auto-generated method stub cause.printStackTrace(); ctx.close(); } }
测试
服务端运行结果
客户端运行结果
服务端成功接收到了客户端发送的10条信息,客户端成功接收到了服务端返回的10条信息,测试结果表明使用DelimiterBasedFrameDecoder可以自动对采用分隔符做码流结束标识的消息进行解码。运行多次的原因是模拟TCP粘包/拆包,如果没有DelimiterBasedFrameDecoder解码处理,服务端和客户端都将运行失败,如下:
输出结果:
This is 1 times receive client : [Hi , bobo烤鸭. Welcome to Netty.$_Hi , bobo烤鸭. Welcome to Netty.$_Hi , bobo烤鸭. Welcome to Netty.$_Hi , bobo烤鸭. Welcome to Netty.$_Hi , bobo烤鸭. Welcome to Netty.$_Hi , bobo烤鸭. Welcome to Netty.$_Hi , bobo烤鸭. Welcome to Netty.$_Hi , bobo烤鸭. Welcome to Netty.$_Hi , bobo烤鸭. Welcome to Netty.$_Hi , bobo烤鸭. Welcome to Netty.$_]
客户端发送的10条信息,在服务端粘包成一条信息了。那么响应信息肯定也是一条了。
FixedLengthFrameDecoder
FixedLengthFrameDecoder是固定长度解码器,它能够按照指定的长度对消息进行自动解码,开发者不需要考虑TCP的粘包/拆包问题,非常使用。通过案例来演示。
服务端
服务端还是用上个案例中的代码,我们需要调整两处地方
EchoServer
EchoServerHandler
客户端
客户端不用处理,就用上面案例中的就可以,服务端不返回响应,所以客户端也不用处理响应的数据。直接运行客户端
总结
DelimiterBasedFrameDecoder用于对使用分隔符结尾的信息进行自动解码,FixedLengthFrameDecoder用于对固定长度的消息进行自动解码,有了上述两种解码器再结合其他的解码器,如字符串解码器等,可以轻松地完成对很多消息自动解码,而且不再需要考虑TCP粘包/拆包导致的读半包问题,极大地提升了开发效率。