RxJava2源码分析(三):线程调度分析3

简介: 线程调度分析

发射数据流程分析

  根据前面两篇的分析,可以知道

emitter.onNext("wizardev");

这句代码就是调用下游的onNext方法,这里就会调用SubscribeOnObserver类的onNext方法,SubscribeOnObserver类的onNext方法的源码如下

public void onNext(T t) {
            downstream.onNext(t);
        }

这里直接调用了下游的onNext方法,这个下游就是ObserveOnObserver类即这里会调用ObserveOnObserver类中的onNext方法,ObserveOnObserver类中的onNext方法代码如下

 public void onNext(T t) {
     //这里的done为初始值false
            if (done) {
                return;
            }
//sourceMode为初始值0
            if (sourceMode != QueueDisposable.ASYNC) {
//将上游发射的数据放入队列中,这个queue就是在onSubscribe方法中实例化的
                queue.offer(t);
            }
     //调用方法
            schedule();
        }

继续看schedule方法的源码。如下

 void schedule() {
            if (getAndIncrement() == 0) {
 //根据上文的分析可以知道这个worker就是HandlerScheduler的内部类
                //HandlerWorker的实例
                worker.schedule(this);
            }
        }

这里讲解一下这句

worker.schedule(this);

代码,这里的worker就是HandlerScheduler内部类HandlerWorker的实例,所以这里调用了HandlerScheduler内部类HandlerWorker的schedule方法并将ObserveOnObserver实例作为参数传入。 现在,来看HandlerWorker类中的schedule方法,代码如下

public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
            if (run == null) throw new NullPointerException("run == null");
            if (unit == null) throw new NullPointerException("unit == null");
            if (disposed) {
                return Disposables.disposed();
            }
            run = RxJavaPlugins.onSchedule(run);
            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
            Message message = Message.obtain(handler, scheduled);
            message.obj = this; // Used as token for batch disposal of this worker's runnables.
            if (async) {
                message.setAsynchronous(true);
            }
            handler.sendMessageDelayed(message, unit.toMillis(delay));
            // Re-check disposed state for removing in case we were racing a call to dispose().
            if (disposed) {
                handler.removeCallbacks(scheduled);
                return Disposables.disposed();
            }
            return scheduled;
        }

这个方法中重要的就是下面这段代码

run = RxJavaPlugins.onSchedule(run);
            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
            Message message = Message.obtain(handler, scheduled);
            message.obj = this; // Used as token for batch disposal of this worker's runnables.
            if (async) {
                message.setAsynchronous(true);
            }
//将message放入messageQueue中等待轮询,这里的handler在主线程中,
//所以这里执行scheduled的run方法,其实已经切换到主线程中了
            handler.sendMessageDelayed(message, unit.toMillis(delay));

了解Handler原理的同学就会知道上面的这段代码最终会调用scheduled的run方法。不了解Handler原理的同学,可以看下我的这篇文章。这里的scheduled的run方法会调用

run = RxJavaPlugins.onSchedule(run)

;这句代码的run方法,即调用的是ObserveOnObserver类中的run方法,看下ObserveOnObserver类中的run方法的代码,如下

  public void run() {
            if (outputFused) {
                drainFused();
            } else {
                drainNormal();
            }
        }

上面代码中的outputFused的初始值为false,所以会执行else语句中的代码,看下drainNormal的代码,如下

void drainNormal() {
            int missed = 1;
//这个queue是在onSubscribe初始化的,在onNext中将上游的数据添加进去的
            final SimpleQueue<T> q = queue;
  //将下游的observe赋值给a
            final Observer<? super T> a = downstream;
//
            for (;;) {
                if (checkTerminated(done, q.isEmpty(), a)) {
                    return;
                }
//开始轮询
                for (;;) {
                    boolean d = done;
                    T v;
                    try {
             //轮询取出q中的值,这里的值就是在上游发射的
                        v = q.poll();
                    } catch (Throwable ex) {
                        Exceptions.throwIfFatal(ex);
                        disposed = true;
                        upstream.dispose();
                        q.clear();
                        a.onError(ex);
                        worker.dispose();
                        return;
                    }
                    boolean empty = v == null;
                    if (checkTerminated(d, empty, a)) {
                        return;
                    }
                    if (empty) {
                        break;
                    }
//取出的值,传递给下游的onNext方法
                    a.onNext(v);
                }
                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }

重要的代码,已经在代码中进行了注释,这里就不再讲解,上面代码的作用就是,不断取出上游发射的数据,然后调用下游的onNext方法并将取出的值传递进去。

总结

  分析到这里,算是将RxJava2线程调度的源码理清楚了。可以发现在进行线程调度的时候大量的使用Runnable,一层层的包装,然后在一层层的来调用。首先,将线程切换到工作线程中的方法是将调用上游subscribe方法放在了Runnable类中的run方法中,然后将这个Runnable层层包装后放进线程队列中等待执行,最后在工作线程中处理发射的数据。

  将线程切换到主线程中的方法是利用Handler,将处理好的数据放进一个队列中,放进队列中的这个动作还是在工作线程中完成的,然后,利用Handler将线程切换到主线程,最后不断的取出队列中的数据,不断调用下游的onNext方法。通过这种方式来完成线程的调度。

结束语

  通过上面的分析,可以发现RxJava2线程调度还是挺复杂的,牵涉到的知识点也是比较多的,为了更简单,更有条理的讲解RxJava2线程调度的原理,同时为了让大家不至于在源码中迷失,所以这里分析源码按照代码的执行顺序一步步的进行的。因为,代码执行的时候会在不同的类之间来回切换,所以,大家会发现分析的时候在各个类中跳来跳去。

  由于篇幅的原因,文章的一些知识没有详细的来讲解,如Java的线程池,Handler原理等,大家可以自己查阅相关资料或者留言一起讨论,如果发现文中有不对的地方,也欢迎指正。

相关文章
|
25天前
|
存储 SQL 监控
JAVA 线程池的分析和使用
JAVA 线程池的分析和使用
19 0
|
2月前
|
SQL Dubbo Java
案例分析|线程池相关故障梳理&总结
本文作者梳理和分享了线程池类的故障,分别从故障视角和技术视角两个角度来分析总结,故障视角可以看到现象和教训,而技术视角可以透过现象看到本质更进一步可以看看如何避免。
84687 136
案例分析|线程池相关故障梳理&总结
|
8天前
|
消息中间件 Java 数据安全/隐私保护
线程间通信的方法与比较分析
线程间通信的方法与比较分析
|
10天前
|
Java 数据库连接 调度
Java多线程,对锁机制的进一步分析
Java多线程,对锁机制的进一步分析
|
2月前
|
算法 调度
【操作系统】处理机调度的基本概念和三个层次、进程调度的时机和方式、调度器、闲逛线程
【操作系统】处理机调度的基本概念和三个层次、进程调度的时机和方式、调度器、闲逛线程
164 3
|
2月前
|
Java
【Java多线程】分析线程加锁导致的死锁问题以及解决方案
【Java多线程】分析线程加锁导致的死锁问题以及解决方案
46 1
|
2月前
|
存储 Java 调度
Java多线程基础-1:通俗简介操作系统之进程的管理与调度
操作系统是一个复杂的软件,具备许多功能。其中,进程的管理与调度是与我们密切相关的。本文将对操作系统功能中进程管理与调度作出介绍。
32 0
|
2月前
|
Java 调度
多线程的基本概念和实现方式,线程的调度,守护线程、礼让线程、插入线程
多线程的基本概念和实现方式,线程的调度,守护线程、礼让线程、插入线程
27 0
|
2月前
|
安全
并发编程之变量的线程安全分析的详细解析
并发编程之变量的线程安全分析的详细解析
18 0
|
2月前
|
存储 算法 Linux
【Linux 系统标准 进程资源】Linux 创建一个最基本的进程所需的资源分析,以及线程资源与之的差异
【Linux 系统标准 进程资源】Linux 创建一个最基本的进程所需的资源分析,以及线程资源与之的差异
148 0