RxJava2源码分析(三):线程调度分析3

简介: 线程调度分析

发射数据流程分析

  根据前面两篇的分析,可以知道

emitter.onNext("wizardev");

这句代码就是调用下游的onNext方法,这里就会调用SubscribeOnObserver类的onNext方法,SubscribeOnObserver类的onNext方法的源码如下

public void onNext(T t) {
            downstream.onNext(t);
        }

这里直接调用了下游的onNext方法,这个下游就是ObserveOnObserver类即这里会调用ObserveOnObserver类中的onNext方法,ObserveOnObserver类中的onNext方法代码如下

 public void onNext(T t) {
     //这里的done为初始值false
            if (done) {
                return;
            }
//sourceMode为初始值0
            if (sourceMode != QueueDisposable.ASYNC) {
//将上游发射的数据放入队列中,这个queue就是在onSubscribe方法中实例化的
                queue.offer(t);
            }
     //调用方法
            schedule();
        }

继续看schedule方法的源码。如下

 void schedule() {
            if (getAndIncrement() == 0) {
 //根据上文的分析可以知道这个worker就是HandlerScheduler的内部类
                //HandlerWorker的实例
                worker.schedule(this);
            }
        }

这里讲解一下这句

worker.schedule(this);

代码,这里的worker就是HandlerScheduler内部类HandlerWorker的实例,所以这里调用了HandlerScheduler内部类HandlerWorker的schedule方法并将ObserveOnObserver实例作为参数传入。 现在,来看HandlerWorker类中的schedule方法,代码如下

public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
            if (run == null) throw new NullPointerException("run == null");
            if (unit == null) throw new NullPointerException("unit == null");
            if (disposed) {
                return Disposables.disposed();
            }
            run = RxJavaPlugins.onSchedule(run);
            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
            Message message = Message.obtain(handler, scheduled);
            message.obj = this; // Used as token for batch disposal of this worker's runnables.
            if (async) {
                message.setAsynchronous(true);
            }
            handler.sendMessageDelayed(message, unit.toMillis(delay));
            // Re-check disposed state for removing in case we were racing a call to dispose().
            if (disposed) {
                handler.removeCallbacks(scheduled);
                return Disposables.disposed();
            }
            return scheduled;
        }

这个方法中重要的就是下面这段代码

run = RxJavaPlugins.onSchedule(run);
            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
            Message message = Message.obtain(handler, scheduled);
            message.obj = this; // Used as token for batch disposal of this worker's runnables.
            if (async) {
                message.setAsynchronous(true);
            }
//将message放入messageQueue中等待轮询,这里的handler在主线程中,
//所以这里执行scheduled的run方法,其实已经切换到主线程中了
            handler.sendMessageDelayed(message, unit.toMillis(delay));

了解Handler原理的同学就会知道上面的这段代码最终会调用scheduled的run方法。不了解Handler原理的同学,可以看下我的这篇文章。这里的scheduled的run方法会调用

run = RxJavaPlugins.onSchedule(run)

;这句代码的run方法,即调用的是ObserveOnObserver类中的run方法,看下ObserveOnObserver类中的run方法的代码,如下

  public void run() {
            if (outputFused) {
                drainFused();
            } else {
                drainNormal();
            }
        }

上面代码中的outputFused的初始值为false,所以会执行else语句中的代码,看下drainNormal的代码,如下

void drainNormal() {
            int missed = 1;
//这个queue是在onSubscribe初始化的,在onNext中将上游的数据添加进去的
            final SimpleQueue<T> q = queue;
  //将下游的observe赋值给a
            final Observer<? super T> a = downstream;
//
            for (;;) {
                if (checkTerminated(done, q.isEmpty(), a)) {
                    return;
                }
//开始轮询
                for (;;) {
                    boolean d = done;
                    T v;
                    try {
             //轮询取出q中的值,这里的值就是在上游发射的
                        v = q.poll();
                    } catch (Throwable ex) {
                        Exceptions.throwIfFatal(ex);
                        disposed = true;
                        upstream.dispose();
                        q.clear();
                        a.onError(ex);
                        worker.dispose();
                        return;
                    }
                    boolean empty = v == null;
                    if (checkTerminated(d, empty, a)) {
                        return;
                    }
                    if (empty) {
                        break;
                    }
//取出的值,传递给下游的onNext方法
                    a.onNext(v);
                }
                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }

重要的代码,已经在代码中进行了注释,这里就不再讲解,上面代码的作用就是,不断取出上游发射的数据,然后调用下游的onNext方法并将取出的值传递进去。

总结

  分析到这里,算是将RxJava2线程调度的源码理清楚了。可以发现在进行线程调度的时候大量的使用Runnable,一层层的包装,然后在一层层的来调用。首先,将线程切换到工作线程中的方法是将调用上游subscribe方法放在了Runnable类中的run方法中,然后将这个Runnable层层包装后放进线程队列中等待执行,最后在工作线程中处理发射的数据。

  将线程切换到主线程中的方法是利用Handler,将处理好的数据放进一个队列中,放进队列中的这个动作还是在工作线程中完成的,然后,利用Handler将线程切换到主线程,最后不断的取出队列中的数据,不断调用下游的onNext方法。通过这种方式来完成线程的调度。

结束语

  通过上面的分析,可以发现RxJava2线程调度还是挺复杂的,牵涉到的知识点也是比较多的,为了更简单,更有条理的讲解RxJava2线程调度的原理,同时为了让大家不至于在源码中迷失,所以这里分析源码按照代码的执行顺序一步步的进行的。因为,代码执行的时候会在不同的类之间来回切换,所以,大家会发现分析的时候在各个类中跳来跳去。

  由于篇幅的原因,文章的一些知识没有详细的来讲解,如Java的线程池,Handler原理等,大家可以自己查阅相关资料或者留言一起讨论,如果发现文中有不对的地方,也欢迎指正。

相关文章
|
1月前
|
Linux
一个进程最多可以创建多少个线程基本分析
一个进程最多可以创建多少个线程基本分析
228 1
|
3月前
|
监控 Linux 编译器
多线程死锁检测的分析与实现(linux c)-有向图的应用
在日常的软件开发中,多线程是不可避免的,使用多线程中的一大问题就是线程对锁的不合理使用造成的死锁,死锁一旦发生,将导致多线程程序响应时间长,吞吐量下降甚至宕机崩溃,那么如何检测出一个多线程程序中是否存在死锁呢?在提出解决方案之前,先对死锁产生的原因以及产生的现象做一个分析。最后在用有向环来检测多线程中是否存在死锁的问题。
57 0
|
1月前
|
资源调度 算法 Linux
Linux进程/线程的调度机制介绍:详细解析Linux系统中进程/线程的调度优先级规则
Linux进程/线程的调度机制介绍:详细解析Linux系统中进程/线程的调度优先级规则
118 0
|
5天前
|
SQL Dubbo Java
案例分析|线程池相关故障梳理&总结
本文作者梳理和分享了线程池类的故障,分别从故障视角和技术视角两个角度来分析总结,故障视角可以看到现象和教训,而技术视角可以透过现象看到本质更进一步可以看看如何避免。
83726 0
|
1月前
|
存储 算法 Linux
【Linux 系统标准 进程资源】Linux 创建一个最基本的进程所需的资源分析,以及线程资源与之的差异
【Linux 系统标准 进程资源】Linux 创建一个最基本的进程所需的资源分析,以及线程资源与之的差异
136 0
|
1月前
|
算法 Linux 调度
Linux 线程介绍:介绍Linux系统中线程的基本概念、创建和调度机制
Linux 线程介绍:介绍Linux系统中线程的基本概念、创建和调度机制
26 0
|
2月前
|
数据处理 UED 开发者
Python并发编程之协程与多线程对比分析
本文将从Python并发编程的角度出发,对比分析协程与多线程两种并发处理方式的优缺点及适用场景,帮助读者更好地选择适合自己项目的并发方案。
|
2月前
|
程序员 测试技术 数据处理
Python中的装饰器应用与实现Python并发编程之协程与多线程对比分析
在Python编程中,装饰器是一种强大的工具,能够简洁而优雅地扩展函数或类的功能。本文将深入探讨Python中装饰器的原理、应用场景以及实现方法,帮助读者更好地理解和运用这一重要的编程概念。 本文将从Python并发编程的角度出发,对比分析协程与多线程两种并发处理方式的优缺点及适用场景,帮助读者更好地选择适合自己项目的并发方案。
|
3月前
|
运维 监控 Java
【深入浅出JVM原理及调优】「搭建理论知识框架」全方位带你深度剖析Java线程转储分析的开发指南
学习JVM需要一定的编程经验和计算机基础知识,适用于从事Java开发、系统架构设计、性能优化、研究学习等领域的专业人士和技术爱好者。
55 5
【深入浅出JVM原理及调优】「搭建理论知识框架」全方位带你深度剖析Java线程转储分析的开发指南
|
3月前
|
算法 程序员 调度
操作系统:线程同步和调度
操作系统:线程同步和调度
25 0