引子
网络请求埋点即是在客户端收集网络请求数据并上传云端,为网络性能优化提供数据支持。(本篇网络请求将基于 OkHttp + Retrofit)
通常采集的数据包括如下字段:
- ip 地址
- 网络类型(蜂窝数据, WIFI)
- 用户 id
- DNS 耗时
- 建立连接耗时
- 请求总耗时
- 请求 url
- 请求方式(GET, POST)
- 响应码
- 响应协议(HTTP/2, QUIC)
其中 4 到 10 的字段和网络请求强相关。
采集数据
和网络强相关的数据并不能在“一个地方”获取。
OkHttp 提供了okhttp3.EventListener
事件监听器来监控网络数据:
abstract class EventListener { // 请求开始 open fun callStart(call: Call) {} // dns 开始 open fun dnsStart(call: Call,domainName: String) {} // dns 结束 open fun dnsEnd(call: Call,domainName: String,inetAddressList: List<@JvmSuppressWildcards InetAddress>) {} // 连接开始 open fun connectStart(call: Call,inetSocketAddress: InetSocketAddress,proxy: Proxy) {} // TLS 开始 open fun secureConnectStart(call: Call) {} // TLS 结束 open fun secureConnectEnd(call: Call,handshake: Handshake?) {} // 连接失败 open fun connectFailed(call: Call,inetSocketAddress: InetSocketAddress,proxy: Proxy,protocol: Protocol?,ioe: IOException) {} // 请求结束 open fun callEnd(call: Call) {} // 请求失败 open fun callFailed(call: Call,ioe: IOException) {} }
事件监听器提供了很多函数,上述是和这次要采集数据相关的。
自定义一个事件监听器:
class TrackEventListener : EventListener() { private var callStartMillis: Long? = null // 请求开始毫秒时 private var dnsStartMillis: Long? = null // dns开始毫秒时 private var tcpConnectStartMillis: Long? = null // tcp连接开始毫秒时 private var tlsConnectStartMillis: Long? = null // tls连接开始毫秒时 private var callDuration = 0L // 请求耗时 private var dnsDuration = 0L // dns耗时 private var tcpDuration = 0L // tcp耗时 private var tlsDuration = 0L // tls耗时 override fun callStart(call: Call) { callStartMillis = System.currentTimeMillis() } override fun callEnd(call: Call) { callStartMillis = callStartMillis ?: System.currentTimeMillis() callDuration = System.currentTimeMillis() - callStartMillis!! } override fun callFailed(call: Call, ioe: IOException) { callStartMillis = callStartMillis ?: System.currentTimeMillis() callDuration = System.currentTimeMillis() - callStartMillis!! } override fun dnsStart(call: Call, domainName: String) { dnsStartMillis = System.currentTimeMillis() } override fun dnsEnd(call: Call, domainName: String, inetAddressList: List<InetAddress>) { dnsStartMillis = dnsStartMillis ?: System.currentTimeMillis() dnsDuration = System.currentTimeMillis() - dnsStartMillis!! } override fun connectStart(call: Call, inetSocketAddress: InetSocketAddress, proxy: Proxy) { tcpConnectStartMillis = System.currentTimeMillis() } override fun secureConnectStart(call: Call) { tlsConnectStartMillis = tlsConnectStartMillis ?: System.currentTimeMillis() tcpDuration = System.currentTimeMillis() - tcpConnectStartMillis!! } override fun secureConnectEnd(call: Call, handshake: Handshake?) { tlsDuration = System.currentTimeMillis() - tlsConnectStartMillis!! } override fun connectFailed(call: Call,inetSocketAddress: InetSocketAddress,proxy: Proxy,protocol: Protocol?,ioe: IOException) { tcpDuration = System.currentTimeMillis() - tcpConnectStartMillis!! } }
耗时相关数据的采集思路是:在事件开始的回调中记录开始时间点,在对应事件结束回调中统计耗时。
如此一来,耗时相关数据都保存在自定义事件监听器实例中。
剩下的请求 url,请求方式,响应码,响应协议,通过拦截器获取:
class TrackInterceptor : Interceptor { override fun intercept(chain: Interceptor.Chain): Response { // 沿着责任链继续传递 val response = chain.proceed(chain.request()) response.code // 响应码 response.protocol // 响应协议 response.request.url // 请求url response.request.method // 请求方式 return response } }
OkHttp 的拦截器是一个 “回形针” ,即请求发出和响应返回会经过一个拦截器两次:
- 其中向右的箭头表示逐个遍历拦截器将请求发送出去(递归中的递),表现在代码上即是
chain.proceed()
的调用。
- 其中向左的箭头表示响应沿着请求发发出相反的顺序传递给客户端(递归中的归),表现在代码上即是
chain.proceed()
的返回。
(关于 OkHttp 中使用的拦截器模式的实战应用可以点击面试题 | 怎么写一个又好又快的日志库? - 掘金 (juejin.cn))
为 OkHttpClient 添加一个埋点拦截器就能实现“雁过拔毛”式的埋点:
OkHttpClient.Builder() .addInterceptor(TrackInterceptor()) .build()
标识网络请求
现在的问题是,有些数据在 EventListener 中,而另一些在拦截其中,如何将这些数据联系起来?即如何判定两处产生的数据属于同一个请求?
那就为每个请求生成一个 id。
Retrofit 提供了一个构建请求的工厂类:
fun interface Factory { fun newCall(request: Request): Call }
抽象方法newCall()
用来定义如何构建一个请求。
// 自定义 Factory,构造时传入另一个 Factory(装饰者模式) class TrackCallFactory(private val factory: Call.Factory) : Call.Factory { private val callId = AtomicLong(1L) // 唯一标识一个请求 override fun newCall(request: Request): Call { val id = callId.getAndIncrement() // 获取新请求id // 重构 Request 实例,并通过tag方式带上请求id val newRequest = request.newBuilder().tag(id).build() // 将新请求传递给被装饰的 factory return factory.newCall(newRequest) } }
自定义一个 Call.Factory,并在构造函数中传入另一个 Factory 实例,目的是“复用构建请求的行为”,并在此之上,扩展新的行为:为每个请求添加 id。(这是装饰者模式,关于该设计模式的详解可以点击使用组合的设计模式 | 美颜相机中的装饰者模式 - 掘金 (juejin.cn))
请求 id 被定义成一个 Long,并从 1 开始自增,通过请求的 tag 和其绑定。
然后在构建 Retrofit 实例时指定 Factory 即可:
// 构建 OkHttpClient 实例,它也是一个 Call.Factory val okHttpClient = OkHttpClient.Builder() .addInterceptor(TrackInterceptor()) .build() // 构建 Retrofit 实例 val retrofit: Retrofit = Retrofit.Builder() .callFactory(TrackCallFactory(okHttpClient)) .addConverterFactory(GsonConverterFactory.create()) .build()
请求 id 和 Request 对象绑定后,就能在埋点拦截器中拿到
class TrackInterceptor : Interceptor { override fun intercept(chain: Interceptor.Chain): Response { val response = chain.proceed(chain.request()) // 获取请求 id val callId = chain.request().tag() as? Long callId?.let { response.code // 响应码 response.protocol // 响应协议 response.request.url // 请求url response.request.method // 请求方式 } return response } }
EventListener 的每个回调也都提供了 Call 对象,可通过它获取 Request 对象:
class TrackEventListener : EventListener() { override fun callStart(call: Call) { // 在回调中获取请求id val callId = call.request().tag() as? Long } ... }
汇总数据
事件监听器和拦截器的数据得汇总到一起,才能得出一个请求完整的网络数据。
想到的办法是,将这些数据统统写入一个的列表中:
typealias CallInfo = Triple<Long, String, Any> val datas = mutableListOf<CallInfo>()
将请求埋点数据设计为一个三元组Triple
,其中第一个元素是请求 id,第二个是键,第三个是值。
为了增加三元组的语义,降低复杂度,使用typealias
语法为三元组取了一个新名字CallInfo
数据容器被设计成顶层 CallInfo 列表,以方便在各处访问。
在拦截器中将数据写入列表:
class TrackInterceptor : Interceptor { override fun intercept(chain: Interceptor.Chain): Response { val response = chain.proceed(chain.request()) val callId = chain.request().tag() as? Long callId?.let { datas.add(CallInfo(it, "code", response.code)) datas.add(CallInfo(it, "protocol", response.protocol)) datas.add(CallInfo(it, "url", response.request.url)) datas.add(CallInfo(it, "method", response.request.method)) } return response } }
在事件监听器中将数据写入列表:
class TrackEventListener : EventListener() { override fun callEnd(call: Call) { val callId = call.request().tag() as? Long datas.add(CallInfo(callId, "duration", callDuration)) } ... }
并发请求数据错乱
OkHttp 的请求是并发的,若所有请求共用一个事件监听器,则数据会发生错乱。
不得不为每个请求申请独立的事件监听器,OkHttp 提供了一个事件监听器的工厂方法:
fun interface Factory { fun create(call: Call): EventListener }
抽象方法 create() 用于定义如何为一个请求构建对应的事件监听器,自定义工厂如下:
object TrackEventListenerFactory : EventListener.Factory { override fun create(call: Call): EventListener { val callId = call.request().tag() as? Long // 获取请求id return TrackEventListener(callId) // 将请求id传递给事件监听器 } } // 自定义事件监听器也做相应的改动,新增成员 callId class TrackEventListener(private val callId: Long?) : EventListener() {}
然后在构建 OkHttpClient 实例的时候设置工厂即可:
OkHttpClient.Builder() .addInterceptor(TrackInterceptor()) .eventListenerFactory(TrackEventListenerFactory) // 指定事件监听器工厂 .build()
数据容器选择
CopyOnWriteArrayList
即使是这样,数据依然会错乱。
因为 OkHttp 的异步请求是在不同线程中执行的,即事件监听器是在不同线程回调的。所以就会发生多线程并发写数据容器,存在线程安全问题。
第一个想到的线程安全的容器是CopyOnWriteArrayList
,于是乎有了网络埋点的第一个版本:
class TrackEventListener(private val callId: Long?) : EventListener() { private var callStartMillis: Long? = null private var callDuration = 0L ... companion object { // 数据容器 private val trackers = CopyOnWriteArrayList<Triple<Long, String, Any>>() // 写数据 fun put(callId: Long, key: String, value: Any) { trackers.add(Triple(callId, key, value)) } // 消费数据:读取一个请求的所有数据,并组织成 map fun get(callId: Long): Map<String, Any> = trackers.filter { it.first == callId } .map { it.second to it.third } .let { mapOf(*it.toTypedArray()) } // 移除一个请求的所有数据 fun removeAll(callId: Long) { if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.N) { trackers.removeIf { it.first == callId } }else { synchronized(trackers){ trackers.removeAll { it.first == callId } } } } // 回调数据给上层的接口 var networkTrackCallback: NetworkTrackCallback? = null } override fun callStart(call: Call) { callId?.let { callStartMillis = System.currentTimeMillis() } } override fun callEnd(call: Call) { callId?.let { callStartMillis = callStartMillis ?: System.currentTimeMillis() callDuration = System.currentTimeMillis() - callStartMillis!! // 写数据 put(callId, "duration", callDuration) // 将数据回调给上层 networkTrackCallback?.onCallEnd(get(callId)) // 移除当前请求的所有数据 removeAll(callId) } } ... } // 网络数据回调 interface NetworkTrackCallback { fun onCallEnd(map: Map<String, Any>) }
埋点拦截器也做了相应修改将数据写到 CopyOnWriteArrayList:
class TrackInterceptor : Interceptor { override fun intercept(chain: Interceptor.Chain): Response { val response = chain.proceed(chain.request()) val callId = chain.request().tag() as? Long callId?.let { TrackEventListener.put(it, "code", response.code) TrackEventListener.put(it, "protocol", response.protocol) TrackEventListener.put(it, "url", response.request.url) TrackEventListener.put(it, "method", response.request.method) } return response } }
单个请求数据的起点是callStart()
,相应的终点是callEnd()
,所以将消费数据的逻辑写在了其中。
消费数据的方式是遍历容器,过滤出给定请求 id 的所有记录,将它们的键和值组织成 Map,并回调NetworkTrackCallback.onCallEnd(map: Map)
。业务层通过实现该接口,就能拿到与网络强相关的埋点数据,只需在 map 结构中拼接上剩下的数据,即可将数据上报云端。
选用CopyOnWriteArrayList
其实没什么大毛病,只是性能上会稍微差一点。
CopyOnWriteArrayList 采用数组作为容器,每次写数据时,将原有数组拷贝到一块新内存,在新数组末尾追加数据,最后将数组引用指向新数组:
public class CopyOnWriteArrayList<E> { // 容器 private transient volatile Object[] array; // 插入元素 public boolean add(E e) { synchronized (lock) {// 防止并发写导致复制出好几份副本 Object[] elements = getArray(); int len = elements.length; // 原数组的副本 Object[] newElements = Arrays.copyOf(elements, len + 1); // 在新数组末尾追加元素 newElements[len] = e; // 将容器引用指向新数组 setArray(newElements); return true; } } }
CopyOnWriteArrayList 的写操作被上了对象锁,遂无法实现并发写,即当一个线程成功获取锁后,其他竞争线程陷入阻塞。
相比之下 CopyOnWriteArrayList 的读数据操作性能就很好了:
// java.util.concurrent.CopyOnWriteArrayList public E get(int index) { return get(getArray(), index); } private E get(Object[] a, int index) { return (E) a[index]; } final Object[] getArray() { return array; }
就是普通的取数组元素,速度很快。
由于对象锁和数组拷贝操作导致 CopyOnWriteArrayList 的写操作无法实现并发写,且若写入操作频繁,则会增加内存压力。
ConcurrentLinkedQueue
App 并发地请求多个接口是很常见的,对于网络请求埋点来说,即存在多线程并发写数据容器的场景。
若使用 CopyOnWriteArrayList 作为数据容器,不仅会增加内存压力,甚至还可能造成请求响应被阻塞的情况,比如多个请求同时返回,其中一个线程写埋点数据时获取锁,则其他线程必定被阻塞。
哪个线性容器可以实现真正地并发写?非ConcurrentLinkedQueue
莫属!
ConcurrentLinkedQueue 是一个队列,即存队尾,取队头。存储结构是带头尾指针的单链表。它用非阻塞的方式实现了线程安全,即未使用锁,而是使用CAS + volatile
保证了修改头尾指针的线程安全。
关于 ConcurrentLinkedQueue 的更详细分析可以点击面试题 | 徒手写一个 ConcurrentLinkedQueue?
理论上,在网络请求埋点的场景下,ConcurrentLinkedQueue 作为数据容器的性能要好于 CopyOnWriteArrayList。因为它可以实现真正的并发写,并且链式结构也不需要数组拷贝。
写一个 demo 验证下性能差异:
class ConcurrentActivity : AppCompatActivity() { // 模拟多线程环境,OkHttp 最多 64 个并行任务。 private val executor = Executors.newFixedThreadPool(64) // CopyOnWriteArrayList 容器 private val cowArrayList = CopyOnWriteArrayList<CallInfo>() // ConcurrentLinkedQueue 容器 private val concurrentQueue = ConcurrentLinkedQueue<CallInfo>() private val mainScope = MainScope() private var start = 0L // 获取容器,分别将返回值改为 cowArrayList,concurrentQueue 进行测试 private fun getList() = cowArrayList override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) // 性能测试计时开始 start = System.currentTimeMillis() // 模拟连续的 1000 个请求(并发写场景) repeat(1000) { executor.execute { getList().add(CallInfo(it.toLong(), "code", 200)) getList().add(CallInfo(it.toLong(), "url", "https://www.ddd.com")) getList().add(CallInfo(it.toLong(), "protocol", "QUIC")) getList().add(CallInfo(it.toLong(), "method", "GET")) } } // 模拟 1000 个请求的响应 mainScope.launch { // 模拟消费网络数据 repeat(1000) { callId -> executor.execute { getList().add(CallInfo(callId.toLong(), "duration", 10000)) // 消费网络数据 get(callId.toLong()) getList().removeIf { it.first == callId.toLong() } } } //等待所有异步任务完成 executor.shutdown() executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS) val size = getList().size // 输出性能耗时 Log.v( "ttaylor", "onCreate() size=${size} consume=${System.currentTimeMillis() - start}" ) } } // 消费同一个请求的网络数据,将他们组织成 map private fun get(callId: Long): Map<String, Any> { return getList().filter { it.first == callId } .map { it.second to it.third } .let { mapOf(*it.toTypedArray()) } } } // 网络数据实体类 typealias CallInfo = Triple<Long, String, Any>
模拟了连续发出的 1000 个网络请求,最大并发数为 64,统计使用不同容器消费掉所有网络数据的总耗时和内存占用,打印 log 如下:
// CopyOnWriteArrayList ttaylor: onCreate() size=0 consume=455 ttaylor: onCreate() size=0 consume=337 ttaylor: onCreate() size=0 consume=262 ttaylor: onCreate() size=0 consume=247 // ConcurrentLinkedQueue ttaylor: onCreate() size=0 consume=155 ttaylor: onCreate() size=0 consume=102 ttaylor: onCreate() size=0 consume=102 ttaylor: onCreate() size=0 consume=103
时间性能上差距非常明显,ConcurrentLinkedQueue 消耗掉所有网络数据的时间是 CopyOnWriteArrayList 的一半不到。
使用 AndroidStudio 的 profile 观察内存,demo 启动后内存稳定在 111 MB,使用 CopyOnWriteArrayList 时,内存最大飙升到 155 MB,而使用 ConcurrentLinkedQueue 的峰值内存为 132 MB。
总结
- 基于 OkHttp + Retrofit 的网络数据埋点可通过事件监听器
EventListener
和拦截器采集数据。
- 对于并发请求,得给每个请求分配一个 EventListener 以防止数据混乱。
- 埋点数据的容器需考虑线程安全问题。
CopyOnWriteArrayList
是一个以数组为存储介质的线性容器,它是线程安全的。写数据操作上锁了,即不允许并发写。执行写操作时会将原先数组拷贝一份,并在新数组尾部插入数据,最后将数组引用指向新数组。这样设计的目的是实现“读与写的并发”,即一个线程读的同时允许另一个线程写(其中读包括 get 和 iterate)。
ConcurrentLinkedQueue
是一个以单链表为存储介质的线性队列,它是线程安全的。读写操作都没有上锁,可实现真正意义上的并发写,采用 CAS + volatile 实现线程安全。
推荐阅读
面试系列文章如下:
面试题 | 徒手写一个 ConcurrentLinkedQueue?
RecyclerView 面试题 | 哪些情况下表项会被回收到缓存池?