《跟闪电侠学Netty》阅读笔记 - 数据载体ByteBuf

简介: 《跟闪电侠学Netty》阅读笔记 - 数据载体ByteBuf

《跟闪电侠学Netty》阅读笔记 - 数据载体 ByteBuf

引言

API设计更建议实战过程中逐渐了解熟悉掌握,本文记录基础设计和相关API,只需要大致了解ByteBuf设计思想即可。

思维导图

www.mubucm.com/doc/58ehM7v…

image.png

基础结构

整个ByteBuf的数据结构组成如下,整个设计思想有点类似计算机如何实现从北京到上海,那就是一段足够长的铁轨,不断“拆掉”后面的铁轨放到前面的铁轨上,这样实现火车一直在铁轨上跑的错觉。

  • 容量上限:maxCapacity
  • 容量:capacity
  • 数组:
  • 废弃字节 :被丢弃的字节数据无效
  • 可读字节(writerIndex -readerIndex)
  • 可写字节(capacity - writerIndex)
  • 读指针 readerIndex :每读取(read)一个字节,readerIndex 自增 1
  • 写指针 writerIndex :每写入(write)一个字节,writeIndex 自增 1
  • 剩余可用空间

image.png

结构解析

  1. 字节容器:分为三部分
  • 废弃空间 :被丢弃的字节,数据无效
  • 可读空间 :从ByteBuf读取出来的数据都属于这部分
  • 可写空间 :未来所有的写入都会写入到此处
  1. 划分依据:两个指针加一个变量
  • 读指针
  • 写指针
  • 总容量
  1. 最大容量和和容量限制
  • 容量可以在写满的时候扩容
  • 如果扩容到最大容量就报错

容量API

实践容量API之前,我们先构建ByteBuf。

ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(9, 100);  
print("allocate ByteBuf(9,100) => {} \n", buffer);
// allocate ByteBuf(9,100) => PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 9/100)

capacity()

表示 ByteBuf 底层占用了多少字节的内存(包括丢弃的字节、可读字节、可写字节),不同的底层实现机制有不同的计算方式,后面我们讲 ByteBuf 的分类的时候会讲到。

从案例看出,默认创建的方式容量为初始化指定的容量。

print("allocate ByteBuf(9,100) capacity => {} \n", buffer.capacity());
// allocate ByteBuf(9,100) capacity => 9

maxCapacity()

表示 ByteBuf 底层最大能够占用多少字节的内存,当向 ByteBuf 中写数据的时候,如果发现容量不足,则进行扩容,直到扩容到 maxCapacity,超过这个数,就抛异常。

从案例可以得知,如果扩容到 100 就会报错

print("allocate ByteBuf(9,100) maxCapacity => {} \n", buffer.maxCapacity());  
// allocate ByteBuf(9,100) maxCapacity => 100

readableBytes() 与 isReadable()

readableBytes() 表示 ByteBuf 当前可读的字节数,它的值等于 writerIndex-readerIndex,如果两者相等,则不可读,isReadable() 方法返回 false。

// readableBytes() 与 isReadable()
print("allocate ByteBuf(9,100) isReadable => {} \n", buffer.isReadable());  
print("allocate ByteBuf(9,100) readableBytes => {} \n", buffer.readableBytes());
//        allocate ByteBuf(9,100) isReadable => false  
//        allocate ByteBuf(9,100) readableBytes => 0
// write 方法改变写指针  
buffer.writeBytes(new byte[]{1, 2, 3, 4});
// 改变写指针 writeBytes(new byte[]{1,2,3,4}) => PooledUnsafeDirectByteBuf(ridx: 0, widx: 4, cap: 9/100) 
// 写入数据之后,重新执行readableBytes() 与 isReadable()
print("allocate ByteBuf(9,100) isReadable => {} \n", buffer.isReadable());
print("allocate ByteBuf(9,100) readableBytes => {} \n", buffer.readableBytes());
//allocate ByteBuf(9,100) isReadable => true 
//allocate ByteBuf(9,100) readableBytes => 4

writableBytes()、 isWritable() 与 maxWritableBytes()

writableBytes() 表示 ByteBuf 当前可写的字节数,它的值等于 capacity-writerIndex,如果两者相等,则表示不可写,isWritable() 返回 false。

注意这个时候,并不代表不能往 ByteBuf 中写数据了, 如果发现往 ByteBuf 中写数据写不进去的话,Netty 会自动扩容 ByteBuf,直到扩容到底层的内存大小为 maxCapacity

maxWritableBytes() 就表示可写的最大字节数,它的值等于 maxCapacity-writerIndex

在初始化构建过程中,由于没有读写任何数据,可以看到他们的值基本和前面计算的容量是一致的。

ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(9, 100); 
// writableBytes()、 isWritable() 与 maxWritableBytes()
print("allocate ByteBuf(9,100) writableBytes => {} \n", buffer.writableBytes());
print("allocate ByteBuf(9,100) isWritable => {} \n", buffer.isWritable());
print("allocate ByteBuf(9,100) maxWritableBytes => {} \n", buffer.maxWritableBytes());
//        allocate ByteBuf(9,100) writableBytes => 9
//        allocate ByteBuf(9,100) isWritable => true
//        allocate ByteBuf(9,100) maxWritableBytes => 100

读写指针相关的 API

实践读写指针相关的 API之前,我们先构建初始化ByteBuf。

ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(9, 100);  
print("allocate ByteBuf(9,100) => {} \n", buffer);

image.png

readerIndex() 与 readerIndex(int)

  • readerIndex():返回当前的读指针 readerIndex
  • readerIndex(int):表示设置读指针

在没有写入任何数据的时候,读指针为0。

// readerIndex() 返回当前读指针  
print("allocate ByteBuf(9,100) readerIndex => {} \n", buffer.readerIndex());  
// allocate ByteBuf(9,100) readerIndex => 0

下面的案例说明读指针不能越过写指针的界限。

// 尝试手动重定向渎指针位置  
// print("allocate ByteBuf(9,100) readerIndex(int) => {} \n", buffer.readerIndex(2));  
// readerIndex: 2, writerIndex: 0 (expected: 0 <= readerIndex <= writerIndex <= capacity(9))

我们写入一些数据之后,再进行读指针重定向。

buffer.writeBytes(new byte[]{1, 2, 3, 4});
// 重定向读指针  
print("allocate ByteBuf(9,100) readerIndex(int) => {} \n", buffer.readerIndex(2));  
print("重定向读指针 之后 (new byte[]{1,2,3,4}) => {} \n", buffer);  
// allocate ByteBuf(9,100) readerIndex(int) => PooledUnsafeDirectByteBuf(ridx: 2, widx: 4, cap: 9/100) 
// 重定向读指针 之后 (new byte[]{1,2,3,4}) => PooledUnsafeDirectByteBuf(ridx: 2, widx: 4, cap: 9/100)

writerIndex() 与 writerIndex(int)

  • writeIndex() 表示返回当前的写指针 writerIndex。
  • writeIndex(int) 表示设置写指针。

案例以初始化写入四个字节之后作为开始。

// writeIndex() 与 writeIndex(int)
print("allocate ByteBuf(9,100) writerIndex => {} \n", buffer.writerIndex());
print("allocate ByteBuf(9,100) writerIndex(int) => {} \n", buffer.writerIndex(2));
//        allocate ByteBuf(9,100) writerIndex => 4
//        allocate ByteBuf(9,100) writerIndex(int) => PooledUnsafeDirectByteBuf(ridx: 0, widx: 2, cap: 9/100)

markReaderIndex() 与 resetReaderIndex()

区别:

  • markReaderIndex() :表示把当前的读指针保存起来,
  • resetReaderIndex() :表示把当前的读指针恢复到之前保存的值。

下面两段代码是等价的。

// 代码片段1
int readerIndex = buffer.readerIndex();
// … 其他操作
buffer.readerIndex(readerIndex);
// 代码片段二
// (不需要自己定义变量,推荐使用)
buffer.markReaderIndex();
// … 其他操作
// resetReaderIndex() 可以恢复到之前状态
// (解析自定义协议的数据包常用)
buffer.resetReaderIndex();

希望大家多多使用代码片段二这种方式,不需要自己定义变量,无论 buffer 当作参数传递到哪里,调用 resetReaderIndex() 都可以恢复到之前的状态,在解析自定义协议的数据包的时候非常常见,推荐大家使用这一对 API markWriterIndex() 与 resetWriterIndex() 这一对 API 的作用与上述一对 API 类似

读写API

实践读写API之前,我们先构建ByteBuf。

ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(9, 100);  
print("allocate ByteBuf(9,100) => {} \n", buffer);

上面的代码

writeBytes(byte[] src) 与 buffer.readBytes(byte[] dst)

writeBytes() 表示把字节数组 src 里面的数据全部写到 ByteBuf。注意此方法执行之后,会移动前面介绍的 writeIndex 写指针。

// write 方法改变写指针  
buffer.writeBytes(new byte[]{1, 2, 3, 4});  
print("改变写指针 writeBytes(new byte[]{1,2,3,4}) => {} \n", buffer);  
// 改变写指针 writeBytes(new byte[]{1,2,3,4}) => PooledUnsafeDirectByteBuf(ridx: 0, widx: 4, cap: 9/100)

readBytes() 指的是把 ByteBuf 里面的数据全部读取到 dst。

//只有read方法改变指针  
byte[] bytes = new byte[buffer.readableBytes()];  
buffer.readBytes(bytes);  
print("bytes 内容 => {}", bytes);  
// bytes 内容 =>    

这里 dst 字节数组的大小通常等于 readableBytes(),而 src 字节数组大小的长度通常小于等于writableBytes()。

writeByte(byte b) 与 buffer.readByte()

writeByte() 表示往 ByteBuf 中写一个字节,而 buffer.readByte() 表示从 ByteBuf 中读取一个字节,类似的 API 还有 writeBoolean()、writeChar()、writeShort()、writeInt()、writeLong()、writeFloat()、writeDouble() 与 readBoolean()、readChar()、readShort()、readInt()、readLong()、readFloat()、readDouble() 这里就不一一赘述。

// write 方法改变写指针  
buffer.writeBytes(new byte[]{1, 2, 3, 4});  
print("改变写指针 writeBytes(new byte[]{1,2,3,4}) => {} \n", buffer);  
// 改变写指针 writeBytes(new byte[]{1,2,3,4}) => PooledUnsafeDirectByteBuf(ridx: 0, widx: 4, cap: 9/100)buffer.writeByte(5);  
print("改变写指针 buffer.writeByte(5) => {} \n", buffer);  
// 改变写指针 buffer.writeByte(5) => PooledUnsafeDirectByteBuf(ridx: 0, widx: 5, cap: 9/100)

getBytes、getByte() 与 setBytes()、setByte() 系列,唯一的区别就是 get/set 不会改变读写指针,而 read/write 会改变读写指针。

//只有read方法改变指针  
byte[] bytes = new byte[buffer.readableBytes()];  
buffer.readBytes(bytes);
print("buffer.readBytes(bytes) => {}\n", buffer);  
// buffer.readBytes(bytes) => PooledUnsafeDirectByteBuf(ridx: 10, widx: 10, cap: 16/100)

release() 与 retain()

由于 Netty 使用了堆外内存,而堆外内存是不被 jvm 直接管理的,也就是说申请到的内存无法被垃圾回收器直接回收,所以需要我们手动回收。

Netty 的 ByteBuf 是通过引用计数的方式管理的,如果一个 ByteBuf 没有地方被引用到,就需要回收底层内存。

默认情况下,当创建完一个 ByteBuf,它的引用为1,然后每次调用 retain() 方法, 它的引用就加一, release() 方法原理是将引用计数减一,减完之后如果发现引用计数为0,则直接回收 ByteBuf 底层的内存。

Test

最后是简单测试程序整合前面的API案例。

public class ByteBufTest {
    public static void main(String[] args) {
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(9, 100);
        print("allocate ByteBuf(9, 100)", buffer);
        // write 方法改变写指针,写完之后写指针未到 capacity 的时候,buffer 仍然可写
        buffer.writeBytes(new byte[]{1, 2, 3, 4});
        print("writeBytes(1,2,3,4)", buffer);
        // write 方法改变写指针,写完之后写指针未到 capacity 的时候,buffer 仍然可写, 写完 int 类型之后,写指针增加4
        buffer.writeInt(12);
        print("writeInt(12)", buffer);
        // write 方法改变写指针, 写完之后写指针等于 capacity 的时候,buffer 不可写
        buffer.writeBytes(new byte[]{5});
        print("writeBytes(5)", buffer);
        // write 方法改变写指针,写的时候发现 buffer 不可写则开始扩容,扩容之后 capacity 随即改变
        buffer.writeBytes(new byte[]{6});
        print("writeBytes(6)", buffer);
        // get 方法不改变读写指针
        System.out.println("getByte(3) return: " + buffer.getByte(3));
        System.out.println("getShort(3) return: " + buffer.getShort(3));
        System.out.println("getInt(3) return: " + buffer.getInt(3));
        print("getByte()", buffer);
        // set 方法不改变读写指针
        buffer.setByte(buffer.readableBytes() + 1, 0);
        print("setByte()", buffer);
        // read 方法改变读指针
        byte[] dst = new byte[buffer.readableBytes()];
        buffer.readBytes(dst);
        print("readBytes(" + dst.length + ")", buffer);
    }
    private static void print(String action, ByteBuf buffer) {
        System.out.println("after ===========" + action + "============");
        System.out.println("capacity(): " + buffer.capacity());
        System.out.println("maxCapacity(): " + buffer.maxCapacity());
        System.out.println("readerIndex(): " + buffer.readerIndex());
        System.out.println("readableBytes(): " + buffer.readableBytes());
        System.out.println("isReadable(): " + buffer.isReadable());
        System.out.println("writerIndex(): " + buffer.writerIndex());
        System.out.println("writableBytes(): " + buffer.writableBytes());
        System.out.println("isWritable(): " + buffer.isWritable());
        System.out.println("maxWritableBytes(): " + buffer.maxWritableBytes());
        System.out.println();
    }
}

最后是个人的实验Test类。

/**  
 * byteBuf 的API测试  
 */  
public class ByteBufTest {  
    public static void main(String[] args) {  
        // 9 代表初始容量, 100代表最大容量  
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(9, 100);  
        print("allocate ByteBuf(9,100) => {} \n", buffer);  
        //allocate ByteBuf(9,100) => PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 9/100)  
        // write 方法改变写指针  
        buffer.writeBytes(new byte[]{1, 2, 3, 4});  
        print("改变写指针 writeBytes(new byte[]{1,2,3,4}) => {} \n", buffer);  
        // 改变写指针 writeBytes(new byte[]{1,2,3,4}) => PooledUnsafeDirectByteBuf(ridx: 0, widx: 4, cap: 9/100)  
        // write 改变写指针,如果没有到达 capacity 依然可以写入,写入 int 之后写指针增加4  
        buffer.writeInt(12);  
        print("buffer.writeInt(12) => {}\n", buffer);  
        // buffer.writeInt(12) => PooledUnsafeDirectByteBuf(ridx: 0, widx: 8, cap: 9/100)  
        // 继续改变写指针,当前写入等于  initialCapacity 这个初始值之后将不能继续写入  
        buffer.writeBytes(new byte[]{5});  
        print("writeBytes(new byte[]{5}) => {}\n", buffer);  
        // writeBytes(new byte[]{5}) => PooledUnsafeDirectByteBuf(ridx: 0, widx: 9, cap: 9/100)  
        // 继续写入指针,此时发现 已经超过 initialCapacity 的值,此时会进行扩容  
        buffer.writeBytes(new byte[]{6});  
        print("writeBytes(new byte[]{6}) => {}\n", buffer);  
        // writeBytes(new byte[]{6}) => PooledUnsafeDirectByteBuf(ridx: 0, widx: 10, cap: 16/100)  
        // get 方法调用之后不改变读指针  
        print("getByte(3) return => {}\n", buffer.getByte(3));  
        print("getShort(3) return => {}\n", buffer.getShort(3));  
        print("getInt(3) return => {}\n", buffer.getInt(3));  
        print("getChar(3) return => {}\n", buffer.getChar(3));  
        /*  
       getByte(3) return => 4        getShort(3) return => 1024        getInt(3) return => 67108864        getChar(3) return => Ѐ  
        * */  
        // set 方法不改变读写指针  
        buffer.setByte(buffer.readableBytes() + 1, 0);  
        print("setByte(buffer.readableBytes() + 1, 0) => {}\n", buffer);  
        // setByte(buffer.readableBytes() + 1, 0) => PooledUnsafeDirectByteBuf(ridx: 0, widx: 10, cap: 16/100)  
        //只有read方法改变指针  
        byte[] bytes = new byte[buffer.readableBytes()];  
        buffer.readBytes(bytes);  
        print("buffer.readBytes(bytes) => {}\n", buffer);  
        // buffer.readBytes(bytes) => PooledUnsafeDirectByteBuf(ridx: 10, widx: 10, cap: 16/100)  
        print("buffer.readBytes(readBuffer); => {}\n", buffer);  
        ByteBuf readBuffer = ByteBufAllocator.DEFAULT.buffer(6, 6);  
        // 原始writeIndex要有足够空间可读  
//        buffer.writeBytes(new byte[]{5,1,1,1,1,1,1,1});  
//        buffer.writeBytes(new byte[]{5});  
        //  readerIndex(10) + length(6) exceeds writerIndex(11): PooledUnsafeDirectByteBuf(ridx: 10, widx: 11, cap: 16/100)        buffer.readBytes(readBuffer);  
        System.err.println(readBuffer.readableBytes());  
//        buffer.readBytes(readBuffer);  
        // readerIndex(10) + length(9) exceeds writerIndex(10): PooledUnsafeDirectByteBuf(ridx: 10, widx: 10, cap: 16/100)  
    }  
}

写在最后

比较简单的一个章节,主要介绍ByteBuf在Java中的使用,整个使用过程简单易懂十分清晰。

相关文章
|
8月前
|
Java
【Netty 网络通信】传统IO方式处理网络IO数据
【1月更文挑战第9天】【Netty 网络通信】传统IO方式处理网络IO数据
|
8月前
|
缓存 网络协议 算法
《跟闪电侠学Netty》阅读笔记 - Netty入门程序解析
《跟闪电侠学Netty》阅读笔记 - Netty入门程序解析
224 0
|
7月前
|
移动开发 前端开发 网络协议
技术笔记:Netty专题(六)
技术笔记:Netty专题(六)
43 0
|
7月前
netty查看ByteBuf工具
netty查看ByteBuf工具
|
8月前
|
JSON 移动开发 网络协议
数据拆散与黏连:深入剖析Netty中的半包与粘包问题
数据拆散与黏连:深入剖析Netty中的半包与粘包问题
177 0
|
8月前
|
消息中间件 缓存 Java
《跟闪电侠学Netty》阅读笔记 - 开篇入门Netty
《跟闪电侠学Netty》阅读笔记 - 开篇入门Netty
194 0
|
存储 缓存 NoSQL
跟着源码学IM(十一):一套基于Netty的分布式高可用IM详细设计与实现(有源码)
本文将要分享的是如何从零实现一套基于Netty框架的分布式高可用IM系统,它将支持长连接网关管理、单聊、群聊、聊天记录查询、离线消息存储、消息推送、心跳、分布式唯一ID、红包、消息同步等功能,并且还支持集群部署。
13534 1
|
8月前
|
消息中间件 Oracle Dubbo
Netty 源码共读(一)如何阅读JDK下sun包的源码
Netty 源码共读(一)如何阅读JDK下sun包的源码
149 1
|
NoSQL Java Redis
跟着源码学IM(十二):基于Netty打造一款高性能的IM即时通讯程序
关于Netty网络框架的内容,前面已经讲了两个章节,但总归来说难以真正掌握,毕竟只是对其中一个个组件进行讲解,很难让诸位将其串起来形成一条线,所以本章中则会结合实战案例,对Netty进行更深层次的学习与掌握,实战案例也并不难,一个非常朴素的IM聊天程序。 原本打算做个多人斗地主练习程序,但那需要织入过多的业务逻辑,因此一方面会带来不必要的理解难度,让案例更为复杂化,另一方面代码量也会偏多,所以最终依旧选择实现基本的IM聊天程序,既简单,又能加深对Netty的理解。
178 1
|
8月前
|
编解码 前端开发 网络协议
Netty Review - ObjectEncoder对象和ObjectDecoder对象解码器的使用与源码解读
Netty Review - ObjectEncoder对象和ObjectDecoder对象解码器的使用与源码解读
173 0