为了更好的使用OKHttp—架构与源码分析

简介: 为了更好的使用OKHttp—架构与源码分析

今儿个咱们就来看看到底okhttp内部是如何实现的,这篇文章咱从okhttp整体框架方面出发,解析okhttp的源码。

okhttp框架源码地址: github.com/square/okht…

如何使用 okhttp

OkHttpClient client = new OkHttpClient.Builder().build();
Request request = new Request.Builder().
        url("https://github.com/cozing").
        build();
Call call = client.newCall(request);
try {
    //1.同步请求方式
    Response response = call.execute();
    //2.异步请求方式
    call.enqueue(new Callback() {
        @Override
        public void onFailure(Call call, IOException e) {
            Log.info("cozing", "交易失败");
        }
        @Override
        public void onResponse(Call call, Response response) throws IOException {
            Log.info("cozing", "交易成功");
        }
    });
} catch (IOException e) {
    e.printStackTrace();
}

这是使用okhttp发送一个简单通信流程,其中包括同步请求和异步请求:

  1. 同步请求调用的方法是call.execute(),内部采用的是线程阻塞方式直接将结果返回到Response,后面咱们会详细讲解;
  2. 异步请求调用的方法是call.enqueue(Callback callback),该方法需要传入一个Callback等待结果回调的接口,交易结果在onFailure()(失败)onResponse()(成功)中返回。

那么,接下来咱们来看看okhttp的源码的整体流程。

整体架构流程图

接下来咱们将根据这个整体架构图来来看看okhttp的内部实现。

dd75ca830b281e718d5d8c3df1a6f5e.png

流程走读

创建OkHttpClient

首先创建OkHttpClient对象,OkHttpClient是okhttp框架的客户端,用于发送http请求(Requests)和读取交易返回数据(Responses)。官方建议使用单例创建OkHttpClient,即一个进程中只创建一次即可,以后的每次交易都使用该实例发送交易。这是因为OkHttpClient拥有自己的连接池和线程池,这些连接池和线程池可以重复使用,这样做利于减少延迟和节省内存,如果咱们每次发交易都创建一个OkHttpClient的话,将会浪费很多内存资源。

创建方式:

OkHttpClient client = new OkHttpClient.Builder().build();
Request request = new Request.Builder().
        url("https://github.com/cozing").
        build();
Call call = client.newCall(request);
复制代码

还有一种方式创建OkHttpCleint对象的方式:

OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder().
        url("https://github.com/cozing").
        build();
Call call = client.newCall(request);
复制代码

这两种创建方式内部实现,第一种采用构建者模式创建OkHttpClient对象,这种方式可以自定义Builder内部的每一个参数属性,第二种采用普通的创建实例方式创建一个OkHttpClient对象,内部创建了一个默认的Builder,因此这种方式使用默认Builder的内部属性。

创建Call对象

一个Call对象表示一次请求,每一次交易请求都会生产一个新的CallCall其实是一个接口对象,它的具体实现类是RealCall

Call的创建过程:

Request request = new Request.Builder().
        url("https://github.com/cozing").
        build();
Call call = client.newCall(request);

可以看到在创建Call对象时候传进去了一个Request对象,Request对象表示用户的交易请求参数,咱们看看它的内部实现:

public final class Request { final HttpUrl url; final String method; final Headers headers;
final @Nullable RequestBody body; final object tagi
private volatile CacheControl cacheControl; // Lazily initialized.
Request(Builder builder) { this.url = builder.url;
this.method = builder.method;
this.headers = builder.headers.build(); this.body = builder.body;
this.tag = builder.tag ≠ null ? builder.tag : this;}

可以看到,里面包括:

  • url:交易请求地址;
  • method:请求方式,比如:get/post/http...等请求方式;
  • headers:请求头;
  • body:请求体;

咱们这时看看Call的创建方法client.newCall(request)的内部实现:

*Prepares the {@code request} to be executed at some point in the future. 
*/  
  @Override public call newcall(Request request) {  
  return new Realcall( client: this, request, t forWebSocket: false /* for web socket */);  

继续看RealCall()

final boolean forwebSocket;
// Guarded by this. 
private boolean executed; 
@RealCall(OkHttpClient client,Request originalRequest, boolean forWebSocket){
final EventListener.FactorveventListenerFactorv = client.eventListenerFactorv():
this.client = client; 
this.originalRequest = originalRequest; 
this.forWebSocket = forWebSocket;
  this.retryAndFollowUpInterceptor=newRetryAndFollowUpInterceptor(client,forWebSocket); 
//TODO(jwilson): this is unsafe publication and not threadsafe. 
this.eventListener = eventListenerFactory.create(this);

可以看到确实如上面咱们所说,最后创建的是RealCall对象。

请求交易

okhttp中提供了两种请求方式:一种是同步请求,第二种是异步请求。同步请求调用call.execute()方法,异步请求调用call.enqueue(Callback callback)方法,

在看两个请求方式的实现之前,咱们先来看okhttp中一个重要成员Dispatcher(调度器)

Dispatcher(调度器)

Dispatcher是okhttp的任务调度核心类,负责管理同步和异步的请求,管理每一个请求任务的请求状态,并且其内部维护了一个线程池用于执行相应的请求,Dispatcher的实现框架图:

c7d19f4c02f492008f60ccb6b7e3f92.png

Dispatcher内部维护了两个队列:

public final class Dispatcher { 
private int maxRequests = 64; 
private int maxRequestsPerHost = 5; 
private @Nullable Runnable idleCallback;  
/** Executes calls. Created lazilv. */  
private @Nullable ExecutorService executorService;
/** Ready async calls in the order they'll be run. */ 
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDequeo();
/**Running asynchronous calls.Includes canceled calls that haven't finished yet. */ 
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque。(); 
/**Running synchronous calls.Includes canceled calls that haven't finished vet. */  
private final Deque<RealCall> runningSyncCalls = new ArrayDeque◇(); 

两个队列的作用可以看上面注释,那么为何要使用这两个队列呢?咱们可以这么理解:把Dispatcher当成生产者,把线程池当成消费者,当生产者生产的线程大于消费者所能承受的最大范围,就把未能及时执行的任务保存在readyAsyncCalls队列中,当时机成熟,也就是线程池有空余线程可以执行时,会调用promoteCall()这个方法把等待队列中的任务取出放到线程池中执行,并且把这个任务转移到runningAsyncCalls队列中去。

接下来咱们分别看看同步请求和异步请求的实现过程,并详细说一下他们是如何实现的。

同步交易请求

call.execute()实现方式:

@ @Override public Request request() { return originalRequest; h  
@Override public Response execute() throws I0Exception { synchronized(this){
if(executed)throw new IllegalStateException("Already Executed"); executed = true;
captureCallStackTrace();
try{调用调度器dispatcher的executed方法 client.dispatcher().executed( call:this);
Response result = getResponseWithInterceptorChain(); if(result = null) throw new I0Exception("Canceled"); return result;
finally{调用调度器dispatcher的finished方法 client.dispatcher().finished( call: this);
}
private void captureCallStackTrace(){
Object callstackTrace=Platform.get().getStackTraceForCloseable( closer: "response.bodv().close()") retryAndFollowUpInterceptor.setCallStackTrace(callStackTrace);
final Dispatcher  dispatcher  
final @Nullable Proxy proxy;  
final List<Protocol> protocols; 
final list<ConnectionSpec>  connectionSpecs;

可以看到实际调用的是Dispatcher调度器的executed()方法,继续看Dispatcher的实现:

return result;
/**Used by {@code Call#execute} to signal it is in-flight. */ 
  synchronized void executed  Realcall call){ 
runningSyncCalls.add(call);

到此咱们可以知道这部分是将这次交易的请求RealCall存进了Deque队列,Deque是一个双向队列接口,Deque接口具有丰富的抽象数据形式,它支持从队列两端点检索和插入元素,在此不对其做过多讲解。

接下来看的client.dispatcher().finished(this),不管结果请求结果如何,都会调用finally中的client.dispatcher().finished(this)将本次请求从队列中移除。

接下来调用到getResponseWithInterceptorChain()方法:

Request request() { return originalRequest; }
@ RealCall get() { return RealCall.this; }  
@Override protected void  execute() 
boolean signalledCallback = false;  
trv { 
Response response = getResponseWithInterceptorChain();  
if(retryAndFollowUpInterceptor.isCanceled()) {  
signalledCallback = true; 
responseCallback.onFailure(call:RealCall.this, new IOException("Canceled"));  
else {  
signalledCallback = true; 
responseCallback.onResponse( call:RealCall.this, response);

这个方法是okhttp的实现精髓点之一,这部分咱先放一边,将会在异步请求中一起讲解。

异步交易请求

call.enqueue(Callback callback)实现方式:

@Override public void enqueue(Callback responseCallback){ 
synchronized(this){ 
if(executed)throw new IllegalStateException("Already Executed");  
executed = true;  
captureCallStackTrace();  
client.dispatcher().enqueue(new AsyncCall(responseCallback)); 

当调用此次异步请求方法的时候,内部是调用了调度器dispatcherequeue(new AsyncCall(responseCallback))方法,该方法需要传入一个AsyncCall的对象。

接下来看dispatcher的equeue(new AsyncCall(responseCallback))方法的实现:

*Seta callback to be invoked each time the dispatcher becomes idle (when the number of running* calls returns to zero).
*<p>Note: The time at which a {@linkplain Call call}is considered idle is different depending*on whether it was run{@linkplain Call#enqueue(Callback) asynchronously} or
*{@linkplainCall#execute)svnchronouslv}. Asvnchronous calls become idle after the
*{@link Callback#onResponse onResponse} or {@link Callback#onFailure onFailure} callback has
*returned.Synchronouscallsbecomeidleonce{@link Call#execute() execute()} returns. This
*means that if youare doingsynchronous calls the network layer will not truly be idle until*every returned {@link Response} has been closed.*/
public synchronized voidsetIdlecallback(@Nullable RunnableidleCallback){ this.idleCallback = idleCallback; h
synchronized void enqueue(AsyncCall call){
if(runningAsyncCalls.size()<maxRequests6GrunningCallsForHost(call)< maxRequestsPerHost){ runningAsyncCalls.add(call);
executorService().execute(call); else {
readyAsyncCalls.add(call);

先判断当前运行中的请求数是否小于设定的最大请求数量,默认最大请求数是同时执行64个请求,并且判断当前运行中的共同主机的请求数量是否小于设定的最大请求数量,默认同一主机的请求数量最大值为5,当两者条件都成立的时候会调用executorService()execute(call)方法;两者中只要有一个条件不成立,就会调用redyAsncCalls.add(call)将表示此次请求的call对象存在readyAsyncCalls队列中,readyAsyncCalls表示已准备好并等待执行请求的队列,当有空闲网络请求线程时,会从该队列中取出并执行网络请求。

接下来看executorService().execute(call)

@public Dispatcher() {
}
public svnchronized ExecutorService executorService(){ if(executorService = null){
executorService =newThreadPoolExecutor(corePoolsize:0,Integer.MAX_VALUE, keepAliveTime: 60,TimeUnit.SECONDS
new SynchronousQueue<Runnable>(),Util.threadFactorv(name: "OkHttp Dispatcher", daemon: false)); 
  return executorService; 

可以看到调度器Dispatcher内部维护了一个ThreadPoolExecutor线程池,并直接将call对象传入线程池执行。

通过上图可以知道这个call的实现对象类型是AsyncCall,来看看内部实现:

final class AsyncCall extends NamedRunnable 
private final Callback responseCallback 
@AsyncCall(Callback responseCallback){  
super("0kHttp %s", redactedUrl());  
this.responseCallback = responseCallback;
@String host() { return originalRequest.url().host();}  
@ Request request() { return originalRequest; } 
@RealCall get() { return RealCall.this; } 
@Override protected void execute() {  
boolean signalledCallback = false;  
try { 
Response response = getResponseWithInterceptorChain();  
if(retryAndFollowUpInterceptor.isCanceled()){ 
  signalledCallback = true; 
responseCallback.onFailure(cal:RealCall.this, new IOException("Canceled")); 
  else {  
signalledCallback = true; 
responseCallback.onResponse( call: RealCall.this, response);  
catch (IOException e){  
if(signalledCallback) { 
// Do not signal the callback twice!  
Platform.get().log(INFo,message:"Callback failure for "+ toLoggableString(), e):  
else {  
responseCallback.onFailure( cal: RealCall.this,e);  
finally { 
client.dispatcher().finished( call: this);  

可以看到AsyncCallRealCall的一个内部类,继承自NamedRunnable,再看NamedRunnable

package okhttp3.internal; 
/** 
Runnable implementation which always sets its thread name.  
* 
public abstract class NamedRunnable implements Runnable { 
protected final string name;  
public NamedRunnable(String format,Object. args){ this.name = Util.format(format, args); }  
@Override public final void run(){  
String oldName = Thread.currentThread().getName();  
Thread.currentThread().setName(name); 
try { 
execute();  
finally{  
Thread.currentThread().setName(oldName);  
} 
protected abstract void execute();  

所以,AsyncCall就是一个Runnable的实现,用来开启一个线程,当网络请求线程池执行该线程的run()方法时,会调用AsyncCallexecute()的方法,最后在execute()方法内部调用了和上面咱们分析的同步请求方法一样的getResponseWithInterceptorChain()

getResponseWithInterceptorChain()/拦截器链

通过上面的分析咱们知道不管是同步请求还是异步请求,最后都会走getResponseWithInterceptorChain()方法,getResponseWithInterceptorChain()是okhttp中的精髓设计之一,那么现在咱们来看看这个方法的内部实现:

*Returnsa string that describes this call.Doesn't include a full URL as that might contain  
* sensitive information.  
  */  
  @ String toLoggableString() 
return (isCanceled() ? "canceled ":"")  
+(forWebSocket ? "web socket" : "call") 
“ :"+redactedurl(); 
String redactedurl(){ return originalRequest.url().redact();} 
Response getResponseWithInterceptorChain() throws IOException f 
// Build a full stack of interceptors.  
List<Interceptor> interceptors = new ArrayListo(); interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor);  
interceptors.add(new BridgeInterceptor(client.cookieJar()));  
interceptors.add(new CacheInterceptor(client.internalCache())); 
interceptors.add(new ConnectInterceptor(client));
if(!forWebSocket){  
interceptors.addAll(client.networkInterceptors());  
interceptors.add(new CallServerInterceptor(forWebSocket));  
Interceptor.Chain chain new RealInterceptorChainl 
interceptors, streamAllocation:null,httpCodec:null,connection: null,index: 0,originalRequest);  
return  chain.proceed(originalRequest); 

这个方法是通过拦截器链对请求数据和返回数据进行处理,内部采用责任链模式,将每一个拦截器对应负责的处理任务进行严格分配,最后将交易结果返回并回调暴露给调用者的接口上。

这些拦截器包括:

  1. 用户自定义的拦截器
  2. retryAndFollowUpInterceptor:重试和重定向拦截器,主要负责网络失败重连。
  3. BridgeInterceptor:主要负责添加交易请求头。
  4. CacheInterceptor:缓存拦截器,主要负责拦截缓存。
  5. ConnectInterceptor:网络连接拦截器,主要负责正式开启http请求。
  6. CallServerInterceptor:负责发送网络请求和读取网络响应。

有关每个拦截器的具体实现和内部流程,读者可自行阅读源码了解,这篇文章咱们主要还是分析okhttp的整体架构。

根据上面的源码可以看到,最后交给了RealInterceptorChain(拦截器链类)这个真正的处理类,并调用RealInterceptorChain``的proceed()`方法来实现,具体实现:

OkHttpClient.java x Dispatcherjava x  Util.java x RealCall.java x RealInterceptorChain.java x Nam 
public Response proceed(Fequest request, StreamAllocation streamAllocation, HttpCodec httpCodec,
RealConnection connection) throws IOException {
if(index>interceptors.size()) throw new AssertionError();
calls+;
//Ifwe already have a stream, confirm that the incoming request will use it. if(this.httpCodec≠null8G!this.connection.supportsUrl(request.url())) {
throw newIllegalstateException("network interceptor"+ interceptors.get(index - 1)
+ " must retain the same host and port");}
//Ifwe already have astream, confirm that this is the only call to chain.proceed(). if (this.httpCodec = null sG calls > 1) {
throw newIllegalStateException("network interceptor "+ interceptors.get(index - 1)
+ " must call proceed() exactly once");}
// Call the next interceptor in the chain.
RealInterceptorChain next = new RealInterceptorChain(
interceptors,streamAllocation,httpCodec,connection, index: index + 1, request); Interceptor interceptor = interceptors.get(index); Response response = interceptor.intercept(next);
//Confirm that the next interceptor made its required call to chain.proceed() if(httpCodec≠null8Gindex+1<interceptors.size() 8G next.calls ≠ 1) { throw newIllegalStateException("network interceptor "+ interceptor
+ " must call proceed() exactly once");
}
// Confirm that the intercepted response isn't null. if (response = null) {
throw new NullPointerException("interceptor"+ interceptor+" returned null");
return response;  
// Call the next interceptor in the chain.
RealInterceptorChain next = new RealInterceptorChain(
interceptors,streamAllocation,httpCodec,connection, index: index+ 1, request); Interceptor interceptor = interceptors.get(index); Response response = interceptor..intercept(next

会调用拦截器的intercept(next)方法,只有当前拦截器的response返回有结果时,才会执行下一个拦截器,因此得出结论:下一个拦截器依赖于当前拦截器的返回,可以保证拦截器的依次执行。

在拦截器链中执行的结果,在同步请求中会直接在response返回,而异步请求:

Request request() { return originalRequest; }
Realcall get() { return Realcall.this; }
@Override protected void execute() {
boolean signalledCallback = false; try {
Response response = getResponseWithInterceptorChain(); if(retryAndFollowUpInterceptor.isCanceled()) { signalledCallback = true;
responseCallback.onFailure(call:RealCall.this, new IOException("Canceled")) else {
signalledCallback =true;
responseCallback.onResponse( call:RealCall.this, response);l
catch(IOException e){ if(signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO,message:"Callback failure for "+ toLoggableString(), e); else {
responseCallback.onFailure( call:RealCall.this,e);
finally {
client.dispatcher().finished( call: this);

异步请求时会把拦截器链的处理结果通过CallbackonReponse回调给用户。

总结

至此,okhttp的整体架构分析完毕,建议可以跟着源码一步步去理解,去了解okhttp的设计思想,然后应用到项目开发中。当然,okhttp是一个很庞大的一个框架,这篇文章主要是从它的整体架构方面对其做了简单的分析,内部的实现逻辑和思想都很值得认真思考和细细品味。



相关文章
|
6月前
|
PyTorch 算法框架/工具
Bert Pytorch 源码分析:五、模型架构简图 REV1
Bert Pytorch 源码分析:五、模型架构简图 REV1
91 0
|
6月前
|
PyTorch 算法框架/工具
Bert Pytorch 源码分析:五、模型架构简图
Bert Pytorch 源码分析:五、模型架构简图
65 0
架构系列——通过ReentrantLock源码分析给对象上锁的原理
架构系列——通过ReentrantLock源码分析给对象上锁的原理
|
负载均衡 Dubbo Java
RPC框架-dubbo:架构及源码分析-初篇
在自学或面试dubbo时,相关的问题有很多,例如dubbo 的基本工作原理,这是使用过dubbo后应该知道的。包括dubbo的分层架构、长短链接选择、二进制协议支持;之后是使用方式(服务的注册、发现、调用方式),基础配置(超时时间、线程数),这些是最基本的。 在这些问题之后,就可以继续深入底层:关于连接方式,使用长连接还是短连接?为什么? dubbo的二进制协议支持哪些,之间有什么区别/优缺点等等,也可以考察在使用过程中遇到过哪些问题,是如何解决的。这些都需要深入理解,并且有真实、长时间使用经验。
229 0
|
网络协议 Java
OkHttp架构—异步请求enqueue(不完整篇)
我分为了四个部分,橙色第一部分实例化一个OkHttoClient类对象就可以了。 所有的逻辑大部分在拦截器Interceptors中,但进入拦截器之前还要靠分发器来调配请求任务。 分发器Dispatcher:内部维护队列和线程池,完成请求调配。 拦截器Interceptors:完成整个请求。
603 0
|
消息中间件 缓存 Kafka
Kafka Producer整体架构概述及源码分析(上)
Kafka Producer整体架构概述及源码分析
215 0
Kafka Producer整体架构概述及源码分析(上)
|
XML 设计模式 前端开发
Tomcat的架构与源码分析学习笔记
Tomcat的架构与源码分析学习笔记
Tomcat的架构与源码分析学习笔记
|
消息中间件 Kafka
Kafka Producer整体架构概述及源码分析(下)
Kafka Producer整体架构概述及源码分析
132 0
|
存储 缓存 Java
微服务架构 | *2.5 Nacos 长轮询定时机制的源码分析
为方便理解与表达,这里把 Nacos 控制台和 Nacos 注册中心称为 Nacos 服务器(就是 web 界面那个),我们编写的业务服务称为 Nacso 客户端; 由于篇幅有限,这里将源码分析分为上下两篇,其中上篇讲获取配置与事件订阅机制,下篇讲长轮询定时机制;在《微服务架构 | 2.2 Alibaba Nacos 的统一配置管理》中提到一张 Nacos 动态监听的长轮询机制原理图,本篇将围绕这张图剖析长轮询定时机制的原理;
1166 1
微服务架构 | *2.5 Nacos 长轮询定时机制的源码分析
|
监控 算法 数据可视化
微服务架构 | 5.4 Sentinel 流控、统计和熔断的源码分析
调用链路是 Sentinel 的工作主流程,由各个 Slot 槽组成,将不同的 Slot 槽按照顺序串在一起,从而将不同的功能(限流、降级、系统保护)组合在一起; 本篇《2. 获取 ProcessorSlot 链》将从源码级讲解如何获取调用链路,接着会以遍历链表的方式处理每一个 Slot 槽,其中就有:FlowSlot、StatisticSlot、DegradeSlot 等。分别对应本篇《3. 流控槽实施流控逻辑》、《4. 统计槽实施指标数据统计》和《5. 熔断槽实施服务熔断》;
320 0
微服务架构 | 5.4 Sentinel 流控、统计和熔断的源码分析
下一篇
无影云桌面