概览
我们在 java 中处理并发是家常便饭,但是协程的并发你有没有想过呢,协程是否也有java一样的并发问题?
我们知道协程是轻量级的进程,而且是可以多线程调度的。那么想想这样一个情景:
我们开启1000个协程,每个协程中对count进行自增,协程执行完成后能否拿到count==1000的结果,答案在后面的章节中,最终结论就是kkotlin也是需要处理并发的
那么这种并发该如何处理呢,我想先给你说的是,协程有自己的一套并发规则,你应该试图优先用 kotlin 的并发方法来处理协程的并发
本文主要讲几种协程处理并发的例子
模拟协程并发
模拟案例
既然是讲协程的并发,那么我们首先应该证明一下协程中存在并发问题,对吧,那就来吧。
先看一下模拟的代码:
- 代码
fun concurrent(){ val scope = CoroutineScope(Dispatchers.Default)//创建协程作用域,Default支持并发 var count=0 repeat(1000){//重复1000次,每次开启一个协程,count自增1 scope.launch { count++ println(count) } } } fun main() = runBlocking { concurrent() } 复制代码
本例中,我们模拟了这样一个场景,使用自定义的作用域 scope 短时间内开启 1000 个协程,协程中每次把 count 自增 1 打印数据。如果最终打印的数据是 1000 那么说明不存在并发,如果打印的数据小于 1000,那么说明协程中也存在如 java 一样的并发问题。
- 日志
这里只发出最后一部分的日志:
696 695 694 693 692 691 690 703 702 复制代码
- 结论
日志打印最终验证协程中也存在并发性的问题,那么后面的章节我们就拿出我们的协程解决方案吧。
模拟案例扩展
我们将本例中的代码做一个扩展,在concurrent方法尾部加一个1s的延时
- 代码
- 输出日志
999 1000 978 995 994 993 992 991 1000 复制代码
- 结论
当最后一个协程执行完成时可以计算出1000这个正确结果但是打印的中间值顺序却是错乱的(这一点和java不一样),但是我们在业务代码中却没有办法获取这个正确结果产生的时间,所以我们依然要处理并发问题
解决并发的方法
解决并发的问题既可以使用 java 中的部分方式,也可以使用 kotlin 自有的方式。协程自有的处理方法适用性更广一些。
java 方式解决
如果我们想使用java的方式解决并发,那么我们可以尝试让协程运行在单线程中,也可以尝试使用Atomic方式
单线程解决方法
我们将 模拟并发章节的代码进行一些改动试试:
- 代码
fun concurrent() { val scope = CoroutineScope(Dispatchers.Unconfined)//创建协程作用域,使用Unconfined,这样在协程被挂起前都不会改变线程,也就是说协程始终运行在单线程中 var count = 0 repeat(1000) {//重复1000次,每次开启一个协程,count自增1 scope.launch { println("线程id:${Thread.currentThread().id}")//这个线程始终不会变,除非你在这里挂起 count++ println(count) } } } 复制代码
- 日志
994 线程id:1 995 线程id:1 996 线程id:1 997 线程id:1 998 线程id:1 999 线程id:1 1000 复制代码
- 结论
本例代码中因为launch始终运行在单线程中所以最终输出count==1000,中间值的顺序也没有错乱
使用Atomic方式解决
上一小节中我们的代码始终运行在单线程中,所以最终输出正确结果,那么如果我们的代码不允许在单线程中有没有解决方法呢,答案当然是有, Atomic机制即可解决。
试想一下如果我们把上面的代码scope.launch 改为launch(Dispatchers.Default) ,那么线程就变成了运行在多线程中了,此时输出的最终结果一定不会是我们想要的(我已经帮你试过了,就不贴代码和日志了)
那么我们可以尝试用Atomic方式解决这个问题
- 代码
suspend fun concurrent() { var count =AtomicInteger(0) coroutineScope { repeat(10000) {//重复1000次,每次开启一个协程,count自增1 launch(Dispatchers.Default) { count.incrementAndGet() println("计算中间值$count") } } } delay(1000) println("计算结果:$count") } 复制代码
- 日志
计算结果:10000 复制代码
- 结论
虽然最终输出结果是正确的
但是对于负责的状态Atomic明显是处理不了的,负责的状态可以考虑后面要讲的Mutex方式
kotlin 方式解决
互斥
kotlin 为我们提供了Mutex实现线程安全,Mutex通俗点来说就是kotlin的锁,和java 的synchronized和RecentLock对应。
使用mutex.withLock {*} 即可实现数据的同步
看代码:
- 代码
val mutex = Mutex() suspend fun concurrent() { var count =0 coroutineScope { repeat(10000) {//重复1000次,每次开启一个协程,count自增1 launch(Dispatchers.Default) { mutex.withLock { count++ } println("中间值:$count") } } } delay(1000) println("计算结果:$count") } 复制代码
- 日志
**** 中间值:9993 中间值:9994 中间值:9995 中间值:9996 中间值:9997 中间值:9998 中间值:9999 中间值:10000 计算结果:10000 复制代码
- 结论
使用Mutex方式解决并发,最终输出正确结果,并且中间值也是按顺序输出
结尾
掌握上面的几种方式基本上能处理大部分业务逻辑了
如果你有其他方案欢迎留言,我会及时补充