读取
例如读了 4 次,每次一个字节
1. ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10); 2. buffer.writeBytes(new byte[]{1, 2, 3, 4}); 3. buffer.writeInt(5); 4. log(buffer); 5. buffer.writeInt(6); 6. log(buffer); 7. System.out.println(buffer.readByte()); 8. System.out.println(buffer.readByte()); 9. System.out.println(buffer.readByte()); 10. System.out.println(buffer.readByte()); 11. log(buffer);
读过的内容,就属于废弃部分了,再读只能读那些尚未读取的部分
如果需要重复读取 int 整数 5,怎么办?
可以在 read 前先做个标记 mark
1. buffer.markReaderIndex(); 2. System.out.println(buffer.readInt()); 3. log(buffer);
这时要重复读取的话,重置到标记位置 reset
1. buffer.resetReaderIndex(); 2. log(buffer); 3. System.out.println(buffer.readInt()); 4. log(buffer);
还有种办法是采用 get 开头的一系列方法,这些方法不会改变 read index
retain&release
由于堆外内存并不直接控制于JVM,因此只能等到full GC的时候才能垃圾回收
由于 Netty 中有堆外内存的 ByteBuf 实现,堆外内存最好是手动来释放,而不是等 GC 垃圾回收。
- UnpooledHeapByteBuf 使用的是 JVM 内存,只需等 GC 回收内存即可
- UnpooledDirectByteBuf 使用的就是直接内存了,需要特殊的方法来回收内存
- PooledByteBuf 和它的子类使用了池化机制,需要更复杂的规则来回收内存
回收内存的源码实现,请关注下面方法的不同实现
protected abstract void deallocate()
Netty 这里采用了引用计数法来控制回收内存,每个 ByteBuf 都实现了 ReferenceCounted 接口
- 每个 ByteBuf 对象的初始计数为 1
- 调用 release 方法计数减 1,如果计数为 0,ByteBuf 内存被回收
- 调用 retain 方法计数加 1,表示调用者没用完之前,其它 handler 即使调用了 release 也不会造成回收
- 当计数为 0 时,底层内存会被回收,这时即使 ByteBuf 对象还在,其各个方法均无法正常使用
谁来负责 release 呢?
不是我们想象的(一般情况下)
1. ByteBuf buf = ... 2. try { 3. ... 4. } finally { 5. buf.release(); 6. }
请思考,因为 pipeline 的存在,一般需要将 ByteBuf 传递给下一个 ChannelHandler,如果在 finally 中 release 了,就失去了传递性(当然,如果在这个 ChannelHandler 内这个 ByteBuf 已完成了它的使命,那么便无须再传递)
基本规则是,谁是最后使用者,谁负责 release,详细分析如下
- 起点,对于 NIO 实现来讲,在 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read 方法中首次创建 ByteBuf 放入 pipeline(line 163 pipeline.fireChannelRead(byteBuf))
- 入站 ByteBuf 处理原则
- 对原始 ByteBuf 不做处理,调用 ctx.fireChannelRead(msg) 向后传递,这时无须 release
- 将原始 ByteBuf 转换为其它类型的 Java 对象,这时 ByteBuf 就没用了,必须 release
- 如果不调用 ctx.fireChannelRead(msg) 向后传递,那么也必须 release
- 注意各种异常,如果 ByteBuf 没有成功传递到下一个 ChannelHandler,必须 release
- 假设消息一直向后传,那么 TailContext 会负责释放未处理消息(原始的 ByteBuf)
- 出站 ByteBuf 处理原则
- 出站消息最终都会转为 ByteBuf 输出,一直向前传,由 HeadContext flush 后 release
- 异常处理原则
- 有时候不清楚 ByteBuf 被引用了多少次,但又必须彻底释放,可以循环调用 release 直到返回 true
TailContext 释放未处理消息逻辑
1. // io.netty.channel.DefaultChannelPipeline#onUnhandledInboundMessage(java.lang.Object) 2. protected void onUnhandledInboundMessage(Object msg) { 3. try { 4. logger.debug( 5. "Discarded inbound message {} that reached at the tail of the pipeline. " + 6. "Please check your pipeline configuration.", msg); 7. } finally { 8. ReferenceCountUtil.release(msg); 9. } 10. }
具体代码
1. // io.netty.util.ReferenceCountUtil#release(java.lang.Object) 2. public static boolean release(Object msg) { 3. if (msg instanceof ReferenceCounted) { 4. return ((ReferenceCounted) msg).release(); 5. } 6. return false; 7. }
slice
【零拷贝】的体现之一,对原始 ByteBuf 进行切片成多个 ByteBuf,切片后的 ByteBuf 并没有发生内存复制,还是使用原始 ByteBuf 的内存,切片后的 ByteBuf 维护独立的 read,write 指针
原始 ByteBuf 进行一些初始操作
1. ByteBuf origin = ByteBufAllocator.DEFAULT.buffer(10); 2. origin.writeBytes(new byte[]{1, 2, 3, 4}); 3. origin.readByte(); 4. System.out.println(ByteBufUtil.prettyHexDump(origin));
+-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 02 03 04 |... | +--------+-------------------------------------------------+----------------+