面试必备:Kotlin 线程同步的 N 种方法

简介: 面试必备:Kotlin 线程同步的 N 种方法

面试的时候经常会被问及多线程同步的问题,例如:

“ 现有 Task1、Task2 等多个并行任务,如何等待全部执行完成后,执行 Task3。”

在 Kotlin 中我们有多种实现方式,本文将所有这些方式做了整理,建议收藏。

1. Thread.join

2. Synchronized

3. ReentrantLock

4. BlockingQueue

5. CountDownLatch

6. CyclicBarrier

7. CAS

8. Future

9. CompletableFuture

10. Rxjava

11. Coroutine

12. Flow

我们先定义三个Task,模拟上述场景, Task3 基于 Task1、Task2 返回的结果拼接字符串,每个 Task 通过 sleep 模拟耗时:

image.png

val task1: () -> String = {
    sleep(2000)
    "Hello".also { println("task1 finished: $it") }
}
val task2: () -> String = {
    sleep(2000)
    "World".also { println("task2 finished: $it") }
}
val task3: (String, String) -> String = { p1, p2 ->
    sleep(2000)
    "$p1 $p2".also { println("task3 finished: $it") }
}

1. Thread.join()

Kotlin 兼容 Java,Java 的所有线程工具默认都可以使用。其中最简单的线程同步方式就是使用 Threadjoin()

@Test
fun test_join() {
    lateinit var s1: String
    lateinit var s2: String
    val t1 = Thread { s1 = task1() }
    val t2 = Thread { s2 = task2() }
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    task3(s1, s2)
}

2. Synchronized

使用 synchronized 锁进行同步

  @Test
    fun test_synchrnoized() {
        lateinit var s1: String
        lateinit var s2: String
        Thread {
            synchronized(Unit) {
                s1 = task1()
            }
        }.start()
        s2 = task2()
        synchronized(Unit) {
            task3(s1, s2)
        }
    }

但是如果超过三个任务,使用 synchrnoized 这种写法就比较别扭了,为了同步多个并行任务的结果需要声明n个锁,并嵌套n个 synchronized


3. ReentrantLock

ReentrantLock 是 JUC 提供的线程锁,可以替换 synchronized 的使用

  @Test
    fun test_ReentrantLock() {
        lateinit var s1: String
        lateinit var s2: String
        val lock = ReentrantLock()
        Thread {
            lock.lock()
            s1 = task1()
            lock.unlock()
        }.start()
        s2 = task2()
        lock.lock()
        task3(s1, s2)
        lock.unlock()
    }

ReentrantLock 的好处是,当有多个并行任务时是不会出现嵌套 synchrnoized 的问题,但仍然需要创建多个 lock 管理不同的任务,

4. BlockingQueue

阻塞队列内部也是通过 Lock 实现的,所以也可以达到同步锁的效果

  @Test
    fun test_blockingQueue() {
        lateinit var s1: String
        lateinit var s2: String
        val queue = SynchronousQueue<Unit>()
        Thread {
            s1 = task1()
            queue.put(Unit)
        }.start()
        s2 = task2()
        queue.take()
        task3(s1, s2)
    }

当然,阻塞队列更多是使用在生产/消费场景中的同步。


5. CountDownLatch

JUC 中的锁大都基于 AQS 实现的,可以分为独享锁和共享锁。ReentrantLock 就是一种独享锁。相比之下,共享锁更适合本场景。 例如 CountDownLatch,它可以让一个线程一直处于阻塞状态,直到其他线程的执行全部完成:

  @Test
    fun test_countdownlatch() {
        lateinit var s1: String
        lateinit var s2: String
        val cd = CountDownLatch(2)
        Thread() {
            s1 = task1()
            cd.countDown()
        }.start()
        Thread() {
            s2 = task2()
            cd.countDown()
        }.start()
        cd.await()
        task3(s1, s2)
    }

共享锁的好处是不必为了每个任务都创建单独的锁,即使再多并行任务写起来也很轻松


6. CyclicBarrier

CyclicBarrier 是 JUC 提供的另一种共享锁机制,它可以让一组线程到达一个同步点后再一起继续运行,其中任意一个线程未达到同步点,其他已到达的线程均会被阻塞。

CountDownLatch 的区别在于 CountDownLatch 是一次性的,而 CyclicBarrier 可以被重置后重复使用,这也正是 Cyclic 的命名由来,可以循环使用

  @Test
    fun test_CyclicBarrier() {
        lateinit var s1: String
        lateinit var s2: String
        val cb = CyclicBarrier(3)
        Thread {
            s1 = task1()
            cb.await()
        }.start()
        Thread() {
            s2 = task1()
            cb.await()
        }.start()
        cb.await()
        task3(s1, s2)
    }

7. CAS

AQS 内部通过自旋锁实现同步,自旋锁的本质是利用 CompareAndSwap 避免线程阻塞的开销。 因此,我们可以使用基于 CAS 的原子类计数,达到实现无锁操作的目的。

  @Test
    fun test_cas() {
        lateinit var s1: String
        lateinit var s2: String
        val cas = AtomicInteger(2)
        Thread {
            s1 = task1()
            cas.getAndDecrement()
        }.start()
        Thread {
            s2 = task2()
            cas.getAndDecrement()
        }.start()
        while (cas.get() != 0) {}
        task3(s1, s2)
    }

While 循环空转看起来有些浪费资源,但是自旋锁的本质就是这样,所以 CAS 仅仅适用于一些cpu密集型的短任务同步。


volatile

看到 CAS 的无锁实现,也许很多人会想到 volatile, 是否也能实现无锁的线程安全?

  @Test
    fun test_Volatile() {
        lateinit var s1: String
        lateinit var s2: String
        Thread {
            s1 = task1()
            cnt--
        }.start()
        Thread {
            s2 = task2()
            cnt--
        }.start()
        while (cnt != 0) {
        }
        task3(s1, s2)
    }

注意,这种写法是错误的 volatile 能保证可见性,但是不能保证原子性,cnt-- 并非线程安全,需要加锁操作


8. Future

上面无论有锁操作还是无锁操作,都需要定义两个变量s1s2记录结果非常不方便。 Java 1.5 开始,提供了 CallableFuture ,可以在任务执行结束时返回结果。

@Test
fun test_future() {
    val future1 = FutureTask(Callable(task1))
    val future2 = FutureTask(Callable(task2))
    Executors.newCachedThreadPool().execute(future1)
    Executors.newCachedThreadPool().execute(future2)
    task3(future1.get(), future2.get())
}

通过 future.get(),可以同步等待结果返回,写起来非常方便


9. CompletableFuture

Future.get() 虽然方便,但是会阻塞线程。 Java 8 中引入了 CompletableFuture ,他实现了 Future 接口的同时实现了 CompletionStage 接口。 CompletableFuture 可以针对多个 CompletionStage 进行逻辑组合、实现复杂的异步编程。 这些逻辑组合的方法以回调的形式避免了线程阻塞:

@Test
fun test_CompletableFuture() {
    CompletableFuture.supplyAsync(task1)
        .thenCombine(CompletableFuture.supplyAsync(task2)) { p1, p2 ->
             task3(p1, p2)
        }.join()
}

10. RxJava

RxJava 提供的各种操作符以及线程切换能力同样可以帮助我们实现需求: zip 操作符可以组合两个 Observable 的结果;subscribeOn 用来启动异步任务

@Test
fun test_Rxjava() {
    Observable.zip(
        Observable.fromCallable(Callable(task1))
            .subscribeOn(Schedulers.newThread()),
        Observable.fromCallable(Callable(task2))
            .subscribeOn(Schedulers.newThread()),
        BiFunction(task3)
    ).test().awaitTerminalEvent()
}

11. Coroutine

前面讲了那么多,其实都是 Java 的工具。 Coroutine 终于算得上是 Kotlin 特有的工具了:

@Test
fun test_coroutine() {
    runBlocking {
        val c1 = async(Dispatchers.IO) {
            task1()
        }
        val c2 = async(Dispatchers.IO) {
            task2()
        }
        task3(c1.await(), c2.await())
    }
}

写起来特别舒服,可以说是集前面各类工具的优点于一身。


12. Flow

Flow 就是 Coroutine 版的 RxJava,具备很多 RxJava 的操作符,例如 zip:

@Test
fun test_flow() {
    val flow1 = flow<String> { emit(task1()) }
    val flow2 = flow<String> { emit(task2()) }
    runBlocking {
         flow1.zip(flow2) { t1, t2 ->
             task3(t1, t2)
        }.flowOn(Dispatchers.IO)
        .collect()
    }
}

FlowOn 使得 Task 在异步计算并发射结果。


总结

上面这么多方式,就像茴香豆的“茴”字的四种写法,没必要都掌握。作为结论,在 Kotlin 上最好用的线程同步方案首推协程!

目录
相关文章
|
7天前
|
Java 开发者
在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口
【10月更文挑战第20天】在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口。本文揭示了这两种方式的微妙差异和潜在陷阱,帮助你更好地理解和选择适合项目需求的线程创建方式。
11 3
|
7天前
|
Java 开发者
在Java多线程编程中,选择合适的线程创建方法至关重要
【10月更文挑战第20天】在Java多线程编程中,选择合适的线程创建方法至关重要。本文通过案例分析,探讨了继承Thread类和实现Runnable接口两种方法的优缺点及适用场景,帮助开发者做出明智的选择。
9 2
|
7天前
|
安全 Java
Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧
【10月更文挑战第20天】Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧,包括避免在循环外调用wait()、优先使用notifyAll()、确保线程安全及处理InterruptedException等,帮助读者更好地掌握这些方法的应用。
10 1
|
7天前
|
Java 开发者
Java多线程初学者指南:介绍通过继承Thread类与实现Runnable接口两种方式创建线程的方法及其优缺点
【10月更文挑战第20天】Java多线程初学者指南:介绍通过继承Thread类与实现Runnable接口两种方式创建线程的方法及其优缺点,重点解析为何实现Runnable接口更具灵活性、资源共享及易于管理的优势。
18 1
|
7天前
|
Java
在Java多线程编程中,`wait()`和`notify()`方法的相遇如同一场奇妙的邂逅
在Java多线程编程中,`wait()`和`notify()`方法的相遇如同一场奇妙的邂逅。它们用于线程间通信,使线程能够协作完成任务。通过这些方法,生产者和消费者线程可以高效地管理共享资源,确保程序的有序运行。正确使用这些方法需要遵循同步规则,避免虚假唤醒等问题。示例代码展示了如何在生产者-消费者模型中使用`wait()`和`notify()`。
13 1
|
7天前
|
安全 Java 开发者
Java多线程中的`wait()`、`notify()`和`notifyAll()`方法,探讨了它们在实现线程间通信和同步中的关键作用
本文深入解析了Java多线程中的`wait()`、`notify()`和`notifyAll()`方法,探讨了它们在实现线程间通信和同步中的关键作用。通过示例代码展示了如何正确使用这些方法,并分享了最佳实践,帮助开发者避免常见陷阱,提高多线程程序的稳定性和效率。
17 1
|
7天前
|
Java
在Java多线程编程中,`wait()` 和 `notify()/notifyAll()` 方法是线程间通信的核心机制。
在Java多线程编程中,`wait()` 和 `notify()/notifyAll()` 方法是线程间通信的核心机制。它们通过基于锁的方式,使线程在条件不满足时进入休眠状态,并在条件成立时被唤醒,从而有效解决数据一致性和同步问题。本文通过对比其他通信机制,展示了 `wait()` 和 `notify()` 的优势,并通过生产者-消费者模型的示例代码,详细说明了其使用方法和重要性。
14 1
|
12天前
|
监控 Java
在实际应用中选择线程异常捕获方法的考量
【10月更文挑战第15天】选择最适合的线程异常捕获方法需要综合考虑多种因素。没有一种方法是绝对最优的,需要根据具体情况进行权衡和选择。在实际应用中,还需要不断地实践和总结经验,以提高异常处理的效果和程序的稳定性。
17 3
|
12天前
|
监控 Java
捕获线程执行异常的多种方法
【10月更文挑战第15天】捕获线程执行异常的方法多种多样,每种方法都有其特点和适用场景。在实际开发中,需要根据具体情况选择合适的方法或结合多种方法来实现全面有效的线程异常捕获。这有助于提高程序的健壮性和稳定性,减少因线程异常带来的潜在风险。
10 1
|
16天前
|
调度 Android开发 开发者
构建高效Android应用:探究Kotlin多线程优化策略
【10月更文挑战第11天】本文探讨了如何在Kotlin中实现高效的多线程方案,特别是在Android应用开发中。通过介绍Kotlin协程的基础知识、异步数据加载的实际案例,以及合理使用不同调度器的方法,帮助开发者提升应用性能和用户体验。
36 4