面试题 | 有用过并发容器吗?有!比如网络请求埋点

本文涉及的产品
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 面试题 | 有用过并发容器吗?有!比如网络请求埋点

引子


网络请求埋点即是在客户端收集网络请求数据并上传云端,为网络性能优化提供数据支持。(本篇网络请求将基于 OkHttp + Retrofit)


通常采集的数据包括如下字段:


  1. ip 地址


  1. 网络类型(蜂窝数据, WIFI)


  1. 用户 id


  1. DNS 耗时


  1. 建立连接耗时


  1. 请求总耗时


  1. 请求 url


  1. 请求方式(GET, POST)


  1. 响应码


  1. 响应协议(HTTP/2, QUIC)


其中 4 到 10 的字段和网络请求强相关。


采集数据


和网络强相关的数据并不能在“一个地方”获取。


OkHttp 提供了okhttp3.EventListener事件监听器来监控网络数据:


abstract class EventListener {
  // 请求开始
  open fun callStart(call: Call) {}
  // dns 开始
  open fun dnsStart(call: Call,domainName: String) {}
  // dns 结束
  open fun dnsEnd(call: Call,domainName: String,inetAddressList: List<@JvmSuppressWildcards InetAddress>) {}
  // 连接开始
  open fun connectStart(call: Call,inetSocketAddress: InetSocketAddress,proxy: Proxy) {}
  // TLS 开始
  open fun secureConnectStart(call: Call) {}
  // TLS 结束
  open fun secureConnectEnd(call: Call,handshake: Handshake?) {}
  // 连接失败
  open fun connectFailed(call: Call,inetSocketAddress: InetSocketAddress,proxy: Proxy,protocol: Protocol?,ioe: IOException) {}
  // 请求结束
  open fun callEnd(call: Call) {}
  // 请求失败
  open fun callFailed(call: Call,ioe: IOException) {}
}


事件监听器提供了很多函数,上述是和这次要采集数据相关的。


自定义一个事件监听器:


class TrackEventListener : EventListener() {
    private var callStartMillis: Long? = null // 请求开始毫秒时
    private var dnsStartMillis: Long? = null // dns开始毫秒时
    private var tcpConnectStartMillis: Long? = null // tcp连接开始毫秒时
    private var tlsConnectStartMillis: Long? = null // tls连接开始毫秒时
    private var callDuration = 0L // 请求耗时
    private var dnsDuration = 0L // dns耗时
    private var tcpDuration = 0L // tcp耗时
    private var tlsDuration = 0L // tls耗时
    override fun callStart(call: Call) {
        callStartMillis = System.currentTimeMillis()
    }
    override fun callEnd(call: Call) {
        callStartMillis = callStartMillis ?: System.currentTimeMillis()
        callDuration = System.currentTimeMillis() - callStartMillis!!
    }
    override fun callFailed(call: Call, ioe: IOException) {
        callStartMillis = callStartMillis ?: System.currentTimeMillis()
        callDuration = System.currentTimeMillis() - callStartMillis!!
    }
    override fun dnsStart(call: Call, domainName: String) {
        dnsStartMillis = System.currentTimeMillis()
    }
    override fun dnsEnd(call: Call, domainName: String, inetAddressList: List<InetAddress>) {
        dnsStartMillis = dnsStartMillis ?: System.currentTimeMillis()
        dnsDuration = System.currentTimeMillis() - dnsStartMillis!!
    }
    override fun connectStart(call: Call, inetSocketAddress: InetSocketAddress, proxy: Proxy) {
        tcpConnectStartMillis = System.currentTimeMillis()
    }
    override fun secureConnectStart(call: Call) {
        tlsConnectStartMillis = tlsConnectStartMillis ?: System.currentTimeMillis()
        tcpDuration = System.currentTimeMillis() - tcpConnectStartMillis!!
    }
    override fun secureConnectEnd(call: Call, handshake: Handshake?) {
        tlsDuration = System.currentTimeMillis() - tlsConnectStartMillis!!
    }
    override fun connectFailed(call: Call,inetSocketAddress: InetSocketAddress,proxy: Proxy,protocol: Protocol?,ioe: IOException) {
        tcpDuration = System.currentTimeMillis() - tcpConnectStartMillis!!
    }
}


耗时相关数据的采集思路是:在事件开始的回调中记录开始时间点,在对应事件结束回调中统计耗时。


如此一来,耗时相关数据都保存在自定义事件监听器实例中。


剩下的请求 url,请求方式,响应码,响应协议,通过拦截器获取:


class TrackInterceptor : Interceptor {
    override fun intercept(chain: Interceptor.Chain): Response {
        // 沿着责任链继续传递
        val response = chain.proceed(chain.request())
        response.code // 响应码
        response.protocol // 响应协议
        response.request.url // 请求url
        response.request.method // 请求方式
        return response
    }
}


OkHttp 的拦截器是一个 “回形针” ,即请求发出和响应返回会经过一个拦截器两次:


image.png


  • 其中向右的箭头表示逐个遍历拦截器将请求发送出去(递归中的递),表现在代码上即是chain.proceed()的调用。


  • 其中向左的箭头表示响应沿着请求发发出相反的顺序传递给客户端(递归中的归),表现在代码上即是chain.proceed()的返回。


(关于 OkHttp 中使用的拦截器模式的实战应用可以点击面试题 | 怎么写一个又好又快的日志库? - 掘金 (juejin.cn))


为 OkHttpClient 添加一个埋点拦截器就能实现“雁过拔毛”式的埋点:


OkHttpClient.Builder()
    .addInterceptor(TrackInterceptor())
    .build()


标识网络请求


现在的问题是,有些数据在 EventListener 中,而另一些在拦截其中,如何将这些数据联系起来?即如何判定两处产生的数据属于同一个请求?


那就为每个请求生成一个 id。


Retrofit 提供了一个构建请求的工厂类:


fun interface Factory {
  fun newCall(request: Request): Call
}


抽象方法newCall()用来定义如何构建一个请求。


// 自定义 Factory,构造时传入另一个 Factory(装饰者模式)
class TrackCallFactory(private val factory: Call.Factory) : Call.Factory {
    private val callId = AtomicLong(1L) // 唯一标识一个请求
    override fun newCall(request: Request): Call {
        val id = callId.getAndIncrement() // 获取新请求id
        // 重构 Request 实例,并通过tag方式带上请求id
        val newRequest = request.newBuilder().tag(id).build()
        // 将新请求传递给被装饰的 factory
        return factory.newCall(newRequest)
    }
}


自定义一个 Call.Factory,并在构造函数中传入另一个 Factory 实例,目的是“复用构建请求的行为”,并在此之上,扩展新的行为:为每个请求添加 id。(这是装饰者模式,关于该设计模式的详解可以点击使用组合的设计模式 | 美颜相机中的装饰者模式 - 掘金 (juejin.cn)


请求 id 被定义成一个 Long,并从 1 开始自增,通过请求的 tag 和其绑定。


然后在构建 Retrofit 实例时指定 Factory 即可:


// 构建 OkHttpClient 实例,它也是一个 Call.Factory
val okHttpClient = OkHttpClient.Builder()
    .addInterceptor(TrackInterceptor())
    .build()
// 构建 Retrofit 实例
val retrofit: Retrofit = Retrofit.Builder()
    .callFactory(TrackCallFactory(okHttpClient))
    .addConverterFactory(GsonConverterFactory.create())
    .build()


请求 id 和 Request 对象绑定后,就能在埋点拦截器中拿到


class TrackInterceptor : Interceptor {
    override fun intercept(chain: Interceptor.Chain): Response {
        val response = chain.proceed(chain.request())
        // 获取请求 id
        val callId = chain.request().tag() as? Long
        callId?.let {
            response.code // 响应码
            response.protocol // 响应协议
            response.request.url // 请求url
            response.request.method // 请求方式
        }
        return response
    }
}


EventListener 的每个回调也都提供了 Call 对象,可通过它获取 Request 对象:


class TrackEventListener : EventListener() {
    override fun callStart(call: Call) {
        // 在回调中获取请求id
        val callId = call.request().tag() as? Long
    }
    ...
}


汇总数据


事件监听器和拦截器的数据得汇总到一起,才能得出一个请求完整的网络数据。


想到的办法是,将这些数据统统写入一个的列表中:


typealias CallInfo = Triple<Long, String, Any>
val datas = mutableListOf<CallInfo>()


将请求埋点数据设计为一个三元组Triple,其中第一个元素是请求 id,第二个是键,第三个是值。


为了增加三元组的语义,降低复杂度,使用typealias语法为三元组取了一个新名字CallInfo


数据容器被设计成顶层 CallInfo 列表,以方便在各处访问。


在拦截器中将数据写入列表:


class TrackInterceptor : Interceptor {
    override fun intercept(chain: Interceptor.Chain): Response {
        val response = chain.proceed(chain.request())
        val callId = chain.request().tag() as? Long
        callId?.let {
            datas.add(CallInfo(it, "code", response.code))
            datas.add(CallInfo(it, "protocol", response.protocol))
            datas.add(CallInfo(it, "url", response.request.url))
            datas.add(CallInfo(it, "method", response.request.method))
        }
        return response
    }
}


在事件监听器中将数据写入列表:


class TrackEventListener : EventListener() {
    override fun callEnd(call: Call) {
        val callId = call.request().tag() as? Long
        datas.add(CallInfo(callId, "duration", callDuration))
    }
    ...
}


并发请求数据错乱


OkHttp 的请求是并发的,若所有请求共用一个事件监听器,则数据会发生错乱。


不得不为每个请求申请独立的事件监听器,OkHttp 提供了一个事件监听器的工厂方法:


fun interface Factory {
  fun create(call: Call): EventListener
}


抽象方法 create() 用于定义如何为一个请求构建对应的事件监听器,自定义工厂如下:


object TrackEventListenerFactory : EventListener.Factory {
    override fun create(call: Call): EventListener {
        val callId = call.request().tag() as? Long // 获取请求id 
        return TrackEventListener(callId) // 将请求id传递给事件监听器 
    }
}
// 自定义事件监听器也做相应的改动,新增成员 callId
class TrackEventListener(private val callId: Long?) : EventListener() {}


然后在构建 OkHttpClient 实例的时候设置工厂即可:


OkHttpClient.Builder()
    .addInterceptor(TrackInterceptor())
    .eventListenerFactory(TrackEventListenerFactory) // 指定事件监听器工厂
    .build()


数据容器选择


CopyOnWriteArrayList


即使是这样,数据依然会错乱。


因为 OkHttp 的异步请求是在不同线程中执行的,即事件监听器是在不同线程回调的。所以就会发生多线程并发写数据容器,存在线程安全问题。


第一个想到的线程安全的容器是CopyOnWriteArrayList,于是乎有了网络埋点的第一个版本:


class TrackEventListener(private val callId: Long?) : EventListener() {
    private var callStartMillis: Long? = null
    private var callDuration = 0L
    ...
    companion object {
        // 数据容器
        private val trackers = CopyOnWriteArrayList<Triple<Long, String, Any>>()
        // 写数据
        fun put(callId: Long, key: String, value: Any) {
            trackers.add(Triple(callId, key, value))
        }
        // 消费数据:读取一个请求的所有数据,并组织成 map
        fun get(callId: Long): Map<String, Any> =
            trackers.filter { it.first == callId }
                .map { it.second to it.third }
                .let { mapOf(*it.toTypedArray()) }
        // 移除一个请求的所有数据
        fun removeAll(callId: Long) {
            if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.N) {
                trackers.removeIf { it.first == callId }
            }else {
                synchronized(trackers){
                    trackers.removeAll { it.first == callId }
                }
            }
        }
        // 回调数据给上层的接口
        var networkTrackCallback: NetworkTrackCallback? = null
    }
    override fun callStart(call: Call) {
        callId?.let { callStartMillis = System.currentTimeMillis() }
    }
    override fun callEnd(call: Call) {
        callId?.let {
            callStartMillis = callStartMillis ?: System.currentTimeMillis()
            callDuration = System.currentTimeMillis() - callStartMillis!!
            // 写数据
            put(callId, "duration", callDuration)
            // 将数据回调给上层
            networkTrackCallback?.onCallEnd(get(callId))
            // 移除当前请求的所有数据
            removeAll(callId)
        }
    }
    ...
}
// 网络数据回调
interface NetworkTrackCallback {
    fun onCallEnd(map: Map<String, Any>)
}


埋点拦截器也做了相应修改将数据写到 CopyOnWriteArrayList:


class TrackInterceptor : Interceptor {
    override fun intercept(chain: Interceptor.Chain): Response {
        val response = chain.proceed(chain.request())
        val callId = chain.request().tag() as? Long
        callId?.let {
            TrackEventListener.put(it, "code", response.code)
            TrackEventListener.put(it, "protocol", response.protocol)
            TrackEventListener.put(it, "url", response.request.url)
            TrackEventListener.put(it, "method", response.request.method)
        }
        return response
    }
}


单个请求数据的起点是callStart(),相应的终点是callEnd(),所以将消费数据的逻辑写在了其中。


消费数据的方式是遍历容器,过滤出给定请求 id 的所有记录,将它们的键和值组织成 Map,并回调NetworkTrackCallback.onCallEnd(map: Map)。业务层通过实现该接口,就能拿到与网络强相关的埋点数据,只需在 map 结构中拼接上剩下的数据,即可将数据上报云端。


选用CopyOnWriteArrayList其实没什么大毛病,只是性能上会稍微差一点。


CopyOnWriteArrayList 采用数组作为容器,每次写数据时,将原有数组拷贝到一块新内存,在新数组末尾追加数据,最后将数组引用指向新数组:


public class CopyOnWriteArrayList<E> {
    // 容器
    private transient volatile Object[] array;
    // 插入元素
    public boolean add(E e) {
        synchronized (lock) {// 防止并发写导致复制出好几份副本
            Object[] elements = getArray();
            int len = elements.length;
            // 原数组的副本
            Object[] newElements = Arrays.copyOf(elements, len + 1);
            // 在新数组末尾追加元素
            newElements[len] = e;
            // 将容器引用指向新数组
            setArray(newElements);
            return true;
        }
    }
}


CopyOnWriteArrayList 的写操作被上了对象锁,遂无法实现并发写,即当一个线程成功获取锁后,其他竞争线程陷入阻塞。


相比之下 CopyOnWriteArrayList 的读数据操作性能就很好了:


// java.util.concurrent.CopyOnWriteArrayList
public E get(int index) {
    return get(getArray(), index);
}
private E get(Object[] a, int index) {
    return (E) a[index];
}
final Object[] getArray() {
    return array;
}


就是普通的取数组元素,速度很快。


由于对象锁和数组拷贝操作导致 CopyOnWriteArrayList 的写操作无法实现并发写,且若写入操作频繁,则会增加内存压力。


ConcurrentLinkedQueue


App 并发地请求多个接口是很常见的,对于网络请求埋点来说,即存在多线程并发写数据容器的场景。


若使用 CopyOnWriteArrayList 作为数据容器,不仅会增加内存压力,甚至还可能造成请求响应被阻塞的情况,比如多个请求同时返回,其中一个线程写埋点数据时获取锁,则其他线程必定被阻塞。


哪个线性容器可以实现真正地并发写?非ConcurrentLinkedQueue莫属!


ConcurrentLinkedQueue 是一个队列,即存队尾,取队头。存储结构是带头尾指针的单链表。它用非阻塞的方式实现了线程安全,即未使用锁,而是使用CAS + volatile保证了修改头尾指针的线程安全。


关于 ConcurrentLinkedQueue 的更详细分析可以点击面试题 | 徒手写一个 ConcurrentLinkedQueue?


理论上,在网络请求埋点的场景下,ConcurrentLinkedQueue 作为数据容器的性能要好于 CopyOnWriteArrayList。因为它可以实现真正的并发写,并且链式结构也不需要数组拷贝。


写一个 demo 验证下性能差异:


class ConcurrentActivity : AppCompatActivity() {
    // 模拟多线程环境,OkHttp 最多 64 个并行任务。
    private val executor = Executors.newFixedThreadPool(64)
    // CopyOnWriteArrayList 容器
    private val cowArrayList = CopyOnWriteArrayList<CallInfo>()
    // ConcurrentLinkedQueue 容器
    private val concurrentQueue = ConcurrentLinkedQueue<CallInfo>()
    private val mainScope = MainScope()
    private var start = 0L
    // 获取容器,分别将返回值改为 cowArrayList,concurrentQueue 进行测试
    private fun getList() = cowArrayList
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        // 性能测试计时开始
        start = System.currentTimeMillis()
        // 模拟连续的 1000 个请求(并发写场景)
        repeat(1000) {
            executor.execute {
                getList().add(CallInfo(it.toLong(), "code", 200))
                getList().add(CallInfo(it.toLong(), "url", "https://www.ddd.com"))
                getList().add(CallInfo(it.toLong(), "protocol", "QUIC"))
                getList().add(CallInfo(it.toLong(), "method", "GET"))
            }
        }
        // 模拟 1000 个请求的响应
        mainScope.launch {
            // 模拟消费网络数据
            repeat(1000) { callId ->
                executor.execute {
                    getList().add(CallInfo(callId.toLong(), "duration", 10000))
                    // 消费网络数据
                    get(callId.toLong())
                    getList().removeIf { it.first == callId.toLong() }
                }
            }
            //等待所有异步任务完成
            executor.shutdown()
            executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)
            val size = getList().size
            // 输出性能耗时
            Log.v(
                "ttaylor",
                "onCreate() size=${size} consume=${System.currentTimeMillis() - start}"
            )
        }
    }
    // 消费同一个请求的网络数据,将他们组织成 map
    private fun get(callId: Long): Map<String, Any> {
        return getList().filter { it.first == callId }
            .map { it.second to it.third }
            .let { mapOf(*it.toTypedArray()) }
    }
}
// 网络数据实体类
typealias CallInfo = Triple<Long, String, Any>


模拟了连续发出的 1000 个网络请求,最大并发数为 64,统计使用不同容器消费掉所有网络数据的总耗时和内存占用,打印 log 如下:


// CopyOnWriteArrayList
ttaylor: onCreate() size=0 consume=455
ttaylor: onCreate() size=0 consume=337
ttaylor: onCreate() size=0 consume=262
ttaylor: onCreate() size=0 consume=247
// ConcurrentLinkedQueue
ttaylor: onCreate() size=0 consume=155
ttaylor: onCreate() size=0 consume=102
ttaylor: onCreate() size=0 consume=102
ttaylor: onCreate() size=0 consume=103


时间性能上差距非常明显,ConcurrentLinkedQueue 消耗掉所有网络数据的时间是 CopyOnWriteArrayList 的一半不到。


使用 AndroidStudio 的 profile 观察内存,demo 启动后内存稳定在 111 MB,使用 CopyOnWriteArrayList 时,内存最大飙升到 155 MB,而使用 ConcurrentLinkedQueue 的峰值内存为 132 MB。


总结


  • 基于 OkHttp + Retrofit 的网络数据埋点可通过事件监听器EventListener和拦截器采集数据。


  • 对于并发请求,得给每个请求分配一个 EventListener 以防止数据混乱。


  • 埋点数据的容器需考虑线程安全问题。


  • CopyOnWriteArrayList是一个以数组为存储介质的线性容器,它是线程安全的。写数据操作上锁了,即不允许并发写。执行写操作时会将原先数组拷贝一份,并在新数组尾部插入数据,最后将数组引用指向新数组。这样设计的目的是实现“读与写的并发”,即一个线程读的同时允许另一个线程写(其中读包括 get 和 iterate)。


  • ConcurrentLinkedQueue是一个以单链表为存储介质的线性队列,它是线程安全的。读写操作都没有上锁,可实现真正意义上的并发写,采用 CAS + volatile 实现线程安全。


推荐阅读


面试系列文章如下:


面试题 | 怎么写一个又好又快的日志库?(一)


面试题 | 怎么写一个又好又快的日志库?(二)


面试题 | 徒手写一个 ConcurrentLinkedQueue?


来讨论下 Android 面试该问什么类型的题目?


RecyclerView 面试题 | 哪些情况下表项会被回收到缓存池?


面试题 | 有用过并发容器吗?有!比如网络请求埋点



目录
相关文章
|
8天前
|
人工智能 弹性计算 运维
ACK Edge与IDC:高效容器网络通信新突破
本文介绍如何基于ACK Edge以及高效的容器网络插件管理IDC进行容器化。
|
19天前
|
存储 缓存 监控
Docker容器性能调优的关键技巧,涵盖CPU、内存、网络及磁盘I/O的优化策略,结合实战案例,旨在帮助读者有效提升Docker容器的性能与稳定性。
本文介绍了Docker容器性能调优的关键技巧,涵盖CPU、内存、网络及磁盘I/O的优化策略,结合实战案例,旨在帮助读者有效提升Docker容器的性能与稳定性。
54 7
|
27天前
|
存储 安全 Java
Java多线程编程中的并发容器:深入解析与实战应用####
在本文中,我们将探讨Java多线程编程中的一个核心话题——并发容器。不同于传统单一线程环境下的数据结构,并发容器专为多线程场景设计,确保数据访问的线程安全性和高效性。我们将从基础概念出发,逐步深入到`java.util.concurrent`包下的核心并发容器实现,如`ConcurrentHashMap`、`CopyOnWriteArrayList`以及`BlockingQueue`等,通过实例代码演示其使用方法,并分析它们背后的设计原理与适用场景。无论你是Java并发编程的初学者还是希望深化理解的开发者,本文都将为你提供有价值的见解与实践指导。 --- ####
|
2月前
|
Docker 容器
docker中检查容器的网络模式
【10月更文挑战第5天】
220 1
|
2月前
|
网络协议 Shell 网络安全
docker容器网络问题
【10月更文挑战第4天】
203 2
|
2月前
|
网络协议 Ubuntu 前端开发
好好的容器突然起不来,经定位是容器内无法访问外网了?测试又说没改网络配置,该如何定位网络问题
本文记录了一次解决前端应用集成到主应用后出现502错误的问题。通过与测试人员的沟通,最终发现是DNS配置问题导致的。文章详细描述了问题的背景、沟通过程、解决方案,并总结了相关知识点和经验教训,帮助读者学习如何分析和定位网络问题。
115 0
|
3月前
|
NoSQL 应用服务中间件 Redis
Docker跨宿主机容器通信-通过网络跨宿主机互联
这篇文章介绍了Docker容器跨宿主机通信的实现方法,包括Docker的四种网络模式(host、none、container、bridge)以及如何通过修改网络配置和添加路由规则来实现不同宿主机上的容器之间的互联。
270 0
Docker跨宿主机容器通信-通过网络跨宿主机互联
|
3月前
|
网络协议 C语言
C语言 网络编程(十三)并发的TCP服务端-以进程完成功能
这段代码实现了一个基于TCP协议的多进程并发服务端和客户端程序。服务端通过创建子进程来处理多个客户端连接,解决了粘包问题,并支持不定长数据传输。客户端则循环发送数据并接收服务端回传的信息,同样处理了粘包问题。程序通过自定义的数据长度前缀确保了数据的完整性和准确性。
|
3月前
|
网络协议 C语言
C语言 网络编程(十四)并发的TCP服务端-以线程完成功能
这段代码实现了一个基于TCP协议的多线程服务器和客户端程序,服务器端通过为每个客户端创建独立的线程来处理并发请求,解决了粘包问题并支持不定长数据传输。服务器监听在IP地址`172.17.140.183`的`8080`端口上,接收客户端发来的数据,并将接收到的消息添加“-回传”后返回给客户端。客户端则可以循环输入并发送数据,同时接收服务器回传的信息。当输入“exit”时,客户端会结束与服务器的通信并关闭连接。
|
3月前
|
C语言
C语言 网络编程(八)并发的UDP服务端 以进程完成功能
这段代码展示了如何使用多进程处理 UDP 客户端和服务端通信。客户端通过发送登录请求与服务端建立连接,并与服务端新建的子进程进行数据交换。服务端则负责接收请求,验证登录信息,并创建子进程处理客户端的具体请求。子进程会创建一个新的套接字与客户端通信,实现数据收发功能。此方案有效利用了多进程的优势,提高了系统的并发处理能力。