面试官:RxJava是如何做到响应式编程的?

简介: RxJava已经出现很多个年头了,但是依然被很多公司使用,如果现在还对RxJava了解的不够透彻, 可以看这个系列对它的分析:相信看完后你对它会有个更全面的认识。 这个系列主要从下面几个方面来讲解: **RxJava基本操作符使用** **RxJava响应式编程是如何实现的** **RxJava的背压机制及Flowable是如何实现背压的** **RxJava的线程切换原理

前言

RxJava已经出现很多个年头了,但是依然被很多公司使用,如果现在还对RxJava了解的不够透彻, 可以看这个系列对它的分析:相信看完后你对它会有个更全面的认识。 这个系列主要从下面几个方面来讲解: RxJava基本操作符使用 RxJava响应式编程是如何实现的 RxJava的背压机制及Flowable是如何实现背压的 **RxJava的线程切换原理

关于RxJava的其他系列文章,可以点击下方链接

面试官:RxJava背压机制有了解么?

面试官:RxJava是如何做到响应式编程的?

使用rxjava创建一个rxbus事件处理框架

RxJava操作符详解--来看看你还记得多少 - 掘金

今天我从源码角度来分析下Rxjava的订阅和事件分发和线程切换过程,看他是如何实现响应式编程的

我们先放一张网络上找到的神图:可以结合源码分析仔细品味

基本流程图 流模式1.jpg

简介:

对于源码分析:我们从一段代码来分析 下面是一段经典代码:

Disposable mDisposable = Observable.create(new ObservableOnSubscribe<Object>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<Object> emitter) throws Exception {
        emitter.onNext("1");
        emitter.onComplete();
    }
}).map(new Function<Object, Object>() {
    @Override
    public Object apply(@NonNull Object o) throws Exception {
        return null;
    }
}).subscribeOn(Schedulers.io())
 .observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<Object>() {
    @Override
    public void accept(Object o) throws Exception {

    }
});
mDisposable.dispose();

下面我们将其分为5个步骤进行分析:

1.创建Observable 2.订阅Observer 3.订阅事件线程切换 4.分发事件线程切换 5.解除订阅dispose

过程源码分析

1.创建Observable

    1.Observable.create:-->new ObservableCreate<T>(source)
    //上面代码创建了一个ObservableCreate 继承Observable
    2.Observable.create.map(fun)-->new ObservableMap<T, R>(this, mapper),
    //也继承Observable
    //这里的this指的是第一步创建的ObservableCreate
    //此时第二步获取的ObservableMap持有第一步的ObservableCreate
    3.Observable.create.map(fun).subscribeOn(io)-->new ObservableSubscribeOn<T>(this, scheduler)
    //同步骤2,这里的this指的是步骤2中的ObservableMap,此时持有第二步中的ObservableMap对象
    4.Observable.create.map(fun).subscribeOn(io).observerOn(io)-->new ObservableObserveOn<T>(this, scheduler)
    //同步骤3,这里的this指的是步骤3中的ObservableSubscribeOn,此时持有第3步中的ObservableSubscribeOn对象
  • 根据以上分析可以看出在调用subscribe提交被观察者之前,都是对前面的被观察者的封装,使用的是装饰器模式
  • 整个链路使用的是责任链模式。按责任划分可以理解为观察者Observer和被观察者Observable模式

    下面是Observable装饰的过程

创建.png

内部关系:
An(A(n-1)(A(n-2)(..A2(A1))):

2.订阅Observer

1.调用subscribe方法的时候:subscribe(onNext)->subscribe(LambdaObserver)->subscribeActual(LambdaObserver); 这里回调的是第n个被观察者的subscribeActual,假设按上面创建流程则上一个被观察者是ObservableObserveOn

ObservableObserveOn.java
protected void subscribeActual(Observer<? super T> observer) {//记住这里的observer是LambdaObserver
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else {
        Scheduler.Worker w = scheduler.createWorker();

        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}

这里回调ObservableSubscribeOn的subscribeActual方法,传入的是下游的ObserveOnObserver

2.继续来看2.ObservableSubscribeOn.subscribeActual(B)()->

ObservableSubscribeOn.java
    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
  • 可以看到这里对传下来的LambdaObserver做了一层封装到SubscribeOnObserver对象parent中
  • 这里调用了scheduler.scheduleDirect(new SubscribeTask(parent)),其实是线程切换操作,具体操作到SubscribeTask的run方法

    SubscribeTask.java
        public void run() {
            source.subscribe(parent);
        }

    这里的source是上游的被观察者ObservableMap-->

ObservableMap.java
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }    

一样将下游传过来SubscribeOnObserver封装到了MapObserver中 同理这里的source是上游的ObservableCreate

ObservableCreate.java
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);//这里是订阅的时候返回的dispose参数,可以使用他终止下面的流程

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
  • 一样将下游传过来MapObserver封装到了CreateEmitter中

    这里调用的source就是最初的新建的ObservableOnSubscribe
    ObservableOnSubscribe.java
        public void subscribe(@NonNull ObservableEmitter<Object> emitter) throws Exception {
            emitter.onNext("1");
            emitter.onComplete();
        }
    emitter为下游的CreateEmitter
    
    上面分析可知,每层也是对下游传过来的观察者Observer进行了封装,和创建Observable类似
    

    使用下面连接方式表示观察者之间的关系:

B1(B2(B3(B4(..Bn-1(Bn-))))) //这里的B1=LambdaObserver

3.订阅事件线程切换

在调用subscribeOn创建的ObservableSubscribeOn中:
@Override
    public void subscribeActual(final Observer<? super T> s) {
        ...
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }    
  • 这里调用了scheduler.scheduleDirect,其实就是开启一个线程池进行处理,切换到指定的线程中,所以可以看出,之后调用的

    ObservableMap的subscribe操作其实都是运行在这个切换后的线程中

    直到遇到调用observerOn操作。下面在onNext线程中分析

    来看分发事件onNext操作:

//在订阅步骤中走到了emitter.onNext("1");
    CreateEmitter.java
    public void onNext(T t) {
        ...
        if (!isDisposed()) {
            observer.onNext(t);
        }
    }
    //调用了observer.onNext,这里的observer是下游的MapObserver
    MapObserver.java
    public void onNext(T t) {
        ...
        try {
            v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
        } catch (Throwable ex) {
            ...
        }
        actual.onNext(v);
    }
    //调用了传入的function,将返回值做了新的值传入下一个观察者中,
    //由2中可知,下个Observer 是SubscribeOnObserver
    SubscribeOnObserver.java
    public void onNext(T t) {
        actual.onNext(t);
    }
    //直接调用下游的Observer,因为任务主要是切换订阅线程
    //这里的下游观察者是:ObserveOnObserver,注意这个方法是调用了observerOn指定分发线程后订阅的观察者,前面分析中没有这个,是为了减少分析步骤
    //ObserveOnObserver留到下个事件分发线程切换来讲解

下面图表示了线程之间切换操作关系:

线程切换操作.jpg

3.分发事件线程切换

继续3中的分析
ObserveOnObserver.java
    public void onNext(T t) {
        ...
        if (sourceMode != QueueDisposable.ASYNC) {
            queue.offer(t);//这里将异步操作放到queue中,可以引出背压的概念,缓存事件用
        }
        schedule();
    }
    void schedule() {
        if (getAndIncrement() == 0) {
            worker.schedule(this);
        }
    }
  • 这里根据worker对象,切换到对应的线程执行onNext操作

    下面的onNext操作都会执行在这个线程上,直到再次调用obServerOn也会切换到新的线程
  • 通过上面的分析可知,observerOn可以执行多次

    下面是源码内部线程切换过程:

rxjava线程池操作.jpg

5.解除订阅dispose

调用mDisposable.dispose()->

根据之前分析这个mDisposable是最后执行subscribe操作中创建的LambdaObserver

Observable.java
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete, Consumer<? super Disposable> onSubscribe) {
        ...
        LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
        ...
        return ls;
    }
    LambdaObserver.java

    public void dispose() {
        DisposableHelper.dispose(this);
    }
    DisposableHelper.java
    public static boolean dispose(AtomicReference<Disposable> field) {
        Disposable current = field.get();
        Disposable d = DISPOSED;
        if (current != d) {
            current = field.getAndSet(d);
            if (current != d) {
                if (current != null) {
                    current.dispose();
                }
                return true;
            }
        }
        return false;
    }
  • 将field置为DISPOSED:解除订阅关系

    这里的current = 上游传下来的dispose:依次向上游调用dispose解除订阅

    注意dispose也是用了流模式,每个下游的dispose都持有上游的dispose引用,这个dispose表示每层的观察者Observer

最后来看下Rxjava内部线程池的实现方式

    static final class SingleHolder {
        static final Scheduler DEFAULT = new SingleScheduler();
    }

    static final class ComputationHolder {
        static final Scheduler DEFAULT = new ComputationScheduler();
    }

    static final class IoHolder {
        static final Scheduler DEFAULT = new IoScheduler();
    }

    static final class NewThreadHolder {
        static final Scheduler DEFAULT = new NewThreadScheduler();
    }
  • 在Schedulers内部创建了几个Scheduler的实现对象, 根据传入的线程方法类型,返回对应的Scheduler对象,如IOScheduler,内部会创建一个CachedWorkerPool对象,而CachedWorkerPool内部有个get方法会创建一个ThreadWorker,后面会用到
  • 在订阅的时候,会调用scheduler.scheduleDirect(new SubscribeTask(parent),这个scheduler = IoScheduler

    执行父类Scheduler的scheduleDirect
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();//1

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        DisposeTask task = new DisposeTask(decoratedRun, w);//2

        w.schedule(task, delay, unit);//3

        return task;
    }

1.这里创建了一个Worker:

IoScheduler.java
    public Worker createWorker() {
        return new EventLoopWorker(pool.get());
    }

2.将源task封装到一个新的DisposeTask中

3.调用1中创建的EventLoopWorker的schedule方法

EventLoopWorker.java
    public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
        if (tasks.isDisposed()) {
            // don't schedule, we are unsubscribed
            return EmptyDisposable.INSTANCE;
        }
        return threadWorker.scheduleActual(action, delayTime, unit, tasks);//5
    }

4.这个threadWorker是在EventLoopWorker构造方法中生成

EventLoopWorker(CachedWorkerPool pool) {
            this.pool = pool;
            this.tasks = new CompositeDisposable();
            this.threadWorker = pool.get();
        }
        //来看CachedWorkerPool的get方法
        CachedWorkerPool.java

        ThreadWorker get() {
            if (allWorkers.isDisposed()) {
                return SHUTDOWN_THREAD_WORKER;
            }
            while (!expiringWorkerQueue.isEmpty()) {
                ThreadWorker threadWorker = expiringWorkerQueue.poll();
                if (threadWorker != null) {
                    return threadWorker;
                }
            }

            // No cached worker found, so create a new one.
            ThreadWorker w = new ThreadWorker(threadFactory);
            allWorkers.add(w);
            return w;
        }
        //可以看到创建了一个ThreadWorker,跟踪ThreadWorker的构造方法
        ThreadWorker(ThreadFactory threadFactory) {
            super(threadFactory);
            this.expirationTime = 0L;
        }
        //这里直接执行父类NewThreadWorker的构造方法
        public NewThreadWorker(ThreadFactory threadFactory) {
            executor = SchedulerPoolFactory.create(threadFactory);//6
        }
        //可以看到这里创建了一个SchedulerThreadPool的线程池,周期性线程池,5中会用到

5.继续3中的操作调用ThreadWorker的scheduleActual

这个方法在父类NewThreadWorker中实现了

public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
            ...
            Future<?> f;
            try {
                if (delayTime <= 0) {
                    f = executor.submit((Callable<Object>)sr);
                } else {
                    f = executor.schedule((Callable<Object>)sr, delayTime, unit);
                }
                sr.setFuture(f);
            } catch (RejectedExecutionException ex) {
                ...
            }
            return sr;
        }
  • 执行了executor.submit方法:executor在4中分析过是SchedulerPoolFactory
  • 调用其submit接口,将任务放到线程池中去执行,这样就实现了线程切换操作
同理调用observerOn接口设置next线程切换也是用到一个道理,这里就不再描述

线程池原理:

线程池.png

总结

以上就是我对RxJava源码内部流程和结构的分析,希望对大家有帮助

喜欢的关注下,后期会推出更多Android涉及的各方面知识。

关于RxJava的其他系列文章,可以点击下方链接

面试官:RxJava背压机制有了解么?

面试官:RxJava是如何做到响应式编程的?

使用rxjava创建一个rxbus事件处理框架

RxJava操作符详解--来看看你还记得多少 - 掘金

相关文章
|
9月前
|
存储 缓存 JavaScript
深入浅出 RxJS 核心原理(响应式编程篇)
在最近的项目中,我们面临了一个需求:监听异步数据的更新,并及时通知相关的组件模块进行相应的处理。传统的事件监听和回调函数方式可能无法满足我们的需求,因此决定采用响应式编程的方法来解决这个问题。在实现过程中发现 RxJS 这个响应式编程库,可以很高效、可维护地实现数据的监听和组件通知。
181 0
深入浅出 RxJS 核心原理(响应式编程篇)
|
存储 缓存 数据处理
面试官:RxJava背压机制有了解么?
RxJava已经出现很多个年头了,但是依然被很多公司使用,如果现在还对RxJava了解的不够透彻, 可以看这个系列对它的分析:相信看完后你对它会有个更全面的认识。 这个系列主要从下面几个方面来讲解: **RxJava基本操作符使用** **RxJava响应式编程是如何实现的** **RxJava的背压机制及Flowable是如何实现背压的** **RxJava的线程切换原理** 关于RxJava的其他系列文章,可以点击下方链接
|
存储 前端开发 Dubbo
响应式编程的实践
响应式编程的实践
响应式编程的实践
|
存储 JavaScript 前端开发
面试官:请手写一个EventBus,让我看看你的代码能力!
前言 EventBus是事件总线的意思,可不是什么事件车。 事件总线模式在工作中经常使用,在面试中也很容易问到。甚至在很多面试中会让你手写一个EventBus,那么EventBus到底是个什么东西,今天我们就来学一学!
253 0
面试官:请手写一个EventBus,让我看看你的代码能力!
|
Java 开发者
手撕源码!线程池核心组件源码剖析
看源码之前,先了解一下该组件 最主要的几个 接口、抽象类和实现类的结构关系。
136 0
手撕源码!线程池核心组件源码剖析
|
存储 算法 搜索推荐
正在准备面试?你需要了解这14种编程模式
正在准备面试?你需要了解这14种编程模式
正在准备面试?你需要了解这14种编程模式
|
Java 调度
RxJava 线程模型分析
RxJava 线程模型分析
142 0
RxJava 线程模型分析
|
设计模式 前端开发 Java
探究netty的观察者设计模式
探究netty的观察者设计模式
124 0
|
Java
RxJava1 升级到 RxJava2 所踩过的坑
RxJava1 升级到 RxJava2 所踩过的坑
229 0
RxJava1 升级到 RxJava2 所踩过的坑
|
消息中间件 Java 调度
抽丝剥茧聊Kotlin协程之协程启动原理
抽丝剥茧聊Kotlin协程之协程启动原理
抽丝剥茧聊Kotlin协程之协程启动原理