容易被忽略的知识点:RxJava操作符的线程安全

简介: RxJava操作符大多不是线程安全的,如果所以写代码时不注意这一点很容易发生bug。本文将分享如何写成更安全的Rxjava代码

image.png

随着RxJava的深入使用,渐渐发现了一个令人不安的真相:

"RxJava的大部分操作符并非线程安全的。"

在一些多线程场景下对RxJava的滥用会发生不符合预期的现象。

Rx操作符并非线程安全

很多人对RxJava的定义是一个异步响应式框架,既然是为异步处理而生的框架线程不安全?

是的,支持异步并不意味着支持并发

线程不安全会发生什么问题呢?RxJava中的大部分操作符都是线程不安全的,
当多线程同时像一个stream发射数据时,操作符的结果可能不符合预期。

以一个常用的操作符 take(n) 为例:

@JvmStatic
fun main(args: Array<String>) {
   val numberOfThreads = 10
   repeat(1000) {
       println("Iteration = $it")

       val publishSubject = PublishSubject.create<Int>() 
       
       val actuallyReceived = AtomicInteger()

       publishSubject.take(3).subscribe { 
               actuallyReceived.incrementAndGet() 
       }

       val latch = CountDownLatch(numberOfThreads)
       var threads = listOf<Thread>()

       (0..numberOfThreads).forEach {
            threads += thread(start = false) {
                publishSubject.onNext(it)
                latch.countDown()
            }
        }

        threads.forEach { it.start() }
        latch.await()

        check(actuallyReceived.get() == 3)
    }
}

执行上面代码,由于take的结果不符合预期,总是会异常退出

在这里插入图片描述
看一下take的源码:

public final class ObservableTake<T> extends AbstractObservableWithUpstream<T, T> {
    final long limit;

    public ObservableTake(ObservableSource<T> source, long limit) {
        super(source);
        this.limit = limit;
    }
    protected void subscribeActual(Observer<? super T> observer) {
        this.source.subscribe(new ObservableTake.TakeObserver(observer, this.limit));
    }

    static final class TakeObserver<T> implements Observer<T>, Disposable {
        final Observer<? super T> downstream;
        boolean done;
        Disposable upstream;
        long remaining;

        TakeObserver(Observer<? super T> actual, long limit) {
            this.downstream = actual;
            this.remaining = limit;
        }

   
        public void onNext(T t) {
            if (!this.done && this.remaining-- > 0L) {
                boolean stop = this.remaining == 0L;
                this.downstream.onNext(t);
                if (stop) {
                    this.onComplete();
                }
            }

        }
    }
}

果然不出所料,remaining--没有任何锁操作,无法保证线程安全。

<br/>

The Observable Contract

Rx在对Observable的定义中已经明确告诉我们了:

Observables must issue notifications to observers serially (not in parallel). They may issue these notifications from different threads, but there must be a formal happens-before relationship between the notifications.

http://reactivex.io/documentation/contract.html

happens-before relationship 需要我们保证进入stream数据的先后顺序,避免并发行为。根据官方的解释,这样做可以避免一些有锁操作带来的性能下降,因此仅在必要的时候才确保线程安全。

操作符的线程安全

那么哪些操作符是线程安全的呢?

RxJava的操作符种类繁多,一个一个记忆很难,基本上可以按照这个原则区分:

  • 操作单个Observable的操作符都不是线程不安全的,例如常用的 take(n)map()distinctUntilChanged() 等,但是带有scheduler参数的除外,例如 window(…, scheduler)debounce(…, scheduler)
  • 操作多个Observable的操作符是线程安全的,例如 merge()combineLatest()zip()

用代码描述大概是这种感觉:

fun operatorThreadSafety() = if (operator.worksWithOneObservable() &&  
    operator.supportsScheduling == false) {
    Operator.NOT_THREAD_SAFE_AND_THAT_IS_OK
} else {
    Operator.MOST_LIKELY_THREAD_SAFE
}

<br/>

Subject的线程安全

相对于操作符的线程安全,个人认为Subject的使用更需要大家注意。常用的Subject都不是线程安全的(SerializedSubject除外),而最容易出现并发操作的场景恰恰是Subject,例如我们经常会使用Subject作为中继器,异步onNext向Subject发射数据。前面take的例子便是这种场景。

更要命的是我们常配合observeOn来进行线程切换,而observeOn本身也并非线程安全的,翻看其源码会发现,observeOn在切线程时使用了一个线程不安全的队列

queue = new SpscArrayQueue<Object>(RxRingBuffer.SIZE);

因此,下面的代码在并发环境中必然会发生问题:

@JvmStatic
fun main(args: Array<String>) {
    val numberOfThreads = 10000

       val publishSubject = PublishSubject.create<Int>()
    val actuallyReceived = AtomicInteger()

    publishSubject
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe {
            actuallyReceived.incrementAndGet()
        }

    val latch = CountDownLatch(numberOfThreads)
    var threads = listOf<Thread>()

    (0..numberOfThreads).forEach {
        threads += thread(start = false) {
            publishSubject.onNext(it)
            latch.countDown()
        }
    }

    threads.forEach { it.start() }
    latch.await()

    print("actuallyReceived: $actuallyReceived")

}

由于observeOn切了线程,结果总是会漏掉几个数据

<br/>

使用SerializedSubject

在并发环境中,使用toSerialized转成SerializedSubject,可以避免上述问题

最后

RxJava处于设计和实现上的考虑,很多操作符以及Subject都不是线程安全的,作为开发者需要尽量遵守The Observable Contract,避免在并发环境下使用,如果出现并发使用时,使用Observable.serialize() 或者 Subject.toSerialized() 保证线程安全。

The Observable Contract: <br/>
http://reactivex.io/documentation/contract.html

目录
相关文章
|
Java
Java线程知识点总结
Java线程知识点总结
66 0
|
8月前
|
安全 Java
Java多线程(全知识点)(下)
Java多线程(全知识点)(下)
74 0
|
2月前
|
监控 Java
JavaGuide知识点整理——线程池的最佳实践
总之,合理使用和配置线程池是提高 Java 程序性能和稳定性的重要手段。遵循最佳实践,可以更好地发挥线程池的作用,提升系统的运行效率。同时,要不断地进行监控和优化,以适应不同的业务需求和环境变化。
130 63
|
4月前
|
存储 消息中间件 资源调度
「offer来了」进程线程有啥关系?10个知识点带你巩固操作系统基础知识
该文章总结了操作系统基础知识中的十个关键知识点,涵盖了进程与线程的概念及区别、进程间通信方式、线程同步机制、死锁现象及其预防方法、进程状态等内容,并通过具体实例帮助理解这些概念。
「offer来了」进程线程有啥关系?10个知识点带你巩固操作系统基础知识
|
3月前
|
Java Python
python知识点100篇系列(16)-python中如何获取线程的返回值
【10月更文挑战第3天】本文介绍了两种在Python中实现多线程并获取返回值的方法。第一种是通过自定义线程类继承`Thread`类,重写`run`和`join`方法来实现;第二种则是利用`concurrent.futures`库,通过`ThreadPoolExecutor`管理线程池,简化了线程管理和结果获取的过程,推荐使用。示例代码展示了这两种方法的具体实现方式。
python知识点100篇系列(16)-python中如何获取线程的返回值
|
8月前
|
Java 调度
Java多线程(全知识点)(上)
Java多线程(全知识点)
87 0
|
7月前
|
安全 Java 编译器
JAVA-多线程知识点总结(二)
JAVA-多线程知识点总结(二)
|
8月前
|
存储 安全 Java
并发编程知识点(volatile、JMM、锁、CAS、阻塞队列、线程池、死锁)
并发编程知识点(volatile、JMM、锁、CAS、阻塞队列、线程池、死锁)
122 3
|
6月前
|
存储 并行计算 安全
Java面试题:Java内存管理、多线程与并发框架的面试题解析与知识点梳理,深入Java内存模型与垃圾回收机制,Java多线程机制与线程安全,Java并发工具包与框架的应用
Java面试题:Java内存管理、多线程与并发框架的面试题解析与知识点梳理,深入Java内存模型与垃圾回收机制,Java多线程机制与线程安全,Java并发工具包与框架的应用
100 0
|
7月前
|
Oracle Java 关系型数据库
面试知识点:notify是随机唤醒线程吗(唤醒线程顺序)?
面试知识点:notify是随机唤醒线程吗(唤醒线程顺序)?
232 0