面试必备:Kotlin 线程同步的 N 种方法

简介: 面试必备:Kotlin 线程同步的 N 种方法

面试的时候经常会被问及多线程同步的问题,例如:

“ 现有 Task1、Task2 等多个并行任务,如何等待全部执行完成后,执行 Task3。”

在 Kotlin 中我们有多种实现方式,本文将所有这些方式做了整理,建议收藏。

1. Thread.join

2. Synchronized

3. ReentrantLock

4. BlockingQueue

5. CountDownLatch

6. CyclicBarrier

7. CAS

8. Future

9. CompletableFuture

10. Rxjava

11. Coroutine

12. Flow

我们先定义三个Task,模拟上述场景, Task3 基于 Task1、Task2 返回的结果拼接字符串,每个 Task 通过 sleep 模拟耗时:

image.png

val task1: () -> String = {
    sleep(2000)
    "Hello".also { println("task1 finished: $it") }
}
val task2: () -> String = {
    sleep(2000)
    "World".also { println("task2 finished: $it") }
}
val task3: (String, String) -> String = { p1, p2 ->
    sleep(2000)
    "$p1 $p2".also { println("task3 finished: $it") }
}

1. Thread.join()

Kotlin 兼容 Java,Java 的所有线程工具默认都可以使用。其中最简单的线程同步方式就是使用 Threadjoin()

@Test
fun test_join() {
    lateinit var s1: String
    lateinit var s2: String
    val t1 = Thread { s1 = task1() }
    val t2 = Thread { s2 = task2() }
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    task3(s1, s2)
}

2. Synchronized

使用 synchronized 锁进行同步

  @Test
    fun test_synchrnoized() {
        lateinit var s1: String
        lateinit var s2: String
        Thread {
            synchronized(Unit) {
                s1 = task1()
            }
        }.start()
        s2 = task2()
        synchronized(Unit) {
            task3(s1, s2)
        }
    }

但是如果超过三个任务,使用 synchrnoized 这种写法就比较别扭了,为了同步多个并行任务的结果需要声明n个锁,并嵌套n个 synchronized


3. ReentrantLock

ReentrantLock 是 JUC 提供的线程锁,可以替换 synchronized 的使用

  @Test
    fun test_ReentrantLock() {
        lateinit var s1: String
        lateinit var s2: String
        val lock = ReentrantLock()
        Thread {
            lock.lock()
            s1 = task1()
            lock.unlock()
        }.start()
        s2 = task2()
        lock.lock()
        task3(s1, s2)
        lock.unlock()
    }

ReentrantLock 的好处是,当有多个并行任务时是不会出现嵌套 synchrnoized 的问题,但仍然需要创建多个 lock 管理不同的任务,

4. BlockingQueue

阻塞队列内部也是通过 Lock 实现的,所以也可以达到同步锁的效果

  @Test
    fun test_blockingQueue() {
        lateinit var s1: String
        lateinit var s2: String
        val queue = SynchronousQueue<Unit>()
        Thread {
            s1 = task1()
            queue.put(Unit)
        }.start()
        s2 = task2()
        queue.take()
        task3(s1, s2)
    }

当然,阻塞队列更多是使用在生产/消费场景中的同步。


5. CountDownLatch

JUC 中的锁大都基于 AQS 实现的,可以分为独享锁和共享锁。ReentrantLock 就是一种独享锁。相比之下,共享锁更适合本场景。 例如 CountDownLatch,它可以让一个线程一直处于阻塞状态,直到其他线程的执行全部完成:

  @Test
    fun test_countdownlatch() {
        lateinit var s1: String
        lateinit var s2: String
        val cd = CountDownLatch(2)
        Thread() {
            s1 = task1()
            cd.countDown()
        }.start()
        Thread() {
            s2 = task2()
            cd.countDown()
        }.start()
        cd.await()
        task3(s1, s2)
    }

共享锁的好处是不必为了每个任务都创建单独的锁,即使再多并行任务写起来也很轻松


6. CyclicBarrier

CyclicBarrier 是 JUC 提供的另一种共享锁机制,它可以让一组线程到达一个同步点后再一起继续运行,其中任意一个线程未达到同步点,其他已到达的线程均会被阻塞。

CountDownLatch 的区别在于 CountDownLatch 是一次性的,而 CyclicBarrier 可以被重置后重复使用,这也正是 Cyclic 的命名由来,可以循环使用

  @Test
    fun test_CyclicBarrier() {
        lateinit var s1: String
        lateinit var s2: String
        val cb = CyclicBarrier(3)
        Thread {
            s1 = task1()
            cb.await()
        }.start()
        Thread() {
            s2 = task1()
            cb.await()
        }.start()
        cb.await()
        task3(s1, s2)
    }

7. CAS

AQS 内部通过自旋锁实现同步,自旋锁的本质是利用 CompareAndSwap 避免线程阻塞的开销。 因此,我们可以使用基于 CAS 的原子类计数,达到实现无锁操作的目的。

  @Test
    fun test_cas() {
        lateinit var s1: String
        lateinit var s2: String
        val cas = AtomicInteger(2)
        Thread {
            s1 = task1()
            cas.getAndDecrement()
        }.start()
        Thread {
            s2 = task2()
            cas.getAndDecrement()
        }.start()
        while (cas.get() != 0) {}
        task3(s1, s2)
    }

While 循环空转看起来有些浪费资源,但是自旋锁的本质就是这样,所以 CAS 仅仅适用于一些cpu密集型的短任务同步。


volatile

看到 CAS 的无锁实现,也许很多人会想到 volatile, 是否也能实现无锁的线程安全?

  @Test
    fun test_Volatile() {
        lateinit var s1: String
        lateinit var s2: String
        Thread {
            s1 = task1()
            cnt--
        }.start()
        Thread {
            s2 = task2()
            cnt--
        }.start()
        while (cnt != 0) {
        }
        task3(s1, s2)
    }

注意,这种写法是错误的 volatile 能保证可见性,但是不能保证原子性,cnt-- 并非线程安全,需要加锁操作


8. Future

上面无论有锁操作还是无锁操作,都需要定义两个变量s1s2记录结果非常不方便。 Java 1.5 开始,提供了 CallableFuture ,可以在任务执行结束时返回结果。

@Test
fun test_future() {
    val future1 = FutureTask(Callable(task1))
    val future2 = FutureTask(Callable(task2))
    Executors.newCachedThreadPool().execute(future1)
    Executors.newCachedThreadPool().execute(future2)
    task3(future1.get(), future2.get())
}

通过 future.get(),可以同步等待结果返回,写起来非常方便


9. CompletableFuture

Future.get() 虽然方便,但是会阻塞线程。 Java 8 中引入了 CompletableFuture ,他实现了 Future 接口的同时实现了 CompletionStage 接口。 CompletableFuture 可以针对多个 CompletionStage 进行逻辑组合、实现复杂的异步编程。 这些逻辑组合的方法以回调的形式避免了线程阻塞:

@Test
fun test_CompletableFuture() {
    CompletableFuture.supplyAsync(task1)
        .thenCombine(CompletableFuture.supplyAsync(task2)) { p1, p2 ->
             task3(p1, p2)
        }.join()
}

10. RxJava

RxJava 提供的各种操作符以及线程切换能力同样可以帮助我们实现需求: zip 操作符可以组合两个 Observable 的结果;subscribeOn 用来启动异步任务

@Test
fun test_Rxjava() {
    Observable.zip(
        Observable.fromCallable(Callable(task1))
            .subscribeOn(Schedulers.newThread()),
        Observable.fromCallable(Callable(task2))
            .subscribeOn(Schedulers.newThread()),
        BiFunction(task3)
    ).test().awaitTerminalEvent()
}

11. Coroutine

前面讲了那么多,其实都是 Java 的工具。 Coroutine 终于算得上是 Kotlin 特有的工具了:

@Test
fun test_coroutine() {
    runBlocking {
        val c1 = async(Dispatchers.IO) {
            task1()
        }
        val c2 = async(Dispatchers.IO) {
            task2()
        }
        task3(c1.await(), c2.await())
    }
}

写起来特别舒服,可以说是集前面各类工具的优点于一身。


12. Flow

Flow 就是 Coroutine 版的 RxJava,具备很多 RxJava 的操作符,例如 zip:

@Test
fun test_flow() {
    val flow1 = flow<String> { emit(task1()) }
    val flow2 = flow<String> { emit(task2()) }
    runBlocking {
         flow1.zip(flow2) { t1, t2 ->
             task3(t1, t2)
        }.flowOn(Dispatchers.IO)
        .collect()
    }
}

FlowOn 使得 Task 在异步计算并发射结果。


总结

上面这么多方式,就像茴香豆的“茴”字的四种写法,没必要都掌握。作为结论,在 Kotlin 上最好用的线程同步方案首推协程!

目录
相关文章
|
7天前
|
缓存 安全 Java
一文吃透面试线程必问10大问题
本文全面探讨了Java线程的十个关键面试问题,涵盖了线程的基本概念、创建方法、使用目的与好处、运行流程与状态、停止线程的正确方式、以及线程安全等高级主题。
|
6天前
|
安全 Java 数据库
一天十道Java面试题----第四天(线程池复用的原理------>spring事务的实现方式原理以及隔离级别)
这篇文章是关于Java面试题的笔记,涵盖了线程池复用原理、Spring框架基础、AOP和IOC概念、Bean生命周期和作用域、单例Bean的线程安全性、Spring中使用的设计模式、以及Spring事务的实现方式和隔离级别等知识点。
|
6天前
|
存储 监控 安全
一天十道Java面试题----第三天(对线程安全的理解------>线程池中阻塞队列的作用)
这篇文章是Java面试第三天的笔记,讨论了线程安全、Thread与Runnable的区别、守护线程、ThreadLocal原理及内存泄漏问题、并发并行串行的概念、并发三大特性、线程池的使用原因和解释、线程池处理流程,以及线程池中阻塞队列的作用和设计考虑。
|
12天前
|
Go 调度 开发者
[go 面试] 深入理解进程、线程和协程的概念及区别
[go 面试] 深入理解进程、线程和协程的概念及区别
|
8天前
|
调度 Android开发 开发者
【颠覆传统!】Kotlin协程魔法:解锁Android应用极速体验,带你领略多线程优化的无限魅力!
【8月更文挑战第12天】多线程对现代Android应用至关重要,能显著提升性能与体验。本文探讨Kotlin中的高效多线程实践。首先,理解主线程(UI线程)的角色,避免阻塞它。Kotlin协程作为轻量级线程,简化异步编程。示例展示了如何使用`kotlinx.coroutines`库创建协程,执行后台任务而不影响UI。此外,通过协程与Retrofit结合,实现了网络数据的异步加载,并安全地更新UI。协程不仅提高代码可读性,还能确保程序高效运行,不阻塞主线程,是构建高性能Android应用的关键。
27 4
|
12天前
|
SQL 安全 测试技术
[go 面试] 接口测试的方法与技巧
[go 面试] 接口测试的方法与技巧
|
12天前
|
存储 安全 Go
Go 面试题:string 是线程安全的吗?
Go 面试题:string 是线程安全的吗?
|
13天前
|
机器学习/深度学习 算法 Python
【机器学习】面试问答:决策树如何进行剪枝?剪枝的方法有哪些?
文章讨论了决策树的剪枝技术,包括预剪枝和后剪枝的概念、方法以及各自的优缺点。
29 2
|
13天前
|
机器学习/深度学习
【机器学习】面试题:LSTM长短期记忆网络的理解?LSTM是怎么解决梯度消失的问题的?还有哪些其它的解决梯度消失或梯度爆炸的方法?
长短时记忆网络(LSTM)的基本概念、解决梯度消失问题的机制,以及介绍了包括梯度裁剪、改变激活函数、残差结构和Batch Normalization在内的其他方法来解决梯度消失或梯度爆炸问题。
27 2
|
13天前
|
存储 机器学习/深度学习 缓存
【数据挖掘】XGBoost面试题:与GBDT的区别?为什么使用泰勒二阶展开?为什么可以并行训练?为什么快?防止过拟合的方法?如何处理缺失值?
XGBoost与GBDT的区别、XGBoost使用泰勒二阶展开的原因、并行训练的原理、速度优势、防止过拟合的策略以及处理缺失值的方法,突出了XGBoost在提升模型性能和训练效率方面的一系列优化。
17 1