RxJava2源码分析(三):线程调度分析1

简介: 线程调度分析

前言:经过前面两篇文章对RxJava2源码的分析,我们已经对RxJava2的基本流程及操作符的原理有了一定程度的认识。这篇文章将在前面两篇文章的基础上,对RxJava2的线程调度进行分析,建议先阅读前面两篇的文章,再阅读本文。

注:文章内容过多,建议在空闲时阅读。

相关文章

示例代码

  为了更好的理解RxJava2的线程调度原理,不被其他的代码所干扰,这里就只贴出与线程调度有关的代码,如下

private void threadScheduleCode() {
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                Log.e("wizardev", "上游所在的线程: "+Thread.currentThread().getName());
                Thread.sleep(2*1000);
                emitter.onNext("wizardev");
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.e("wizardev", "onSubscribe: "+Thread.currentThread().getName() );
                    }
                    @Override
                    public void onNext(String s) {
                        Log.e("wizardev", "接收到上游发射的数据为: " + s);
                        Log.e("wizardev", "下游所在的线程: "+ Thread.currentThread().getName());
                    }
                    @Override
                    public void onError(Throwable e) {
                    }
                    @Override
                    public void onComplete() {
                    }
                });
    }

可以看下执行这段代码后打印的日志,如下

e4536bf73c83437b30bb2dac9a5d498.png

可以发现上游和下游确实不在同一个线程中,那么RxJava2是怎么进行线程切换的呢?想知道答案,请继续阅读本文。

本文要解决的问题

  本文要解决的问题其实就一个,就是RxJava2是如何进行线程调度的?但是,围绕着这个问题又会有两个小的问题需要解决:

  1. subscribeOn是怎样将要处理的数据放到到工作线程的?
  2. observeOn是怎样将工作线程切换到主线程的?

为了能够更容易理解线程调度的原理,这里对源码分析的顺序将会按照代码的执行顺序进行分析。

subscribeOn方法分析

  因为前面的文章已经分析过了create方法,所以就直接分析subscribeOn这个方法,直接上源码,如下

public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

有了前面分析源码的经验,可以知道,subscribeOn方法其实就是返回了ObservableSubscribeOn类的实例并将上游的ObservableCreate和subscribeOn方法的参数注入到了它的构造方法中。 继续看下ObservableSubscribeOn类的源码,如下

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }
    @Override
    public void subscribeActual(final Observer<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
        observer.onSubscribe(parent);
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
    //...
    //省略部分源码
}

从源码中可以看到,这里分别将ObservableCreate类的实例以及subscribeOn方法的参数即Schedulers.io()作为了ObservableSubscribeOn类的成员变量。 好了,上面的这些就是执行

subscribeOn(Schedulers.io())

这句代码所做的事情了,下面来看下

observeOn(AndroidSchedulers.mainThread())

这句代码所做的事情。


相关文章
|
2月前
|
存储 NoSQL Redis
Redis 新版本引入多线程的利弊分析
【10月更文挑战第16天】Redis 新版本引入多线程是一个具有挑战性和机遇的改变。虽然多线程带来了一些潜在的问题和挑战,但也为 Redis 提供了进一步提升性能和扩展能力的可能性。在实际应用中,我们需要根据具体的需求和场景,综合评估多线程的利弊,谨慎地选择和使用 Redis 的新版本。同时,Redis 开发者也需要不断努力,优化和完善多线程机制,以提供更加稳定、高效和可靠的 Redis 服务。
62 1
|
2月前
线程CPU异常定位分析
【10月更文挑战第3天】 开发过程中会出现一些CPU异常升高的问题,想要定位到具体的位置就需要一系列的分析,记录一些分析手段。
81 0
|
23天前
|
调度 开发者
核心概念解析:进程与线程的对比分析
在操作系统和计算机编程领域,进程和线程是两个基本而核心的概念。它们是程序执行和资源管理的基础,但它们之间存在显著的差异。本文将深入探讨进程与线程的区别,并分析它们在现代软件开发中的应用和重要性。
41 4
|
1月前
|
开发框架 Java .NET
.net core 非阻塞的异步编程 及 线程调度过程
【11月更文挑战第12天】本文介绍了.NET Core中的非阻塞异步编程,包括其基本概念、实现方式及应用示例。通过`async`和`await`关键字,程序可在等待I/O操作时保持线程不被阻塞,提高性能。文章还详细说明了异步方法的基础示例、线程调度过程、延续任务机制、同步上下文的作用以及如何使用`Task.WhenAll`和`Task.WhenAny`处理多个异步任务的并发执行。
|
2月前
|
安全 调度 C#
STA模型、同步上下文和多线程、异步调度
【10月更文挑战第19天】本文介绍了 STA 模型、同步上下文和多线程、异步调度的概念及其优缺点。STA 模型适用于单线程环境,确保资源访问的顺序性;同步上下文和多线程提高了程序的并发性和响应性,但增加了复杂性;异步调度提升了程序的响应性和资源利用率,但也带来了编程复杂性和错误处理的挑战。选择合适的模型需根据具体应用场景和需求进行权衡。
|
3月前
|
存储 Java 数据处理
进程中的线程调度
进程是应用程序运行的基本单位,包括主线程、用户线程和守护线程。计算机由存储器和处理器协同操作,操作系统设计为分时和分任务模式。在个人PC普及后,基于用户的时间片异步任务操作系统确保了更好的体验和性能。线程作为进程的调度单元,通过覆写`Thread`类的`run`方法来处理任务数据,并由系统调度框架统一管理。微服务架构进一步将应用分解为多个子服务,在不同节点上执行,提高数据处理效率与容错性,特别是在大规模数据存储和处理中表现显著。例如,利用微服务框架可以优化算法,加速业务逻辑处理,并在不同区块间分配海量数据存储任务。
|
3月前
|
并行计算 API 调度
探索Python中的并发编程:线程与进程的对比分析
【9月更文挑战第21天】本文深入探讨了Python中并发编程的核心概念,通过直观的代码示例和清晰的逻辑推理,引导读者理解线程与进程在解决并发问题时的不同应用场景。我们将从基础理论出发,逐步过渡到实际案例分析,旨在揭示Python并发模型的内在机制,并比较它们在执行效率、资源占用和适用场景方面的差异。文章不仅适合初学者构建并发编程的基础认识,同时也为有经验的开发者提供深度思考的视角。
|
4月前
|
存储 监控 Java
|
4月前
|
安全 Java 开发者
Swing 的线程安全分析
【8月更文挑战第22天】
72 4
|
4月前
|
Java 数据库连接 数据库
当线程中发生异常时的情况分析
【8月更文挑战第22天】
133 4