源码阅读 | Okhttp

简介: 源码阅读 | Okhttp

首先先看一下用法:


val client = OkHttpClient.Builder().build()
val request = Request.Builder()
  .url("https://www.baidu.com")
  .build()
val call = client.newCall(request)
call.enqueue(object : okhttp3.Callback {
    override fun onFailure(call: okhttp3.Call, e: IOException) {
    }
    override fun onResponse(call: okhttp3.Call, response: okhttp3.Response) {
        Log.e("TAG", "onResponse: ${response.body?.string()}")
    }
})


首先通过 builder 模式创建了一个 client。然后通过 newCall 方法创建了一个 Call ,最后通过这个 call 进行网络请求。


newCall 方法中传了一个参数,是 Request,这个 Request 是自己拼出来的。这个方法就是通过 request 创建一个 待用的网络请求。


override fun newCall(request: Request): Call {
  return RealCall.newRealCall(this, request, forWebSocket = false)
}
// RealCall 
companion object {
    fun newRealCall(
      client: OkHttpClient,  originalRequest: Request,   forWebSocket: Boolean
    ): RealCall {
        //创建一个 RealCall 
        //client ,requet,webSocket一般情况是用不到      
      return RealCall(client, originalRequest, forWebSocket).apply {
          //Transmitter
        transmitter = Transmitter(client, this)
      }
    }
  }


在 newRealCall 方法中返回了一个 RealCall 对象,由此可知,RealCall 是 Call 的实现类。


enqueue 方法


call.enqueue(object : okhttp3.Callback {
    override fun onFailure(call: okhttp3.Call, e: IOException) {
    }
    override fun onResponse(call: okhttp3.Call, response: okhttp3.Response) {
        Log.e("TAG", "onResponse: ${response.body?.string()}")
    }
})
//RealCall 
  override fun enqueue(responseCallback: Callback) {
    synchronized(this) {
      check(!executed) { "Already Executed" }
      executed = true
    }
    transmitter.callStart()
    client.dispatcher.enqueue(AsyncCall(responseCallback))
  }


在 enqueue 中 将 callback 转交给了 dispather,并创建了一个异步的Call 。下面分别看一下 dispatcher,enqueue 和 AsyncCall 分别是什么东西


dispatcher


val dispatcher: Dispatcher = builder.dispatcher


//异步请求何时执行的策略。每个调度程序使用一个[ExecutorService]在内部运行调用。
//如果您提供自己的执行器,它应该能够同时运行[配置的最大][maxRequests]调用数。
class Dispatcher constructor() {
  // 最大请求数,超过后就会等一等
  @get:Synchronized var maxRequests = 64
  //...... 
  //主机的最大请求数,防止给服务器太大压力
  @get:Synchronized var maxRequestsPerHost = 5
  @get:Synchronized
  @get:JvmName("executorService") val executorService: ExecutorService
    get() {
      if (executorServiceOrNull == null) {
      executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
            SynchronousQueue(), threadFactory("OkHttp Dispatcher", false))
    }
      return executorServiceOrNull!!
    }
}


Dispatcher 主要是用来管理线程的,每一个新的请求的过程是需要一个单独的线程,这样不同的请求之间不会被挡着。


他的内部实现用的是 ExecutorService ,


dispatcher 的 enqueue


internal fun enqueue(call: AsyncCall) {
  synchronized(this) {
    readyAsyncCalls.add(call)
  //.....
  }
  promoteAndExecute()
}


调用 readyAsyncCalls ,这个是一个待命的队列,随时准备执行。


private fun promoteAndExecute(): Boolean {
  assert(!Thread.holdsLock(this))
  val executableCalls = mutableListOf<AsyncCall>()
  val isRunning: Boolean
  synchronized(this) {
    val i = readyAsyncCalls.iterator()
     // 遍历待命队列
    while (i.hasNext()) {
      val asyncCall = i.next()
   //判断 最大连接数 和 host 数量,不满足直接 break 
      if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
      if (asyncCall.callsPerHost().get() >= this.maxRequestsPerHost) continue // Host max capacity.
  // 从待命中移除 
      i.remove()
      asyncCall.callsPerHost().incrementAndGet()
       //将 asynCall 添加到 list 和 运行队列中
      executableCalls.add(asyncCall)
      runningAsyncCalls.add(asyncCall)
    }
    isRunning = runningCallsCount() > 0
  }
  for (i in 0 until executableCalls.size) {
    val asyncCall = executableCalls[i]
    // 调用 AsyncCall 的 executeOn 开始执行
    asyncCall.executeOn(executorService)
  }
  return isRunning
}


dispatcher 的 enqueue 主要就是进行了最大连接数和host的判断,然后将 asyncCall 添加到 一个 list 和 运行队列中,最后遍历 list 调用了 AsyncCall 的 executeOn 方法,


下面看一下 AsyncCall


AsyncCall


internal inner class AsyncCall( private val responseCallback: Callback) : Runnable {
    fun executeOn(executorService: ExecutorService) {
      assert(!Thread.holdsLock(client.dispatcher))
      var success = false
      try {
        //执行  
        executorService.execute(this)
        success = true
      } catch (e: RejectedExecutionException) {
        val ioException = InterruptedIOException("executor rejected")
        ioException.initCause(e)
        transmitter.noMoreExchanges(ioException)
        responseCallback.onFailure(this@RealCall, ioException)
      } finally {
        if (!success) {
          client.dispatcher.finished(this) // This call is no longer running!
        }
      }
    }
    override fun run() {
      threadName("OkHttp ${redactedUrl()}") {
        var signalledCallback = false
        transmitter.timeoutEnter()
        try {
          //  
          val response = getResponseWithInterceptorChain()
          signalledCallback = true
          //
          responseCallback.onResponse(this@RealCall, response)
        } catch (e: IOException) {
          if (signalledCallback) {
            // Do not signal the callback twice!
            Platform.get().log(INFO, "Callback failure for ${toLoggableString()}", e)
          } else {
            //
            responseCallback.onFailure(this@RealCall, e)
          }
        } finally {
          client.dispatcher.finished(this)
        }
      }
    }
  }
}


注意 executeOn 方法,这个方法就是 在 dispatcher 的 enqueue 中调用的。在这里执行了 executorService 后,对应的 run 方法也就会执行。


run 方法中,通过 getResponseWithInterceptorChain() 进行请求,并拿到响应 response,最后调用 callback,这个 callback 就是我们在请求的时候传入的 callback


在 run 中有一些比较重要的方法,我们到后面说。


到现在使用 enqueue 进行请求的大致逻辑已经非常清楚了,其中最关键的代码就是:


client.dispatcher.enqueue(AsyncCall(responseCallback))


大致流程如下:


1,创建一个 RealCall ,然后调用 enqueue


2,在 enqueue 中调用 dispatcher 的 enqueue


3,在 dispatcher 的 enqueue 方法中触发了 AsyncCall 的 run 方法,


4,在 run 方法中进行请求并响应


execute 方法


val response = call.execute()
1
override fun execute(): Response {
  synchronized(this) {
    check(!executed) { "Already Executed" }
    executed = true
  }
  transmitter.timeoutEnter()
  transmitter.callStart()
  try {
    client.dispatcher.executed(this)
     // 非常直接,直接调用了。
    return getResponseWithInterceptorChain()
  } finally {
    client.dispatcher.finished(this)
  }
}


execute 中不需要切线程,所以就直接调用了。


OkhttpClient 中的配置项


dispatcher


用来调度线程的,对性能进行优化等,例如 maxRequest = 64 等


proxy


代理


protocols


所支持 http 协议的版本


enum class Protocol(private val protocol: String) {
  HTTP_1_0("http/1.0"),
  HTTP_1_1("http/1.1"),
  HTTP_2("h2"),
  //......
 }


connectionSpecs


ConnectionSpecs 是用来指定 http 传输时的 socket 连接;对于 https 连接,ConnectionSpecs 是在构建 TLS 连接时向服务端说明客户端支持的 TLS 版本,密码套件的一类,


在通过 https 连接时,需要向服务器附加上 客户端所支持的 TLS 版本,Cipher suite(可以接受的 对称,非对称和 hash 算法) 等数据。这些都在 ConnectionSpecs 这个类中。


// 支持密码套件的列表
private val RESTRICTED_CIPHER_SUITES = arrayOf(
    // TLSv1.3.
    CipherSuite.TLS_AES_128_GCM_SHA256,
    CipherSuite.TLS_AES_256_GCM_SHA384,
    CipherSuite.TLS_CHACHA20_POLY1305_SHA256,
    // TLSv1.0, TLSv1.1, TLSv1.2.
    CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
    CipherSuite.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
    CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
    CipherSuite.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
    CipherSuite.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256,
    CipherSuite.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256)
//支持密码套件的列表
private val APPROVED_CIPHER_SUITES = arrayOf(
    // TLSv1.3.
    CipherSuite.TLS_AES_128_GCM_SHA256,
    CipherSuite.TLS_AES_256_GCM_SHA384,
    CipherSuite.TLS_CHACHA20_POLY1305_SHA256,
    // TLSv1.0, TLSv1.1, TLSv1.2.
    CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
    CipherSuite.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
    CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
    CipherSuite.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
    CipherSuite.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256,
    CipherSuite.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256,
    // Note that the following cipher suites are all on HTTP/2's bad cipher suites list. We'll
    // continue to include them until better suites are commonly available.
    CipherSuite.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,
    CipherSuite.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA,
    CipherSuite.TLS_RSA_WITH_AES_128_GCM_SHA256,
    CipherSuite.TLS_RSA_WITH_AES_256_GCM_SHA384,
    CipherSuite.TLS_RSA_WITH_AES_128_CBC_SHA,
    CipherSuite.TLS_RSA_WITH_AES_256_CBC_SHA,
    CipherSuite.TLS_RSA_WITH_3DES_EDE_CBC_SHA)
     /** 安全的TLS连接需要最近的客户端和最近的服务器 */
    @JvmField
    val RESTRICTED_TLS = Builder(true)
        .cipherSuites(*RESTRICTED_CIPHER_SUITES)
        .tlsVersions(TlsVersion.TLS_1_3, TlsVersion.TLS_1_2)
        .supportsTlsExtensions(true)
        .build()
    /**
     *现代的TLS的配置,适用于大多数客户端和可以连接到的服务器,是OkHttp的默认配置
     */
    @JvmField
    val MODERN_TLS = Builder(true)
        .cipherSuites(*APPROVED_CIPHER_SUITES)
        .tlsVersions(TlsVersion.TLS_1_3, TlsVersion.TLS_1_2)
        .supportsTlsExtensions(true)
        .build()
    /**
     *向后兼容的配置,相比于MODERN_TLS,支持的TLS的版本变少了,只支持TLS_1_0版本
     */
    @JvmField
    val COMPATIBLE_TLS = Builder(true)
        .cipherSuites(*APPROVED_CIPHER_SUITES)
        .tlsVersions(TlsVersion.TLS_1_3, TlsVersion.TLS_1_2, TlsVersion.TLS_1_1, TlsVersion.TLS_1_0)
        .supportsTlsExtensions(true)
        .build()
    /** 未加密未认证的连接,就是HTTP连接*/
    @JvmField
    val CLEARTEXT = Builder(false).build()


上面的 xxxx_TLS 配置的就是你要使用 http 还是使用 https 。


如果你要使用 https ,那么你是用的 tls 版本是多少


interceptors,networkInterceptors


拦截器


eventListenerFactory


用来做统计的。


cookieJar


cookie 的存储器。okhttp 默认没有实现 cookie 的存储,需要自己去存储。


cache


缓存


socketFactory


用来创建 TCP 端口的 ,http 本身是没有端口的。


certificateChainCleaner


从服务器拿下来的证书,有时候会拿到很多个证书,certificateChainCleaner 是用来整理的,整理完后就是一个链或者说是一个序列,最后一个证书就是本地的根证书。


hostnameVerifier


给 https 做主机名验证的,用来验证对方的 host 是不是你需要访问的host


certificatePinner


证书固定器,用来验证自签名证书的!


connectionPool


连接池


followRedirects


遇到 301 的时候是否需要重定向


followSslRedirects


当你访问的是 http ,但是重定向到 https,或者是 访问的是 https ,但是重定向到 http 的时候 是否要重定向。


retryOnConnectionFailure


请求失败的时候是否需要重连


connectTimeout


tcp 连接的时间,超过这个事件则报错


readTimeout


下载响应的时候等待时间


writeTimeout


写入请求的等待时间


pingInterval


针对 webSocket 的,可以双向交互的通道,这就需要长连接了。pingInterval 表示多少时间 ping 一次。


getResponseWithInterceptorChain


在 enqueue 和 execute 方法中,最后都调用的是 getResponseWithInterceptorChain 方法进行请求,在这个方法中会将你的请求进行网络请求,并获得 response。下面看一哈源码


fun getResponseWithInterceptorChain(): Response {
  // 获取 和 创建所有的拦截器
  val interceptors = mutableListOf<Interceptor>()
  interceptors += client.interceptors
  interceptors += RetryAndFollowUpInterceptor(client)
  interceptors += BridgeInterceptor(client.cookieJar)
  interceptors += CacheInterceptor(client.cache)
  interceptors += ConnectInterceptor
  if (!forWebSocket) {
    interceptors += client.networkInterceptors
  }
  interceptors += CallServerInterceptor(forWebSocket)
  //
  val chain = RealInterceptorChain(interceptors, transmitter, null, 0, originalRequest, this,
      client.connectTimeoutMillis, client.readTimeoutMillis, client.writeTimeoutMillis)
  //.....
    val response = chain.proceed(originalRequest)
  //......
}


上面首先是获取了自己添加的拦截器,然后又添加了许多内置的拦截器。


接着就是创建了一个 chain ,这个 chain 就是一个链,这个链里面放的的就是拦截器。


在请求的时候,就会从链的开始往后执行,请求完成之后又会返回来。


chain 的作用:例子,我是店老板,收到了一份快餐的订单,然后我将快餐做好之后交给店里骑手,接着骑手把快餐送到客户家,客户收到后将钱交给骑手,骑手会从钱中拿出一部分当做路饭,剩下的就给老板。


这就是一个链,老板是起始端,负责制作快餐,并交给骑手;中间的节点就是骑手,负责将快餐送给用户;终点就是客户,接收快餐后,进行付钱。


链中的每一个节点都会对 request 做一些处理,并转交给下一个节点,一直到最后,并且会返回到最开始的地方。


而网络请求的时候会创建 chain,其中的节点就是拦截器,他会对 request 做一些或多或少的处理,然后交给下一个节点处理。


override fun proceed(request: Request): Response {
  return proceed(request, transmitter, exchange)
}
@Throws(IOException::class)
fun proceed(request: Request, transmitter: Transmitter, exchange: Exchange?): Response {
  if (index >= interceptors.size) throw AssertionError()
  calls++
  //.....
  // Call the next interceptor in the chain.
  val next = RealInterceptorChain(interceptors, transmitter, exchange,
      index + 1, request, call, connectTimeout, readTimeout, writeTimeout)
  val interceptor = interceptors[index]
  @Suppress("USELESS_ELVIS")
  val response = interceptor.intercept(next) ?: throw NullPointerException(
      "interceptor $interceptor returned null")
 //......
  return response
}


在 proced 中,获取下一个拦截器,并调用他的 intercept 方法,在 intercept 中,又会调用 proced,获取下一个拦截器。。。。;这里就会形成一个循环,直到最后一个节点返回 response ,最终数据就会返回的开始调用的地方。


需要注意的是,在调用下一个拦截器的时候,当前的 intercept 是没有执行完的。


接着看一下一些内置的拦截器


// Build a full stack of interceptors.
    val interceptors = mutableListOf<Interceptor>()
    interceptors += client.interceptors
    interceptors += RetryAndFollowUpInterceptor(client)
    interceptors += BridgeInterceptor(client.cookieJar)
    interceptors += CacheInterceptor(client.cache)
    interceptors += ConnectInterceptor
    if (!forWebSocket) {
      interceptors += client.networkInterceptors
    }
    interceptors += CallServerInterceptor(forWebSocket)


RetryAndFollowUpInterceptor


重试,并在必要时跟随重定向。它可能会抛出IOException


BridgeInterceptor(client.cookieJar)


连接应用程序代码和网络代码。首先,它根据用户请求构建网络请求。然后它继续调用网络。最后,它从网络响应构建一个用户响应。


override fun intercept(chain: Interceptor.Chain): Response {
  val userRequest = chain.request()
  val requestBuilder = userRequest.newBuilder()
  val body = userRequest.body
  if (body != null) {
    val contentType = body.contentType()
    if (contentType != null) {
      requestBuilder.header("Content-Type", contentType.toString())
    }
    val contentLength = body.contentLength()
    if (contentLength != -1L) {
      requestBuilder.header("Content-Length", contentLength.toString())
      requestBuilder.removeHeader("Transfer-Encoding")
    } else {
      requestBuilder.header("Transfer-Encoding", "chunked")
      requestBuilder.removeHeader("Content-Length")
    }
  }
  if (userRequest.header("Host") == null) {
    requestBuilder.header("Host", userRequest.url.toHostHeader())
  }
  if (userRequest.header("Connection") == null) {
    requestBuilder.header("Connection", "Keep-Alive")
  }
  var transparentGzip = false
  if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
    transparentGzip = true
    //支持 gzip 数据类型
    requestBuilder.header("Accept-Encoding", "gzip")
  }
  val cookies = cookieJar.loadForRequest(userRequest.url)
  if (cookies.isNotEmpty()) {
    requestBuilder.header("Cookie", cookieHeader(cookies))
  }
  if (userRequest.header("User-Agent") == null) {
    requestBuilder.header("User-Agent", userAgent)
  }
  // proced 之前,请求前  
  val networkResponse = chain.proceed(requestBuilder.build())
  // proced 之后,请求后  
  cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)
  val responseBuilder = networkResponse.newBuilder()
      .request(userRequest)
  //解 gzip  
  if (transparentGzip &&
      "gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) &&
      networkResponse.promisesBody()) {
    val responseBody = networkResponse.body
    if (responseBody != null) {
      val gzipSource = GzipSource(responseBody.source())
      val strippedHeaders = networkResponse.headers.newBuilder()
          .removeAll("Content-Encoding")
          .removeAll("Content-Length")
          .build()
      responseBuilder.headers(strippedHeaders)
      val contentType = networkResponse.header("Content-Type")
      responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer()))
    }
  }
  return responseBuilder.build()
}


在 proced 之前,设置一写通用的设置,这些设置是在 header 中的,如 contentType,长度,host 等一系列东西。


在 proced 之后如果服务器发送的是 gzip 数据,则进行解压,将数据添加到 body 中,最后返回 response


CacheInterceptor


做一些缓存的处理,如 请求之前判断是否有缓存,请求成功后缓存的写入等


ConnectInterceptor


处理 http ,https ,tcp 连接的问题,和网络进行交互,并返回 response


override fun intercept(chain: Interceptor.Chain): Response {
  val realChain = chain as RealInterceptorChain
  val request = realChain.request()
  val transmitter = realChain.transmitter()
  // We need the network to satisfy this request. Possibly for validating a conditional GET.
  val doExtensiveHealthChecks = request.method != "GET"
  val exchange = transmitter.newExchange(chain, doExtensiveHealthChecks)
  return realChain.proceed(request, transmitter, exchange)
}


networkInterceptors


NetWorkIntercept 是放在最后面的,这种拦截器是用来做网络相关的操作的。


相关文章
|
设计模式 缓存 Java
一步步带你读懂 Okhttp 源码
一步步带你读懂 Okhttp 源码
|
JSON 缓存 前端开发
打造终极MVP+Retrofit2+okhttp3+Rxjava2网络请求,开发实用,简约2
打造终极MVP+Retrofit2+okhttp3+Rxjava2网络请求,开发实用,简约
235 0
|
缓存 JSON 网络协议
打造终极MVP+Retrofit2+okhttp3+Rxjava2网络请求,开发实用,简约1
打造终极MVP+Retrofit2+okhttp3+Rxjava2网络请求,开发实用,简约
323 0
|
设计模式 缓存 监控
OKHttp3 从使用到原理分析
Okhttp3 是我们经常使用的一个网络框架,可扩展性强,支持 get 缓存, spdy、http2.0,gzip 压缩减少数据流量,同步和异步请求,连接池复用机制等特性让广大 android 开发者深爱不已,今天我就带大家从 Okhttp 简单使用,到各种好用拦截器原理了解 Okhttp3
1910 0
OKHttp3 从使用到原理分析
|
安全 Java API
OkHttp官方教程
OkHttp官方教程
486 0
|
缓存 Java 开发工具
Android开发之OkHttp介绍
Android开发之OkHttp介绍
193 0
Android开发之OkHttp介绍
|
存储 Android开发
OkHttp源码详解之Okio源码详解
OkHttp源码详解之Okio源码详解
OkHttp源码详解之Okio源码详解
|
设计模式 缓存 网络协议
从设计模式看OkHttp源码
说到源码,很多朋友都觉得复杂,难理解。
157 0
从设计模式看OkHttp源码
|
缓存 网络协议 Java
OkHttp源码详解之二完结篇
OkHttp源码详解之二完结篇
OkHttp源码详解之二完结篇
|
设计模式 Java API
【OkHttp】OkHttp 源码分析 ( 网络框架封装 | OkHttp 4 迁移 | OkHttp 建造者模式 )
【OkHttp】OkHttp 源码分析 ( 网络框架封装 | OkHttp 4 迁移 | OkHttp 建造者模式 )
351 0
【OkHttp】OkHttp 源码分析 ( 网络框架封装 | OkHttp 4 迁移 | OkHttp 建造者模式 )