一道面试题: Kotlin 中处理生产者/消费者问题的几种方式

简介: 生产者消费者问题是面试中常被问到的问题,Kotlin 继承了 Java 的所有能力同时增加了协程,本问题盘点一下 Kotlin 中处理生产者消费者问题的几种方法

image.png

生产者和消费者问题是线程模型中的经典问题:生产者和消费者在同一时间段内共用同一个缓冲区(Buffer),生产者往 Buffer 中添加产品,消费者从 Buffer 中取走产品,当 Buffer 为空时,消费者阻塞,当 Buffer 满时,生产者阻塞。

Kotlin 中有多种方法可以实现多线程的生产/消费模型(大多也适用于Java)

  1. Synchronized
  2. ReentrantLock
  3. BlockingQueue
  4. Semaphore
  5. PipedXXXStream
  6. RxJava
  7. Coroutine
  8. Flow

1. Synchronized

Synchronized 是最最基本的线程同步工具,配合 wait/notify 可以实现实现生产消费问题

val buffer = LinkedList<Data>()
val MAX = 5 //buffer最大size

val lock = Object()

fun produce(data: Data) {
    sleep(2000) // mock produce
    synchronized(lock) {
        while (buffer.size >= MAX) {
           // 当buffer满时,停止生产
           // 注意此处使用while不能使用if,因为有可能是被另一个生产线程而非消费线程唤醒,所以要再次检查buffer状态
           // 如果生产消费两把锁,则不必担心此问题
           lock.wait()
        }

            buffer.push(data)
        // notify方法只唤醒其中一个线程,选择哪个线程取决于操作系统对多线程管理的实现。
        // notifyAll会唤醒所有等待中线程,哪一个线程将会第一个处理取决于操作系统的实现,但是都有机会处理。
        // 此处使用notify有可能唤醒的是另一个生产线程从而造成死锁,所以必须使用notifyAll
        lock.notifyAll()
    }
}

fun consume() {
    synchronized(lock) {
        while (buffer.isEmpty())
            lock.wait() // 暂停消费
        buffer.removeFirst()
        lock.notifyAll()
    }
    sleep(2000) // mock consume
}



@Test
fun test() {
    // 同时启动多个生产、消费线程
    repeat(10) {
        Thread { produce(Data()) }.start()
    }
    repeat(10) {
        Thread { consume() }.start()
    }
}

<br/>

2. ReentrantLock


Lock 相对于 Synchronized 好处是当有多个生产线/消费线程时,我们可以通过定义多个 condition 精确指定唤醒哪一个。下面的例子展示 Lock 配合 await/single 替换前面 Synchronized 写法

val buffer = LinkedList<Data>()
val MAX = 5 //buffer最大size
             
val lock = ReentrantLock()                     
val condition = lock.newCondition()          
                                               
fun produce(data: Data) {                      
    sleep(2000) // mock produce                
    lock.lock()                                
                                               
    while (buffer.size >= 5)                      
        condition.await()                      
                                               
    buffer.push(data)                          
    condition.signalAll()                      
    lock.unlock()                              
}                                              
                                               
fun consume() {                                
    lock.lock()                                
    while (buffer.isEmpty())                      
        condition.await()                      
                                               
    buffer.removeFirst()
    condition.singleAll()                        
    lock.unlock()                              
    sleep(2000) // mock consume                
}                                              

<br/>

3. BlockingQueue (阻塞队列)

BlockingQueue在达到临界条件时,再进行读写会自动阻塞当前线程等待锁的释放,天然适合这种生产/消费场景

val buffer = LinkedBlockingQueue<Data>(5)               
                                                        
fun produce(data: Data) {                               
    sleep(2000) // mock produce                         
    buffer.put(data) //buffer满时自动阻塞                       
}
                                                        
fun consume() {                                         
    buffer.take() // buffer空时自动阻塞
    sleep(2000) // mock consume                         
}                                                       
                                                                                                             

注意 BlockingQueue 的有三组读/写方法,只有一组有阻塞效果,不要用错

方法 说明
add(o)/remove(o) add方法在添加元素的时候,若超出了度列的长度会直接抛出异常
offer(o)/poll(o) offer在添加元素时,如果发现队列已满无法添加的话,会直接返回false
put(o)/take(o) put向队尾添加元素的时候发现队列已经满了会发生阻塞一直等待空间,以加入元素

<br/>

4. Semaphore(信号量)

Semaphore 是 JUC 提供的一种共享锁机制,可以进行拥塞控制,此特性可用来控制 buffer 的大小。

// canProduce: 可以生产数量(即buffer可用的数量),生产者调用acquire,减少permit数目    
val canProduce = Semaphore(5)                                                                                           
// canConsumer:可以消费数量,生产者调用release,增加permit数目                  
val canConsume = Semaphore(5)                                                                                      
// 控制buffer访问互斥                                                
val mutex = Semaphore(0)                                       
                                                               
val buffer = LinkedList<Data>()                                
                                                               
fun produce(data: Data) {                                      
    if (canProduce.tryAcquire()) {                             
        sleep(2000) // mock produce                            
                                                               
        mutex.acquire()                                        
        buffer.push(data)                                      
        mutex.release()                                        
                                                               
        canConsume.release() //通知消费端新增加了一个产品                   
    }                                                          
}                                                              
                                                               
fun consume() {                                                
    if (canConsume.tryAcquire()) {                             
        sleep(2000) // mock consume                            
                                                               
        mutex.acquire()                                        
        buffer.removeFirst()                                   
        mutex.release()                                        
                                                               
        canProduce.release() //通知生产端可以再追加生产                    
    }                                                          
                                                               
}                                               

<br/>

5. PipedXXXStream (管道)

Java里的管道输入/输出流 PipedInputStream / PipedOutputStream 实现了类似管道的功能,用于不同线程之间的相互通信,输入流中有一个缓冲数组,当缓冲数组为空的时候,输入流 PipedInputStream 所在的线程将阻塞

val pis: PipedInputStream = PipedInputStream()
val pos: PipedOutputStream by lazy {
    PipedOutputStream().apply {
        pis.connect(this) //输入输出流之间建立连接
    }
}

fun produce(data: ContactsContract.Data) {
    while (true) {
        sleep(2000)
        pos.use { // Kotlin 使用 use 方便的进行资源释放
            it.write(data.getBytes())
            it.flush()
        }
    }
}

fun consume() {
    while (true) {
        sleep(2000)
        pis.use {
            val byteArray = ByteArray(1024)
            it.read(byteArray)
        }
    }
}

@Test
fun Test() {
    repeat(10) {
        Thread { produce(Data()) }.start()
    }

    repeat(10) {
        Thread { consume() }.start()
    }
}

<br/>

6. RxJava

RxJava 从概念上,可以将 Observable/Subject 作为生产者, Subscriber 作为消费者, 但是无论 Subject 或是 Observable 都缺少 Buffer 溢出时的阻塞机制,难以独立实现生产者/消费者模型。

Flowable 的背压机制,可以用来控制 buffer 数量,并在上下游之间建立通信, 配合 Atomic 可以变向实现单生产者/单消费者场景,(不适用于多生产者/多消费者场景)。

class Producer : Flowable<Data>() {

    override fun subscribeActual(subscriber: org.reactivestreams.Subscriber<in Data>) {
        subscriber.onSubscribe(object : Subscription {
            override fun cancel() {
                //...
            }

            private val outStandingRequests = AtomicLong(0)

            override fun request(n: Long) { //收到下游通知,开始生产
                outStandingRequests.addAndGet(n)

                while (outStandingRequests.get() > 0) {
                    sleep(2000)
                    subscriber.onNext(Data())
                    outStandingRequests.decrementAndGet()
                }
            }

        })
    }

}



class Consumer : DefaultSubscriber<Data>() {

    override fun onStart() {
        request(1)
    }

    override fun onNext(i: Data?) {
        sleep(2000) //mock consume
        request(1) //通知上游可以增加生产
    }

    override fun onError(throwable: Throwable) {
        //...
    }

    override fun onComplete() {
        //...
    }

}


@Test
fun test_rxjava() {
    try {
        val testProducer = Producer)
        val testConsumer = Consumer()

        testProducer
            .subscribeOn(Schedulers.computation())
            .observeOn(Schedulers.single())
            .blockingSubscribe(testConsumer)

    } catch (t: Throwable) {
        t.printStackTrace()
    }

}

<br/>

7. Coroutine Channel

协程中的 Channel 具有拥塞控制机制,可以实现生产者消费者之间的通信。
可以把 Channel 理解为一个协程版本的阻塞队列,capacity 指定队列容量。


val channel = Channel<Data>(capacity = 5)

suspend fun produce(data: ContactsContract.Contacts.Data) = run {
    delay(2000) //mock produce
    channel.send(data)
}


suspend fun consume() = run {
    delay(2000)//mock consume
    channel.receive()
}

@Test
fun test_channel() {
    repeat(10) {
        GlobalScope.launch {
            produce(Data())
        }
    }

    repeat(10) {
        GlobalScope.launch {
           consume()
        }
    }
}

此外,Coroutine 提供了 produce 方法,在声明 Channel 的同时生产数据,写法上更简单,适合单消费者单生产者的场景:

fun CoroutineScope.produce(): ReceiveChannel<Data> = produce {
    repeat(10) {
        delay(2000) //mock produce
        send(Data())
    }
}

@Test
fun test_produce() {
    GlobalScope.launch {
        produce.consumeEach {
            delay(2000) //mock consume
        }
    }
}

8. Coroutine Flow

Flow 跟 RxJava 一样,因为缺少 Buffer 溢出时的阻塞机制,不适合处理生产消费问题,其背压机制也比较简单,无法像 RxJava 那样收到下游通知。 但是 Flow 后来发布了 SharedFlow, 作为带缓冲的热流,提供了 Buffer 溢出策略,可以用作生产者/消费者之间的同步。

val flow : MutableSharedFlow<Data> = MutableSharedFlow(
    extraBufferCapacity = 5  //缓冲大小
    , onBufferOverflow = BufferOverflow.SUSPEND // 缓冲溢出时的策略:挂起
)

@Test
fun test() {

    GlobalScope.launch {
        repeat(10) {
            delay(2000) //mock produce
            sharedFlow.emit(Data())
        }
    }

    GlobalScope.launch {
        sharedFlow.collect {
            delay(2000) //mock consume
        }
    }
}

注意 SharedFlow 也只能用在单生产者/单消费者场景

<br/>

总结

生产者/消费者问题,其本质核心还是多线程读写共享资源(Buffer)时的同步问题,理论上只要具有同步机制的多线程框架,例如线程锁、信号量、阻塞队列、协程 Channel等,都是可以实现生产消费模型的。

另外,RxJava 和 Flow 虽然也是多线程框架,但是缺少Buffer溢出时的阻塞机制,不适用于生产/消费场景,更适合在纯响应式场景中使用。

目录
相关文章
|
3月前
|
Android开发 Kotlin
Android经典面试题之Kotlin的==和===有什么区别?
本文介绍了 Kotlin 中 `==` 和 `===` 操作符的区别:`==` 用于比较值是否相等,而 `===` 用于检查对象身份。对于基本类型,两者行为相似;对于对象引用,`==` 比较值相等性,`===` 检查引用是否指向同一实例。此外,还列举了其他常用比较操作符及其应用场景。
193 93
|
2月前
|
JSON 调度 数据库
Android面试之5个Kotlin深度面试题:协程、密封类和高阶函数
本文首发于公众号“AntDream”,欢迎微信搜索“AntDream”或扫描文章底部二维码关注,和我一起每天进步一点点。文章详细解析了Kotlin中的协程、扩展函数、高阶函数、密封类及`inline`和`reified`关键字在Android开发中的应用,帮助读者更好地理解和使用这些特性。
32 1
|
2月前
|
Android开发 Kotlin
Android面试题之Kotlin中如何实现串行和并行任务?
本文介绍了 Kotlin 中 `async` 和 `await` 在并发编程中的应用,包括并行与串行任务的处理方法。并通过示例代码展示了如何启动并收集异步任务的结果。
33 0
|
2月前
|
Java 调度 Android开发
Android面试题之Kotlin中async 和 await实现并发的原理和面试总结
本文首发于公众号“AntDream”,详细解析了Kotlin协程中`async`与`await`的原理及其非阻塞特性,并提供了相关面试题及答案。协程作为轻量级线程,由Kotlin运行时库管理,`async`用于启动协程并返回`Deferred`对象,`await`则用于等待该对象完成并获取结果。文章还探讨了协程与传统线程的区别,并展示了如何取消协程任务及正确释放资源。
37 0
|
5月前
|
安全 Android开发 Kotlin
Android经典面试题之Kotlin中常见作用域函数
**Kotlin作用域函数概览**: `let`, `run`, `with`, `apply`, `also`. `let`安全调用并返回结果; `run`在上下文中执行代码并返回结果; `with`执行代码块,返回结果; `apply`配置对象后返回自身; `also`附加操作后返回自身
62 8
|
5月前
|
SQL 安全 Java
Android经典面试题之Kotlin中object关键字实现的是什么类型的单例模式?原理是什么?怎么实现双重检验锁单例模式?
Kotlin 单例模式概览 在 Kotlin 中,`object` 关键字轻松实现单例,提供线程安全的“饿汉式”单例。例如: 要延迟初始化,可使用 `companion object` 和 `lazy` 委托: 对于参数化的线程安全单例,结合 `@Volatile` 和 `synchronized`
66 6
|
5月前
|
消息中间件 存储 Kafka
面试题Kafka问题之Kafka的消费者(Consumer)跟踪消息如何解决
面试题Kafka问题之Kafka的消费者(Consumer)跟踪消息如何解决
61 0
|
5月前
|
Android开发 Kotlin
Android经典面试题之Kotlin中Lambda表达式有哪些用法
Kotlin的Lambda表达式是匿名函数的简洁形式,常用于集合操作和高阶函数。基本语法是`{参数 -&gt; 表达式}`。例如,`{a, b -&gt; a + b}`是一个加法lambda。它们可在`map`、`filter`等函数中使用,也可作为参数传递。单参数时可使用`it`关键字,如`list.map { it * 2 }`。类型推断简化了类型声明。
31 0
|
4月前
|
存储 Java
【IO面试题 四】、介绍一下Java的序列化与反序列化
Java的序列化与反序列化允许对象通过实现Serializable接口转换成字节序列并存储或传输,之后可以通过ObjectInputStream和ObjectOutputStream的方法将这些字节序列恢复成对象。
|
28天前
|
存储 缓存 算法
面试官:单核 CPU 支持 Java 多线程吗?为什么?被问懵了!
本文介绍了多线程环境下的几个关键概念,包括时间片、超线程、上下文切换及其影响因素,以及线程调度的两种方式——抢占式调度和协同式调度。文章还讨论了减少上下文切换次数以提高多线程程序效率的方法,如无锁并发编程、使用CAS算法等,并提出了合理的线程数量配置策略,以平衡CPU利用率和线程切换开销。
面试官:单核 CPU 支持 Java 多线程吗?为什么?被问懵了!