5.Netty数据传输编解码
5.1.什么是编码、解码
高性能RPC框架的三个要素:IO模型、数据协议、线程模型
最开始接触的编码:Java序列化/反序列化、URL编码、base64编解码
java自带序列化的缺点:
无法跨语言
序列化后的码流太大,也就是数据报太大
序列化和反序列化性能比较差
业界里面也有其他编码框架:
ProtoBuf(PB):ProtoBuf是google的一个结构数据序列化方法框架,可简单类比XML,语言无关、平台无关,支持java、c、python等多种语言,高效,比XML更小,扩展性、兼容性好。
Trift:Facebook下的一款编解码框架,thrift可以支持多种程序语言,在多种不同的语言之间通信thrift可以作为二进制的高性能的通讯中间件,支持数据(对象)序列化和多种类型的RPC服务。Thrift适用于程序对程序静态的数据交换,需要先确定好他的数据结构,他是完全静态化的,当数据结构发生变化时,必须重新编辑IDL文件。
Netty里面的编解码
解码器:负责处理“入站 InboundHandler”数据
编码器:负责处理“出站 OutboundHandler”数据
Netty里面提供默认的编解码器,也支持自定义编解码器
Encoder:编码器
Decoder:解码器
Codec:编解码器
5.2.Netty解码器之Decoder
Netty提供了丰富的节码器抽象基类,我们可以很容易的实现这些基类来实现自定义的解码器。
- 解码字节到消息:ByteToMessageDecoder和ReplayingDecoder
- 解码消息到消息:MessageToMessageDecoder
decoder负责将“入站”数据从一种格式转换成另一种格式,Netty的节码是一种
ChannelInboundHandler的抽象实现。实践中使用解码器很简单,就是将入站数据转换格式后传递到ChannelPipeline中的下一个ChannelInboundHandler进行处理,这样的处理是很灵活的,我们可以将解码器放在ChannelPipeline中,重用逻辑。
1、ByteToMessageDecoder
ByteToMessageDecoder是用于将字节转为消息(或其他字节序列)
你不能确定远端是否会一次发送完一个完整的“消息”,因此这个类会缓存入站的数据,直到准备好了用于处理。
方法名称 | 描述 |
decode | 它是用一个ByteBuf调用的,ByteBuf包含传入的字节和一个添加解码消息的列表。重复调用decode(),直到返回时列表为空。然后将列表的内容传递给管道中的下一个处理程序。 |
decodeLast | 提供的默认实现只调用decode()。当通道处于非活动状态时,此方法只调用一次。覆盖以提供特殊的。 |
假如我们接收了一个包含简单整数的字节流,每个都要单独处理,,我们将从入站 ByteBuf 读取每个整数并将其传递给 pipeline 中的下一个ChannelInboundHandler。“解码”字节流成整数我们将扩展ByteToMessageDecoder,实现类为“ToIntegerDecoder”。
每次从入站的ByteBuf读取四个字节,解码成整型,并添加到一个List,当不能在添加数据到List中时,它所包含的内容就会被发送到下一个ChannelInboudnHandler。
(1)继承ByteToMessageDecoder实现decode方法
(2)检查可读的字节是否少于4个(int类型是四个字节长度)
(3)从入站ByteBuf读取int,添加到节码消息的List中
尽管ByteToMessageDecoder简化了这个模式,但是在实际操作中(readInt()之前),必须要验证下ByteBuf要有足够的数据。
2、ReplayingDecoder
ReplayingDecoder是ByteToMessageDecoder的一个实现类,读取缓存中数据之前需要先检查下缓存中数据是否有足够字节,使用ReplayingDecoder就无需自己检查,若ByteBuf中有足够的字节,则会正常读取,若没有足够的字节则会停止解码。
正因为ReplayingDecoder是ByteToMessage的包装类,所以它会带有一定的局限性:
- 不是所有的标准ByteBuf操作都被支持,如果调用一个不支持的操作会抛出
- UnreplayableOperationException
- ReplayingDecoder性能慢于ByteToMessageDecoder
- 如果这些局限性是你可以接受的,那么你可以使用ReplayingDecoder,相反,如果没有引入过多的复杂性,使用ByteToMessageDecoder更优。
(1)继承ReplayingDecoder用于将字节码转换为消息
(2)从入站的ByteBuf中读取整型,并添加到节码消息的List中
3、MessageToMessageDecoder
用于从一种消息解码成另一种消息(例如:POIO到POJO)
将Integer转换为String,我们自定义IntegerToStringDecoder,继承自MessageToMessageDecoder。
也就是说,入站消息是按照在类定义中声明的参数类型(这里是 Integer) 而不是 ByteBuf来解析的。在之前的例子,解码消息(这里是String)将被添加到List,并传递到下个 ChannelInboundHandler。
代码实现:
(1)实现继承自 MessageToMessageDecoder
(2)转换消息为字符串,加到节码队列中
4、解码时太大的帧处理
Netty是异步框架需要缓冲区字节在内存中,直到你能够节码它们。一次,不能让解码器缓存太多的数据以免耗尽可用内存。为了解决这个问题,Netty提供了一个TooLongFrameException,通常由解码器在帧时间过长抛出。
TooLongFrameException 抛出(并由 ChannelHandler.exceptionCaught() 捕获)。然后由译码器的用户决定如何处理它。虽然一些协议,比如 HTTP、允许这种情况下有一个特殊的响应,有些可能没有,事件唯一的选择可能就是关闭连接。ByteToMessageDecoder 可以利用 TooLongFrameException 通知其他 ChannelPipeline 中的 ChannelHandler。
(1)实现继承 ByteToMessageDecoder 来将字节解码为消息
(2)检测缓冲区数据是否大于 MAX_FRAME_SIZE
(3)忽略所有可读的字节,并抛出 TooLongFrameException 来通知 ChannelPipeline 中的 ChannelHandler 这个帧数据超长
5、Netty中常用的几种解码器
LineBasedFrameDecoder
DelimiterBaesdFrameDecoder
FixedLengthFrameDecoder
StringDecoder
(1)LineBasedFrameDecoder
LineBasedFrameDecoder行解码器,遍历ByteBuf中可读字节,按行(\n \r\n)处理。
(2)StringDecoder
StringDecoder将接收的码流转化为字符串
代码中使用
(3)DelimiterBasedFrameDecoder
DelimiterBasedFrameDecoder,将特定分隔符作为码流结束标志的解码器。
- 代码中使用
(4)FixedLengthFrameDecoder
FixedLengthFrameDecoder固定长度节码器,只会读取指定长度的码流。
- 代码中使用
5.3.Netty编码器之Encoder
Encoder是用来把出站数据从一种格式转换成另外一种格式,因此它实现了ChannelOutboundHandler。就像Decoder一样,Netty也为你提供了一组类来写Encoder,当然这些提供的是与Decoder完全相反的方法,如下所示:
- 编码从消息到字节
- 编码从消息到消息
1、MessageToByteEncoder
这个类只有一个方法,而Decoder却有两个,原因就是Decoder经常需要在Channel关闭时产生一个“最后的消息”。出于这个原因,提供了decodeLast(),而Encoder没有这个需求。
方法名称 | 描述 |
encode | encode方法是您需要实现的唯一抽象方法。它是通过出站消息调用的,这个类将把出站消息编码为ByteBuf。然后将ByteBuf转发到ChannelPipeline中的下一个ChannelOutboundHandler。 |
下图实例,我们想生产值,并将他们编码成ByteBuf来发送到线上,我们提供了ShortToByteEncoder来实现该目的。
上图展示了,Encoder收到了Short消息,进行编码,并把它们写入ByteBuf。ByteBuf接着前面进到下一个pipeline的ChannelOutboundHandler。每个 Short 将占用 ByteBuf 的两个字节。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-2ZqO6x78-1667215373979)(images/5.4(2).jpg)]
(1)实现继承自 MessageToByteEncoder
(2)写 Short 到 ByteBuf
Netty 提供很多 MessageToByteEncoder 类来帮助你的实现自己的 encoder 。其中 WebSocket08FrameEncoder 就是个不错的范例。
2、MessageToMessageEncoder
我们已经知道了如何将入站数据从一个消息格式解码成另一个格式。现在我们需要一种方法来将出站数据从一种消息编码成另一种消息。MessageToMessageEncoder 提供此功能,同样的只有一个方法,因为不需要产生“最后的消息”。
下面例子,我们将要解码 Integer 消息到 String 消息。可简单使用 MessageToMessageEncoder。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-7DHocuyE-1667215373980)(images/5.4(3).jpg)]
encoder 从出站字节流提取 Integer,以 String 形式传递给ChannelPipeline 中的下一个 ChannelOutboundHandler 。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-C641NNQq-1667215373980)(images/5.4(4).jpg)]
(1)实现继承自 MessageToMessageEncoder
(2)转 Integer 为 String,并添加到 MessageBuf
5.4.Netty编解码器之Codec
我们在讨论解码器和编码器的时候,都是把它们当成不同的实体的,但是有时候如果在同一个类中同时放入入站和出站的数据和信息转换的话,发现会更加实用。而Netty中的抽象Codec(变解码器)类就能达到这个目的,它们成对的组合解码器和编码器,以此提供对于字节和消息都相同的操作(这些类实现了ChannelInboundHandler和ChannelOutboundHandler)。
1、ByteToMessageCodec
我们需要解码字节到消息,也许是一个POJO,然后转回来,ByteToMessageCodec将为我们处理这个问题,因为他结合了ByteToMessageDecoder和MessageToByteEncoder。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-qHTzcVX4-1667215373980)(images/5.5(1).jpg)]
类的继承图中我们可以看出,ByteToMessageCodec继承自ChannelDuplexHandler,ChannelDuplexHandler继承自ChannelInboundHandlerAdapter,实现于ChannelOutboundHandler接口,前面我们知道ByteToMessageDecoder继承ChannelInboundHandlerAdapter,MessageToByteEncoder继承自ChannelOutboundHandlerAdapter。所以ByteToMessageCodec兼顾编码、解码的功能。
2、MessageToMessageCodec
和ByteToMessageCodec一样。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-gapS6SNb-1667215373981)(images/5.5(2).jpg)]
3、编解码器的优缺点
- 优点:成对出现,编解码都是在一个类里面完成
- 缺点:耦合在一起,扩展性不佳
6.Netty网络传输TCP粘包拆包
6.1.TCP粘包拆包讲解
1、TCP粘包、TCP拆包
TCP粘包就是指发送方发送的若干包数据到达接收方时粘成一个包,从接收缓冲区来看,后一包数据的头紧接着前一包数据的尾,出现粘包的原因是多方面的,可能是来自发送方,也可能来自接收方。
2、出现TCP粘包的原因
(1)发送方原因
TCP默认使用Nagle算法(主要作用:减少网络中报文段的数量),而Nagle算法主要做两件事:
- 只有上一个分组得到确认,才会发送下一个分组
- 收集多个小分组,在一个确认到来时一起发送
Nagle算法造成了发送方可能会出现粘包问题
Nagle算法是指发送方发送的数据不会立即发出,而是先放在缓冲区,等待缓冲区满了在发出,发送完一批数据后,会等待接收方对这批数据的回应,然后在发送下一批数据。Nagle算法适用于发送方需要发送大批量数据,并且接收方会及时做出回应的场合,这种算法通过减少传输数据的次数来提高通信效率。
(2)接收方原因
TCP接收到数据包时,并不会马上交到应用层进行处理,或者说应用层并不会立即处理。实际上,TCP将收到的数据包保存在接收缓存里,然后应用程序主动从缓存读取收到的分组。这样一来,如果TCP接收数据包到缓存的速度大于应用程序从缓存中读取数据包的速度,多个包就会被缓存,应用程序就有可能读取到多个首尾相接粘到一起的包。
3、什么时候需要处理粘包现象
如果发送方发送的多组数据本来就是同一块数据的不同部分,比如说一个文件被分成多个部分发送,这时当然不需要处理粘包现象。
如果多个分组毫不相干,甚至是并列关系,那么这个时候就一定要处理粘包现象了。
4、如何处理粘包现象
(1)发送方
对于发送方造成的粘包问题,可以通过关闭Nagle算法来解决,使用TCP_NODELAY选项来关闭。
(2)接收方
接收方没有办法来处理粘包现象,只能将问题交给应用层来处理。
(3)应用层
应用层的解决办法简单可行,不仅能解决接收方的粘包问题,还可以解决发送方的粘包问题。
解决办法:循环处理,应用程序从接收缓存中读取分组时,读完一条数据,就应该循环读取下一条数据,直至所有数据都别处理完成。
如何判断每条数据的长度呢?
格式化数据:每条数据有固定的格式(开始符,结束符),这种方法简单易行,但是选择开始符和结束符时一定要确保每条数据的内部不包含开始符和结束符。
发送长度:发送每条数据时,将数据的长度一并发送,例如规定数据的前4位是数据的长度,应用层在处理时可以根据长度来判断每个分组的开始和结束位置。
5、UDP不会产生粘包问题
TCP为例保证可靠性传输并减少额外的开销(每次发包都要验证),采用了基于流的传输,基于流的传输不认为消息是一条一条的,是无保护消息边界的(保护消息边界:指传输协议把数据当做一条独立的消息在网上传输,接收端一次只能接受一条独立的消息)。
UDP则是面向消息传输的,是有保护消息边界的,接收方一次只接受一条独立的信息,所以不存在粘包问题。
UDP不存在粘包问题,是由于UDP发送的时候,没有经过Negal算法优化,不会将多个小包合并一次发送出去。另外,在UDP协议的接收端,采用了链式结构来记录每一个到达的UDP包,这样接收端应用程序一次recv只能从socket接收缓冲区中读出一个数据包。也就是说,发送端send了几次,接收端必须recv几次(无论recv时指定了多大的缓冲区)。
举个例子:有三个数据包,大小分别为2k、4k、6k,如果采用UDP发送的话,不管接受方的接收缓存有多大,我们必须要进行至少三次以上的发送才能把数据包发送完,但是使用TCP协议发送的话,我们只需要接受方的接收缓存有12k的大小,就可以一次把这3个数据包全部发送完毕。
6、TCP拆包
TCP拆包就是一个完整的包可能会被TCP拆分为多个包进行发送。
发生拆包的原因:
- 要发送的数据大于TCP发送缓冲区剩余空间大小,将会发生拆包。
- 待发送数据大于MSS(最大报文长度),TCP在传输前将进行拆包。
TCP拆包同样可以通过添加边界信息或者数据报长度信息来解决。
6.2.半包读写常见解决方案
- 发送方:关闭Nagle算法
- 接受方:TCP是无界的数据流,并没有处理粘包现象的机制,且协议本身无法避免粘包,可以在应用层处理。
- 应用层:
- 设置定长消息(24个字符)
- 设置消息的边界($_切割)
- 使用带消息头的协议,消息头存储消息开始标识及消息的长度信息(Header+Body)
6.3.Netty自带解决TCP半包读写方案
- DelimiterBasedFrameDecoder:指定消息分隔符的解码器
- LineBaseFrameDecoder:以换行符为结束标志的解码器
- FixedLengthFrameDecoder:固定长度解码器
- LengthFieldBasedFrameDecoder:message=header+body,基于长度解码的通用解码器
6.4.半包读写问题案例
(1)EchoServer编写
//创建启动引导类 ServerBootstrap serverBootstrap = new ServerBootstrap(); //加入服务端线程组 serverBootstrap.group(bossGroup,workGroup) //设置管道 .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG,1024) //加入处理器 .childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel socketChannel) throws Exception { //加入处理器ServerHandler socketChannel.pipeline().addLast(new ServerHandler()); } }); System.out.println("Echo服务启动中...");
(2)ServerHandler处理器编写
private int counter; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf byteBuf = (ByteBuf) msg; byte[] bytes = new byte[byteBuf.readableBytes()]; byteBuf.readBytes(bytes); String body = new String(bytes,"UTF-8").substring(0,bytes.length - System.getProperty("line.separator").length()); System.out.println("服务端收到消息内容为:"+body+",收到消息次数:"+ ++counter); }
(3)测试
6.4.空格解码器案例
LineBasedFrameDecoder
(1)EchoServer编写
//加入服务端线程组 serverBootstrap.group(bossGroup,workGroup) //设置管道 .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG,1024) //加入处理器 .childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel socketChannel) throws Exception { //1024参数为,当没有截取到换行符时,但是字节已经超过1024个,就会抛异常TooLongFrameException socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024)); //String解码器,InboundHandler接收到的消息能只直接转换成String类型 socketChannel.pipeline().addLast(new StringDecoder()); socketChannel.pipeline().addLast(new ServerHandler()); } }); System.out.println("Echo服务启动中...");
(2)ServerHandler处理器
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String body = (String) msg; System.out.println("服务端收到消息内容为:"+body+",收到消息次数:"+ ++counter); }
(3)客户端都一样,测试
6.5.自定义解码器案例
DelimiterBasedFrameDecoder
(1)EchoClient编写
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { String message = "11111111111111111&_222222222222222222222&_33333333333333333333&_444444444444444444444444&_"; ByteBuf msg = null; msg = Unpooled.buffer(message.getBytes().length); msg.writeBytes(message.getBytes()); ctx.writeAndFlush(msg); }
(2)ServerHandler编写
//加入服务端线程组 serverBootstrap.group(bossGroup,workGroup) //设置管道 .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG,1024) //加入处理器 .childHandler(new ChannelInitializer<SocketChannel>() { //指定分隔符为"&_" ByteBuf delimiter = Unpooled.copiedBuffer("&_".getBytes()); //构建DelimiterBasedFrameDecoder处理器 //1024参数为,当没有截取到换行符时,但是字节已经超过1024个,就会抛异常TooLongFrameException socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter)); socketChannel.pipeline().addLast(new StringDecoder()); socketChannel.pipeline().addLast(new ServerHandler()); } }); System.out.println("Echo服务启动中...");
(3)测试
7.Netty核心功能之Buffer
7.1.Netty中BufferAPI
Buffer API主要包括
- ByteBuf
- ByteBufHolder
Netty根据reference-counting(引用计数)来确定何时可以释放ByteBuf或ByteBufHolder和其他相关资源,从而可以利用池和其他技巧来提高性能和降低内存的消耗。这一点上不需要开发人员做任何事情,但是在开发Netty应用程序时,尤其是使用ByteBuf和ByteBufHolder时,你应该尽早的释放资源。Netty缓冲API提供了几个优势:
- 可以自定义缓冲类型
- 扩展性好,比如StringBuilder
- 通过一个内置的复合缓冲类型实现零拷贝
- 不需要调用flip()来切换读写/模式
- 读取和写入索引分开
- 方法链
- 引用计数器
- Pooling(池)
7.2.Netty字节数据容器ByteBuf
既然所有的网络通信都是基于底层的字节流来传输,那么传输所使用的数据接口就要求是效率高的、使用方便的,Netty的ByteBuf更好的能达到这些需求。
ByteBuf是一个已经经过优化的很好的使用的数据容器,字节数据可以有效的被添加到ByteBuf中或者也可以从ByteBuf中之直接获取数据。ByteBuf中有两个索引,一个用来读,一个用来写。这两个索引达到了便于操作的目的。我们可以按照顺序的读取数据,也可以通过调整读取数据的索引或者直接将读取位置索引作为参数传递给get方法。而JDK中的ByteBuffer共用读写索引,每次读写操作都需要Flip()(复位操作)扩容麻烦,而且孔融后容易造成浪费。
1、ByteBuf的工作原理
写入数据到ByteBuf后,**writerIndex(写入索引)**增加写入的字节数。读取字节后,**readerIndex(读取索引)**也增加读取出的字节数。你可以读取字节,直到写入索引和读取索引位置相同时,此时ByteBuf不可读,所以下一次操作将会抛出IndexOutOfBoundsException,就像读取数组时越位一样。
调用ByteBuf的以“read”或“write”开头的任何方法都将自动增加相应的索引。另一方面“set”、“get”操作字符将不会移动索引位置,他们只会在指定的相对位置上操作字节。
可以给ByteBuf指定一个最大容量值,这个值限制这ByteBuf的容量,任何尝试将写入超过这个值的数据的行为都将导致抛出异常。ByteBuf的默认最大容量限制是Integer.MAX_VALUE。
ByteBuf类似于一个字节数组,最大的区别是读和写的索引可以来控制对缓冲区数据的访问。
2、ByteBuf使用模式
HEAP BUFFER(堆缓冲区)
最常用的模式是ByteBuf将数据存储在JVM的堆空间,这是通过将数据存储在数组里实现。堆缓冲区可以快速分配,当不使用时也可以快速释放。他还提供了直接访问数组的方法,通过ByteBuf.array()来获取byte[]数据。这种方法时最适合用来处理遗留数据的。如下:
(1)检查 ByteBuf 是否有支持数组。
(2)如果有的话,得到引用数组。
(3)计算第一字节的偏移量。
(4)获取可读的字节数。
(5)使用数组,偏移量和长度作为调用方法的参数。
创建堆缓冲区的方式:
注意:
- 访问非缓冲区ByteBuf的数组会导致UnsupportedOperationException,可以使用ByteBuf.hasArray()来检查是否支持访问数组
- 这个方法与JDK的ByteBuffer类似
DIRECT BUFFER(直接缓冲区)
“直接缓冲区”是另一种ByteBuf模式。在JDK1.4引入NIO的ByteBuffer类允许JVM通过本地方法调用分配内存,其目的是
- 通过免去中间交换的内存拷贝,提升IO处理速度,直接缓冲区的内容可以驻留在垃圾回收扫描的堆区以外。
- DireBuffer在 -XX:MaxDirectMemorySize=xxM大小限制,使用heap以外的内存。
这也就解释了为什么“直接缓冲区”,对于那些通过socket实现数据传输的应用来说,是一种非常理想的方式。如果你的数据是存放在堆中分配的缓冲区,那么实际上,在通过socket发送数据之前,JVM需要将数据复制到缓冲区。
但是直接缓冲区的缺点在内存空间的分配和释放上比堆缓冲区更复杂,另外一个缺点就是如果将数据传递给遗留代码处理,因为数据不是堆上,你可能不得不做出一个副本:
(1)检查 ByteBuf 是不是由数组支持。如果不是,这是一个直接缓冲区。
(2)获取可读的字节数
(3)分配一个新的数组来保存字节
(4)字节复制到数组
(5)将数组,偏移量和长度作为参数调用某些处理方法
创建直接缓冲区的方式:
显然,这比使用数组要多做一些工作。因此,如果你事前就知道容器里的数据将作为一个数组被访问,你可能更愿意使用堆内存。
COMPOSITE BUFFER(复合缓冲区)
最后一种模式是复合缓冲区,我们可以创建多个不同的 ByteBuf,然后提供一个这些 ByteBuf 组合的视图。复合缓冲区就像一个列表,我们可以动态的添加和删除其中的 ByteBuf,JDK 的 ByteBuffer 没有这样的功能。
Netty 提供了 ByteBuf 的子类 CompositeByteBuf 类来处理复合缓冲区,CompositeByteBuf 只是一个视图。
CompositeByteBuf.hasArray() 总是返回 false,因为它可能既包含堆缓冲区,也包含直接缓冲区。
一条消息由 header 和 body 两部分组成,将 header 和 body 组装成一条消息发送出去,可能 body 相同,只是 header 不同,使用CompositeByteBuf 就不用每次都重新分配一个新的缓冲区。下图显示CompositeByteBuf 组成 header 和 body:
来看一下用JDK的ByteBuffer的一个实现,两个ByteBuf的数组创建保存消息的组件,第三个用于保存所有的数据副本。
这种做法显然是低效的;分配和复制操作不是最优的方法,操纵数组使代码显得很笨拙。
下面看使用 CompositeByteBuf 的改进版本,你可以把CompositeByteBuf当做一个可迭代遍历的容器。
(1)追加 ByteBuf 实例的 CompositeByteBuf
(2)删除 索引1的 ByteBuf
(3)遍历所有 ByteBuf 实例。
CompositeByteBuf不允许访问其内部可能存在的支持数组,也不允许直接访问数据,这一点类似于直接缓冲区模式,如下代码:
(1)得到可读的字节数
(2)分配一个新的数组数组长度为可读字节长度
(3)读取字节到数组
(4)使用数组,把偏移量和长度作为参数
7.3.Netty字节级别的操作
除了基本的读写操作,ByteBuf还提供了它所包含的数据的修改方法。
随机访问索引
ByteBuf使用zero-based的indexing(从0开始的索引),第一个字节的索引是0,最后一个字节的索引是ByteBuf的capacity-1,下面代码是遍历ByteBuf的所有字节:
注意通过索引访问时不会推进readerIndex(读索引)和writeIndex(写索引),我们可以通过ByteBuf的readerIndex(index)或者writerIndex(index)来分别推进读索引和写索引。
7.4.Netty之ByteBufHolder的使用
我们时不时的会遇到这样的情况:即需要另外存储除有效的实际数据各种属性值。HTTP响应就是一个很好的例子。与内容一起的字节的还有状态码,cookies等。
Netty提供的ByteBufHolder可以对这种常见情况进行处理。ByteBufHolder还提供了堆于Netty的高级功能,如缓冲池,其中保存实际数据的ByteBuf可以从池中借用,如果需要还可以自动释放。
ByteBufHolder有那么几个方法。到底层的致谢支持接入数据和引用计数。
名称 | 描述 |
data() | 返回ByteBuf保存的数据 |
copy() | 制作一个ByteBufHolder的拷贝但不共享其数据(所以数据也是拷贝的) |
7.5.Netty之ByteBuf分配
ByteBuf实例管理的方式
1、ByteBufAllocator
为了减少分配和释放内存的开销,Netty通过支持池类ByteBufAllocator,可用于分配的任何ByteBuf。是否使用池是由应用程序决定的。
以下提供几个常用的API:
名称 | 描述 |
ByteBuf directBuffer() | 组合分配,把多个ByteBuf组合到一起变成一个整体 |
ByteBuf buffer() | 尽可能的分配一块堆外直接内存,如果系统不支持则分配堆内内存 |
ByteBuf ioBuffer() | 分配一块堆内内存 |
ByteBuf heapBuffer() | 分配一块堆外内存 |
通过一些方法接收整型参数允许用户指定ByteBuf的初始和最大容量值。ByteBuf存储可以扩大到其最大容量。得到一个ByteBufAlloctor的引用很简单。你可以从Channel,或者通过绑定到的ChannelHandler的ChannelHandlerContext得到它,用它实现了你的数据处理。
下面列表说明获得ByteBufAllocator的两种方式。
(1)从 channel 获得 ByteBufAllocator
(2)从 ChannelHandlerContext 获得 ByteBufAllocator
Netty提供了两种ByteBufAllocator的实现,一种是PooledByteBufAllocator池化的,一种是UnpooledByteBufAllocator非池化的。
2、Unpooled(非池化)缓存
当未引用ByteBufAllocator时,上面的方法无法访问到ByteBuf。对于这个用例Netty提供了一个实用工具类称为Unpooled,它提供了静态辅助方法来创建非池化的ByteBuf实例。
在ByteBuf下有三个重要的属性,writeIndex、readIndex、capacity
- writeIndex:就是当前操作写的下标
- readIndex:就是当前操纵读的下标
- capacity:当前ByteBuf的容量
ByteBuf实际就是将数据存储在了一个array[]数组中,里面来存储实际的数据,readIndex,writeIndex就是在操作这个数据,capacity就是这个数组的长度。
ByteBuf相比于NIO的ByteBuffer,它不用flip进行翻转,在NIO中操作ByteBuffer每次之后都需要读写进行翻转之后进行相反的操作,比如现在在操作读的操作,只有当flip之后,才能进行写的操作,可在Netty中并不使用flip,即可进行直接的读写切换,非常的方便。
根据ByteBuf维护的三个变量,readIndex,writeIndex,capacity,则可将arr[]数组分为三个区
- 0–readIndex —>已经读取过数据的区域
- readIndex–writeIndex —>未读取过数据的区域
- writeIndex–capacity —>可写数据的区域
- 在非联网项目,该Unoopled类也使得它更容易使用的ByteBuf API,获得一个高性能的可缓冲的API,而不需要Netty的其他部分。
3、ByteBufUtil
ByteBufUtil静态辅助方法来操作ByteBuf,因为这个API是通用的,与使用池无关,这些方法已经在外面的分配类实现。
也许最有价值的是hexDump()方法,
这个方法返回值指定ByteBuf中可读字节的十六进制字符串,可以用于调试程序时打印ByteBuf的内容。一个典型的用途是记录一个ByteBuf的内容进行调试。十六进制字符串相比字节而言对用户更加友好。而且十六进制版本可以很容易的转换回实际字节表示。
另一个有用的方法是使用boolean equals(ByteBuf,ByteBuf),用来比较ByteBuf实例是否相等。在实现自己ByteBuf的子类时经常用到。
4、池化/非池化ByteBuf的类关系图
7.6.Netty引用计数器
在Netty中ByteBuf和ByteBufHolder(两者都实现了ReferenceCounted接口)引入了引用计数器。引用计数器本身并不复杂,他能够在特定的对象上跟踪引用的数目,实现了ReferenceCoundted的类的实例通常开始于一个活动的引用计数器为1。而如果对象活动的引用计数器大于0,就会被保证不被释放。当数量引用减少到0,将释放该实例。需要注意的是“释放”的语义是特定于具体的实现。最起码,一个对象,它已被释放应不再可用。
这种技术就是诸如 PooledByteBufAllocator 这种减少内存分配开销的池化的精髓部分。
(1)从 channel 获取 ByteBufAllocator
(2)从 ByteBufAllocator 分配一个 ByteBuf
(3)检查引用计数器是否是 1
(1)release()将会递减对象引用的数目。当这个引用计数达到0时,对象已被释放,并且该方法返回 true。
如果尝试访问已经释放的对象,将会抛出 IllegalReferenceCountException 异常。
需要注意的是一个特定的类可以定义自己独特的方式其释放计数的“规则”。 例如,release() 可以将引用计数器直接计为 0 而不管当前引用的对象数目。
7.7.Netty中用到的设计模式
- 建造者模式:ServerBootstap
- 责任链模式:pipline的事件传播
- 工厂模式:创建channel信道
- 适配器模式:HandlerAdapter
8.Netty搭建单机百万连接
8.1.Netty单机百万连接方案
实现单机的百万连接,瓶颈有以下几点:
(1)如何模拟百万连接
(2)突破局部文件句柄的限制
(3)突破全局文件句柄的限制
在Linux系统中,单个进程打开的句柄数是非常有限的,一条TCP连接就对应一个文件句柄,而对于我们应用程序来说,一个服务端默认建立的连接数是有限制的。
如下图所示,通常一个客户端去除一些被占用的端口之后,可用的端口大于只有6w个左右,要想模拟百万连接要比较多的客户端,而且比较麻烦,所以比较麻烦,所以这种方案不适合。
在服务端启动800~8100,而客户端依旧使用1025-65535范围内可用的端口号,让同一个端口号,可以连接Server的不同端口。这样的话,6W的端口可以连接Server的100个端口,累加起来就能实现近600W左右的连接,TCP是以一个四元组概念,以原IP、原端口号、目的IP、目的端口号来确定的,当原IP 和原端口号相同,但目的端口号不同,最终系统会把他当成两条TCP 连接来处理,所以TCP连接可以如此设计。
8.2.Netty搭建百万连接案例
1、NettyServer服务端代码
public class NettyServer { public void run(int beginPort, int endPort) { System.out.println("服务端启动中。。"); //配置服务端线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workGroup = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) //.childOption(ChannelOption.SO_BACKLOG, 1024) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_REUSEADDR, true); //快速复用端口 serverBootstrap.childHandler(new TCPCountHandler()); for (; beginPort < endPort; beginPort++) { int port = beginPort; serverBootstrap.bind(port).addListener((ChannelFutureListener) future -> { System.out.println("服务端成功绑定端口 port = " + port); }); } } /** * 启动入口 * * @param args */ public static void main(String[] args) { new NettyServer().run(NettyConfig.BEGIN_PORT, NettyConfig.END_PORT); } }
2、TCPCountHandler代码编写
@ChannelHandler.Sharable public class TCPCountHandler extends ChannelInboundHandlerAdapter { //使用原子类,避免线程安全问题 private AtomicInteger atomicInteger = new AtomicInteger(); public TCPCountHandler(){ Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(()->{ System.out.println("当前连接数为 = "+atomicInteger.get()); },0,3, TimeUnit.SECONDS); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { atomicInteger.incrementAndGet(); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { atomicInteger.decrementAndGet(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("TCPCountHandler exceptionCaught"); cause.printStackTrace(); ctx.close(); } }
3、NettyConfig配置类
public class NettyConfig { public static int BEGIN_PORT = 8000; public static int END_PORT = 8050; public static String SERVER_ADDR = "127.0.0.1"; }
4、NettyClient客户端代码
public class NettyClient { public void run(int beginPort,int endPort){ System.out.println("客户端启动中。。"); EventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.SO_REUSEADDR,true) //快速复用端口 .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { } }); int index = 0; while(true){ int finalPort = beginPort + index; try { bootstrap.connect(NettyConfig.SERVER_ADDR,finalPort).addListener((ChannelFutureListener) future ->{ if (!future.isSuccess()){ System.out.println("创建连接失败 port = "+finalPort); } }).get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } ++index; if(index == (endPort - beginPort)){ index = 0; } } } /** * 启动入口 * @param args */ public static void main(String[] args) { new NettyClient().run(NettyConfig.BEGIN_PORT,NettyConfig.END_PORT); } }
5、maven打包依赖加入pom.xml中
分两次打包,先打包server,在打包client,打包哪个主类的时候,把另一个先注掉。注意这块,打包之前先把NettyConfig中的地址改掉,改成Netty-server的地址。
<build> <plugins> <!--maven的默认编译使用的jdk版本貌似很低,使用maven-compiler-plugin插件可以指定项目源码的jdk版本--> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.7.0</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>utf-8</encoding> </configuration> </plugin> <!--将依赖的jar包打包到当前jar包,常规打包是不会将所依赖jar包打进来的--> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>1.2.1</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <!-- 打包server主类 --> <mainClass>com.lixiang.NettyServer</mainClass> <!-- 打包client主类 --> <!--<mainClass>com.lixiang.NettyClient</mainClass>--> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build>
8.3.Netty百万连接测试
1、环境准备
两台机器:192.168.159.60(netty-server)、192.168.159.61(netty-client)
虚拟机:centos7系统 4核8G(注意:这块系统参数至少要4核6G)
192.168.159.60(netty-server)放置NettyServer主类jar包
192.168.159.61(netty-client)放置NettyClient主类jar包
先启动server端的jar包,在启动client端的jar包,启动命令:java -jar million-server-1.0-SNAPSHOT.jar
我们可以看到当前的连接数一直在4000上不去。出现异常 Caused by: java.io.IOException: Too many open files。
too many open files:顾名思义即打开过多文件数。不过这里的files不单是文件的意思,也包括打开的通讯链接(比如socket),正在监听的端口等等,所以有时候也可以叫做句柄(handle),这个错误通常也可以叫做句柄数超出系统限制。Linux是有文件句柄限制的,而且默认不是很高,一般都是1024。查看当前用户句柄数限制:
ulimit -n
我们可以看到当前的文件句柄数是1024,我们的机器是4核的所以大概的连接数在4000左右,那么如何提高文件句柄数呢?
2、修改文件句柄数,让netty-server支持百万连接
(1)root身份下编解/etc/security/limits.conf
vi /etc/security/limits.conf 增加如下: root soft nofile 1000000 root hard nofile 1000000 * soft nofile 1000000 * hard nofile 1000000
(2)修改全局文件句柄限制(所有进程最大打开的文件数,不同系统是不一样,可以直接echo临时修改)
查看命令:cat /proc/sys/fs/file-max
(3)永久修改全局文件句柄, 修改后生效 sysctl -p
vi /etc/sysctl.conf 增加 fs.file-max = 1000000 使其生效:sysctl -p
(4)修改完成后重启机器,client端配置也是一样的
reboot
(5)启动运行jar包
java -jar million-server-1.0-SNAPSHOT.jar -Xms5g -Xmx5g -XX:NewSize=3g -XX:MaxNewSize=3g