RxJava的被观察者在使用操作符时可以利用线程调度器--Scheduler来切换线程,例如
Observable.just("aaa","bbb") .observeOn(Schedulers.newThread()) .map(new Function<String, String>() { @Override public String apply(@NonNull String s) throws Exception { return s.toUpperCase(); } }) .subscribeOn(Schedulers.single()) .observeOn(Schedulers.io()) .subscribe(new Consumer<String>() { @Override public void accept(@NonNull String s) throws Exception { System.out.println(s); } });
被观察者(Observable、Flowable...)发射数据流之后,其操作符可以在不同的线程中加工数据流,最后被观察者在前台线程中接受并响应数据。
下图不同的箭头颜色表示不同的线程。
schedulers.png
一. 线程调度器
Schedulers 是一个静态工厂类,通过分析Schedulers的源码可以看到它有多种不同类型的Scheduler。下面是Schedulers的各个工厂方法。
computation()用于CPU密集型的计算任务,但并不适合于IO操作。
@NonNull public static Scheduler computation() { return RxJavaPlugins.onComputationScheduler(COMPUTATION); }
io()用于IO密集型任务,支持异步阻塞IO操作,这个调度器的线程池会根据需要增长。对于普通的计算任务,请使用Schedulers.computation()。
@NonNull public static Scheduler io() { return RxJavaPlugins.onIoScheduler(IO); }
trampoline()在RxJava2中跟RxJava1的作用是不同的。在RxJava2中表示立即执行,如果当前线程有任务在执行,则会将其暂停,等插入进来的新任务执行完之后,再将原先未完成的任务接着执行。在RxJava1中表示在当前线程中等待其他任务完成之后,再执行新的任务。
@NonNull public static Scheduler trampoline() { return TRAMPOLINE; }
newThread()为每个任务创建一个新线程。
@NonNull public static Scheduler newThread() { return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD); }
single()拥有一个线程单例,所有的任务都在这一个线程中执行,当此线程中有任务执行时,它的任务们将会按照先进先出的顺序依次执行。
@NonNull public static Scheduler single() { return RxJavaPlugins.onSingleScheduler(SINGLE); }
除此之外,还支持自定义的Executor来作为调度器。
@NonNull public static Scheduler from(@NonNull Executor executor) { return new ExecutorScheduler(executor); }
RxJava 线程模型.png
Scheduler是RxJava的线程任务调度器,Worker是线程任务的具体执行者。从Scheduler源码可以看到,Scheduler在scheduleDirect()、schedulePeriodicallyDirect()方法中创建了Worker,然后会分别调用worker的schedule()、schedulePeriodically()来执行任务。
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { final Worker w = createWorker(); final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); DisposeTask task = new DisposeTask(decoratedRun, w); w.schedule(task, delay, unit); return task; } public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, @NonNull TimeUnit unit) { final Worker w = createWorker(); final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); PeriodicDirectTask periodicTask = new PeriodicDirectTask(decoratedRun, w); Disposable d = w.schedulePeriodically(periodicTask, initialDelay, period, unit); if (d == EmptyDisposable.INSTANCE) { return d; } return periodicTask; }
Worker也是一个抽象类,从上图可以看到每一种Scheduler会对应一种具体的Worker。
public abstract static class Worker implements Disposable { public Disposable schedule(@NonNull Runnable run) { return schedule(run, 0L, TimeUnit.NANOSECONDS); } public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit); public Disposable schedulePeriodically(@NonNull Runnable run, final long initialDelay, final long period, @NonNull final TimeUnit unit) { final SequentialDisposable first = new SequentialDisposable(); final SequentialDisposable sd = new SequentialDisposable(first); final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); final long periodInNanoseconds = unit.toNanos(period); final long firstNowNanoseconds = now(TimeUnit.NANOSECONDS); final long firstStartInNanoseconds = firstNowNanoseconds + unit.toNanos(initialDelay); Disposable d = schedule(new PeriodicTask(firstStartInNanoseconds, decoratedRun, firstNowNanoseconds, sd, periodInNanoseconds), initialDelay, unit); if (d == EmptyDisposable.INSTANCE) { return d; } first.replace(d); return sd; } public long now(@NonNull TimeUnit unit) { return unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS); } ... }
1.1 SingleScheduler
SingleScheduler是RxJava2新增的Scheduler。SingleScheduler中有一个属性叫executor,它是使用AtomicReference包装的ScheduledExecutorService。
final AtomicReference<ScheduledExecutorService> executor = new AtomicReference<ScheduledExecutorService>();
在SingleScheduler构造函数中,executor会调用lazySet()。
public SingleScheduler(ThreadFactory threadFactory) { this.threadFactory = threadFactory; executor.lazySet(createExecutor(threadFactory)); }
它的createExecutor()用于创建工作线程,可以看到通过SchedulerPoolFactory来创建ScheduledExecutorService。
static ScheduledExecutorService createExecutor(ThreadFactory threadFactory) { return SchedulerPoolFactory.create(threadFactory); }
在SchedulerPoolFactory类的create(ThreadFactory factory) 中,使用newScheduledThreadPool线程池定义定时器,最大允许线程数为1。
public static ScheduledExecutorService create(ThreadFactory factory) { final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory); if (exec instanceof ScheduledThreadPoolExecutor) { ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec; POOLS.put(e, exec); } return exec; }
在SingleScheduler中每次使用ScheduledExecutorService,其实是使用executor.get()。所以说,single拥有一个线程单例。
SingleScheduler会创建一个ScheduledWorker,ScheduledWorker使用jdk的ScheduledExecutorService作为executor。
下面是ScheduledWorker的schedule()方法。使用ScheduledExecutorService的submit()或schedule()来执行runnable。
@NonNull @Override public Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { if (disposed) { return EmptyDisposable.INSTANCE; } Runnable decoratedRun = RxJavaPlugins.onSchedule(run); ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, tasks); tasks.add(sr); try { Future<?> f; if (delay <= 0L) { f = executor.submit((Callable<Object>)sr); } else { f = executor.schedule((Callable<Object>)sr, delay, unit); } sr.setFuture(f); } catch (RejectedExecutionException ex) { dispose(); RxJavaPlugins.onError(ex); return EmptyDisposable.INSTANCE; } return sr; }
1.2 ComputationScheduler
ComputationScheduler使用FixedSchedulerPool作为线程池,并且FixedSchedulerPool被AtomicReference包装了一下。
从ComputationScheduler的源码中可以看出,MAX_THREADS是CPU的数目。
FixedSchedulerPool可以理解为拥有固定数量的线程池,数量为MAX_THREADS。
static { MAX_THREADS = cap(Runtime.getRuntime().availableProcessors(), Integer.getInteger(KEY_MAX_THREADS, 0)); ...... } static int cap(int cpuCount, int paramThreads) { return paramThreads <= 0 || paramThreads > cpuCount ? cpuCount : paramThreads; }
ComputationScheduler会创建一个EventLoopWorker。
@NonNull @Override public Worker createWorker() { return new EventLoopWorker(pool.get().getEventLoop()); }
其中,getEventLoop()是FixedSchedulerPool中的方法,返回了FixedSchedulerPool中的一个PoolWorker。
public PoolWorker getEventLoop() { int c = cores; if (c == 0) { return SHUTDOWN_WORKER; } // simple round robin, improvements to come return eventLoops[(int)(n++ % c)]; }
PoolWorker继承自NewThreadWorker,它也是线程数为1的ScheduledExecutorService。
1.3 IoScheduler
IoScheduler使用CachedWorkerPool作为线程池,并且CachedWorkerPool也是被AtomicReference包装了一下。
CachedWorkerPool是基于RxThreadFactory这个ThreadFactory来创建的。
static { ...... WORKER_THREAD_FACTORY = new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority); ...... NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY); ...... }
在RxThreadFactory中,由 prefix 和 incrementAndGet() 来创建新线程的名称。
@Override public Thread newThread(Runnable r) { StringBuilder nameBuilder = new StringBuilder(prefix).append('-').append(incrementAndGet()); String name = nameBuilder.toString(); Thread t = nonBlocking ? new RxCustomThread(r, name) : new Thread(r, name); t.setPriority(priority); t.setDaemon(true); return t; }
IoScheduler创建的线程数是不固定的,可以通过IoScheduler 的 size() 来获得当前的线程数。而ComputationScheduler的线程数一般情况等于CPU的数目。
public int size() { return pool.get().allWorkers.size(); }
特别需要的是 ComputationScheduler 和 IoScheduler 都是依赖线程池来维护线程的,区别就是 IoScheduler 线程池中的个数是无限的,由 prefix 和 incrementAndGet() 产生的递增值来决定线程的名字;而 ComputationScheduler 中则是一个固定线程数量的线程池,数据为CPU的数目,并且不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
同样,IoScheduler也会创建EventLoopWorker。
@NonNull @Override public Worker createWorker() { return new EventLoopWorker(pool.get()); }
但是这个EventLoopWorker是IoScheduler的内部类,跟ComputationScheduler创建的EventLoopWorker是不一样的,只是二者的名称相同罢了。
1.4 NewThreadScheduler
NewThreadScheduler会创建NewThreadWorker。我们看到NewThreadWorker的构造函数也是使用SchedulerPoolFactory。
public NewThreadWorker(ThreadFactory threadFactory) { executor = SchedulerPoolFactory.create(threadFactory); }
跟SingleScheduler不同的是,SingleScheduler的executor是使用AtomicReference包装的ScheduledExecutorService。每次使用时,会调用executor.get()。
然而,NewThreadScheduler每次都会创建一个新的线程。
1.5 TrampolineScheduler
TrampolineScheduler会创建TrampolineWorker,在TrampolineWorker内部维护着一个PriorityBlockingQueue。任务进入该队列之前,会先用TimedRunnable封装一下。
static final class TimedRunnable implements Comparable<TimedRunnable> { final Runnable run; final long execTime; final int count; // In case if time between enqueueing took less than 1ms volatile boolean disposed; TimedRunnable(Runnable run, Long execTime, int count) { this.run = run; this.execTime = execTime; this.count = count; } @Override public int compareTo(TimedRunnable that) { int result = ObjectHelper.compare(execTime, that.execTime); if (result == 0) { return ObjectHelper.compare(count, that.count); } return result; } }
我们可以看到TimedRunnable实现了Comparable接口,会比较任务的execTime和count。
任务在进入queue之前,count每次都会+1。
final TimedRunnable timedRunnable = new TimedRunnable(action, execTime, counter.incrementAndGet()); queue.add(timedRunnable);
所以,使用TrampolineScheduler时,每次新的任务都会优先执行。
二. 线程调度
在默认情况下不做任何线程处理,Observable和Observer是处于同一线程中的。如果想要切换线程的话,可以使用subscribeOn()和observeOn()。
2.1 线程调度subscribeOn
subscribeOn通过接收一个Scheduler参数,来指定对数据的处理运行在特定的线程调度器Scheduler上。
若多次执行subscribeOn,则只有一次起作用。
点击subscribeOn()的源码可以看到,每次调用subscribeOn()都会创建一个ObservableSubscribeOn对象。
public final Observable<T> subscribeOn(Scheduler scheduler) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler)); }
ObservableSubscribeOn真正发生订阅的方法是subscribeActual(Observer<? super T> observer)。
@Override public void subscribeActual(final Observer<? super T> s) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); s.onSubscribe(parent); parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); }
其中,SubscribeOnObserver是下游的Observer通过装饰器模式生成的。它实现了Observer、Disposable接口。
接下来,在上游的线程中执行下游Observer的onSubscribe(Disposable disposabel)方法。
s.onSubscribe(parent);
然后,将子线程的操作加入Disposable管理中,加入Disposable后可以方便上下游的统一管理。
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
在这里,已经调用对应scheduler的scheduleDirect()方法。scheduleDirect() 传入的是一个Runnable,也就是下面的SubscribeTask。
final class SubscribeTask implements Runnable { private final SubscribeOnObserver<T> parent; SubscribeTask(SubscribeOnObserver<T> parent) { this.parent = parent; } @Override public void run() { source.subscribe(parent); } }
SubscribeTask会执行run()对上游的Observable进行订阅。
此时,已经在对应的Scheduler线程中运行了。
source.subscribe(parent);
在RxJava的链式操作中,数据的处理是自下而上,这点跟数据发射正好相反。如果多次调用subscribeOn,最上面的线程切换最晚执行,所以变成了只有第一次切换线程才有效。
2.2 线程调度observeOn
observeOn同样接收一个Scheduler参数,用来指定下游操作运行在特定的线程调度器Scheduler上。
若多次执行observeOn,则每次均起作用,线程会一直切换。
点击observeOn()的源码可以看到,每次调用observeOn()都会创建一个ObservableObserveOn对象。
public final Observable<T> observeOn(Scheduler scheduler) { return observeOn(scheduler, false, bufferSize()); } public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize)); }
ObservableObserveOn真正发生订阅的方法是subscribeActual(Observer<? super T> observer)。
@Override protected void subscribeActual(Observer<? super T> observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); } }
如果scheduler是TrampolineScheduler,上游事件和下游事件会立即产生订阅。
如果不是的话,scheduler会创建自己的Worker,然后上游事件和下游事件产生订阅,生成一个ObserveOnObserver对象包装了下游真正的Observer。
ObserveOnObserver是ObservableObserveOn的内部类,实现了Observer、Runnable接口。跟SubscribeOnObserver不同的是,SubscribeOnObserver实现了Observer、Disposable接口。
在ObserveOnObserver的onNext()中,schedule()执行了具体调度的方法。
@Override public void onNext(T t) { if (done) { return; } if (sourceMode != QueueDisposable.ASYNC) { queue.offer(t); } schedule(); } void schedule() { if (getAndIncrement() == 0) { worker.schedule(this); } }
其中,worker是当前scheduler创建的Worker,this指的是当前的ObserveOnObserver对象,this实现了Runnable接口。
然后,我们看看Runnable接口的实现方法run(),这个方法是在worker对应的线程里执行的。drainNormal()会取出 ObserveOnObserver 的 queue 里的数据进行发送。
@Override public void run() { if (outputFused) { drainFused(); } else { drainNormal(); } }
下游多次调用observeOn()的话,线程会一直切换。每一次切换线程,都会把对应的Observer对象的各个方法的处理执行在指定的线程中。
三. 示例
举一个多次调用subscribeOn、observeOn的例子。
Observable.just("HELLO WORLD") .subscribeOn(Schedulers.single()) .map(new Function<String, String>() { @Override public String apply(@NonNull String s) throws Exception { s = s.toLowerCase(); L.i("map1",s); return s; } }) .observeOn(Schedulers.io()) .map(new Function<String, String>() { @Override public String apply(String s) throws Exception { s = s+" tony."; L.i("map2",s); return s; } }) .subscribeOn(Schedulers.computation()) .map(new Function<String, String>() { @Override public String apply(String s) throws Exception { s = s+"it is a test."; L.i("map3",s); return s; } }) .observeOn(Schedulers.newThread()) .subscribe(new Consumer<String>() { @Override public void accept(@NonNull String s) throws Exception { L.i("subscribe",s); System.out.println(s); } });
执行结果.png
四. 总结
了解RxJava的线程模型、线程调度器、线程调度是非常有意义的。能够帮助我们更合理地使用RxJava。另外,RxJava的线程切换结合链式调用非常方便,比起Java使用线程操作实在是简单太多了。