1. 请大家思考几个问题
在开始本文之前,请大家思考如下几个问题。并请大家带着这几个问题,去本文寻找答案。如果你对下面几个问题的答案了如指掌那本文可以略过不看
在浏览器中输入一个网址,按回车后发生了什么?
Okhttp的TCP连接建立发生在什么时候?
Okhttp的Request请求发生在什么时候?
Okio与Okhttp是在什么时候发生关联的?
Okhttp的Interceptor和Chain是怎么串联起来的?
Okhttp同步请求和异步请求分别是怎么实现的。异步请求有限制吗?
本文将围绕以上6个问题对Okhttp做一个简单的讲解,如有遗漏的知识点,欢迎在评论区指出,一起探讨成长。
2. Http请求的流程
首先来回答第一个问题“在浏览器中输入一个网址,按回车后发生了什么?”。Http权威指南一书给出的答案是发生了7件事情
浏览器从url中解析出主机名
浏览器将服务器的主机名转换成服务器的IP地址
浏览器将端口号从url中解析出来
浏览器建立一条与Web服务器的TCP连接
浏览器向服务器发送一条Http请求报文
服务器向浏览器回送一条Http响应报文
关闭连接,浏览器显示文档
以上七步是每一个Http请求必须要做的事情,Okhttp库要实现Http请求也不例外。这七个步骤的每一步,在Okhttp中都有体现。
HttpUrl类负责步骤1和步骤3的主机名和端口解析
Dns接口的具体实现负责步骤2的实现
RealConnection类就是步骤4中的那条TCP连接
CallServerInterceptor拦截器的intercept方法负责步骤5和步骤6的发送报文和接收报文
ConnectPool连接池中提供了关闭TCP连接的方法
Okio在哪操作?当然是往Socket的OutputStream写请求报文和从Socket的InputStram读取响应报文了
至此文章开头提出的6个问题,前5个都已经有了一个简单的回答。至于第六个问题,异步在Okhttp中用的缓存线程池,理论上缓存线程池,是当有任务到来,就会从线程池中取一个空闲的线程或者新建线程来处理任务,而且缓存线程池的线程数是Integer.MAX_VALUE。理论上是没有限制的。但是Dispatcher类在线程池的基础上,做了强制限制,最多可以同时处理的网络请求数64个,对于同一个主机名,最多可以同时处理5个网络请求。接下来我带大家从源码的角度来寻找这几个问题的答案
3.初识Okhttp
首先我们来写两个简单的例子来使用Okhttp完成get和post请求。
同步get请求
OkHttpClient client = new OkHttpClient(); String run(String url) throws IOException { Request request = new Request.Builder() .url(url) .build(); Response response = client.newCall(request).execute(); return response.body().string(); }
同步post请求
public static final MediaType JSON = MediaType.parse("application/json; charset=utf-8"); OkHttpClient client = new OkHttpClient(); String post(String url, String json) throws IOException { RequestBody body = RequestBody.create(JSON, json); Request request = new Request.Builder() .url(url) .post(body) .build(); Response response = client.newCall(request).execute(); return response.body().string(); }
异步get请求
OkHttpClient client = new OkHttpClient(); void run(String url) throws IOException { Request request = new Request.Builder() .url(url) .build(); client.newCall(request).enqueue(new Callback() { @Override public void onFailure(Call call, IOException e) { } @Override public void onResponse(Call call, Response response) throws IOException { System.out.println(response.body().string()); } }); }
异步post请求
public static final MediaType JSON = MediaType.parse("application/json; charset=utf-8"); OkHttpClient client = new OkHttpClient(); void post(String url, String json) throws IOException { RequestBody body = RequestBody.create(JSON, json); Request request = new Request.Builder() .url(url) .post(body) .build(); client.newCall(request).enqueue(new Callback() { @Override public void onFailure(Call call, IOException e) { } @Override public void onResponse(Call call, Response response) throws IOException { } }); }
Okhttp完成一次网络请求,过程如下
创建一个OkHttpClient对象client
创建一个Request对象request,并设置url
调用client.newCall(request)生成一个Call对象call
调用call.execute()[同步调用]或者call.enqueue(callback)[异步调用]完成网络请求
4. 同步和异步网络请求过程
同步调用过程如下
OkHttpClient.newCall(Request r) => RealCall.execute() => RealCall.getResponseWithInterceptorChain()
异步调用过程如下
OkHttpClient.newCall(Request r) => RealCall.enqueue(Callback) => Dispatcher.enqueue(AsyncCall) => 线程池执行AsyncCall =>AsyncCall.execute() => RealCall.getResponseWithInterceptorChain()
从上图可以看出异步调用比同步调用步骤更长,也更复杂。在这里我把整个调用过程分成两个阶段,RealCall.getResponseWithInterceptorChain()称作“网络请求执行阶段”,其余部分称作“网络请求准备阶段”。由于“网络请求执行阶段”涉及到链式(Chain)调用以及各种拦截器的执行比“网络请求准备阶段”复杂太多。所以我们先来看“网络请求准备阶段”,这个阶段也需要分成同步和异步两种方式来讲解
首先我们来看下准备阶段的公共调用部分OkHttpClient.newCall(Request r)
@Override public Call newCall(Request request) { return new RealCall(this, request, false /* for web socket */); }
Call是什么,在我看来Call可以理解成是对网络请求封装,它可以执行网络请求,也可以取消网络请求。我们看下Call的类定义
public interface Call extends Cloneable { Request request(); Response execute() throws IOException; void enqueue(Callback responseCallback); void cancel(); boolean isExecuted(); boolean isCanceled(); Call clone(); interface Factory { Call newCall(Request request); } }
request()返回的Request对象
execute()同步执行网络操作
enqueue(Callback responseCallback)异步执行网络操作
cancel()取消网络操作
RealCall是Call的实现类。我们重点来看下execute()和enqueue(Callback responseCallback)
execute()方法
@Override public Response execute() throws IOException { synchronized (this) { if (executed) throw new IllegalStateException("Already Executed"); executed = true; } captureCallStackTrace(); try { //这里只是把call存到Dispatcher的列表中 client.dispatcher().executed(this); //真正执行网络操作 Response result = getResponseWithInterceptorChain(); if (result == null) throw new IOException("Canceled"); return result; } finally { client.dispatcher().finished(this); } }
enqueue(Callback callback)
@Override public void enqueue(Callback responseCallback) { synchronized (this) { if (executed) throw new IllegalStateException("Already Executed"); executed = true; } captureCallStackTrace(); client.dispatcher().enqueue(new AsyncCall(responseCallback)); }
我们注意到AsyncCall类,首先它是RealCall类的非静态内部类,它持有RealCall的对象,它可以做任何RealCall能做的事情。同时它继承自NamedRunnable。NamedRunnable是Ruannable的子类。它主要有两个功能,1.可以被线程池执行 2.修改当前执行线程的线程名(方便debug)。由于这几个类都比较简单,而且篇幅有限,这里就不贴代码了。请自行查阅
当然到这里异步执行的准备阶段并没有结束,它是怎么被子线程执行的呢。这里我们就需要在Dispatcher类中寻找答案了。Dispatcher是干嘛用的,它的功能就是负责分发用户创建的网络请求,以及控制网络请求的个数,以及上一个网络请求结束后,要不要执行等待中的网络请求。下面我们来看下Dispatcher都有哪些成员变量,以及这些成员变量的作用
public final class Dispatcher { //最多可以同时请求数量 private int maxRequests = 64; //每个host最大同时请求数量 private int maxRequestsPerHost = 5; //当没有任务执行的回调 比如说执行10个任务,10个任务都执行完了会回调该接口 private Runnable idleCallback; /** Executes calls. Created lazily. */ //线程池 用的是缓存线程池(提高吞吐量,并且能及时回收无用的线程) private ExecutorService executorService; /** Ready async calls in the order they'll be run. */ //等待的异步任务队列 private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>(); //正在执行的异步任务队列 这个放的是Runnable /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */ private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>(); /** Running synchronous calls. Includes canceled calls that haven't finished yet. */ //正在执行的同步任务队列 这个存储的是RealCall private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>(); public Dispatcher(ExecutorService executorService) { this.executorService = executorService; } public Dispatcher() { } public synchronized ExecutorService executorService() { if (executorService == null) { executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false)); } return executorService; } }
总结下几个比较重要的概念
executorService 线程池并且是缓存线程池,最最大程度提高吞吐量,并且能回收空闲的线程
Deque readyAsyncCalls异步请求等待队列,当异步请求数超过64个,或者单个主机名请求超过5个,网络任务(AsyncCall)会放到该队列里。当任务执行完毕,会调用promoteCalls()把满足条件的网络任务(AsyncCall)放到线程池中执行
Deque runningAsyncCalls 正在执行的异步任务队列
Deque runningSyncCalls 正在执行的同步任务队列
紧接着我们来看下Dispatcher的enqueue(AsyncCall call)
synchronized void enqueue(AsyncCall call) { if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) { //如果请求没有超过限制 用线程池执行网络请求 runningAsyncCalls.add(call); executorService().execute(call); } else { //如果超过了限制,把请求放到异步等待队列中去,什么时候被唤醒?finished后 readyAsyncCalls.add(call); } }
通过线程池执行AsyncCall 最终调用的是AsyncCall的run(),而run()中调用的是AsyncCall的execute()
@Override protected void execute() { boolean signalledCallback = false; try { Response response = getResponseWithInterceptorChain(); if (retryAndFollowUpInterceptor.isCanceled()) { signalledCallback = true; responseCallback.onFailure(RealCall.this, new IOException("Canceled")); } else { signalledCallback = true; responseCallback.onResponse(RealCall.this, response); } } catch (IOException e) { if (signalledCallback) { // Do not signal the callback twice! Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e); } else { responseCallback.onFailure(RealCall.this, e); } } finally { client.dispatcher().finished(this); } }
在execute()中调用了getResponseWithInterceptorChain(),也就是我们前面说的“网络请求执行阶段”。至此,同步和异步网络请求的“网络请求准备阶段”就讲解完了,接下来我们讲解“网络请求执行阶段”
5. Interceptor和Chain
Response getResponseWithInterceptorChain() throws IOException { List<Interceptor> interceptors = new ArrayList<>(); //用户自定义的拦截器 interceptors.addAll(client.interceptors()); //重试和重定向拦截器 interceptors.add(retryAndFollowUpInterceptor); interceptors.add(new BridgeInterceptor(client.cookieJar())); //缓存拦截器 interceptors.add(new CacheInterceptor(client.internalCache())); //连接拦截器 interceptors.add(new ConnectInterceptor(client)); //用户自定义的网络拦截器 if (!forWebSocket) { interceptors.addAll(client.networkInterceptors()); } //真正请求服务器的拦截器 interceptors.add(new CallServerInterceptor(forWebSocket)); //用Chain把List中的interceptors串起来执行 Interceptor.Chain chain = new RealInterceptorChain( interceptors, null, null, null, 0, originalRequest); return chain.proceed(originalRequest); }
要理解上面的代码。我们需要搞清楚两个概念 Interceptor和Chain。我们先来看下它们的定义
public interface Interceptor { Response intercept(Chain chain) throws IOException; interface Chain { Request request(); Response proceed(Request request) throws IOException; Connection connection(); } }
可以举一个比较形象的例子来解释它们的关系。就以我们IT行业来讲。老板说想要一个APP,然后把需求转化成任务流转给产品经理,产品经理构思了下,然后把任务流转给设计师,设计师一通设计之后,把设计图流转给码农,码农接到任务后就开始加班加点的编码,最终写出了APP,然后把APP交给设计师,让设计师查看界面是否美观,设计师再把APP流转给产品经理,产品经理觉得很满意,最终把APP交付给老板。老板看了很满意,说大家伙晚上加个鸡腿。虽然实际生产中并不是这样的一个流程,想了很久觉得还是这样讲,更好理解。对应到这个例子中,产品经理、设计师、程序员他们都是真正干活的家伙,他们对应的是Interceptor。Chain是什么,是老板吗?不是!!Chain只是一套规则,对应的就是例子里的流转流程。
对照图片,“网络请求执行阶段”会依次执行Interceptors里的Interceptor。Interceptor执行分成三步。第一步:处理请求 第二步:执行下个Interceptor 第三步:处理响应
List<Interceptor> interceptors = new ArrayList<>(); //用户自定义的拦截器 interceptors.addAll(client.interceptors()); //重试和重定向拦截器 interceptors.add(retryAndFollowUpInterceptor); interceptors.add(new BridgeInterceptor(client.cookieJar())); //缓存拦截器 interceptors.add(new CacheInterceptor(client.internalCache())); //连接拦截器 interceptors.add(new ConnectInterceptor(client)); //用户自定义的网络拦截器 if (!forWebSocket) { interceptors.addAll(client.networkInterceptors()); } //真正请求服务器的拦截器 interceptors.add(new CallServerInterceptor(forWebSocket));
首先执行的就是retryAndFollowUpInterceptor
RetryAndFollowUpInterceptor
//第一部分 根据url解析出主机名 解析端口 Request request = chain.request(); streamAllocation = new StreamAllocation( client.connectionPool(), createAddress(request.url()), callStackTrace); ....省略 while(true){ .... //第二部分处理下一个Interceptor response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null); releaseConnection = false; .... //第三部分 处理响应,如果是重定向,用while循环,重新处理下一个Interceptor if (followUp == null) { if (!forWebSocket) { streamAllocation.release(); } return response; } closeQuietly(response.body()); if (++followUpCount > MAX_FOLLOW_UPS) { streamAllocation.release(); throw new ProtocolException("Too many follow-up requests: " + followUpCount); } }
BridgeInterceptor和CacheInterceptor请读者自行分析
ConnectInterceptor中会解析主机DNS并且建立TCP连接,Socket的输入输出流通过Source和Sink处理
public final class ConnectInterceptor implements Interceptor { public final OkHttpClient client; public ConnectInterceptor(OkHttpClient client) { this.client = client; } @Override public Response intercept(Chain chain) throws IOException { RealInterceptorChain realChain = (RealInterceptorChain) chain; Request request = realChain.request(); StreamAllocation streamAllocation = realChain.streamAllocation(); boolean doExtensiveHealthChecks = !request.method().equals("GET"); //这里会解析DNS并且建立TCP连接,Socket的输入输出流会和Okio的Source和Sink关联 HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks); RealConnection connection = streamAllocation.connection(); return realChain.proceed(request, streamAllocation, httpCodec, connection); } }
CallServerInterceptor 通过httpCodec的sink向socket发送请求,并且通过httpCodec的openResponseBody把socket的输入流写入到Response中
@Override public Response intercept(Chain chain) throws IOException { HttpCodec httpCodec = ((RealInterceptorChain) chain).httpStream(); StreamAllocation streamAllocation = ((RealInterceptorChain) chain).streamAllocation(); Request request = chain.request(); long sentRequestMillis = System.currentTimeMillis(); httpCodec.writeRequestHeaders(request); Response.Builder responseBuilder = null; if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) { // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100 // Continue" response before transmitting the request body. If we don't get that, return what // we did get (such as a 4xx response) without ever transmitting the request body. if ("100-continue".equalsIgnoreCase(request.header("Expect"))) { httpCodec.flushRequest(); responseBuilder = httpCodec.readResponseHeaders(true); } // Write the request body, unless an "Expect: 100-continue" expectation failed. if (responseBuilder == null) { Sink requestBodyOut = httpCodec.createRequestBody(request, request.body().contentLength()); BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut); request.body().writeTo(bufferedRequestBody); bufferedRequestBody.close(); } } httpCodec.finishRequest(); if (responseBuilder == null) { responseBuilder = httpCodec.readResponseHeaders(false); } Response response = responseBuilder .request(request) .handshake(streamAllocation.connection().handshake()) .sentRequestAtMillis(sentRequestMillis) .receivedResponseAtMillis(System.currentTimeMillis()) .build(); int code = response.code(); if (forWebSocket && code == 101) { // Connection is upgrading, but we need to ensure interceptors see a non-null response body. response = response.newBuilder() .body(Util.EMPTY_RESPONSE) .build(); } else { response = response.newBuilder() .body(httpCodec.openResponseBody(response))//真正把body写进去 .build(); } if ("close".equalsIgnoreCase(response.request().header("Connection")) || "close".equalsIgnoreCase(response.header("Connection"))) { streamAllocation.noNewStreams(); } if ((code == 204 || code == 205) && response.body().contentLength() > 0) { throw new ProtocolException( "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength()); } return response; }
6. 结束语
OkHttp库还是挺庞大的,涉及到很多Http的基础知识。这里只是讲解了OkHttp的一小部分。很多细节的东西也没有深入讲解。比如说Socket的建立,连接池的管理,HttpCodec如何解析输入输出流,路由的细节。希望有机会可以再深入的讲解一番