OKHttp3源码解析系列
本文基于OkHttp3的3.11.0版本
implementation 'com.squareup.okhttp3:okhttp:3.11.0'
OkHttp3发起请求方式
我们用OkHttp3发起一个网络请求一般是这样:
首先要构建一个OkHttpClient
OkHttpClient.Builder builder = new OkHttpClient.Builder()
.connectTimeout(15, TimeUnit.SECONDS)
.writeTimeout(20, TimeUnit.SECONDS)
.readTimeout(20, TimeUnit.SECONDS);
mOkHttpClient = builder.build();
然后构建Request
Request request = new Request.Builder()
.url(url)
.build();
异步请求
mOkHttpClient.newCall(request).enqueue(callback);
同步请求
mOkHttpClient.newCall(request).execute();
以上是简略的用OkHttp3请求网络的步骤,下面我们来通过源码分析下。
OkHttp3源码分析
我们先来看看OkHttp的newCall方法
@Override
public Call newCall(Request request) {
return RealCall.newRealCall(this, request, false /* for web socket */);
}
可以看见返回的RealCall,所以我们发起请求无论是调用execute方法还是enqueue方法,实际上调用的都是RealCall内部的方法。
//RealCall.class
@Override
public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
try {
client.dispatcher().executed(this);
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} catch (IOException e) {
eventListener.callFailed(this, e);
throw e;
} finally {
client.dispatcher().finished(this);
}
}
@Override
public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
在RealCall内部的enqueue方法和execute方法中,都是通过OkHttpClient的任务调度器Dispatcher来完成。下面我们先来看看Dispatcher。
任务调度器Dispatcher
Dispatcher中有一些重要的变量
//最大并发请求数
private int maxRequests = 64;
//每个主机的最大请求数
private int maxRequestsPerHost = 5;
//这个是线程池,采用懒加载的模式,在第一次请求的时候才会初始化
private @Nullable ExecutorService executorService;
//将要运行的异步请求任务队列
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
//正在运行的异步请求任务队列
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
//正在运行的同步请求队列
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
我们再看看Dispatcher的构造方法
public Dispatcher(ExecutorService executorService) {
this.executorService = executorService;
}
public Dispatcher() {
}
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
(1)可以看到Dispatcher有2个方法,我们如果需要用自己的线程池,可以调用带有线程池参数的构造方法。
(2)Dispatcher中的默认构造方法是个空实现,线程池的加载方式采用的是懒加载,也就是在第一次调用请求的时候初始化。
(3)Dispatcher采用的线程池类似于CacheThreadPool,没有核心线程,非核心线程数很大,比较适合执行大量的耗时较少的任务。
(4)由于没有提供带有最大并发请求数和每个主机的最大请求数参数的构造方法,我们没办法修改这2个参数。
下面我们分别分析下同步请求和异步请求的流程
同步请求流程分析
我们在通过mOkHttpClient.newCall(request).execute()的方式发起同步请求时,实际上调用的是RealCall内部的execute方法。
//RealCall.class
@Override public Response execute() throws IOException {
...
try {
client.dispatcher().executed(this);
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} catch (IOException e) {
eventListener.callFailed(this, e);
throw e;
} finally {
client.dispatcher().finished(this);
}
}
RealCall内部的execute方法主要做了3件事:
(1)执行Dispacther的executed方法,将任务添加到同步任务队列中
//Dispatcher.class
synchronized void executed(RealCall call) {
runningSyncCalls.add(call);
}
(2)调用RealCall内部的 getResponseWithInterceptorChain 方法请求网络。
(3)无论最后请求结果如何,都会调用Dispatcher的finished方法,将当前的请求移出队列。
void finished(RealCall call) {
finished(runningSyncCalls, call, false);
}
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
//将请求移出队列
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
//这里promoteCalls为false,所以不会调用promoteCalls方法
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
总结
综上流程分析来看,同步请求只是利用了Dispatcher的任务队列管理,没有利用Dispatcher的线程池,所以executed方法是在请求发起的线程中运行的。所以我们不能直接在UI线程中调用OkHttpClient的同步请求,否则会报“NetworkOnMainThread”错误。
异步请求流程分析
我们通过 mOkHttpClient.newCall(request).enqueue(callback) 的方式发起异步请求,实际上调用的是RealCall中的enqueue方法。
//RealCall.class
@Override
public void enqueue(Callback responseCallback) {
...
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
RealCall中的enqueue方法什么也没干,直接调用Dispatcher中的enqueue方法,不过将传入的Callback包装成了AsyncCall。AsyncCall是RealCall中的内部类。
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
AsyncCall(Callback responseCallback) {
super("OkHttp %s", redactedUrl());
this.responseCallback = responseCallback;
}
...
@Override
protected void execute() {
boolean signalledCallback = false;
try {
//请求网络
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
}
(1)AsyncCall实现了继承自NamedRunnable,而NamedRunnable实现了Runnable接口,在run方法中会调用execute方法,所以当线程池执行AsyncCall时,AsyncCall的execute方法就会被调用。
(2)AsyncCall的execute方法通过getResponseWithInterceptorChain方法请求网络得到Response,然后根据请求状态回调对应的方法。所以这些回调方法都是运行在线程池中的,不能直接更新UI。
(3)AsyncCall的execute方法无论请求结果如何,最后都会调用Dispatcher的finished方法。
//Dispatcher.class
void finished(AsyncCall call) {
//与同步任务的情况不同,这里promoteCalls参数为true
finished(runningAsyncCalls, call, true);
}
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
//异步任务时,这里会调用promoteCalls方法
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
private void promoteCalls() {
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
//如果当前的请求没有超出每个主机的最大请求数
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
与同步请求调用finished方法不同的是,异步任务调用Dispatcher的finished方法时还会执行promoteCalls方法。promoteCalls方法主要就是从待执行的任务队列中取出一个任务加入到正在执行的任务队列,并调用线程池执行任务。
最后我们来看看Dispatcher中的enqueue方法
//Dispatcher.class
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
enqueue方法比较简单,如果当前正在执行的任务数量还没达到最大数量并且当前请求的任务所请求的主机对应的请求数没有超过最大阈值,就将当前任务加入正在执行的任务队列,并调用线程池执行,否则就将任务加入待执行的队列。
总结
(1)综上,异步请求利用了Dispatcher的线程池来处理请求。当我们发起一个异步请求时,首先会将我们的请求包装成一个AsyncCall,并加入到Dispatcher管理的异步任务队列中,如果没有达到最大的请求数量限制,就会立即调用线程池执行请求。
(2)AsyncCall执行的请求回调方法都是在线程池中调用的,所以我们不能直接更新UI,需要在回调方法中利用Handler切换线程。
欢迎关注我的微信公众号,和我一起每天进步一点点!