前言
TCP是面向连接的,服务端和客户端通过socket进行数据传输,发送端为了更有效的发送数据,通常会使用Nagle算法把多个数据块合并成一个大的数据块,这样做虽然提高了效率,但是接收端就很难识别完整的数据包了(TCP无消息保护边界),可能会出现粘包拆包的问题。
粘包拆包理解
下面我用一个图来带大家理解什么是粘包和拆包
解释一下
- 第一次传输没有问题,数据1和数据2没有粘合,也没有拆分
- 第二次传输,数据1和数据2粘在一起传输了,出现了粘包
- 第三次传输,数据2被分为了2部分,数据2_1 第一份和数据1粘在一起,数据2_2第二份单独传输,这里即出现了拆包也出现了粘包
粘包拆包代码演示
这里写一个简单案例来演示粘包拆包,客户端发送10个数据包,观察服务端是否做了10次读取,如果不是,就出现粘包或者拆包的情况,这里我们使用byte类型来传输案例如下。
第一步:编写Netty服务端
publicstaticvoidmain(String[] args) { NioEventLoopGroupbossGroup=newNioEventLoopGroup(); NioEventLoopGroupworkGroup=newNioEventLoopGroup(); ServerBootstrapbootstrap=newServerBootstrap(); bootstrap.group(bossGroup, workGroup); bootstrap.channel(NioServerSocketChannel.class); bootstrap.childHandler(newChannelInitializer<SocketChannel>() { protectedvoidinitChannel(SocketChannelch) throwsException { ChannelPipelinepipeline=ch.pipeline(); //添加handlerpipeline.addLast(newServerHandler()); } }); try { ChannelFuturesync=bootstrap.bind(3000).sync(); sync.channel().closeFuture().sync(); } catch (InterruptedExceptione) { e.printStackTrace(); }finally { bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } }
第二步:编写服务端handler
publicclassServerHandlerextendsSimpleChannelInboundHandler<ByteBuf> { //服务端接收次数privateintnum=0; protectedvoidchannelRead0(ChannelHandlerContextctx, ByteBufmsg) throwsException { System.out.println("接收消息,次数 = "+num++); //接收数据byte[] bytes=newbyte[msg.readableBytes()]; //把数据读到bytes中msg.readBytes(bytes); System.out.println(newString(bytes, CharsetUtil.UTF_8)); } }
这里定义了一个num来记录服务端数据读取次数。
第三步:定义Netty客户端
publicstaticvoidmain(String[] args) { NioEventLoopGroupeventLoopGroup=newNioEventLoopGroup(); Bootstrapbootstrap=newBootstrap(); bootstrap.group(eventLoopGroup); bootstrap.channel(NioSocketChannel.class); bootstrap.handler(newChannelInitializer<SocketChannel>() { protectedvoidinitChannel(SocketChannelch) throwsException { ChannelPipelinepipeline=ch.pipeline(); pipeline.addLast(newClientHandler()); } }); ChannelFuturesync=null; try { sync=bootstrap.connect("127.0.0.1", 3000).sync(); sync.channel().closeFuture().sync(); } catch (InterruptedExceptione) { e.printStackTrace(); }finally { eventLoopGroup.shutdownGracefully(); } }
第四步:定义客户端的Handler
publicclassClientHandlerextendsSimpleChannelInboundHandler<ByteBuf> { protectedvoidchannelRead0(ChannelHandlerContextctx, ByteBufmsg) throwsException { } publicvoidchannelActive(ChannelHandlerContextctx) throwsException { //发送10个数据块for (inti=0; i<10; i++) { ctx.writeAndFlush(Unpooled.copiedBuffer("数据块"+i+";", CharsetUtil.UTF_8)); } } }
这里循环了10次,我发送了10个数据块
第五步:测试,启动服务端和客户端。观察控制台
问题比较明显,客户端发送了10次数据,服务端做了5次接收,第3次4次5次都出现了粘包的情况。
定义编码器解决粘包拆包问题
要解决粘包拆包的问题就要明确数据边界,尴尬的是面向流的通信是没有消息保护边界的。所以我们需要自定义传输协议来确定消息的边界,说的再直白一点就是我们如果能够明确服务端每次读取消息的长度,那就不会出现粘包拆包问题了。
如果要做到该效果,那么就需要自定义消息协议和编码解码器,我们先来处理客户端。
第一步:定义协议 , 指定消息长度和内容
//定义消息协议publicclassMsgProtocol { //内容长度privateintlen; //内容privatebyte[] data; publicMsgProtocol(intlen , byte[] data){ this.len=len; this.data=data; } publicMsgProtocol(){} publicintgetLen() { returnlen; } publicvoidsetLen(intlen) { this.len=len; } publicbyte[] getData() { returndata; } publicvoidsetData(byte[] data) { this.data=data; } }
第二步:客户端的handler发送MsgProtocol对象
publicclassClientHandlerextendsSimpleChannelInboundHandler<MsgProtocol> { publicvoidchannelActive(ChannelHandlerContextctx) throwsException { //发送10个数据块for (inti=0; i<10; i++) { Stringdata="数据块"+i; byte[] bytes=data.getBytes(CharsetUtil.UTF_8); //长度intlen=bytes.length; //构建一个MsgProtocol,并写去ctx.writeAndFlush(newMsgProtocol(len,bytes)); } } }
第三步:继承MessageToByteEncoder,自定义编码器 ,把消息的长度和内容写出去
//定义直接的编码器:MessageToByteEncoder 把Messsage转换成 bytepublicclassMessageEncoderextendsMessageToByteEncoder<MsgProtocol> { protectedvoidencode(ChannelHandlerContextctx, MsgProtocolmsg, ByteBufout) throwsException { //这里需要把内容的长度写给服务端out.writeInt(msg.getLen()); //把内容写给服务端out.writeBytes(msg.getData()); } }
第四步:客户端指定编码器
publicstaticvoidmain(String[] args) { NioEventLoopGroupeventLoopGroup=newNioEventLoopGroup(); Bootstrapbootstrap=newBootstrap(); bootstrap.group(eventLoopGroup); bootstrap.channel(NioSocketChannel.class); bootstrap.handler(newChannelInitializer<SocketChannel>() { protectedvoidinitChannel(SocketChannelch) throwsException { ChannelPipelinepipeline=ch.pipeline(); //加入自定义的编码器pipeline.addLast(newMessageEncoder()); pipeline.addLast(newClientHandler()); } }); ChannelFuturesync=null; try { sync=bootstrap.connect("127.0.0.1", 3000).sync(); sync.channel().closeFuture().sync(); } catch (InterruptedExceptione) { e.printStackTrace(); }finally { eventLoopGroup.shutdownGracefully(); } }
客户端的工作完成了,接下来我们处理服务端
第一步:编写解码器,需要把byte数据封装成MsgProtocol
//定义解码器,拿到数据长度和内容转换成MsgProtocol,交给handler处理publicclassMessageDecoderextendsReplayingDecoder<Void> { protectedvoiddecode(ChannelHandlerContextctx, ByteBufin, List<Object>out) throwsException { //拿到数据的长度intlen=in.readInt(); //拿到数据的内容byte[] bytes=newbyte[len]; in.readBytes(bytes); //把解码后的数据交给下一个handlerout.add(newMsgProtocol(len,bytes)); } }
ReplayingDecoder就是对ByteToMessageDecoder的 扩展和简化
第二步:服务端handler,这里接收的是MsgProtocol消息对象
publicclassServerHandlerextendsSimpleChannelInboundHandler<MsgProtocol> { //服务端接收次数privateintnum=0; protectedvoidchannelRead0(ChannelHandlerContextctx, MsgProtocolmsg) throwsException { System.out.println("接收消息,次数 = "+num++); //接收数据System.out.println(newString(msg.getData(), CharsetUtil.UTF_8)); } publicvoidexceptionCaught(ChannelHandlerContextctx, Throwablecause) throwsException { ctx.channel().close(); } }
第三步:服务端指定解码器
publicstaticvoidmain(String[] args) { NioEventLoopGroupbossGroup=newNioEventLoopGroup(); NioEventLoopGroupworkGroup=newNioEventLoopGroup(); ServerBootstrapbootstrap=newServerBootstrap(); bootstrap.group(bossGroup, workGroup); bootstrap.channel(NioServerSocketChannel.class); bootstrap.childHandler(newChannelInitializer<SocketChannel>() { protectedvoidinitChannel(SocketChannelch) throwsException { ChannelPipelinepipeline=ch.pipeline(); //添加解码器pipeline.addLast(newMessageDecoder()); pipeline.addLast(newServerHandler()); } }); try { ChannelFuturesync=bootstrap.bind(3000).sync(); sync.channel().closeFuture().sync(); } catch (InterruptedExceptione) { e.printStackTrace(); }finally { bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } }
到这里服务端编写完成,接下来依次启动服务端和客户端进行测试,效果如下
可以看到,客户端发送了10次,服务器接收了10次,没有出现粘包拆包的情况了。所以问题的关键就是服务端解码器中需要明确消息的长度,就能够明确每次消息读取的边界,就不会出问题了。