上一篇博客介绍了 AnchorTask 的基本使用,今天,让我们一起看一下怎么实现它。
原理简介
AnchorTask,锚点任务,它的实现原理是构建一个有向无环图,拓扑排序之后,如果任务 B 依赖任务 A,那么 A 一定排在任务 B 之前。
了解原理之前,请必须先了解有向无环图和多线程的一些基本知识,不然,下文,你基本是看不懂的。
一个共识
- 前置任务:任务 3 依赖于任务 0,1,那么任务 3 的前置任务是任务 0, 1
- 子任务:任务 0 执行完之后,任务 3 才能执行,那么称呼任务 3 为 任务 0 的子任务
如何构建一个有向无环图
这里我们采用 BFS 方法实现,算法思想大概是这样的
- 建立入度表,入度为 0 的节点先入队
- 当队列不为空,进行循环判断
- 节点出队,添加到结果 list 当中
- 将该节点的邻居入度减 1
- 若邻居课程入度为 0,加入队列
- 若结果 list 与所有节点数量相等,则证明不存在环。否则,存在环
多线程中,任务执行是随机的,那如何保证任务被依赖的任务先于任务执行呢?
这里要解决的主要有三个问题
- 首先我们要解决一个问题,当前任务有哪些前置任务,这个可以用 list 存储,代表它依赖的任务 list。当它所依赖的任务 list 没有执行完毕,当前任务需要等待。
- 当前任务执行完毕之后,所有依赖它的子任务需要感知到。我们可以用一个 map 来存储这种关系,key 是当前任务,value 是依赖于当前任务的集合(list)
- 多线程当中,等待和唤醒功能,有多种方式可以实现。wait、notify 机制,ReentrantLock Condition 机制,CountDownLatch 机制。这里我们选择 CountDownLatch 机制,因为 CountDownLatch 有点类似于计数器,特别适合这种场景。
具体实现
IAnchorTask
首先,我们定义一个 IAnchorTask 接口,主要有一个方法
- isRunOnMainThread(): Boolean 表示是否在主线程运行,默认值是 false
- priority(): Int 方法 表示线程的优先级别,默认值是 Process.THREAD_PRIORITY_FOREGROUND
- needWait() 表示当我们调用 AnchorTaskDispatcher await 时,是否需要等待,return true,表示需要等待改任务执行结束,AnchorTaskDispatcher await 方法才能继续往下执行。
- fun getDependsTaskList(): List<Class<out AnchorTask>>? 方法返回前置任务依赖,默认值是返回 null.
- fun run() 方法,表示任务执行的时候
interface IAnchorTask : IAnchorCallBack { /** * 是否在主线程执行 */ fun isRunOnMainThread(): Boolean /** * 任务优先级别 */ @IntRange( from = Process.THREAD_PRIORITY_FOREGROUND.toLong(), to = Process.THREAD_PRIORITY_LOWEST.toLong() ) fun priority(): Int /** * 调用 await 方法,是否需要等待改任务执行完成 * true 不需要 * false 需要 */ fun needWait(): Boolean /** * 当前任务的前置任务,可以用来确定顶点的入度 */ fun getDependsTaskList(): List<Class<out AnchorTask>>? /** * 任务被执行的时候回调 */ fun run() }
它有一个实现类 AnchorTask,增加了 await 和 countdown 方法
- await 方法,调用它,当前任务会等待
- countdown() 方法,如果当前计数器值 > 0,会减一,否则,什么也不操作
abstract class AnchorTask : IAnchorTask { private val countDownLatch: CountDownLatch = CountDownLatch(getListSize()) private fun getListSize() = getDependsTaskList()?.size ?: 0 companion object { const val TAG = "AnchorTask" } /** * self call,await */ fun await() { countDownLatch.await() } /** * parent call, countDown */ fun countdown() { countDownLatch.countDown() } }
排序实现
无环图的拓扑排序,这里采用的是 BFS 算法。具体的可以见 AnchorTaskUtils#getSortResult 方法,它有三个参数
- list 存储所有的任务
- taskMap: MutableMap<Class<out AnchorTask>, AnchorTask> = HashMap()存储所有的任务,key 是 Class,value 是 AnchorTask
- taskChildMap: MutableMap<Class<out AnchorTask>, ArrayList<Class<out AnchorTask>>?> = HashMap(),储存当前任务的子任务, key 是当前任务的 class,value 是 AnchorTask 的 list
算法思想
- 首先找出所有入度为 0 的队列,用 queue 变量存储
- 当队列不为空,进行循环判断。
- 从队列 pop 出,添加到结果队列
- 遍历当前任务的子任务,通知他们的入度减一(其实是遍历 taskChildMap),如果入度为 0,添加到队列 queue 里面
- 当结果队列和 list size 不相等试,证明有环
@JvmStatic fun getSortResult( list: MutableList<AnchorTask>, taskMap: MutableMap<Class<out AnchorTask>, AnchorTask>, taskChildMap: MutableMap<Class<out AnchorTask>, ArrayList<Class<out AnchorTask>>?> ): MutableList<AnchorTask> { val result = ArrayList<AnchorTask>() // 入度为 0 的队列 val queue = ArrayDeque<AnchorTask>() val taskIntegerHashMap = HashMap<Class<out AnchorTask>, Int>() // 建立每个 task 的入度关系 list.forEach { anchorTask: AnchorTask -> val clz = anchorTask.javaClass if (taskIntegerHashMap.containsKey(clz)) { throw AnchorTaskException("anchorTask is repeat, anchorTask is $anchorTask, list is $list") } val size = anchorTask.getDependsTaskList()?.size ?: 0 taskIntegerHashMap[clz] = size taskMap[clz] = anchorTask if (size == 0) { queue.offer(anchorTask) } } // 建立每个 task 的 children 关系 list.forEach { anchorTask: AnchorTask -> anchorTask.getDependsTaskList()?.forEach { clz: Class<out AnchorTask> -> var list = taskChildMap[clz] if (list == null) { list = ArrayList<Class<out AnchorTask>>() } list.add(anchorTask.javaClass) taskChildMap[clz] = list } } // 使用 BFS 方法获得有向无环图的拓扑排序 while (!queue.isEmpty()) { val anchorTask = queue.pop() result.add(anchorTask) val clz = anchorTask.javaClass taskChildMap[clz]?.forEach { // 遍历所有依赖这个顶点的顶点,移除该顶点之后,如果入度为 0,加入到改队列当中 var result = taskIntegerHashMap[it] ?: 0 result-- if (result == 0) { queue.offer(taskMap[it]) } taskIntegerHashMap[it] = result } } // size 不相等,证明有环 if (list.size != result.size) { throw AnchorTaskException("Ring appeared,Please check.list is $list, result is $result") } return result }
AnchorTaskDispatcher
AnchorTaskDispatcher 这个类很重要,有向无环图的拓扑排序和多线程的依赖唤醒,都是借助这个核心类完成的。
它主要有几个成员变量
// 存储所有的任务 private val list: MutableList<AnchorTask> = ArrayList() // 存储所有的任务,key 是 Class<out AnchorTask>,value 是 AnchorTask private val taskMap: MutableMap<Class<out AnchorTask>, AnchorTask> = HashMap() // 储存当前任务的子任务, key 是当前任务的 class,value 是 AnchorTask 的 list private val taskChildMap: MutableMap<Class<out AnchorTask>, ArrayList<Class<out AnchorTask>>?> = HashMap() // 拓扑排序之后的主线程任务 private val mainList: MutableList<AnchorTask> = ArrayList() // 拓扑排序之后的子线程任务 private val threadList: MutableList<AnchorTask> = ArrayList() //需要等待的任务总数,用于阻塞 private lateinit var countDownLatch: CountDownLatch //需要等待的任务总数,用于CountDownLatch private val needWaitCount: AtomicInteger = AtomicInteger()
它有一个比较重要的方法 setNotifyChildren(anchorTask: AnchorTask) ,有一个方法参数 AnchorTask,它的作用是通知该任务的子任务,当前任务执行完毕,入度数减一。
/** * 通知 child countdown,当前的阻塞任务书也需要 countdown */ fun setNotifyChildren(anchorTask: AnchorTask) { taskChildMap[anchorTask::class.java]?.forEach { taskMap[it]?.countdown() } if (anchorTask.needWait()) { countDownLatch.countDown() } }
接下来看一下 start 方法
fun start(): AnchorTaskDispatcher { if (Looper.myLooper() != Looper.getMainLooper()) { throw AnchorTaskException("start method should be call on main thread") } startTime = System.currentTimeMillis() val sortResult = AnchorTaskUtils.getSortResult(list, taskMap, taskChildMap) LogUtils.i(TAG, "start: sortResult is $sortResult") sortResult.forEach { if (it.isRunOnMainThread()) { mainList.add(it) } else { threadList.add(it) } } countDownLatch = CountDownLatch(needWaitCount.get()) val threadPoolExecutor = this.threadPoolExecutor ?: TaskExecutorManager.instance.cpuThreadPoolExecutor threadList.forEach { threadPoolExecutor.execute(AnchorTaskRunnable(this, anchorTask = it)) } mainList.forEach { AnchorTaskRunnable(this, anchorTask = it).run() } return this }
它主要干几件事
- 检测是否在主线程,不是抛出异常,这里为什么要检测在主线程呢?主要是构建有向无环图的过程,我们必须保证是线程安全的
- 获取有向无环图的拓扑排序
- 根据拓扑排序的排序结果,执行相应的任务。可以看到在执行任务的时候,我们使用 AnchorTaskRunnable 包裹起来
class AnchorTaskRunnable( private val anchorTaskDispatcher: AnchorTaskDispatcher, private val anchorTask: AnchorTask ) : Runnable { override fun run() { Process.setThreadPriority(anchorTask.priority()) // 前置任务没有执行完毕的话,等待,执行完毕的话,往下走 anchorTask.await() anchorTask.onStart() // 执行任务 anchorTask.run() anchorTask.onFinish() // 通知子任务,当前任务执行完毕了,相应的计数器要减一。 anchorTaskDispatcher.setNotifyChildren(anchorTask) } }
AnchorTaskRunnable 有点类似于装饰者模式,多线程依赖的执行关系在这里都得到体现,只有几行代码
- 前置任务没有执行完毕的话,等待,执行完毕的话,往下走
- 执行任务
- 通知子任务,当前任务执行完毕了,相应的计数器(入度数)要减一。
总结
AnchorTask 的原理不复杂,本质是有向无环图与多线程知识的结合。
- 根据 BFS 构建出有向无环图,并得到它的拓扑排序
- 在多线程执行过程中,我们是通过任务的子任务关系和 CounDownLatch 确保先后执行关系的
- 前置任务没有执行完毕的话,等待,执行完毕的话,往下走
- 执行任务
- 通知子任务,当前任务执行完毕了,相应的计数器(入度数)要减一。
AnchorTask 源码已经更新到 github,AnchorTask。
特别鸣谢
在实现这个开源框架的时候,借鉴了以下开源框架的思想。AppStartFaster 主要是通过 ClassName 找到相应的 Task,而阿里 alpha 是通过 taskName 找到相应的 Task,并且需要指定 ITaskCreator。两种方式各有优缺点,没有优劣之说,具体看使用场景。