09.RxJava线程调度源码分析

简介: 上一次分析了RxJava的运作流程,其中的线程调度方面只是简单提了两句,以我看来,线程调度是RxJava中非常重要的一环,所以今天单独拿出来分析一下。subscribeOnobserveOnsubscribeOn调用可以将之前的操作加如线程池,从...

上一次分析了RxJava的运作流程,其中的线程调度方面只是简单提了两句,以我看来,线程调度是RxJava中非常重要的一环,所以今天单独拿出来分析一下。

subscribeOn
observeOn

subscribeOn调用可以将之前的操作加如线程池,从而保证运行于子线程中,observeOn会使后边的执行运行于主线程,这里的之前和后边均是指的代码结构上的前后

subscribeOn

经过上一篇的分析,可以知道,当subscribeOn调用的时候,会创建一个ObservableSubscribeOn对象返回,与此同时,上一级产生的对象会被保存在当前对象的source变量中,并且,将创建出一个线程池,先看线程池的创建,这里直接以io线程为例
Schedulers.io(Schedulers.io())

public static Scheduler io() {
       return RxJavaPlugins.onIoScheduler(IO);
}

其中的IO是在Schedulers类加载的时候就创建出来的,从这个结构可以看出,IO就是IoScheduler对象,RxJavaPlugins.initIoScheduler方法接收一个Callable线程,返回callable.call,也就是call方法中返回的就是这个函数的返回值(Callable是另一种开启线程的方式,这个线程有返回值,当返回值获取到之前,会阻塞当前线程)

IO = RxJavaPlugins.initIoScheduler(new Callable<Scheduler>() {
            @Override
            public Scheduler call() throws Exception {
                return IoHolder.DEFAULT;
            }
        });

static final class IoHolder {
        static final Scheduler DEFAULT = new IoScheduler();
    }

那么IoScheduler是什么?当IoScheduler创建的时候

    public IoScheduler() {
        this.pool = new AtomicReference<CachedWorkerPool>(NONE);
        start();
    }

    @Override
    public void start() {
        CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT);
        if (!pool.compareAndSet(NONE, update)) {
            update.shutdown();
        }
    }

NONE是IoScheduler中创建的一个线程池,所以IoScheduler其实就是一个封装好了的线程池对象

    static final CachedWorkerPool NONE;
    static {
        NONE = new CachedWorkerPool(0, null);
    }

    CachedWorkerPool(long keepAliveTime, TimeUnit unit) {
            this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
            this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
            this.allWorkers = new CompositeDisposable();

            ScheduledExecutorService evictor = null;
            Future<?> task = null;
            if (unit != null) {
                evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
                task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
            }
            evictorService = evictor;
            evictorTask = task;
        }

Schedulers.io(Schedulers.io())的调用,执行了两个动作,第一,保存上一级的对象,第二创建线程池

observeOn(AndroidSchedulers.mainThread())

接下来来看主线程的切换,调用observeOn方法,创建ObservableObserveOn对象,同样保存上一级产生的对象到source中,这里指的就是subscribeOn返回的对象ObservableSubscribeOn,并且保存传入的Scheduler--AndroidSchedulers.mainThread()

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

进入AndroidSchedulers.mainThread(),与上边同样的写法,最后返回HandlerScheduler

    public static Scheduler mainThread() {
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
    }
    private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
            new Callable<Scheduler>() {
                @Override public Scheduler call() throws Exception {
                    return MainHolder.DEFAULT;
                }
            });

    private static final class MainHolder {
        //可以猜测这个HandlerScheduler是一个通过对Handler进行封装
        //运行于主线程的线程,可以看到Looper.getMainLooper()传入了一个主线程的
        //looper对象,事实上也是如此
        static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
    }

所以,很类似,observeOn(AndroidSchedulers.mainThread())同样是做了两件事,保存source和Scheduler,那么两种线程是如何进行调度的,其实看到这里,还没有进入正题,真正的逻辑其实在subscribe方法上。

subscribe

以subscribe(new Observer<String>())为例说明(new Consumer最终源码也是相同的),调用subscribe方法后会来到Observable的抽象方法subscribeActual中,所以我们要到当前Observable实现类中找这个方法,按照上边程序调用的顺序,此时,调用subscribe方法的对象是observeOn方法产生的ObservableObserveOn,进入这个类,找到subscribeActual方法

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        //这个scheduler是指AndroidSchedulers.mainThread(),也就是HandlerScheduler
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            //创建一个worker
            Scheduler.Worker w = scheduler.createWorker();
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }

进入HandlerScheduler找到createWorker方法,这里创建了一个HandlerWorker对象,看到这里大概也可以猜测一下,HandlerWorker中的schedule方法将会是一个关键,传入的handler是主线程中的handler,明显是要通过消息机制发送到主线程执行,问题的关键,在于是怎么发送到主线程执行的,schedule方法的具体执行我们暂且不看,按照程序执行顺序继续往下走

@Override
    public Worker createWorker() {
        return new HandlerWorker(handler);
    }

    private static final class HandlerWorker extends Worker {
        private final Handler handler;
        private volatile boolean disposed;
        HandlerWorker(Handler handler) {
            this.handler = handler;
        }
        @Override
        public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
            ......
            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
            Message message = Message.obtain(handler, scheduled);
            message.obj = this; // Used as token for batch disposal of this worker's runnables.
            handler.sendMessageDelayed(message, unit.toMillis(delay));
            ......
            return scheduled;
        }
        ......
    }

在创建了worker之后,调用方法subscribe,source很明显是ObservableObserveOn对象创建的时候所保存的上一级的调用subscribeOn方法产生的ObservableSubscribeOn对象,通过这个对象调用subscribe方法,又会进入到ObservableSubscribeOn的subscribeActual方法。observer指的是我们代码中传入的observer(subscribe时new的那个),这里对observer封装了一层,以ObserveOnObserver的形式传入到ObservableSubscribeOn的subscribeActual方法中,向上层传递了一级,可以参考08.RxJava运作流程源码分析中提供的流程图

//source指ObservableSubscribeOn对象
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));

来到ObservableSubscribeOn的subscribeActual

@Override
    //参数s指的时对observer封装了一层之后的ObserveOnObserver(new ObserveOnObserver(new Observer ))
    public void subscribeActual(final Observer<? super T> s) {
        //对ObserveOnObserver对象进行一次封装
        //此时Observer已经被封装了两层
        //(new SubscribeOnObserver(new ObserveOnObserver(new Observer)))
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        //调用ObserveOnObserver对象的onSubscribe
        s.onSubscribe(parent);
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

看看ObserveOnObserver的onSubscribe方法

        @Override
        public void onSubscribe(Disposable s) {
            if (DisposableHelper.validate(this.s, s)) {
                this.s = s;
                //注意这里这个判断这次是不会满足的,也就是这里的代码不会走
                if (s instanceof QueueDisposable) {
                    @SuppressWarnings("unchecked")
                    QueueDisposable<T> qd = (QueueDisposable<T>) s;

                    int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
                    //同步,如果是要同步执行,就是指如果设置了在主线程执行,那么
                    //就执行schedule(),往下看可以发现是使用我我们创建的worker
                    //发送到主线程执行
                    if (m == QueueDisposable.SYNC) {
                        sourceMode = m;
                        queue = qd;
                        done = true;
                        //actual指的就是我们传入的最原始的那个observer
                        actual.onSubscribe(this);
                        schedule();
                        return;
                    }
                    //异步,如果是异步执行,直接在当前线程执行,当前线程也就是子线程
                    if (m == QueueDisposable.ASYNC) {
                        sourceMode = m;
                        queue = qd;
                        actual.onSubscribe(this);
                        return;
                    }
                }

                queue = new SpscLinkedArrayQueue<T>(bufferSize);
                //actual是我们new的那个Observer,所以这里直接回调了onSubscribe方法
                actual.onSubscribe(this);
            }
        }

scheduler就是Schedulers.io()得到的就是IoSchedule对象,在上边分析subscribeOn方法时我们已经知道这个对象是一个线程池,调用scheduleDirect方法就是将SubscribeTask这个Runnable放进了线程池执行,并且是在子线程中

@NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        DisposeTask task = new DisposeTask(decoratedRun, w);
        w.schedule(task, delay, unit);
        return task;
    }

createWorker()是个抽象类,在IoSchedule中找到重写的方法

@Override
    public Worker createWorker() {
        return new EventLoopWorker(pool.get());
    }

所以这样一来也就是说new SubscribeTask(parent))这个Runnable被放入了线程池执行,这时候会调用它的run方法,这样就又回到了调用上一级产生对象的subscribe方法中去了,不同的是此时subscribe已经是在线程池中执行了(子线程)

        @Override
        public void run() {
            source.subscribe(parent);
        }

就这样一级一级的往上调用,下一个会走到ObservableMap的subscribeActual方法,最后走到ObservableJust的subscribeActual,s.onSubscribe(sd)方法并没有执行什么东西,onSubscribe在之前已经被调用了,重点在 sd.run()

    @Override
    protected void subscribeActual(Observer<? super T> s) {
        ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
        s.onSubscribe(sd);
        sd.run();
    }

终于在这里要看到onNext onComplete方法的执行了

        @Override
        public void run() {
            if (get() == START && compareAndSet(START, ON_NEXT)) {
                //observer 是 new MapObserver(new SubscribeOnObserver(new ObserveOnObserver(new Observer(){......})))
                observer.onNext(value);
                if (get() == ON_NEXT) {
                    lazySet(ON_COMPLETE);
                    observer.onComplete();
                }
            }
        }

此时一层一层的调用到这里,observer对象已经是经过层层封装包裹的observer了(new MapObserver(new SubscribeOnObserver(new ObserveOnObserver(new Observer(){......})))),所以调用observer.onNext会首先执行MapObserver中的onNext,不管用户调用了几次map操作符,都会一个一个的通过回调onNext方法执行完成(如果有多个map方法被调用,当执行完一个apply方法后,后边的actual.onNext就会进入下一个MapObserver中的onNext方法),当执行到最后一个onNext方法的时候,此时这个actual表示的就是SubscribeOnObserver对象了,也就会去执行它里边的onNext

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }
            if (sourceMode != NONE) {
                actual.onNext(null);
                return;
            }
            U v;
            try {
                //执行apply方法,也就是map操作符中的回调方法
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            actual.onNext(v);
        }

SubscribeOnObserver中的onNext,这里的actual指的是ObserveOnObserver,所以又要去执行它的onNext

        @Override
        public void onNext(T t) {
            actual.onNext(t);
        }

ObserveOnObserver中的onNext

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }
            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            schedule();
        }

        void schedule() {
            if (getAndIncrement() == 0) {
                //这个worker是AndroidScheduler.mainThread得到的一个运行于主线程的封装类 HandlerWorker         
                worker.schedule(this);
            }
        }

在分析observeOn方法的时候我们已经知道这个worker是AndroidScheduler.mainThread得到的一个运行于主线程的封装类 HandlerWorker ,worker.schedule(this)传入的是一个Runnable,也就是会在主线程中执行这个Runnable,我们找到重写的run方法。终于找到onNext和onComplete的最终执行的地方了,并且我们知道,这两个方法是在主线程执行的

        @Override
        public void run() {
            if (outputFused) {
                drainFused();
            } else {
                //会执行这个,上边那个先不管
                drainNormal();
            }
        }

        void drainNormal() {
            int missed = 1;
            final SimpleQueue<T> q = queue;
            final Observer<? super T> a = actual;
            for (;;) {
                if (checkTerminated(done, q.isEmpty(), a)) {
                    return;
                }
                for (;;) {
                    boolean d = done;
                    T v;
                    try {
                        v = q.poll();
                    } catch (Throwable ex) {
                        Exceptions.throwIfFatal(ex);
                        s.dispose();
                        q.clear();
                        a.onError(ex);
                        worker.dispose();
                        return;
                    }
                    boolean empty = v == null;
                    if (checkTerminated(d, empty, a)) {
                        return;
                    }
                    if (empty) {
                        break;
                    }
                    a.onNext(v);
                }
                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }

到这里,RxJava线程调度的实现方式基本上我们已经了解了。
这里可以插一个题外话,通常我们使用handler发送的消息都是在handleMessage方法中执行,但是这里我们无论如何找不到这个方法的实现,那么handler是如何处理消息的?

        @Override
        public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
            ......
            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
            Message message = Message.obtain(handler, scheduled);
            message.obj = this; // Used as token for batch disposal of this worker's runnables.
            handler.sendMessageDelayed(message, unit.toMillis(delay));
            ......
            return scheduled;
        }

可以看到这里Message message = Message.obtain(handler, scheduled),看一下obtain方法的源码会发现传入的第二个参数是一个callback,保存到了message的成员变量m.callback中,当handler调用sendMessageDelayed会将消息加入主线程的消息队列(因为handler就是主线程的handler),我们知道应用启动就会初始化一个主线程的handler一个looper和messageQueue(对消息机制不理解的可以看另一篇15.源码阅读(安卓消息机制)),调用looper.loop开启一个无限循环不断的从主线程消息队列中取消息,我们看看它是如何取的

public static void loop() {
    for (;;) {
            Message msg = queue.next(); // might block
            ......
            msg.target.dispatchMessage(msg);
      }
}

无限循环中取到message后会执行发送这个Message的handler中的dispatchMessage方法,这时候会判断callback也就是我们上边那个传入的,如果它不能与null,就执行handleCallback,执行callback的run方法,找到这里终于找到为什么没有handlerMessage仍然可以处理消息了

public void dispatchMessage(Message msg) {
        if (msg.callback != null) {
            handleCallback(msg);
        } else {
            if (mCallback != null) {
                if (mCallback.handleMessage(msg)) {
                    return;
                }
            }
            handleMessage(msg);
        }
    }

private static void handleCallback(Message message) {
        message.callback.run();
    }

传入的callback是哪个,就是 Message message = Message.obtain(handler, scheduled)中的schedule,schedule是哪个ScheduledRunnable ,也就是说执行的是ScheduledRunnable 的run方法,delegate就是ScheduledRunnable 中传入的那个runnable,追溯上去,这个runnable就是worker.schedule(this)中的this,所以可以找到重写的run方法

        @Override
        public void run() {
            try {
                delegate.run();
            } catch (Throwable t) {
                RxJavaPlugins.onError(t);
            }
        }
相关文章
|
1月前
|
开发框架 Java .NET
.net core 非阻塞的异步编程 及 线程调度过程
【11月更文挑战第12天】本文介绍了.NET Core中的非阻塞异步编程,包括其基本概念、实现方式及应用示例。通过`async`和`await`关键字,程序可在等待I/O操作时保持线程不被阻塞,提高性能。文章还详细说明了异步方法的基础示例、线程调度过程、延续任务机制、同步上下文的作用以及如何使用`Task.WhenAll`和`Task.WhenAny`处理多个异步任务的并发执行。
|
4月前
|
算法 Unix Linux
linux线程调度策略
linux线程调度策略
94 0
|
2月前
|
安全 调度 C#
STA模型、同步上下文和多线程、异步调度
【10月更文挑战第19天】本文介绍了 STA 模型、同步上下文和多线程、异步调度的概念及其优缺点。STA 模型适用于单线程环境,确保资源访问的顺序性;同步上下文和多线程提高了程序的并发性和响应性,但增加了复杂性;异步调度提升了程序的响应性和资源利用率,但也带来了编程复杂性和错误处理的挑战。选择合适的模型需根据具体应用场景和需求进行权衡。
|
3月前
|
存储 Java 数据处理
进程中的线程调度
进程是应用程序运行的基本单位,包括主线程、用户线程和守护线程。计算机由存储器和处理器协同操作,操作系统设计为分时和分任务模式。在个人PC普及后,基于用户的时间片异步任务操作系统确保了更好的体验和性能。线程作为进程的调度单元,通过覆写`Thread`类的`run`方法来处理任务数据,并由系统调度框架统一管理。微服务架构进一步将应用分解为多个子服务,在不同节点上执行,提高数据处理效率与容错性,特别是在大规模数据存储和处理中表现显著。例如,利用微服务框架可以优化算法,加速业务逻辑处理,并在不同区块间分配海量数据存储任务。
|
7月前
|
资源调度 算法 Linux
Linux进程/线程的调度机制介绍:详细解析Linux系统中进程/线程的调度优先级规则
Linux进程/线程的调度机制介绍:详细解析Linux系统中进程/线程的调度优先级规则
1764 0
|
5月前
|
监控 安全 Java
Java中的线程调度与性能优化技巧
Java中的线程调度与性能优化技巧
|
5月前
|
缓存 监控 Java
(十)深入理解Java并发编程之线程池、工作原理、复用原理及源码分析
深入理解Java并发编程之线程池、工作原理、复用原理及源码分析
|
5月前
|
Java Linux API
深入理解Java中的多线程调度策略
深入理解Java中的多线程调度策略
|
7月前
|
算法 调度
【操作系统】处理机调度的基本概念和三个层次、进程调度的时机和方式、调度器、闲逛线程
【操作系统】处理机调度的基本概念和三个层次、进程调度的时机和方式、调度器、闲逛线程
647 3
|
7月前
|
存储 Java 调度
Java多线程基础-1:通俗简介操作系统之进程的管理与调度
操作系统是一个复杂的软件,具备许多功能。其中,进程的管理与调度是与我们密切相关的。本文将对操作系统功能中进程管理与调度作出介绍。
75 0