Okio
1. Okio简介
引用官方的一段介绍
Okio是一个补充java.io和java.nio的库,使访问,存储和处理数据变得更加容易。 它最初是作为Android中包含的功能强大的HTTP客户端OkHttp的一个组件。 它运作良好,随时准备解决新问题。
2. 从HelloWorld开始
我们知道,在java.io中InputStream和OutputStream分别代表了输入流,和输出流。相应的在Okio中Source和Sink分别代表了输入流和输出流。接下来我们分别用java.io和Okio实现打印文本内容功能
假设有个文件helloworld.txt文件内容如下
Hello World! Hello World!
java.io实现打印功能
try { File file = new File("helloworld.txt"); FileInputStream fileInputStream = new FileInputStream(file);//1 byte[] buffer = new byte[(int) file.length()];//2 fileInputStream.read(buffer);//3 System.out.write(buffer);//4 } catch (Exception e) { e.printStackTrace(); }
Okio实现打印功能
try { File file = new File("helloworld.txt"); Source source = Okio.source(file);//a Buffer buffer = new Buffer();//b source.read(buffer, file.length());//c Sink sink = Okio.sink(System.out); sink.write(buffer, file.length());//d } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); }
上面两段代码实现的功能都是一样的,实现思路总结如下
1. 获取文件的输入流 //1和//a处实现
2. 将文件输入流读取到缓冲区 //3和//c处实现
3. 将缓冲区的数据写入到Sytem.out流中 //4和//d处
3.Source Sink源码讲解
1. Okio Source Sink
从上面的例子我们可以把Source想象成InputStream,把Sink想象成OutputStream。通过下面的图片,我们来看下Source和Sink的定义
Source通过read(Buffer sink,long byteCount)方法,把磁盘,网络,或者内存中的输入流的数据读取到内存的Buffer中。
Sink刚好相反,它通过write(Buffer source,long byteCount)方法把内存Buffer中的数据写入到输出流中。
Okio中定义了生成Source的静态方法,source(File file)、source(InputStream in)、source(InputStream in,Timeout timeout)、source(Socket socket)。其中source(Socket socket)在OkHttp中被用来操作网络请求的Response。这很重要是OkHttp IO操作的核心。这四个重载方法真正的实现是在source(InputStream in,Timeout timeout)中
Okio中定义了生成Sink的静态方法,sink(File file)、sink(OutputStream out)、source(OutputStream out,Timeout timeout)、source(Socket socket)。其中source(Socket socket)在OkHttp中被用来操作网络请求的Request。同样这也是OkHttp IO操作的核心。这四个重载方法真正的实现是在sink(OutputStream out,Timeout timeout)中
2. Source Sink的创建
2.1 Okio source(InputStream in,Timeout timeout)
private static Source source(final InputStream in, final Timeout timeout) { if (in == null) throw new IllegalArgumentException("in == null"); if (timeout == null) throw new IllegalArgumentException("timeout == null"); return new Source() { @Override public long read(Buffer sink, long byteCount) throws IOException { if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount); if (byteCount == 0) return 0; try { timeout.throwIfReached(); Segment tail = sink.writableSegment(1); int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit); int bytesRead = in.read(tail.data, tail.limit, maxToCopy); if (bytesRead == -1) return -1; tail.limit += bytesRead; sink.size += bytesRead; return bytesRead; } catch (AssertionError e) { if (isAndroidGetsocknameError(e)) throw new IOException(e); throw e; } } @Override public void close() throws IOException { in.close(); } @Override public Timeout timeout() { return timeout; } @Override public String toString() { return "source(" + in + ")"; } }; }
仔细看一眼代码,除了int bytesRead = in.read(tail.data, tail.limit, maxToCopy);看着眼熟,其它的代码如Segment tail = sink.writableSegment(1);初学者表示很懵逼呀。实话告诉各位,整个Okio的精髓就在这两行代码里。这才叫四两拨千斤。好吧,让我们来重温一下InputStream的read(byte[] b, int off, int len)方法
read public int read(byte[] b, int off, int len) throws IOException Reads up to len bytes of data from the input stream into an array of bytes. An attempt is made to read as many as len bytes, but a smaller number may be read. The number of bytes actually read is returned as an integer. This method blocks until input data is available, end of file is detected, or an exception is thrown.
仔细看一眼代码,除了int bytesRead = in.read(tail.data, tail.limit, maxToCopy);看着眼熟,其它的代码如Segment tail = sink.writableSegment(1);初学者表示很懵逼呀。实话告诉各位,整个Okio的精髓就在这两行代码里。这才叫四两拨千斤。好吧,让我们来重温一下InputStream的read(byte[] b, int off, int len)方法
read public int read(byte[] b, int off, int len) throws IOException Reads up to len bytes of data from the input stream into an array of bytes. An attempt is made to read as many as len bytes, but a smaller number may be read. The number of bytes actually read is returned as an integer. This method blocks until input data is available, end of file is detected, or an exception is thrown.
翻译如下:从输入流中读取len个字节到字节数组b中。这个方法可能会被阻塞,直到输入流数据可用。
如此说来tail.data应该是个byte[],tail.limit是读取流的起始位置,maxToCopy是要读取的字节的长度了。没错是这样的。
Segment tail = sink.writableSegment(1);这句代码目前我还是看不懂啊,Segment是什么呀?它和Buffer之间的关系是什么呀?上图!一图抵千言
虽然说一图抵千言,还是做个简单的讲解吧。
Segment说白了就是byte[],每个绿色的或者白色的小方格代表一个byte。绿色表示已经有数据了,白色表示没有数据。pos指向第一个绿色的格子,表示读取数据的位置,limit指向第一个白色的格子,表示写入数据的位置。
Buffer是一个由Segment组成的双链表。每一个Segment最多可以容下8192个字节。在向Buffer的Segment写入数据时,如果超过了8192个字节,那么会从SegmentPool(一个对象池,最多可以容下8个Segment)拿一个Segment或者新建一个Segment(因为SegmentPool中的对象都被用光了)加入到双链表的尾端
接下来我们来分析下Segment源码,毕竟Talk Is Cheap,Show Me The Code。由于Segment代码还是比较简单的。所以我就在源码中加入注释来讲解
final class Segment { /** 每个Segment最大容量8KB */ static final int SIZE = 8192; /** Segment分两种,只读和可写。当Segment需要被拆分成两个小的Segment的时候,如果被拆分 * 出去的Segment的大小超过1024,那么那个Segment会被定义成只读的。(暂时不理解没关系) */ static final int SHARE_MINIMUM = 1024; /**真正存储数据的byte数组**/ final byte[] data; /** 读取数据的地方 参考前面的图片解释 */ int pos; /** 写数据的地方 参考前面的图片解释 */ int limit; /** 只读模式 */ boolean shared; /** 可写模式 */ boolean owner; /** 双链表的next指针 */ Segment next; /** 双链表的prev指针 */ Segment prev; Segment() { this.data = new byte[SIZE]; this.owner = true;//默认是可写的 this.shared = false; } /**当前Segment从双链表中出队**/ public Segment pop() { Segment result = next != this ? next : null; prev.next = next; next.prev = prev; next = null; prev = null; return result; } /**插入一个新的Segment到当前的Segment的后面**/ public Segment push(Segment segment) { segment.prev = this; segment.next = next; next.prev = segment; next = segment; return segment; } /** *把当前的Segment分成两个Segment。 *使用场景 把当前Segment A写入到Segment B中。将设A中数据的大小是2KB(记得容量是8KB) *B中的数据是7KB(剩余空间1KB),这样A往B中写数据,肯定是写不完的,需要把A分成 *A1(新建的Segment)和A(原来的A,需要更新pos) **/ public Segment split(int byteCount) { if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException(); Segment prefix; if (byteCount >= SHARE_MINIMUM) { //如果写入的数据超过1kb 新建一个只读的Segment,避免arrayCopy prefix = new Segment(this); } else { //从SegmentPool中拿一个Segment prefix = SegmentPool.take(); System.arraycopy(data, pos, prefix.data, 0, byteCount); } prefix.limit = prefix.pos + byteCount; pos += byteCount; //插入到当前Segment A的前面 A1->A prev.push(prefix); return prefix; } /**对多个Segment的空间做压缩,用来HashSource,HashSink,GzipSource, * GzipSink(还怕别人问你Gzip在OkHttp中的实现原理吗) **/ public void compact() { //如果只有一个Segment 不需要压缩 if (prev == this) throw new IllegalStateException(); // 如果前面的Segment是只读的,没法压缩 if (!prev.owner) return; int byteCount = limit - pos; int availableByteCount = SIZE - prev.limit + (prev.shared ? 0 : prev.pos); //两个Segment的总大小不大于8kb 可以合并成一个,否则返回 if (byteCount > availableByteCount) //把当前的Segment的数据写入到前面的Segment中 writeTo(prev, byteCount); //当前的Segment出队列 pop(); //回收Segment SegmentPool.recycle(this); } /**写byteCount个数据到sink中**/ public void writeTo(Segment sink, int byteCount) { if (!sink.owner) throw new IllegalArgumentException(); if (sink.limit + byteCount > SIZE) { // We can't fit byteCount bytes at the sink's current position. Shift sink first. if (sink.shared) throw new IllegalArgumentException(); if (sink.limit + byteCount - sink.pos > SIZE) throw new IllegalArgumentException(); System.arraycopy(sink.data, sink.pos, sink.data, 0, sink.limit - sink.pos); sink.limit -= sink.pos; sink.pos = 0; } System.arraycopy(data, pos, sink.data, sink.limit, byteCount); sink.limit += byteCount; pos += byteCount; } }
总结下Segment的知识
- Segment其实就是个byte[]
- Segemnt记录了byte的读写指针pos和limit
- Segment维护一个双链表
接下来我们来分析 Segment tail = sink.writableSegment(1)
Buffer.java
Segment writableSegment(int minimumCapacity) { if (minimumCapacity < 1 || minimumCapacity > Segment.SIZE) throw new IllegalArgumentException(); //如果buffer 还没有初始化,从对象池拿一个Segment,同时初始化双链表 if (head == null) { head = SegmentPool.take(); // Acquire a first segment. return head.next = head.prev = head; } //拿到双链表的最后一个Segment Segment tail = head.prev; //判断最后tail的空间够不够,tail是不是只读的Segment if (tail.limit + minimumCapacity > Segment.SIZE || !tail.owner) { //如果空间不够,或者是只读的 重新拿一个Segment放入到链表尾部 tail = tail.push(SegmentPool.take()); } return tail; }
skink.writeableSegment(1)的功能就是,从Buffer 的Segment链表中取到链表最后一个Segment,这个Segment需要满足两个条件1.可写 2.可写空间大于1个字节
到这里咱们基本上把int bytesRead = in.read(tail.data, tail.limit, maxToCopy)和Segment tail = sink.writableSegment(1)讲解清楚了。那么我们再重新看下Okio.source(InputStream in,Timeout timeOut) return new Source()代码块的read方法
@Override public long read(Buffer sink, long byteCount) throws IOException { if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount); if (byteCount == 0) return 0; try { //支持超时读取 timeout.throwIfReached(); //拿到buffer中链表的最后一个可写的Segment Segment tail = sink.writableSegment(1); //获取最大能往tail中写多少个字节 int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit); //计算往Segment写了多少数据(为什么是写,对buffer来说就是写) int bytesRead = in.read(tail.data, tail.limit, maxToCopy); if (bytesRead == -1) return -1; //更新写的位置 tail.limit += bytesRead; //增加buffer的数据总量 sink.size += bytesRead; return bytesRead; } catch (AssertionError e) { if (isAndroidGetsocknameError(e)) throw new IOException(e); throw e; } }
总结下Okio.source(InputStream in, Timeout timeout)
从Buffer(sink)中找到链表中最后一个可写的并且还有写入空间的Segment记做tail
判断最多能写多少数据到Buffer(sink)中记做maxToCopy
从InputStream(in)中读取maxToCopy个数据到tail中
根据第三条的结论来看,比如你调用了soure.read(buffer,10*1024),那其实返回的肯定是比 10*1024少。举例说明,拿前面的helloworld举例。现在我从网络粘贴了 老罗android开发之旅的一篇文章到helloword.txt里并重复了3遍。文件大小为35243个字节
try { File file = new File("helloworld.txt"); System.out.println("file.length "+file.length()); Source source = Okio.source(file); Sink sink = Okio.sink(System.out); Buffer buffer = new Buffer(); source.read(buffer, file.length()); System.out.println(buffer.size()); // sink.write(buffer, file.length()); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); }
执行输出结果如下
file.length 35243 buffer size 8192 • 1 • 2 • 3
如果需要正确的把所有数据都写入到buffer中就需要用while循环了
try { File file = new File("helloworld.txt"); System.out.println("file.length "+file.length()); long fileLength = file.length(); Source source = Okio.source(file); Sink sink = Okio.sink(System.out); Buffer buffer = new Buffer(); while (fileLength!=0) { long hasRead = source.read(buffer, file.length()); fileLength-=hasRead; } System.out.println("buffer size "+buffer.size()); // sink.write(buffer, file.length()); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); }
执行输出结果如下
file.length 35243 buffer size 35243
好了,至此Source基本上讲解完毕,接下来讲解Sink,老规矩还是从Okio.sink(OutputStream out,Timeout timeout)讲起
2.2 Okio.sink(final OutputStream out, final Timeout timeout)
private static Sink sink(final OutputStream out, final Timeout timeout) { if (out == null) throw new IllegalArgumentException("out == null"); if (timeout == null) throw new IllegalArgumentException("timeout == null"); return new Sink() { //把Buffer中的数据写入到sink中 @Override public void write(Buffer source, long byteCount) throws IOException { //检查buffer中的数据数量是否合法(如果buffer数量<byteCount就不合法) checkOffsetAndCount(source.size, 0, byteCount); //自带while循环,直到把buffer中的数据耗尽 while (byteCount > 0) { timeout.throwIfReached(); //从buffer的第一个Segment开始 Segment head = source.head; int toCopy = (int) Math.min(byteCount, head.limit - head.pos); out.write(head.data, head.pos, toCopy); head.pos += toCopy; byteCount -= toCopy; source.size -= toCopy; //如果当前Segment写完了出队列 if (head.pos == head.limit) { source.head = head.pop(); SegmentPool.recycle(head); } } } @Override public void flush() throws IOException { out.flush(); } @Override public void close() throws IOException { out.close(); } @Override public Timeout timeout() { return timeout; } @Override public String toString() { return "sink(" + out + ")"; } };
总结下Okio.sink(OutputStream out,Timeout timeout)
检查Buffer的size 和readCount是否合法
获取Buffer中的Head Segment,把Segment中的数据写入到OutputStream,如果当前Segment数据写完了,Segment出队列,并放回对象池
判断数据是否写完,如果没写完,重复第二部
4. BufferedSource BufferedSink源码
BufferedSource、BufferedSink 与Source和Sink的区别如下
BufferedSource、BufferedSink内部维护了一个Buffer对象
BufferedSource、BufferedSink内部分别引用了Source、Sink对象
BufferedSource的read(Buffer sink,long byteCount)使用内部的Source的read(Buffer sink,long byteCount)方法
BufferedSink的write(Buffer source,long byteCount)使用内部的Sink的write(Buffer source,long byteCount)方法
BufferedSource、BufferedSink内部扩展了很多readXX方法如 readByte/writeByte、readInt/writeInt等等
关于BufferedXX系列的源码可能需要再写一篇文章详细讲解。不过也是挺简单的。如果你看懂了本文自行分析BufferedXX应该是不在话下
5. 扩展Source Sink
Okio内部有不少已实现的Source和Sink。例如GzipSource/GzipSink、HashingSource/HashingSink。至于源码分析,请读者自行分析。
6. 接下来要做的事情
Okio是OkHttp IO操作的基石。接下来我们将带着Okio的学习成果进入OkHttp源码分析