面试必备:Kotlin 线程同步的 N 种方法

简介: 面试的时候经常会被问及多线程同步的问题,例如,有 Task1、Task2 等多个并行任务,如何等待全部执行完成后执行 Task3?

面试的时候经常会被问及多线程同步的问题,例如:

“ 现有 Task1、Task2 等多个并行任务,如何等待全部执行完成后,执行 Task3。”

在 Kotlin 中我们有多种实现方式,本文将所有这些方式做了整理,建议收藏。

1. Thread.join <br/>
2. Synchronized <br/>
3. ReentrantLock<br/>
4. BlockingQueue<br/>
5. CountDownLatch<br/>
6. CyclicBarrier<br/>
7. CAS<br/>
8. Future<br/>
9. CompletableFuture<br/>
10. Rxjava<br/>
11. Coroutine<br/>
12. Flow

我们先定义三个Task,模拟上述场景, Task3 基于 Task1、Task2 返回的结果拼接字符串,每个 Task 通过 sleep 模拟耗时:
image.png

val task1: () -> String = {
    sleep(2000)
    "Hello".also { println("task1 finished: $it") }
}

val task2: () -> String = {
    sleep(2000)
    "World".also { println("task2 finished: $it") }
}

val task3: (String, String) -> String = { p1, p2 ->
    sleep(2000)
    "$p1 $p2".also { println("task3 finished: $it") }
}

1. Thread.join()

Kotlin 兼容 Java,Java 的所有线程工具默认都可以使用。其中最简单的线程同步方式就是使用 Threadjoin()

@Test
fun test_join() {
    lateinit var s1: String
    lateinit var s2: String

    val t1 = Thread { s1 = task1() }
    val t2 = Thread { s2 = task2() }
    t1.start()
    t2.start()

    t1.join()
    t2.join()
    
    task3(s1, s2)

}

2. Synchronized

使用 synchronized 锁进行同步

    @Test
    fun test_synchrnoized() {
        lateinit var s1: String
        lateinit var s2: String

        Thread {
            synchronized(Unit) {
                s1 = task1()
            }
        }.start()
        s2 = task2()

        synchronized(Unit) {
            task3(s1, s2)
        }

    }

但是如果超过三个任务,使用 synchrnoized 这种写法就比较别扭了,为了同步多个并行任务的结果需要声明n个锁,并嵌套n个 synchronized

3. ReentrantLock

ReentrantLock 是 JUC 提供的线程锁,可以替换 synchronized 的使用

    @Test
    fun test_ReentrantLock() {

        lateinit var s1: String
        lateinit var s2: String

        val lock = ReentrantLock()
        Thread {
            lock.lock()
            s1 = task1()
            lock.unlock()
        }.start()
        s2 = task2()

        lock.lock()
        task3(s1, s2)
        lock.unlock()

    }

ReentrantLock 的好处是,当有多个并行任务时是不会出现嵌套 synchrnoized 的问题,但仍然需要创建多个 lock 管理不同的任务,

4. BlockingQueue

阻塞队列内部也是通过 Lock 实现的,所以也可以达到同步锁的效果

    @Test
    fun test_blockingQueue() {

        lateinit var s1: String
        lateinit var s2: String

        val queue = SynchronousQueue<Unit>()

        Thread {
            s1 = task1()
            queue.put(Unit)
        }.start()

        s2 = task2()

        queue.take()
        task3(s1, s2)
    }

当然,阻塞队列更多是使用在生产/消费场景中的同步。

5. CountDownLatch

JUC 中的锁大都基于 AQS 实现的,可以分为独享锁和共享锁。ReentrantLock 就是一种独享锁。相比之下,共享锁更适合本场景。 例如 CountDownLatch,它可以让一个线程一直处于阻塞状态,直到其他线程的执行全部完成:

    @Test
    fun test_countdownlatch() {

        lateinit var s1: String
        lateinit var s2: String
        val cd = CountDownLatch(2)
        Thread() {
            s1 = task1()
            cd.countDown()
        }.start()

        Thread() {
            s2 = task2()
            cd.countDown()
        }.start()

        cd.await()
        task3(s1, s2)
    }

共享锁的好处是不必为了每个任务都创建单独的锁,即使再多并行任务写起来也很轻松

6. CyclicBarrier

CyclicBarrier 是 JUC 提供的另一种共享锁机制,它可以让一组线程到达一个同步点后再一起继续运行,其中任意一个线程未达到同步点,其他已到达的线程均会被阻塞。

CountDownLatch 的区别在于 CountDownLatch 是一次性的,而 CyclicBarrier 可以被重置后重复使用,这也正是 Cyclic 的命名由来,可以循环使用

    @Test
    fun test_CyclicBarrier() {

        lateinit var s1: String
        lateinit var s2: String
        val cb = CyclicBarrier(3)

        Thread {
            s1 = task1()
            cb.await()
        }.start()

        Thread() {
            s2 = task1()
            cb.await()
        }.start()

        cb.await()
        task3(s1, s2)

    }

7. CAS

AQS 内部通过自旋锁实现同步,自旋锁的本质是利用 CompareAndSwap 避免线程阻塞的开销。
因此,我们可以使用基于 CAS 的原子类计数,达到实现无锁操作的目的。

     @Test
    fun test_cas() {

        lateinit var s1: String
        lateinit var s2: String

        val cas = AtomicInteger(2)

        Thread {
            s1 = task1()
            cas.getAndDecrement()
        }.start()

        Thread {
            s2 = task2()
            cas.getAndDecrement()
        }.start()

        while (cas.get() != 0) {}

        task3(s1, s2)

    }

while 循环空转看起来有些浪费资源,但是自旋锁的本质就是这样,所以 CAS 仅仅适用于一些cpu密集型的短任务同步。

volatile

看到 CAS 的无锁实现,也许很多人会想到 volatile, 是否也能实现无锁的线程安全?

     @Test
    fun test_Volatile() {
        lateinit var s1: String
        lateinit var s2: String

        Thread {
            s1 = task1()
            cnt--
        }.start()

        Thread {
            s2 = task2()
            cnt--
        }.start()

        while (cnt != 0) {
        }

        task3(s1, s2)

    }

注意,这种写法是错误的
volatile 能保证可见性,但是不能保证原子性,cnt-- 并非线程安全,需要加锁操作

8. Future

上面无论有锁操作还是无锁操作,都需要定义两个变量s1s2记录结果非常不方便。
Java 1.5 开始,提供了 CallableFuture ,可以在任务执行结束时返回结果。

@Test
fun test_future() {

    val future1 = FutureTask(Callable(task1))
    val future2 = FutureTask(Callable(task2))

    Executors.newCachedThreadPool().execute(future1)
    Executors.newCachedThreadPool().execute(future2)

    task3(future1.get(), future2.get())

}

通过 future.get(),可以同步等待结果返回,写起来非常方便

9. CompletableFuture

future.get() 虽然方便,但是会阻塞线程。 Java 8 中引入了 CompletableFuture ,他实现了 Future 接口的同时实现了 CompletionStage 接口。 CompletableFuture 可以针对多个 CompletionStage 进行逻辑组合、实现复杂的异步编程。 这些逻辑组合的方法以回调的形式避免了线程阻塞:

@Test
fun test_CompletableFuture() {
    CompletableFuture.supplyAsync(task1)
        .thenCombine(CompletableFuture.supplyAsync(task2)) { p1, p2 ->
             task3(p1, p2)
        }.join()
}

<br/>

10. RxJava

RxJava 提供的各种操作符以及线程切换能力同样可以帮助我们实现需求:
zip 操作符可以组合两个 Observable 的结果;subscribeOn 用来启动异步任务

@Test
fun test_Rxjava() {

    Observable.zip(
        Observable.fromCallable(Callable(task1))
            .subscribeOn(Schedulers.newThread()),
        Observable.fromCallable(Callable(task2))
            .subscribeOn(Schedulers.newThread()),
        BiFunction(task3)
    ).test().awaitTerminalEvent()

}

11. Coroutine

前面讲了那么多,其实都是 Java 的工具。 Coroutine 终于算得上是 Kotlin 特有的工具了:

@Test
fun test_coroutine() {

    runBlocking {
        val c1 = async(Dispatchers.IO) {
            task1()
        }

        val c2 = async(Dispatchers.IO) {
            task2()
        }

        task3(c1.await(), c2.await())
    }
}

写起来特别舒服,可以说是集前面各类工具的优点于一身。

12. Flow

Flow 就是 Coroutine 版的 RxJava,具备很多 RxJava 的操作符,例如 zip:


@Test
fun test_flow() {

    val flow1 = flow<String> { emit(task1()) }
    val flow2 = flow<String> { emit(task2()) }
        
    runBlocking {
         flow1.zip(flow2) { t1, t2 ->
             task3(t1, t2)
        }.flowOn(Dispatchers.IO)
        .collect()

    }

}

flowOn 使得 Task 在异步计算并发射结果。

总结

上面这么多方式,就像茴香豆的“茴”字的四种写法,没必要都掌握。作为结论,在 Kotlin 上最好用的线程同步方案首推协程!

目录
相关文章
|
7天前
|
缓存 安全 Java
一文吃透面试线程必问10大问题
本文全面探讨了Java线程的十个关键面试问题,涵盖了线程的基本概念、创建方法、使用目的与好处、运行流程与状态、停止线程的正确方式、以及线程安全等高级主题。
|
5天前
|
安全 Java 数据库
一天十道Java面试题----第四天(线程池复用的原理------>spring事务的实现方式原理以及隔离级别)
这篇文章是关于Java面试题的笔记,涵盖了线程池复用原理、Spring框架基础、AOP和IOC概念、Bean生命周期和作用域、单例Bean的线程安全性、Spring中使用的设计模式、以及Spring事务的实现方式和隔离级别等知识点。
|
5天前
|
存储 监控 安全
一天十道Java面试题----第三天(对线程安全的理解------>线程池中阻塞队列的作用)
这篇文章是Java面试第三天的笔记,讨论了线程安全、Thread与Runnable的区别、守护线程、ThreadLocal原理及内存泄漏问题、并发并行串行的概念、并发三大特性、线程池的使用原因和解释、线程池处理流程,以及线程池中阻塞队列的作用和设计考虑。
|
11天前
|
Go 调度 开发者
[go 面试] 深入理解进程、线程和协程的概念及区别
[go 面试] 深入理解进程、线程和协程的概念及区别
|
7天前
|
调度 Android开发 开发者
【颠覆传统!】Kotlin协程魔法:解锁Android应用极速体验,带你领略多线程优化的无限魅力!
【8月更文挑战第12天】多线程对现代Android应用至关重要,能显著提升性能与体验。本文探讨Kotlin中的高效多线程实践。首先,理解主线程(UI线程)的角色,避免阻塞它。Kotlin协程作为轻量级线程,简化异步编程。示例展示了如何使用`kotlinx.coroutines`库创建协程,执行后台任务而不影响UI。此外,通过协程与Retrofit结合,实现了网络数据的异步加载,并安全地更新UI。协程不仅提高代码可读性,还能确保程序高效运行,不阻塞主线程,是构建高性能Android应用的关键。
27 4
|
11天前
|
SQL 安全 测试技术
[go 面试] 接口测试的方法与技巧
[go 面试] 接口测试的方法与技巧
|
11天前
|
存储 安全 Go
Go 面试题:string 是线程安全的吗?
Go 面试题:string 是线程安全的吗?
|
13天前
|
机器学习/深度学习 算法 Python
【机器学习】面试问答:决策树如何进行剪枝?剪枝的方法有哪些?
文章讨论了决策树的剪枝技术,包括预剪枝和后剪枝的概念、方法以及各自的优缺点。
28 2
|
13天前
|
机器学习/深度学习
【机器学习】面试题:LSTM长短期记忆网络的理解?LSTM是怎么解决梯度消失的问题的?还有哪些其它的解决梯度消失或梯度爆炸的方法?
长短时记忆网络(LSTM)的基本概念、解决梯度消失问题的机制,以及介绍了包括梯度裁剪、改变激活函数、残差结构和Batch Normalization在内的其他方法来解决梯度消失或梯度爆炸问题。
27 2
|
13天前
|
存储 机器学习/深度学习 缓存
【数据挖掘】XGBoost面试题:与GBDT的区别?为什么使用泰勒二阶展开?为什么可以并行训练?为什么快?防止过拟合的方法?如何处理缺失值?
XGBoost与GBDT的区别、XGBoost使用泰勒二阶展开的原因、并行训练的原理、速度优势、防止过拟合的策略以及处理缺失值的方法,突出了XGBoost在提升模型性能和训练效率方面的一系列优化。
17 1