前提
Netty(一)之helloworld
数据的粘包
在上面的的例子基础之上的TimeClient上修改
我们的本意是发送三条您好
//发送数据 f.channel().writeAndFlush(Unpooled.copiedBuffer("您好".getBytes())); //Thread.sleep(1000);//防止TCP粘包 f.channel().writeAndFlush(Unpooled.copiedBuffer("您好".getBytes())); //Thread.sleep(1000); f.channel().writeAndFlush(Unpooled.copiedBuffer("您好".getBytes())); //Thread.sleep(1000);
数据粘在一起了,就是数据的粘包
初步解决办法
//发送数据 f.channel().writeAndFlush(Unpooled.copiedBuffer("您好".getBytes())); Thread.sleep(1000);//防止TCP粘包 f.channel().writeAndFlush(Unpooled.copiedBuffer("您好".getBytes())); Thread.sleep(1000); f.channel().writeAndFlush(Unpooled.copiedBuffer("您好".getBytes())); Thread.sleep(1000);
分隔符解码DelimiterBasedFrameDceoder
TimeServerHandler
package mydelimiterBasedFrameDecoder; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; /** * @author CBeann * @create 2019-08-27 18:31 */ public class TimeServerHandler extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //服务器读客户端发送来的数据 String body = (String) msg;//之所以能强制类型转换是因为添加了StringDecoder System.out.println("The TimeServer receive :" + body); //服务器向客户端回应请求 String resp = "服务器端已经收到客户端数据$_";//$_结尾符号 ByteBuf response = Unpooled.copiedBuffer(resp.getBytes()); ctx.writeAndFlush(response); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } // }
TimeServer
package mydelimiterBasedFrameDecoder; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; /** * @author CBeann * @create 2019-08-27 18:22 */ public class TimeServer { public static void main(String[] args) throws Exception { int port = 8080; //配置服务器端的NIO线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //设置特殊分隔符 ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes()); socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter)); //设置字符串编码器,使其在handler中的msg可以直接强转为String socketChannel.pipeline().addLast(new StringDecoder()); //TimeClientHandler是自己定义的方法 socketChannel.pipeline().addLast(new TimeServerHandler()); } }); //绑定端口 ChannelFuture f = b.bind(port).sync(); //等待服务端监听端口关闭 f.channel().closeFuture().sync(); } catch (Exception e) { } finally { //优雅关闭,释放线程池资源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
TimeClientHandler
package mydelimiterBasedFrameDecoder; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.util.ReferenceCountUtil; /** * @author CBeann * @create 2019-08-27 18:47 */ public class TimeClientHandler extends ChannelHandlerAdapter { //客户端读取服务器发送的数据 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { String body = (String) msg; System.out.println("Now is:" + body); } catch (Exception e) { } finally { //标配 ReferenceCountUtil.release(msg); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
TimeClient
package mydelimiterBasedFrameDecoder; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; /** * @author CBeann * @create 2019-08-27 18:43 */ public class TimeClient { public static void main(String[] args) throws Exception { int port = 8080; String host = "127.0.0.1"; //配置客户端NIO线程组 EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //设置特殊分隔符 ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes()); socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter)); //设置字符串编码器,使其在handler中的msg可以直接强转为String socketChannel.pipeline().addLast(new StringDecoder()); //TimeClientHandler是自己定义的方法 socketChannel.pipeline().addLast(new TimeClientHandler()); } }); //发起异步连接操作 ChannelFuture f = b.connect(host, port).sync(); //发送数据 f.channel().writeAndFlush(Unpooled.copiedBuffer("您好$_".getBytes())); f.channel().writeAndFlush(Unpooled.copiedBuffer("您好$_".getBytes())); f.channel().writeAndFlush(Unpooled.copiedBuffer("您好$_".getBytes())); //等待客户端链路关闭 f.channel().closeFuture().sync(); } catch (Exception e) { } finally { //优雅关闭 group.shutdownGracefully(); } } }
运行结果
定长解码器FixedLengthFrameDecoder
TimeServerHandler
读取数据,相应请求
package myfixedlengthframedecoder; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; /** * @author CBeann * @create 2019-08-27 18:31 */ public class TimeServerHandler extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //服务器读客户端发送来的数据 String body = (String) msg; System.out.println("The TimeServer receive :" + body); //服务器向客户端回应请求 ByteBuf response = Unpooled.copiedBuffer("1234".getBytes()); ctx.writeAndFlush(response); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } // }
TimeServer
添加定长解码器
添加字符串解码器
package myfixedlengthframedecoder; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.FixedLengthFrameDecoder; import io.netty.handler.codec.string.StringDecoder; /** * @author CBeann * @create 2019-08-27 18:22 */ public class TimeServer { public static void main(String[] args) throws Exception { int port = 8080; //配置服务器端的NIO线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //添加固定长度解码器, 固定长度为5 socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(5)); socketChannel.pipeline().addLast(new StringDecoder()); //TimeClientHandler是自己定义的方法 socketChannel.pipeline().addLast(new TimeServerHandler()); } }); //绑定端口 ChannelFuture f = b.bind(port).sync(); //等待服务端监听端口关闭 f.channel().closeFuture().sync(); } catch (Exception e) { } finally { //优雅关闭,释放线程池资源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
TimeClientHandler
接收服务器发来的数据
package myfixedlengthframedecoder; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.util.ReferenceCountUtil; /** * @author CBeann * @create 2019-08-27 18:47 */ public class TimeClientHandler extends ChannelHandlerAdapter { //客户端读取服务器发送的数据 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { String body = (String) msg; System.out.println("Now is:" + body); } catch (Exception e) { } finally { //标配 ReferenceCountUtil.release(msg); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
TimeClient
添加定长解码器
添加字符串解码器
发送长度为10的数据(定长解码器设置长度为5)
package myfixedlengthframedecoder; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.FixedLengthFrameDecoder; import io.netty.handler.codec.string.StringDecoder; /** * @author CBeann * @create 2019-08-27 18:43 */ public class TimeClient { public static void main(String[] args) throws Exception { int port = 8080; String host = "127.0.0.1"; //配置客户端NIO线程组 EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(5)); socketChannel.pipeline().addLast(new StringDecoder()); //TimeClientHandler是自己定义的方法 socketChannel.pipeline().addLast(new TimeClientHandler()); } }); //发起异步连接操作 ChannelFuture f = b.connect(host, port).sync(); //发送数据 f.channel().writeAndFlush(Unpooled.copiedBuffer("0123456789".getBytes())); //等待客户端链路关闭 f.channel().closeFuture().sync(); } catch (Exception e) { } finally { //优雅关闭 group.shutdownGracefully(); } } }
运行结果
客户端发送10个长度的字符串,因为设置了长度为5的定长解码器,所以服务器收到2条消息