RxJava2源码分析(三):线程调度分析1

简介: 线程调度分析

前言:经过前面两篇文章对RxJava2源码的分析,我们已经对RxJava2的基本流程及操作符的原理有了一定程度的认识。这篇文章将在前面两篇文章的基础上,对RxJava2的线程调度进行分析,建议先阅读前面两篇的文章,再阅读本文。

注:文章内容过多,建议在空闲时阅读。

相关文章

示例代码

  为了更好的理解RxJava2的线程调度原理,不被其他的代码所干扰,这里就只贴出与线程调度有关的代码,如下

private void threadScheduleCode() {
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                Log.e("wizardev", "上游所在的线程: "+Thread.currentThread().getName());
                Thread.sleep(2*1000);
                emitter.onNext("wizardev");
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.e("wizardev", "onSubscribe: "+Thread.currentThread().getName() );
                    }
                    @Override
                    public void onNext(String s) {
                        Log.e("wizardev", "接收到上游发射的数据为: " + s);
                        Log.e("wizardev", "下游所在的线程: "+ Thread.currentThread().getName());
                    }
                    @Override
                    public void onError(Throwable e) {
                    }
                    @Override
                    public void onComplete() {
                    }
                });
    }

可以看下执行这段代码后打印的日志,如下

e4536bf73c83437b30bb2dac9a5d498.png

可以发现上游和下游确实不在同一个线程中,那么RxJava2是怎么进行线程切换的呢?想知道答案,请继续阅读本文。

本文要解决的问题

  本文要解决的问题其实就一个,就是RxJava2是如何进行线程调度的?但是,围绕着这个问题又会有两个小的问题需要解决:

  1. subscribeOn是怎样将要处理的数据放到到工作线程的?
  2. observeOn是怎样将工作线程切换到主线程的?

为了能够更容易理解线程调度的原理,这里对源码分析的顺序将会按照代码的执行顺序进行分析。

subscribeOn方法分析

  因为前面的文章已经分析过了create方法,所以就直接分析subscribeOn这个方法,直接上源码,如下

public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

有了前面分析源码的经验,可以知道,subscribeOn方法其实就是返回了ObservableSubscribeOn类的实例并将上游的ObservableCreate和subscribeOn方法的参数注入到了它的构造方法中。 继续看下ObservableSubscribeOn类的源码,如下

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }
    @Override
    public void subscribeActual(final Observer<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
        observer.onSubscribe(parent);
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
    //...
    //省略部分源码
}

从源码中可以看到,这里分别将ObservableCreate类的实例以及subscribeOn方法的参数即Schedulers.io()作为了ObservableSubscribeOn类的成员变量。 好了,上面的这些就是执行

subscribeOn(Schedulers.io())

这句代码所做的事情了,下面来看下

observeOn(AndroidSchedulers.mainThread())

这句代码所做的事情。


相关文章
|
6月前
|
Java 调度 开发者
【JavaSE专栏84】线程让步,一种线程调度的机制
【JavaSE专栏84】线程让步,一种线程调度的机制
|
1月前
|
资源调度 算法 Linux
Linux进程/线程的调度机制介绍:详细解析Linux系统中进程/线程的调度优先级规则
Linux进程/线程的调度机制介绍:详细解析Linux系统中进程/线程的调度优先级规则
118 0
|
1月前
|
算法 Linux 调度
Linux 线程介绍:介绍Linux系统中线程的基本概念、创建和调度机制
Linux 线程介绍:介绍Linux系统中线程的基本概念、创建和调度机制
26 0
|
7月前
|
资源调度 算法 Java
Java线程常用调度算法与应用
Java线程常用调度算法与应用
99 0
|
7月前
|
安全 Java 调度
进程与线程的关系,进程调度的基本过程
进程是操作系统分配资源的基本单位,每个进程都有自己的内存空间,独立分配的CPU时间片,以及其他系统资源。 线程共享所属进程的资源,它们通常更轻量级,创建和切换线程的开销较小。
52 0
|
3月前
|
算法 程序员 调度
操作系统:线程同步和调度
操作系统:线程同步和调度
25 0
|
4月前
|
XML Java 调度
Android App网络通信中通过runOnUiThread快速操纵界面以及利用线程池Executor调度异步任务实战(附源码 简单易懂)
Android App网络通信中通过runOnUiThread快速操纵界面以及利用线程池Executor调度异步任务实战(附源码 简单易懂)
31 0
|
4月前
|
Java Unix Linux
认真学习Java中线程实现和调度
认真学习Java中线程实现和调度
46 0
|
5月前
|
Linux 调度
Linux线程调度实验
Linux线程调度实验
25 0