OkHttp源码详解之Okio源码详解

简介: OkHttp源码详解之Okio源码详解

Okio


1. Okio简介

引用官方的一段介绍


Okio是一个补充java.io和java.nio的库,使访问,存储和处理数据变得更加容易。 它最初是作为Android中包含的功能强大的HTTP客户端OkHttp的一个组件。 它运作良好,随时准备解决新问题。


2. 从HelloWorld开始

我们知道,在java.io中InputStream和OutputStream分别代表了输入流,和输出流。相应的在Okio中Source和Sink分别代表了输入流和输出流。接下来我们分别用java.io和Okio实现打印文本内容功能


假设有个文件helloworld.txt文件内容如下

Hello World!
Hello World!

java.io实现打印功能

try {
    File file = new File("helloworld.txt");
    FileInputStream fileInputStream = new FileInputStream(file);//1
    byte[] buffer = new byte[(int) file.length()];//2
    fileInputStream.read(buffer);//3
    System.out.write(buffer);//4
} catch (Exception e) {
    e.printStackTrace();
}

Okio实现打印功能

try {
    File file = new File("helloworld.txt");
    Source source = Okio.source(file);//a
    Buffer buffer = new Buffer();//b
    source.read(buffer, file.length());//c
    Sink sink = Okio.sink(System.out);
    sink.write(buffer, file.length());//d
} catch (FileNotFoundException e) {
    e.printStackTrace();
} catch (IOException e) {
    e.printStackTrace();
}

上面两段代码实现的功能都是一样的,实现思路总结如下


1. 获取文件的输入流 //1和//a处实现

2. 将文件输入流读取到缓冲区 //3和//c处实现

3. 将缓冲区的数据写入到Sytem.out流中 //4和//d处

okio示意图.png

3.Source Sink源码讲解


1. Okio Source Sink


从上面的例子我们可以把Source想象成InputStream,把Sink想象成OutputStream。通过下面的图片,我们来看下Source和Sink的定义

Okio Source Sink.png


Source通过read(Buffer sink,long byteCount)方法,把磁盘,网络,或者内存中的输入流的数据读取到内存的Buffer中。


Sink刚好相反,它通过write(Buffer source,long byteCount)方法把内存Buffer中的数据写入到输出流中。


Okio中定义了生成Source的静态方法,source(File file)、source(InputStream in)、source(InputStream in,Timeout timeout)、source(Socket socket)。其中source(Socket socket)在OkHttp中被用来操作网络请求的Response。这很重要是OkHttp IO操作的核心。这四个重载方法真正的实现是在source(InputStream in,Timeout timeout)中


Okio中定义了生成Sink的静态方法,sink(File file)、sink(OutputStream out)、source(OutputStream out,Timeout timeout)、source(Socket socket)。其中source(Socket socket)在OkHttp中被用来操作网络请求的Request。同样这也是OkHttp IO操作的核心。这四个重载方法真正的实现是在sink(OutputStream out,Timeout timeout)中


2. Source Sink的创建

2.1 Okio source(InputStream in,Timeout timeout)

private static Source source(final InputStream in, final Timeout timeout) {
    if (in == null) throw new IllegalArgumentException("in == null");
    if (timeout == null) throw new IllegalArgumentException("timeout == null");
    return new Source() {
      @Override public long read(Buffer sink, long byteCount) throws IOException {
        if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
        if (byteCount == 0) return 0;
        try {
          timeout.throwIfReached();
          Segment tail = sink.writableSegment(1);
          int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit);
          int bytesRead = in.read(tail.data, tail.limit, maxToCopy);
          if (bytesRead == -1) return -1;
          tail.limit += bytesRead;
          sink.size += bytesRead;
          return bytesRead;
        } catch (AssertionError e) {
          if (isAndroidGetsocknameError(e)) throw new IOException(e);
          throw e;
        }
      }
      @Override public void close() throws IOException {
        in.close();
      }
      @Override public Timeout timeout() {
        return timeout;
      }
      @Override public String toString() {
        return "source(" + in + ")";
      }
    };
  }

仔细看一眼代码,除了int bytesRead = in.read(tail.data, tail.limit, maxToCopy);看着眼熟,其它的代码如Segment tail = sink.writableSegment(1);初学者表示很懵逼呀。实话告诉各位,整个Okio的精髓就在这两行代码里。这才叫四两拨千斤。好吧,让我们来重温一下InputStream的read(byte[] b, int off, int len)方法

read
public int read(byte[] b,
       int off,
       int len)
         throws IOException
Reads up to len bytes of data from the input stream into an array of bytes. An attempt is made to read as many as len bytes, but a smaller number may be read. The number of bytes actually read is returned as an integer.
This method blocks until input data is available, end of file is detected, or an exception is thrown.

仔细看一眼代码,除了int bytesRead = in.read(tail.data, tail.limit, maxToCopy);看着眼熟,其它的代码如Segment tail = sink.writableSegment(1);初学者表示很懵逼呀。实话告诉各位,整个Okio的精髓就在这两行代码里。这才叫四两拨千斤。好吧,让我们来重温一下InputStream的read(byte[] b, int off, int len)方法

read
public int read(byte[] b,
       int off,
       int len)
         throws IOException
Reads up to len bytes of data from the input stream into an array of bytes. An attempt is made to read as many as len bytes, but a smaller number may be read. The number of bytes actually read is returned as an integer.
This method blocks until input data is available, end of file is detected, or an exception is thrown.

翻译如下:从输入流中读取len个字节到字节数组b中。这个方法可能会被阻塞,直到输入流数据可用。


如此说来tail.data应该是个byte[],tail.limit是读取流的起始位置,maxToCopy是要读取的字节的长度了。没错是这样的。


Segment tail = sink.writableSegment(1);这句代码目前我还是看不懂啊,Segment是什么呀?它和Buffer之间的关系是什么呀?上图!一图抵千言

OKIO Segment.png

OKIO Buffer.png

虽然说一图抵千言,还是做个简单的讲解吧。


Segment说白了就是byte[],每个绿色的或者白色的小方格代表一个byte。绿色表示已经有数据了,白色表示没有数据。pos指向第一个绿色的格子,表示读取数据的位置,limit指向第一个白色的格子,表示写入数据的位置。


Buffer是一个由Segment组成的双链表。每一个Segment最多可以容下8192个字节。在向Buffer的Segment写入数据时,如果超过了8192个字节,那么会从SegmentPool(一个对象池,最多可以容下8个Segment)拿一个Segment或者新建一个Segment(因为SegmentPool中的对象都被用光了)加入到双链表的尾端


接下来我们来分析下Segment源码,毕竟Talk Is Cheap,Show Me The Code。由于Segment代码还是比较简单的。所以我就在源码中加入注释来讲解

final class Segment {
  /** 每个Segment最大容量8KB */
  static final int SIZE = 8192;
  /** Segment分两种,只读和可写。当Segment需要被拆分成两个小的Segment的时候,如果被拆分
   * 出去的Segment的大小超过1024,那么那个Segment会被定义成只读的。(暂时不理解没关系)
  */
  static final int SHARE_MINIMUM = 1024;
  /**真正存储数据的byte数组**/
  final byte[] data;
  /** 读取数据的地方 参考前面的图片解释 */
  int pos;
  /** 写数据的地方 参考前面的图片解释 */
  int limit;
  /** 只读模式 */
  boolean shared;
  /** 可写模式 */
  boolean owner;
  /** 双链表的next指针 */
  Segment next;
  /** 双链表的prev指针 */
  Segment prev;
  Segment() {
    this.data = new byte[SIZE];
    this.owner = true;//默认是可写的
    this.shared = false;
  }
  /**当前Segment从双链表中出队**/
  public Segment pop() {
    Segment result = next != this ? next : null;
    prev.next = next;
    next.prev = prev;
    next = null;
    prev = null;
    return result;
  }
  /**插入一个新的Segment到当前的Segment的后面**/
   public Segment push(Segment segment) {
    segment.prev = this;
    segment.next = next;
    next.prev = segment;
    next = segment;
    return segment;
  }
    /**
    *把当前的Segment分成两个Segment。
    *使用场景 把当前Segment A写入到Segment B中。将设A中数据的大小是2KB(记得容量是8KB)
    *B中的数据是7KB(剩余空间1KB),这样A往B中写数据,肯定是写不完的,需要把A分成
    *A1(新建的Segment)和A(原来的A,需要更新pos)
    **/
   public Segment split(int byteCount) {
    if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException();
    Segment prefix;
    if (byteCount >= SHARE_MINIMUM) {
    //如果写入的数据超过1kb 新建一个只读的Segment,避免arrayCopy
      prefix = new Segment(this);
    } else {
    //从SegmentPool中拿一个Segment
      prefix = SegmentPool.take();
      System.arraycopy(data, pos, prefix.data, 0, byteCount);
    }
    prefix.limit = prefix.pos + byteCount;
    pos += byteCount;
    //插入到当前Segment A的前面 A1->A
    prev.push(prefix);
    return prefix;
  }
  /**对多个Segment的空间做压缩,用来HashSource,HashSink,GzipSource,
  * GzipSink(还怕别人问你Gzip在OkHttp中的实现原理吗)
  **/
  public void compact() {
    //如果只有一个Segment 不需要压缩
    if (prev == this) throw new IllegalStateException();
    // 如果前面的Segment是只读的,没法压缩
    if (!prev.owner) return; 
    int byteCount = limit - pos;
    int availableByteCount = SIZE - prev.limit + (prev.shared ? 0 : prev.pos);
    //两个Segment的总大小不大于8kb 可以合并成一个,否则返回
    if (byteCount > availableByteCount)
    //把当前的Segment的数据写入到前面的Segment中
    writeTo(prev, byteCount);
    //当前的Segment出队列
    pop();
    //回收Segment
    SegmentPool.recycle(this);
  }
  /**写byteCount个数据到sink中**/
  public void writeTo(Segment sink, int byteCount) {
    if (!sink.owner) throw new IllegalArgumentException();
    if (sink.limit + byteCount > SIZE) {
      // We can't fit byteCount bytes at the sink's current position. Shift sink first.
      if (sink.shared) throw new IllegalArgumentException();
      if (sink.limit + byteCount - sink.pos > SIZE) throw new IllegalArgumentException();
      System.arraycopy(sink.data, sink.pos, sink.data, 0, sink.limit - sink.pos);
      sink.limit -= sink.pos;
      sink.pos = 0;
    }
    System.arraycopy(data, pos, sink.data, sink.limit, byteCount);
    sink.limit += byteCount;
    pos += byteCount;
  }
}

总结下Segment的知识


  • Segment其实就是个byte[]
  • Segemnt记录了byte的读写指针pos和limit
  • Segment维护一个双链表


接下来我们来分析 Segment tail = sink.writableSegment(1)

Buffer.java


Segment writableSegment(int minimumCapacity) {
    if (minimumCapacity < 1 || minimumCapacity > Segment.SIZE) throw new IllegalArgumentException();
    //如果buffer 还没有初始化,从对象池拿一个Segment,同时初始化双链表
    if (head == null) {
      head = SegmentPool.take(); // Acquire a first segment.
      return head.next = head.prev = head;
    }
    //拿到双链表的最后一个Segment
    Segment tail = head.prev;
    //判断最后tail的空间够不够,tail是不是只读的Segment
    if (tail.limit + minimumCapacity > Segment.SIZE || !tail.owner) {
    //如果空间不够,或者是只读的 重新拿一个Segment放入到链表尾部
      tail = tail.push(SegmentPool.take()); 
    }
    return tail;
  }

skink.writeableSegment(1)的功能就是,从Buffer 的Segment链表中取到链表最后一个Segment,这个Segment需要满足两个条件1.可写 2.可写空间大于1个字节


到这里咱们基本上把int bytesRead = in.read(tail.data, tail.limit, maxToCopy)和Segment tail = sink.writableSegment(1)讲解清楚了。那么我们再重新看下Okio.source(InputStream in,Timeout timeOut) return new Source()代码块的read方法


@Override public long read(Buffer sink, long byteCount) throws IOException {
        if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
        if (byteCount == 0) return 0;
        try {
        //支持超时读取
          timeout.throwIfReached();
          //拿到buffer中链表的最后一个可写的Segment
          Segment tail = sink.writableSegment(1);
          //获取最大能往tail中写多少个字节
          int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit);
          //计算往Segment写了多少数据(为什么是写,对buffer来说就是写)
          int bytesRead = in.read(tail.data, tail.limit, maxToCopy);
          if (bytesRead == -1) return -1;
          //更新写的位置
          tail.limit += bytesRead;
          //增加buffer的数据总量
          sink.size += bytesRead;
          return bytesRead;
        } catch (AssertionError e) {
          if (isAndroidGetsocknameError(e)) throw new IOException(e);
          throw e;
        }
      }


总结下Okio.source(InputStream in, Timeout timeout)


从Buffer(sink)中找到链表中最后一个可写的并且还有写入空间的Segment记做tail

判断最多能写多少数据到Buffer(sink)中记做maxToCopy

从InputStream(in)中读取maxToCopy个数据到tail中


根据第三条的结论来看,比如你调用了soure.read(buffer,10*1024),那其实返回的肯定是比 10*1024少。举例说明,拿前面的helloworld举例。现在我从网络粘贴了 老罗android开发之旅的一篇文章到helloword.txt里并重复了3遍。文件大小为35243个字节


try {
            File file = new File("helloworld.txt");
            System.out.println("file.length "+file.length());
            Source source = Okio.source(file);
            Sink sink = Okio.sink(System.out);
            Buffer buffer = new Buffer();
            source.read(buffer, file.length());
            System.out.println(buffer.size());
//            sink.write(buffer, file.length());
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }

执行输出结果如下

file.length 35243
buffer size 8192
• 1
• 2
• 3

如果需要正确的把所有数据都写入到buffer中就需要用while循环了


 try {
            File file = new File("helloworld.txt");
            System.out.println("file.length "+file.length());
            long fileLength  = file.length();
            Source source = Okio.source(file);
            Sink sink = Okio.sink(System.out);
            Buffer buffer = new Buffer();
            while (fileLength!=0) {
                long hasRead = source.read(buffer, file.length());
                fileLength-=hasRead;
            }
            System.out.println("buffer size "+buffer.size());
//            sink.write(buffer, file.length());
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }


执行输出结果如下

file.length 35243
buffer size 35243


好了,至此Source基本上讲解完毕,接下来讲解Sink,老规矩还是从Okio.sink(OutputStream out,Timeout timeout)讲起

2.2 Okio.sink(final OutputStream out, final Timeout timeout)

private static Sink sink(final OutputStream out, final Timeout timeout) {
    if (out == null) throw new IllegalArgumentException("out == null");
    if (timeout == null) throw new IllegalArgumentException("timeout == null");
    return new Sink() {
    //把Buffer中的数据写入到sink中
      @Override public void write(Buffer source, long byteCount) throws IOException {
      //检查buffer中的数据数量是否合法(如果buffer数量<byteCount就不合法)
        checkOffsetAndCount(source.size, 0, byteCount);
        //自带while循环,直到把buffer中的数据耗尽
        while (byteCount > 0) {
          timeout.throwIfReached();
          //从buffer的第一个Segment开始
          Segment head = source.head;
          int toCopy = (int) Math.min(byteCount, head.limit - head.pos);
          out.write(head.data, head.pos, toCopy);
          head.pos += toCopy;
          byteCount -= toCopy;
          source.size -= toCopy;
        //如果当前Segment写完了出队列
          if (head.pos == head.limit) {
            source.head = head.pop();
            SegmentPool.recycle(head);
          }
        }
      }
      @Override public void flush() throws IOException {
        out.flush();
      }
      @Override public void close() throws IOException {
        out.close();
      }
      @Override public Timeout timeout() {
        return timeout;
      }
      @Override public String toString() {
        return "sink(" + out + ")";
      }
    };

总结下Okio.sink(OutputStream out,Timeout timeout)


检查Buffer的size 和readCount是否合法


获取Buffer中的Head Segment,把Segment中的数据写入到OutputStream,如果当前Segment数据写完了,Segment出队列,并放回对象池


判断数据是否写完,如果没写完,重复第二部

4. BufferedSource BufferedSink源码

Buffered.png

BufferedSource、BufferedSink 与Source和Sink的区别如下


BufferedSource、BufferedSink内部维护了一个Buffer对象


BufferedSource、BufferedSink内部分别引用了Source、Sink对象


BufferedSource的read(Buffer sink,long byteCount)使用内部的Source的read(Buffer sink,long byteCount)方法


BufferedSink的write(Buffer source,long byteCount)使用内部的Sink的write(Buffer source,long byteCount)方法


BufferedSource、BufferedSink内部扩展了很多readXX方法如 readByte/writeByte、readInt/writeInt等等


关于BufferedXX系列的源码可能需要再写一篇文章详细讲解。不过也是挺简单的。如果你看懂了本文自行分析BufferedXX应该是不在话下


5. 扩展Source Sink

Okio内部有不少已实现的Source和Sink。例如GzipSource/GzipSink、HashingSource/HashingSink。至于源码分析,请读者自行分析。


6. 接下来要做的事情

Okio是OkHttp IO操作的基石。接下来我们将带着Okio的学习成果进入OkHttp源码分析


相关文章
|
缓存
SpringCloud Gateway 网关的请求体body的读取和修改
SpringCloud Gateway 框架中,为了处理请求体body,实现多次读取与修改,创建了一个名为`RequestParamGlobalFilter`的全局过滤器。这个过滤器使用`@Component`和`@Slf4j`注解,实现了`GlobalFilter`和`Ordered`接口,设置最高优先级以首先读取body。它通过缓存请求体并创建装饰过的`ServerHttpRequest`来实现body的动态获取。
2073 4
|
存储 分布式计算 大数据
大数据之路:阿里巴巴大数据实践——大数据领域建模综述
数据建模解决数据冗余、资源浪费、一致性缺失及开发低效等核心问题,通过分层设计提升性能10~100倍,优化存储与计算成本,保障数据质量并提升开发效率。相比关系数据库,数据仓库采用维度建模与列式存储,支持高效分析。阿里巴巴采用Kimball模型与分层架构,实现OLAP场景下的高性能计算与实时离线一体化。
|
SQL 关系型数据库 数据库
学习分布式事务Seata看这一篇就够了,建议收藏
学习分布式事务Seata看这一篇就够了,建议收藏
19771 2
|
存储 监控 前端开发
Sentry 监控部署与使用(详细流程)
Sentry 监控部署与使用(详细流程)
14036 1
|
11月前
|
存储 数据采集 大数据
数据仓库建模规范思考
本文介绍了数据仓库建模规范,包括模型分层、设计、数据类型、命名及接口开发等方面的详细规定。通过规范化分层逻辑、高内聚松耦合的设计、明确的命名规范和数据类型转换规则,提高数据仓库的可维护性、可扩展性和数据质量,为企业决策提供支持。
922 10
Sentinel学习圣经:从入门到精通 Sentinel,最全详解 (40+图文全面总结)
尼恩给大家做一下系统化、体系化的梳理,联合社群小伙伴,来一个Sentinel学习圣经:从入门到精通Sentinel。
|
消息中间件 分布式计算 Hadoop
实时计算 Flink版操作报错合集之使用flink jar开发,报错:找不到main方法,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
消息中间件 缓存 Java
RocketMQ消息发送常见错误与解决方案
RocketMQ消息发送常见错误与解决方案
RocketMQ消息发送常见错误与解决方案
|
存储 数据可视化 前端开发
数仓常用分层与维度建模
本文介绍了数据仓库的分层结构和维度建模。数仓通常分为ODS、DIM、DWD、DWS和ADS五层,各层负责不同的数据处理阶段。维度建模是数据组织方法,包括星型和雪花模型。星型模型简单直观,查询性能高,适合简单查询;雪花模型则通过规范化减少冗余,提高数据一致性和结构复杂性,但可能影响查询效率。选择模型需根据业务需求和数据复杂性来定。
2799 0
|
存储 Java 分布式数据库
HBase构建图片视频数据的统一存储检索
HBase构建图片视频数据的统一存储检索