OkHttp3源码解析(一)之请求流程

本文涉及的产品
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 带你从源码理解OkHttp3的原理

OKHttp3源码解析系列

本文基于OkHttp3的3.11.0版本

implementation 'com.squareup.okhttp3:okhttp:3.11.0'

OkHttp3发起请求方式

我们用OkHttp3发起一个网络请求一般是这样:

首先要构建一个OkHttpClient

OkHttpClient.Builder builder = new OkHttpClient.Builder()
        .connectTimeout(15, TimeUnit.SECONDS)
        .writeTimeout(20, TimeUnit.SECONDS)
        .readTimeout(20, TimeUnit.SECONDS);
mOkHttpClient = builder.build();

然后构建Request

Request request = new Request.Builder()
        .url(url)
        .build();

异步请求

mOkHttpClient.newCall(request).enqueue(callback);

同步请求

mOkHttpClient.newCall(request).execute();

以上是简略的用OkHttp3请求网络的步骤,下面我们来通过源码分析下。


OkHttp3源码分析

我们先来看看OkHttp的newCall方法

@Override 
public Call newCall(Request request) {
    return RealCall.newRealCall(this, request, false /* for web socket */);
}

可以看见返回的RealCall,所以我们发起请求无论是调用execute方法还是enqueue方法,实际上调用的都是RealCall内部的方法。

//RealCall.class
@Override 
public Response execute() throws IOException {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    eventListener.callStart(this);
    try {
      client.dispatcher().executed(this);
      Response result = getResponseWithInterceptorChain();
      if (result == null) throw new IOException("Canceled");
      return result;
    } catch (IOException e) {
      eventListener.callFailed(this, e);
      throw e;
    } finally {
      client.dispatcher().finished(this);
    }
}

@Override 
public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    eventListener.callStart(this);
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
}

在RealCall内部的enqueue方法和execute方法中,都是通过OkHttpClient的任务调度器Dispatcher来完成。下面我们先来看看Dispatcher。

任务调度器Dispatcher

Dispatcher中有一些重要的变量

//最大并发请求数
private int maxRequests = 64;
//每个主机的最大请求数
private int maxRequestsPerHost = 5;

//这个是线程池,采用懒加载的模式,在第一次请求的时候才会初始化
private @Nullable ExecutorService executorService;

//将要运行的异步请求任务队列
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

//正在运行的异步请求任务队列
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

//正在运行的同步请求队列
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

我们再看看Dispatcher的构造方法

public Dispatcher(ExecutorService executorService) {
    this.executorService = executorService;
}

public Dispatcher() {
}

public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
}

(1)可以看到Dispatcher有2个方法,我们如果需要用自己的线程池,可以调用带有线程池参数的构造方法。

(2)Dispatcher中的默认构造方法是个空实现,线程池的加载方式采用的是懒加载,也就是在第一次调用请求的时候初始化。

(3)Dispatcher采用的线程池类似于CacheThreadPool,没有核心线程,非核心线程数很大,比较适合执行大量的耗时较少的任务。

(4)由于没有提供带有最大并发请求数和每个主机的最大请求数参数的构造方法,我们没办法修改这2个参数。

下面我们分别分析下同步请求和异步请求的流程

同步请求流程分析

我们在通过mOkHttpClient.newCall(request).execute()的方式发起同步请求时,实际上调用的是RealCall内部的execute方法。

//RealCall.class
@Override public Response execute() throws IOException {
    ...
    try {
      client.dispatcher().executed(this);
      Response result = getResponseWithInterceptorChain();
      if (result == null) throw new IOException("Canceled");
      return result;
    } catch (IOException e) {
      eventListener.callFailed(this, e);
      throw e;
    } finally {
      client.dispatcher().finished(this);
    }
}

RealCall内部的execute方法主要做了3件事:

(1)执行Dispacther的executed方法,将任务添加到同步任务队列中

//Dispatcher.class
synchronized void executed(RealCall call) {
    runningSyncCalls.add(call);
}

(2)调用RealCall内部的 getResponseWithInterceptorChain 方法请求网络。

(3)无论最后请求结果如何,都会调用Dispatcher的finished方法,将当前的请求移出队列。

void finished(RealCall call) {
    finished(runningSyncCalls, call, false);
}

private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
      //将请求移出队列  
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      //这里promoteCalls为false,所以不会调用promoteCalls方法
      if (promoteCalls) promoteCalls();
      runningCallsCount = runningCallsCount();
      idleCallback = this.idleCallback;
    }
    
    if (runningCallsCount == 0 && idleCallback != null) {
      idleCallback.run();
    }
}

总结

综上流程分析来看,同步请求只是利用了Dispatcher的任务队列管理,没有利用Dispatcher的线程池,所以executed方法是在请求发起的线程中运行的。所以我们不能直接在UI线程中调用OkHttpClient的同步请求,否则会报“NetworkOnMainThread”错误。


异步请求流程分析

我们通过 mOkHttpClient.newCall(request).enqueue(callback) 的方式发起异步请求,实际上调用的是RealCall中的enqueue方法。

//RealCall.class
@Override 
public void enqueue(Callback responseCallback) {
    ...
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
}

RealCall中的enqueue方法什么也没干,直接调用Dispatcher中的enqueue方法,不过将传入的Callback包装成了AsyncCall。AsyncCall是RealCall中的内部类。

final class AsyncCall extends NamedRunnable {
    private final Callback responseCallback;

    AsyncCall(Callback responseCallback) {
      super("OkHttp %s", redactedUrl());
      this.responseCallback = responseCallback;
    }

    ...

    @Override 
    protected void execute() {
      boolean signalledCallback = false;
      try {
        //请求网络
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          eventListener.callFailed(RealCall.this, e);
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        client.dispatcher().finished(this);
      }
    }
}

(1)AsyncCall实现了继承自NamedRunnable,而NamedRunnable实现了Runnable接口,在run方法中会调用execute方法,所以当线程池执行AsyncCall时,AsyncCall的execute方法就会被调用。

(2)AsyncCall的execute方法通过getResponseWithInterceptorChain方法请求网络得到Response,然后根据请求状态回调对应的方法。所以这些回调方法都是运行在线程池中的,不能直接更新UI。

(3)AsyncCall的execute方法无论请求结果如何,最后都会调用Dispatcher的finished方法。

//Dispatcher.class
void finished(AsyncCall call) {
    //与同步任务的情况不同,这里promoteCalls参数为true
    finished(runningAsyncCalls, call, true);
}

private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      //异步任务时,这里会调用promoteCalls方法
      if (promoteCalls) promoteCalls();
      runningCallsCount = runningCallsCount();
      idleCallback = this.idleCallback;
    }

    if (runningCallsCount == 0 && idleCallback != null) {
      idleCallback.run();
    }
}

private void promoteCalls() {
    if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
    if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.

    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall call = i.next();
      //如果当前的请求没有超出每个主机的最大请求数
      if (runningCallsForHost(call) < maxRequestsPerHost) {
        i.remove();
        runningAsyncCalls.add(call);
        executorService().execute(call);
      }

      if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
    }
}

与同步请求调用finished方法不同的是,异步任务调用Dispatcher的finished方法时还会执行promoteCalls方法。promoteCalls方法主要就是从待执行的任务队列中取出一个任务加入到正在执行的任务队列,并调用线程池执行任务。

最后我们来看看Dispatcher中的enqueue方法

//Dispatcher.class
synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else {
      readyAsyncCalls.add(call);
    }
}

enqueue方法比较简单,如果当前正在执行的任务数量还没达到最大数量并且当前请求的任务所请求的主机对应的请求数没有超过最大阈值,就将当前任务加入正在执行的任务队列,并调用线程池执行,否则就将任务加入待执行的队列。

总结

(1)综上,异步请求利用了Dispatcher的线程池来处理请求。当我们发起一个异步请求时,首先会将我们的请求包装成一个AsyncCall,并加入到Dispatcher管理的异步任务队列中,如果没有达到最大的请求数量限制,就会立即调用线程池执行请求。

(2)AsyncCall执行的请求回调方法都是在线程池中调用的,所以我们不能直接更新UI,需要在回调方法中利用Handler切换线程。



欢迎关注我的微信公众号,和我一起每天进步一点点!
AntDream
目录
相关文章
|
7天前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
23 2
|
1月前
|
缓存 Java 程序员
Map - LinkedHashSet&Map源码解析
Map - LinkedHashSet&Map源码解析
67 0
|
1月前
|
算法 Java 容器
Map - HashSet & HashMap 源码解析
Map - HashSet & HashMap 源码解析
54 0
|
7天前
|
存储 安全 Linux
Golang的GMP调度模型与源码解析
【11月更文挑战第11天】GMP 调度模型是 Go 语言运行时系统的核心部分,用于高效管理和调度大量协程(goroutine)。它通过少量的操作系统线程(M)和逻辑处理器(P)来调度大量的轻量级协程(G),从而实现高性能的并发处理。GMP 模型通过本地队列和全局队列来减少锁竞争,提高调度效率。在 Go 源码中,`runtime.h` 文件定义了关键数据结构,`schedule()` 和 `findrunnable()` 函数实现了核心调度逻辑。通过深入研究 GMP 模型,可以更好地理解 Go 语言的并发机制。
|
20天前
|
消息中间件 缓存 安全
Future与FutureTask源码解析,接口阻塞问题及解决方案
【11月更文挑战第5天】在Java开发中,多线程编程是提高系统并发性能和资源利用率的重要手段。然而,多线程编程也带来了诸如线程安全、死锁、接口阻塞等一系列复杂问题。本文将深度剖析多线程优化技巧、Future与FutureTask的源码、接口阻塞问题及解决方案,并通过具体业务场景和Java代码示例进行实战演示。
39 3
|
1月前
|
存储
让星星⭐月亮告诉你,HashMap的put方法源码解析及其中两种会触发扩容的场景(足够详尽,有问题欢迎指正~)
`HashMap`的`put`方法通过调用`putVal`实现,主要涉及两个场景下的扩容操作:1. 初始化时,链表数组的初始容量设为16,阈值设为12;2. 当存储的元素个数超过阈值时,链表数组的容量和阈值均翻倍。`putVal`方法处理键值对的插入,包括链表和红黑树的转换,确保高效的数据存取。
56 5
|
1月前
|
Java Spring
Spring底层架构源码解析(三)
Spring底层架构源码解析(三)
111 5
|
1月前
|
XML Java 数据格式
Spring底层架构源码解析(二)
Spring底层架构源码解析(二)
|
1月前
|
前端开发 JavaScript UED
axios取消请求CancelToken的原理解析及用法示例
axios取消请求CancelToken的原理解析及用法示例
94 0
|
1月前
|
敏捷开发 数据可视化 测试技术
解析软件项目管理:以板栗看板为例,其如何有效影响并优化软件开发流程
软件项目管理是一个复杂而重要的过程,涵盖了软件产品的创建、维护和优化。其核心目标是确保软件项目能够顺利完成,同时满足预定的质量、时间和预算目标。本文将深入探讨软件项目管理的内涵及其对软件开发过程的影响,并介绍一些有效的管理工具。

推荐镜像

更多