Kotlin Coroutines 笔记 (二)

简介: Kotlin Coroutines 笔记 (二)

协程虽然是微线程,但是并不会和某一个特定的线程绑定,它可以在A线程中执行,并经过某一个时刻的挂起(suspend),等下次调度到恢复执行的时候,很可能会在B线程中执行。


一. withContext



与 launch、async、runBlocking 类似 withContext 也属于 Coroutine builders。不过与他们不同的是,其他几个都是创建一个新的协程,而 withContext 不会创建新的协程。

withContext 允许更改协程的执行线程,withContext 在使用时需要传递一个 CoroutineContext 。

launch {
        val result1 = withContext(CommonPool) {
            delay(2000)
            1
        }
        val  result2 = withContext(CommonPool) {
            delay(1000)
            2
        }
        val  result = result1 + result2
        println(result)
    }
    Thread.sleep(5000)


执行结果:

3


withContext 可以有返回值,这一点类似 async。async 创建的协程通过 await() 方法将值返回。而 withContext 可以直接返回。

launch {
        val result1 = async {
            delay(2000)
            1
        }
        val  result2 = async {
            delay(1000)
            2
        }
        val  result = result1.await() + result2.await()
        println(result)
    }
    Thread.sleep(5000)


执行结果:

3


二. 共享线程池



在上述的例子中,withContext 使用了 CommonPool。CommonPool 继承了 CoroutineDispatcher,表示使用线程池来执行协程的任务。


image.png

CommonPool.png


CommonPool 有点类似于 RxJava 的 Schedulers.computation(),主要是用于CPU密集型的计算任务。


CommonPool 使用 pool 来执行 block。

override fun dispatch(context: CoroutineContext, block: Runnable) =
        try { (pool ?: getOrCreatePoolSync()).execute(timeSource.trackTask(block)) }
        catch (e: RejectedExecutionException) {
            timeSource.unTrackTask()
            DefaultExecutor.execute(block)
        }


如果 pool 为空,则调用 getOrCreatePoolSync() 方法来创建 pool。

@Synchronized
    private fun getOrCreatePoolSync(): Executor =
        pool ?: createPool().also { pool = it }


此时,createPool() 方法是正在创建 pool 的方法。


首先,安全管理器不为空的话,使用 createPlainPool() 来创建 pool。


否则,尝试创建一个 ForkJoinPool,不行的话还是使用 createPlainPool() 来创建 pool。

private fun createPool(): ExecutorService {
        if (System.getSecurityManager() != null) return createPlainPool()
        val fjpClass = Try { Class.forName("java.util.concurrent.ForkJoinPool") }
            ?: return createPlainPool()
        if (!usePrivatePool) {
            Try { fjpClass.getMethod("commonPool")?.invoke(null) as? ExecutorService }
                ?.let { return it }
        }
        Try { fjpClass.getConstructor(Int::class.java).newInstance(parallelism) as? ExecutorService }
            ?. let { return it }
        return createPlainPool()
    }


createPlainPool() 会使用 Executors.newFixedThreadPool() 来创建线程池。

private fun createPlainPool(): ExecutorService {
        val threadId = AtomicInteger()
        return Executors.newFixedThreadPool(parallelism) {
            Thread(it, "CommonPool-worker-${threadId.incrementAndGet()}").apply { isDaemon = true }
        }
    }


CommonPool 的创建原理大致了解之后,通过源码发现 CoroutineContext 默认的 CoroutineDispatcher 就是 CommonPool。

/**
 * This is the default [CoroutineDispatcher] that is used by all standard builders like
 * [launch], [async], etc if no dispatcher nor any other [ContinuationInterceptor] is specified in their context.
 *
 * It is currently equal to [CommonPool], but the value is subject to change in the future.
 */
@Suppress("PropertyName")
public actual val DefaultDispatcher: CoroutineDispatcher = CommonPool


常见的 CoroutineDispatcher 还可以通过 ThreadPoolDispatcher 的 newSingleThreadContext()、newFixedThreadPoolContext()来创建,以及Executor 的扩展函数 asCoroutineDispatcher() 来创建。


在 Android 中,还可以使用UI。它顾名思义,在 Android 主线程上调度执行。


三. 可取消的协程



Job、Deferred 对象都可以取消任务。


3.1 cancel()


使用 cancel() 方法:

val job = launch {
        delay(1000)
        println("Hello World!")
    }
    job.cancel()
    println(job.isCancelled)
    Thread.sleep(2000)


执行结果:

true


true表示job已经被取消了,并没有打印"Hello World!"


3.2 cancelAndJoin()


使用 cancelAndJoin() 方法:

runBlocking<Unit> {
        val job = launch {
            repeat(100) { i ->
                println("count time: $i")
                delay(500)
            }
        }
        delay(2100)
        job.cancelAndJoin()
    }


执行结果:

count time: 0
count time: 1
count time: 2
count time: 3
count time: 4


cancelAndJoin() 等价于使用了 cancel() 和 join()。


join() 方法用于等待已启动协程的完成,并且它不会传播其异常。 但是,崩溃的子协程也会取消其父协程,并带有相应的异常。


3.3 检查协程的取消标记


如果一个协程一直在执行计算,没有去检查取消标记,它就无法取消。即使调用了cancel() 或者 cancelAndJoin()。

runBlocking<Unit> {
        val startTime = System.currentTimeMillis()
        val job = launch {
            var tempTime = startTime
            var i = 0
            while (i < 100) {
                if (System.currentTimeMillis() >= tempTime) {
                    println("count time: ${i++}")
                    tempTime += 500L
                }
            }
        }
        delay(2100)
        job.cancelAndJoin()
    }


上述代码仍然会打印100次。


如果使用isActive检查取消标记,则Job 或 Deferred 的任务可以被取消:

runBlocking<Unit> {
        val startTime = System.currentTimeMillis()
        val job = launch {
            var tempTime = startTime
            var i = 0
            while (isActive) {
                if (System.currentTimeMillis() >= tempTime) {
                    println("count time: ${i++}")
                    tempTime += 500L
                }
            }
        }
        delay(2100)
        job.cancelAndJoin()
    }


执行结果:

count time: 0
count time: 1
count time: 2
count time: 3
count time: 4


isActive 是 CoroutineScope 的属性:

package kotlinx.coroutines.experimental
import kotlin.coroutines.experimental.*
import kotlin.internal.*
/**
 * Receiver interface for generic coroutine builders, so that the code inside coroutine has a convenient
 * and fast access to its own cancellation status via [isActive].
 */
public interface CoroutineScope {
    /**
     * Returns `true` when this coroutine is still active (has not completed and was not cancelled yet).
     *
     * Check this property in long-running computation loops to support cancellation:
     * ```
     * while (isActive) {
     *     // do some computation
     * }
     * ```
     *
     * This property is a shortcut for `coroutineContext.isActive` in the scope when
     * [CoroutineScope] is available.
     * See [coroutineContext][kotlin.coroutines.experimental.coroutineContext],
     * [isActive][kotlinx.coroutines.experimental.isActive] and [Job.isActive].
     */
    public val isActive: Boolean
    /**
     * Returns the context of this coroutine.
     *
     * @suppress: **Deprecated**: Replaced with top-level [kotlin.coroutines.experimental.coroutineContext].
     */
    @Deprecated("Replace with top-level coroutineContext",
        replaceWith = ReplaceWith("coroutineContext",
            imports = ["kotlin.coroutines.experimental.coroutineContext"]))
    @LowPriorityInOverloadResolution
    @Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
    public val coroutineContext: CoroutineContext
}


总结:



本文介绍了三个部分:withContext的使用,CommonPool的创建以及如何取消协程。其中,还捎带介绍了 async 和 await 的使用。

相关文章
|
25天前
|
算法 Kotlin
Kotlin教程笔记(24) -尾递归优化
Kotlin教程笔记(24) -尾递归优化
43 7
Kotlin教程笔记(24) -尾递归优化
|
26天前
|
Java Kotlin
​ Kotlin教程笔记(15) - 方法重载与默认参数
​ Kotlin教程笔记(15) - 方法重载与默认参数
29 4
​ Kotlin教程笔记(15) - 方法重载与默认参数
|
23天前
|
Java 编译器 Kotlin
Kotlin入门笔记1 - 数据类型
Kotlin入门笔记1 - 数据类型
68 15
|
25天前
|
Kotlin
Kotlin教程笔记(20) - 枚举与密封类
Kotlin教程笔记(20) - 枚举与密封类
38 8
|
25天前
|
安全 Java 编译器
Kotlin教程笔记(27) -Kotlin 与 Java 共存(二)
Kotlin教程笔记(27) -Kotlin 与 Java 共存(二)
|
25天前
|
Java 开发工具 Android开发
Kotlin教程笔记(26) -Kotlin 与 Java 共存(一)
Kotlin教程笔记(26) -Kotlin 与 Java 共存(一)
|
25天前
|
安全 Kotlin
Kotlin教程笔记(23) -作用域函数
Kotlin教程笔记(23) -作用域函数
74 6
|
25天前
|
Kotlin
Kotlin教程笔记(21) -高阶函数与函数引用
Kotlin教程笔记(21) -高阶函数与函数引用
33 6
|
25天前
|
缓存 Kotlin Python
Kotlin教程笔记(25) -函数式编程
Kotlin教程笔记(25) -函数式编程
|
24天前
|
设计模式 Java Kotlin
Kotlin教程笔记(56) - 改良设计模式 - 装饰者模式
Kotlin教程笔记(56) - 改良设计模式 - 装饰者模式
37 2