08.RxJava运作流程源码分析

简介: RxJava线程切换非常方便,只要调用subscribeOn(Schedules.io())就可以使前边的操作运行于子线程,调用obsersableOn(AndroidSchedules.

RxJava线程切换非常方便,只要调用subscribeOn(Schedules.io())就可以使前边的操作运行于子线程,调用obsersableOn(AndroidSchedules.mainThread())就可以设置后边的代码运行于主线程,那么是如此神奇,他是如何实现的?

今天就以下边的代码为切入点深入源码看一下

Observable.just("我是网络图片url").map(new Function<String, String>() {
            @Override
            public String apply(@NonNull String s) throws Exception {
                Log.i(TAG, "apply1 thread:"+Thread.currentThread().getName());
                Log.i(TAG, "apply1");
                s = s +" 加上一个时间戳后";
                return s;
            }
        }).map(new Function<String, String>() {
            @Override
            public String apply(@NonNull String s) throws Exception {
                Log.i(TAG, "apply2 thread:"+Thread.currentThread().getName());
                Log.i(TAG, "apply2");
                s = s +" 加上第二个参数后";
                return s;
            }
        }).subscribeOn(Schedulers.io()).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe() {
                Log.i(TAG, "onSubscribe thread:"+Thread.currentThread().getName());
                Log.i(TAG, "onSubscribe");
            }

            @Override
            public void onNext(@NonNull String s) {
                Log.i(TAG, "onNext thread:"+Thread.currentThread().getName());
                Log.i(TAG, "onNext:"+s+" 开启下载这个图片");

            }

            @Override
            public void onError(@NonNull Throwable throwable) {
                Log.i(TAG, "onError");
                throwable.printStackTrace();
            }

            @Override
            public void onComplete() {
                Log.i(TAG, "onComplete thread:"+Thread.currentThread().getName());
                Log.i(TAG, "onComplete:下载完成");
            }
        });

程序运行流程图如下


img_27a9bf2011f4a2b3fb0517a97b9f2f8f.png
RxJava运行流程图.png

just方法

创建一个ObservableJust对象返回,并将just传入的参数保保存为value

@CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> just(T item) {
        ObjectHelper.requireNonNull(item, "The item is null");
        return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
    }

@SuppressWarnings({ "rawtypes", "unchecked" })
    @NonNull
    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

map方法

同样道理,创建一个ObservableMap对象,由于map方法由上边的ObservableJust对象调用,所以构造方法中传入的this表示的就是ObservableJust对象,创建ObservableMap对象后,保存上一级产生的ObservableJust为当前ObservableMap对象中的成员变量source,保存当前function回调接口,这样一来,当前对象持有上一级ObservableJust的引用。不管map调用几次,当前对象都会持有上一级产生的对象的引用

@CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }

假设再次调用map之后,这个map就是由上一次调用map产生的ObservableMap对象调用的,此时会将上一级这个ObservableMap对象保存到当前对象的source成员变量中,就这样,一级套一级

subscribeOn方法

产生一个ObservableSubscribeOn对象,并将上一级的ObservableMap对象保存为当前对象的source变量,保存传入的scheduler,那么这个scheduler是什么?

@CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

subscribeOn(Schedulers.io())方法使上边的操作在子线程中执行,Schedulers.io()就是上边传入的schedulers,我们看一下schedulers是如何创建的
来到Schedulers类中

public static Scheduler io() {
        return RxJavaPlugins.onIoScheduler(IO);
    }

可以找到IO对象是在本类静态代码块中创建的

 static {
        ....
        IO = RxJavaPlugins.initIoScheduler(new IOTask());
        ....
    }

IOTask是一个实现了Callable接口的线程

static final class IOTask implements Callable<Scheduler> {
        @Override
        public Scheduler call() throws Exception {
            return IoHolder.DEFAULT;
        }
    }

线程执行会得到Scheduler,可以看到,这是以内部类形式实现的单例模式

static final class IoHolder {
        static final Scheduler DEFAULT = new IoScheduler();
    }

可以看到,这个IoScheduler内部是线程池实现的

CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);

也就是说,当我们在代码中设置了这个操作之后(subscribeOn(Schedulers.io())),会创建一个线程池(如果存在就不必创建),很明显,最终将会需要放在子线程中执行的方法在这个线程池中执行,从而达到切换线程的效果,目前看到这里,这只能作为一个猜想,我们继续往下看

observeOn(AndroidSchedulers.mainThread())方法

这个方法执行会保存一个运行于主线程的Scheduler,这个主线程Scheduler如何创建的?
AndroidSchedulers中

private static final class MainHolder {

        static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
    }

    private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
            new Callable<Scheduler>() {
                @Override public Scheduler call() throws Exception {
                    return MainHolder.DEFAULT;
                }
            });

    /** A {@link Scheduler} which executes actions on the Android main thread. */
    public static Scheduler mainThread() {
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);

可以看到这个Scheduler是通过封装Handler得到的一个运行于主线程的封装类,这里将它保存起来。最后我们看subscribe方法

subscribe方法

public final void subscribe(Observer<? super T> observer) {
        ......
            subscribeActual(observer);
        ......
    }

protected abstract void subscribeActual(Observer<? super T> observer);

由于subscribeActual方法是抽象的,那么要从其子类中找,subscribe方法由上次操作observeOn方法得到的ObservableObserveOn对象调用,所以会执行这个类中的subscribeActual方法,进入ObservableObserveOn中

@Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }

scheduler就是之前保存的AndroidSchedulers.mainThread对象,这里的source表示的就是上一级产生的Observable对象,具体到当前代码,就是ObservableSubscribeOn,调用ObservableSubscribeOn中的subscribe方法,逐层向上传递,直到传递到ObservableJust对象中,再不断的调用map中传入的function回调方法apply,当apply方法调用完成,再执行Observer的onNext onComplete方法,具体流程见上边的流程图,下一篇博客我将会详细分析线程调度的源码。
到这里,这段示例代码的流程已经走了一遍

写了一个简化版的RxJava,实现了just map subscribeOn obserseOn方法,有助于对原理的理解,GitHub地址:https://github.com/renzhenming/MyRxJava

相关文章
|
23天前
|
前端开发 编译器 Android开发
构建高效Android应用:探究Kotlin协程的异步处理机制
【4月更文挑战第2天】在现代移动应用开发中,提供流畅且响应迅速的用户体验是至关重要的。随着Android平台的发展,Kotlin语言凭借其简洁性和功能性编程的特点成为了主流选择之一。特别地,Kotlin协程作为一种新型的轻量级线程管理机制,为开发者提供了强大的异步处理能力,从而显著提升了应用程序的性能和响应速度。本文将深入探讨Kotlin协程在Android中的应用,分析其原理、实现以及如何通过协程优化应用性能。
23 2
|
6月前
|
索引
etcd源码分析 - 4.【打通核心流程】processInternalRaftRequestOnce四个细节
在上一讲,我们继续梳理了`PUT`请求到`EtcdServer`这一层的逻辑,并大概阅读了其中的关键函数`processInternalRaftRequestOnce`。
43 0
|
26天前
|
存储 安全 Java
【深度挖掘Java并发编程底层源码】「底层技术原理体系」带你零基础认识和分析学习相关的异步任务提交机制FutureTask的底层原理
【深度挖掘Java并发编程底层源码】「底层技术原理体系」带你零基础认识和分析学习相关的异步任务提交机制FutureTask的底层原理
14 0
|
20天前
|
移动开发 API Android开发
构建高效安卓应用:探究Kotlin协程的异步处理机制
【4月更文挑战第5天】 在移动开发领域,为了提升用户体验,应用必须保持流畅且响应迅速。然而,复杂的后台任务和网络请求往往导致应用卡顿甚至崩溃。本文将深入探讨Kotlin协程——一种在Android平台上实现轻量级线程管理的先进技术,它允许开发者以简洁的方式编写异步代码。我们将分析协程的核心原理,并通过实际案例演示其在安卓开发中的运用,以及如何借助协程提高应用性能和稳定性。
19 8
|
3月前
|
前端开发 网络协议 Java
Netty | 工作流程图分析 & 核心组件说明 & 代码案例实践
Netty | 工作流程图分析 & 核心组件说明 & 代码案例实践
109 0
|
10月前
|
存储 资源调度 Kubernetes
K8S | 核心应用原理分析
K8S作为开源的容器编排引擎,用来对容器化应用进行自动化部署、 扩缩和管理;
132 0
K8S | 核心应用原理分析
|
Android开发 iOS开发 计算机视觉
【第四篇】XiaoZaiMultiAutoAiDevices之核心机制
在上一期说到主要的流程和部分核心运行流程,这一期我们主讲:`如何通过外部参数指定脚本运行指定设备` 测试框架传参,可能一部分同学会想到unittest的DDT,使用pytest相关装饰器和各种外部文件的数据传入方式。
|
Java 调度 数据库
【Java技术指南】「难点-核心-遗漏」Java线程状态流转及生命周期的技术指南(知识点串烧)!
【Java技术指南】「难点-核心-遗漏」Java线程状态流转及生命周期的技术指南(知识点串烧)!
128 0
【Java技术指南】「难点-核心-遗漏」Java线程状态流转及生命周期的技术指南(知识点串烧)!