OkHttp源码详解之Okio源码详解

简介: OkHttp源码详解之Okio源码详解

Okio


1. Okio简介

引用官方的一段介绍


Okio是一个补充java.io和java.nio的库,使访问,存储和处理数据变得更加容易。 它最初是作为Android中包含的功能强大的HTTP客户端OkHttp的一个组件。 它运作良好,随时准备解决新问题。


2. 从HelloWorld开始

我们知道,在java.io中InputStream和OutputStream分别代表了输入流,和输出流。相应的在Okio中Source和Sink分别代表了输入流和输出流。接下来我们分别用java.io和Okio实现打印文本内容功能


假设有个文件helloworld.txt文件内容如下

Hello World!
Hello World!

java.io实现打印功能

try {
    File file = new File("helloworld.txt");
    FileInputStream fileInputStream = new FileInputStream(file);//1
    byte[] buffer = new byte[(int) file.length()];//2
    fileInputStream.read(buffer);//3
    System.out.write(buffer);//4
} catch (Exception e) {
    e.printStackTrace();
}

Okio实现打印功能

try {
    File file = new File("helloworld.txt");
    Source source = Okio.source(file);//a
    Buffer buffer = new Buffer();//b
    source.read(buffer, file.length());//c
    Sink sink = Okio.sink(System.out);
    sink.write(buffer, file.length());//d
} catch (FileNotFoundException e) {
    e.printStackTrace();
} catch (IOException e) {
    e.printStackTrace();
}

上面两段代码实现的功能都是一样的,实现思路总结如下


1. 获取文件的输入流 //1和//a处实现

2. 将文件输入流读取到缓冲区 //3和//c处实现

3. 将缓冲区的数据写入到Sytem.out流中 //4和//d处

okio示意图.png

3.Source Sink源码讲解


1. Okio Source Sink


从上面的例子我们可以把Source想象成InputStream,把Sink想象成OutputStream。通过下面的图片,我们来看下Source和Sink的定义

Okio Source Sink.png


Source通过read(Buffer sink,long byteCount)方法,把磁盘,网络,或者内存中的输入流的数据读取到内存的Buffer中。


Sink刚好相反,它通过write(Buffer source,long byteCount)方法把内存Buffer中的数据写入到输出流中。


Okio中定义了生成Source的静态方法,source(File file)、source(InputStream in)、source(InputStream in,Timeout timeout)、source(Socket socket)。其中source(Socket socket)在OkHttp中被用来操作网络请求的Response。这很重要是OkHttp IO操作的核心。这四个重载方法真正的实现是在source(InputStream in,Timeout timeout)中


Okio中定义了生成Sink的静态方法,sink(File file)、sink(OutputStream out)、source(OutputStream out,Timeout timeout)、source(Socket socket)。其中source(Socket socket)在OkHttp中被用来操作网络请求的Request。同样这也是OkHttp IO操作的核心。这四个重载方法真正的实现是在sink(OutputStream out,Timeout timeout)中


2. Source Sink的创建

2.1 Okio source(InputStream in,Timeout timeout)

private static Source source(final InputStream in, final Timeout timeout) {
    if (in == null) throw new IllegalArgumentException("in == null");
    if (timeout == null) throw new IllegalArgumentException("timeout == null");
    return new Source() {
      @Override public long read(Buffer sink, long byteCount) throws IOException {
        if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
        if (byteCount == 0) return 0;
        try {
          timeout.throwIfReached();
          Segment tail = sink.writableSegment(1);
          int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit);
          int bytesRead = in.read(tail.data, tail.limit, maxToCopy);
          if (bytesRead == -1) return -1;
          tail.limit += bytesRead;
          sink.size += bytesRead;
          return bytesRead;
        } catch (AssertionError e) {
          if (isAndroidGetsocknameError(e)) throw new IOException(e);
          throw e;
        }
      }
      @Override public void close() throws IOException {
        in.close();
      }
      @Override public Timeout timeout() {
        return timeout;
      }
      @Override public String toString() {
        return "source(" + in + ")";
      }
    };
  }

仔细看一眼代码,除了int bytesRead = in.read(tail.data, tail.limit, maxToCopy);看着眼熟,其它的代码如Segment tail = sink.writableSegment(1);初学者表示很懵逼呀。实话告诉各位,整个Okio的精髓就在这两行代码里。这才叫四两拨千斤。好吧,让我们来重温一下InputStream的read(byte[] b, int off, int len)方法

read
public int read(byte[] b,
       int off,
       int len)
         throws IOException
Reads up to len bytes of data from the input stream into an array of bytes. An attempt is made to read as many as len bytes, but a smaller number may be read. The number of bytes actually read is returned as an integer.
This method blocks until input data is available, end of file is detected, or an exception is thrown.

仔细看一眼代码,除了int bytesRead = in.read(tail.data, tail.limit, maxToCopy);看着眼熟,其它的代码如Segment tail = sink.writableSegment(1);初学者表示很懵逼呀。实话告诉各位,整个Okio的精髓就在这两行代码里。这才叫四两拨千斤。好吧,让我们来重温一下InputStream的read(byte[] b, int off, int len)方法

read
public int read(byte[] b,
       int off,
       int len)
         throws IOException
Reads up to len bytes of data from the input stream into an array of bytes. An attempt is made to read as many as len bytes, but a smaller number may be read. The number of bytes actually read is returned as an integer.
This method blocks until input data is available, end of file is detected, or an exception is thrown.

翻译如下:从输入流中读取len个字节到字节数组b中。这个方法可能会被阻塞,直到输入流数据可用。


如此说来tail.data应该是个byte[],tail.limit是读取流的起始位置,maxToCopy是要读取的字节的长度了。没错是这样的。


Segment tail = sink.writableSegment(1);这句代码目前我还是看不懂啊,Segment是什么呀?它和Buffer之间的关系是什么呀?上图!一图抵千言

OKIO Segment.png

OKIO Buffer.png

虽然说一图抵千言,还是做个简单的讲解吧。


Segment说白了就是byte[],每个绿色的或者白色的小方格代表一个byte。绿色表示已经有数据了,白色表示没有数据。pos指向第一个绿色的格子,表示读取数据的位置,limit指向第一个白色的格子,表示写入数据的位置。


Buffer是一个由Segment组成的双链表。每一个Segment最多可以容下8192个字节。在向Buffer的Segment写入数据时,如果超过了8192个字节,那么会从SegmentPool(一个对象池,最多可以容下8个Segment)拿一个Segment或者新建一个Segment(因为SegmentPool中的对象都被用光了)加入到双链表的尾端


接下来我们来分析下Segment源码,毕竟Talk Is Cheap,Show Me The Code。由于Segment代码还是比较简单的。所以我就在源码中加入注释来讲解

final class Segment {
  /** 每个Segment最大容量8KB */
  static final int SIZE = 8192;
  /** Segment分两种,只读和可写。当Segment需要被拆分成两个小的Segment的时候,如果被拆分
   * 出去的Segment的大小超过1024,那么那个Segment会被定义成只读的。(暂时不理解没关系)
  */
  static final int SHARE_MINIMUM = 1024;
  /**真正存储数据的byte数组**/
  final byte[] data;
  /** 读取数据的地方 参考前面的图片解释 */
  int pos;
  /** 写数据的地方 参考前面的图片解释 */
  int limit;
  /** 只读模式 */
  boolean shared;
  /** 可写模式 */
  boolean owner;
  /** 双链表的next指针 */
  Segment next;
  /** 双链表的prev指针 */
  Segment prev;
  Segment() {
    this.data = new byte[SIZE];
    this.owner = true;//默认是可写的
    this.shared = false;
  }
  /**当前Segment从双链表中出队**/
  public Segment pop() {
    Segment result = next != this ? next : null;
    prev.next = next;
    next.prev = prev;
    next = null;
    prev = null;
    return result;
  }
  /**插入一个新的Segment到当前的Segment的后面**/
   public Segment push(Segment segment) {
    segment.prev = this;
    segment.next = next;
    next.prev = segment;
    next = segment;
    return segment;
  }
    /**
    *把当前的Segment分成两个Segment。
    *使用场景 把当前Segment A写入到Segment B中。将设A中数据的大小是2KB(记得容量是8KB)
    *B中的数据是7KB(剩余空间1KB),这样A往B中写数据,肯定是写不完的,需要把A分成
    *A1(新建的Segment)和A(原来的A,需要更新pos)
    **/
   public Segment split(int byteCount) {
    if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException();
    Segment prefix;
    if (byteCount >= SHARE_MINIMUM) {
    //如果写入的数据超过1kb 新建一个只读的Segment,避免arrayCopy
      prefix = new Segment(this);
    } else {
    //从SegmentPool中拿一个Segment
      prefix = SegmentPool.take();
      System.arraycopy(data, pos, prefix.data, 0, byteCount);
    }
    prefix.limit = prefix.pos + byteCount;
    pos += byteCount;
    //插入到当前Segment A的前面 A1->A
    prev.push(prefix);
    return prefix;
  }
  /**对多个Segment的空间做压缩,用来HashSource,HashSink,GzipSource,
  * GzipSink(还怕别人问你Gzip在OkHttp中的实现原理吗)
  **/
  public void compact() {
    //如果只有一个Segment 不需要压缩
    if (prev == this) throw new IllegalStateException();
    // 如果前面的Segment是只读的,没法压缩
    if (!prev.owner) return; 
    int byteCount = limit - pos;
    int availableByteCount = SIZE - prev.limit + (prev.shared ? 0 : prev.pos);
    //两个Segment的总大小不大于8kb 可以合并成一个,否则返回
    if (byteCount > availableByteCount)
    //把当前的Segment的数据写入到前面的Segment中
    writeTo(prev, byteCount);
    //当前的Segment出队列
    pop();
    //回收Segment
    SegmentPool.recycle(this);
  }
  /**写byteCount个数据到sink中**/
  public void writeTo(Segment sink, int byteCount) {
    if (!sink.owner) throw new IllegalArgumentException();
    if (sink.limit + byteCount > SIZE) {
      // We can't fit byteCount bytes at the sink's current position. Shift sink first.
      if (sink.shared) throw new IllegalArgumentException();
      if (sink.limit + byteCount - sink.pos > SIZE) throw new IllegalArgumentException();
      System.arraycopy(sink.data, sink.pos, sink.data, 0, sink.limit - sink.pos);
      sink.limit -= sink.pos;
      sink.pos = 0;
    }
    System.arraycopy(data, pos, sink.data, sink.limit, byteCount);
    sink.limit += byteCount;
    pos += byteCount;
  }
}

总结下Segment的知识


  • Segment其实就是个byte[]
  • Segemnt记录了byte的读写指针pos和limit
  • Segment维护一个双链表


接下来我们来分析 Segment tail = sink.writableSegment(1)

Buffer.java


Segment writableSegment(int minimumCapacity) {
    if (minimumCapacity < 1 || minimumCapacity > Segment.SIZE) throw new IllegalArgumentException();
    //如果buffer 还没有初始化,从对象池拿一个Segment,同时初始化双链表
    if (head == null) {
      head = SegmentPool.take(); // Acquire a first segment.
      return head.next = head.prev = head;
    }
    //拿到双链表的最后一个Segment
    Segment tail = head.prev;
    //判断最后tail的空间够不够,tail是不是只读的Segment
    if (tail.limit + minimumCapacity > Segment.SIZE || !tail.owner) {
    //如果空间不够,或者是只读的 重新拿一个Segment放入到链表尾部
      tail = tail.push(SegmentPool.take()); 
    }
    return tail;
  }

skink.writeableSegment(1)的功能就是,从Buffer 的Segment链表中取到链表最后一个Segment,这个Segment需要满足两个条件1.可写 2.可写空间大于1个字节


到这里咱们基本上把int bytesRead = in.read(tail.data, tail.limit, maxToCopy)和Segment tail = sink.writableSegment(1)讲解清楚了。那么我们再重新看下Okio.source(InputStream in,Timeout timeOut) return new Source()代码块的read方法


@Override public long read(Buffer sink, long byteCount) throws IOException {
        if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
        if (byteCount == 0) return 0;
        try {
        //支持超时读取
          timeout.throwIfReached();
          //拿到buffer中链表的最后一个可写的Segment
          Segment tail = sink.writableSegment(1);
          //获取最大能往tail中写多少个字节
          int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit);
          //计算往Segment写了多少数据(为什么是写,对buffer来说就是写)
          int bytesRead = in.read(tail.data, tail.limit, maxToCopy);
          if (bytesRead == -1) return -1;
          //更新写的位置
          tail.limit += bytesRead;
          //增加buffer的数据总量
          sink.size += bytesRead;
          return bytesRead;
        } catch (AssertionError e) {
          if (isAndroidGetsocknameError(e)) throw new IOException(e);
          throw e;
        }
      }


总结下Okio.source(InputStream in, Timeout timeout)


从Buffer(sink)中找到链表中最后一个可写的并且还有写入空间的Segment记做tail

判断最多能写多少数据到Buffer(sink)中记做maxToCopy

从InputStream(in)中读取maxToCopy个数据到tail中


根据第三条的结论来看,比如你调用了soure.read(buffer,10*1024),那其实返回的肯定是比 10*1024少。举例说明,拿前面的helloworld举例。现在我从网络粘贴了 老罗android开发之旅的一篇文章到helloword.txt里并重复了3遍。文件大小为35243个字节


try {
            File file = new File("helloworld.txt");
            System.out.println("file.length "+file.length());
            Source source = Okio.source(file);
            Sink sink = Okio.sink(System.out);
            Buffer buffer = new Buffer();
            source.read(buffer, file.length());
            System.out.println(buffer.size());
//            sink.write(buffer, file.length());
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }

执行输出结果如下

file.length 35243
buffer size 8192
• 1
• 2
• 3

如果需要正确的把所有数据都写入到buffer中就需要用while循环了


 try {
            File file = new File("helloworld.txt");
            System.out.println("file.length "+file.length());
            long fileLength  = file.length();
            Source source = Okio.source(file);
            Sink sink = Okio.sink(System.out);
            Buffer buffer = new Buffer();
            while (fileLength!=0) {
                long hasRead = source.read(buffer, file.length());
                fileLength-=hasRead;
            }
            System.out.println("buffer size "+buffer.size());
//            sink.write(buffer, file.length());
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }


执行输出结果如下

file.length 35243
buffer size 35243


好了,至此Source基本上讲解完毕,接下来讲解Sink,老规矩还是从Okio.sink(OutputStream out,Timeout timeout)讲起

2.2 Okio.sink(final OutputStream out, final Timeout timeout)

private static Sink sink(final OutputStream out, final Timeout timeout) {
    if (out == null) throw new IllegalArgumentException("out == null");
    if (timeout == null) throw new IllegalArgumentException("timeout == null");
    return new Sink() {
    //把Buffer中的数据写入到sink中
      @Override public void write(Buffer source, long byteCount) throws IOException {
      //检查buffer中的数据数量是否合法(如果buffer数量<byteCount就不合法)
        checkOffsetAndCount(source.size, 0, byteCount);
        //自带while循环,直到把buffer中的数据耗尽
        while (byteCount > 0) {
          timeout.throwIfReached();
          //从buffer的第一个Segment开始
          Segment head = source.head;
          int toCopy = (int) Math.min(byteCount, head.limit - head.pos);
          out.write(head.data, head.pos, toCopy);
          head.pos += toCopy;
          byteCount -= toCopy;
          source.size -= toCopy;
        //如果当前Segment写完了出队列
          if (head.pos == head.limit) {
            source.head = head.pop();
            SegmentPool.recycle(head);
          }
        }
      }
      @Override public void flush() throws IOException {
        out.flush();
      }
      @Override public void close() throws IOException {
        out.close();
      }
      @Override public Timeout timeout() {
        return timeout;
      }
      @Override public String toString() {
        return "sink(" + out + ")";
      }
    };

总结下Okio.sink(OutputStream out,Timeout timeout)


检查Buffer的size 和readCount是否合法


获取Buffer中的Head Segment,把Segment中的数据写入到OutputStream,如果当前Segment数据写完了,Segment出队列,并放回对象池


判断数据是否写完,如果没写完,重复第二部

4. BufferedSource BufferedSink源码

Buffered.png

BufferedSource、BufferedSink 与Source和Sink的区别如下


BufferedSource、BufferedSink内部维护了一个Buffer对象


BufferedSource、BufferedSink内部分别引用了Source、Sink对象


BufferedSource的read(Buffer sink,long byteCount)使用内部的Source的read(Buffer sink,long byteCount)方法


BufferedSink的write(Buffer source,long byteCount)使用内部的Sink的write(Buffer source,long byteCount)方法


BufferedSource、BufferedSink内部扩展了很多readXX方法如 readByte/writeByte、readInt/writeInt等等


关于BufferedXX系列的源码可能需要再写一篇文章详细讲解。不过也是挺简单的。如果你看懂了本文自行分析BufferedXX应该是不在话下


5. 扩展Source Sink

Okio内部有不少已实现的Source和Sink。例如GzipSource/GzipSink、HashingSource/HashingSink。至于源码分析,请读者自行分析。


6. 接下来要做的事情

Okio是OkHttp IO操作的基石。接下来我们将带着Okio的学习成果进入OkHttp源码分析


相关文章
|
设计模式 缓存 监控
OKHttp3 从使用到原理分析
Okhttp3 是我们经常使用的一个网络框架,可扩展性强,支持 get 缓存, spdy、http2.0,gzip 压缩减少数据流量,同步和异步请求,连接池复用机制等特性让广大 android 开发者深爱不已,今天我就带大家从 Okhttp 简单使用,到各种好用拦截器原理了解 Okhttp3
1791 0
OKHttp3 从使用到原理分析
|
安全 Java API
OkHttp官方教程
OkHttp官方教程
|
缓存 Java 开发工具
Android开发之OkHttp介绍
Android开发之OkHttp介绍
184 0
Android开发之OkHttp介绍
|
存储 缓存 网络协议
源码阅读 | Okhttp
源码阅读 | Okhttp
|
缓存 JSON 数据格式
OkHttp3源码详解(一) Request类
阿里P7移动互联网架构师进阶视频(每日更新中)免费学习请点击:https://space.bilibili.com/474380680每一次网络请求都是一个Request,Request是对url,method,header,body的封装,也是对Http协议中请求行,请求头,实体内容的封装 p.
|
设计模式 Java API
【OkHttp】OkHttp 源码分析 ( 网络框架封装 | OkHttp 4 迁移 | OkHttp 建造者模式 )
【OkHttp】OkHttp 源码分析 ( 网络框架封装 | OkHttp 4 迁移 | OkHttp 建造者模式 )
346 0
【OkHttp】OkHttp 源码分析 ( 网络框架封装 | OkHttp 4 迁移 | OkHttp 建造者模式 )
|
Java 调度 安全
OkHttp3源码详解(六)Okhttp任务队列工作原理
阿里P7移动互联网架构师进阶视频(每日更新中)免费学习请点击:https://space.bilibili.com/474380680 1 概述 1.1 引言 android完成非阻塞式的异步请求的时候都是通过启动子线程的方式来解决,子线程执行完任务的之后通过handler的方式来和主线程来完成通信。
OkHttp3源码详解(三) 拦截器
阿里P7移动互联网架构师进阶视频(每日更新中)免费学习请点击:https://space.bilibili.com/4743806801.构造Demo 首先构造一个简单的异步网络访问Demo: 1. OkHttpClient client = new OkHttpClient(); 2.
Okhttp3源码解析(1)-OkHttpClient分析
前言 上篇文章我们讲了Okhttp的基本用法,今天根据上节讲到请求流程来分析源码,那么第一步就是实例化OkHttpClient对象,所以我们今天主要分析下OkHttpClient源码! 初始化-构造方式 创建 OkHttpClient实例的两种方式 1.
2870 0
|
JSON Android开发 数据格式
Okhttp3-基本用法
前言 Okhttp官网Okhttp-Github android网络框架之OKhttp一个处理网络请求的开源项目,是安卓端最火热的轻量级框架,由移动支付Square公司贡献(该公司还贡献了Picasso) 用于替代HttpUrlConnection和Apache HttpClient(android API23 6.0里已移除HttpClient) 官网的解释如下: 基本用法 1.集成 1.1.依赖 implementation 'com.squareup.okhttp3:okhttp:3.11.0' 可以去Okhttp-Github 查看并依赖最新的版本。
3140 0