Demo的源码地址在 mini-rxjava, 有兴趣的可以下载源码来看.
从观察者模式说起
观察者模式,是我们在平时使用的比较多的一种设计模式.
观察者模式定义了一种一对多的依赖关系,让多个观察者对象同时监听某一个主题对象。这个主题对象在状态上发生变化时,会通知所有观察者对象,使它们能够自动更新自己. 它的特点是 一个对象状态改变给其他对象通知的问题,而且要考虑到易用和低耦合,保证高度的协作。
本文的重点不是在介绍 观察者模式,因此这个话题就不展开讨论.
Rxjava 是一种 函数式编程,或者称为链式调用模型,也是使用观察者模式来实现事件的传递与监听.
下面我们来看,Rxjava 和 普通观察者的点区别.
- 普通的观察者模式通常是 一个主题,多个观察者配合,基本上是属于 一对多的情况.
- Rxjava的观察者模式通常是 一对一的关系.
- 普通的观察者模式是 主题数据改变时,通知观察者数据的变动
- Rxjava的观察者模式是 在被观察者(主题)调用
subscribe
方法后,触发数据流动和观察者接收事件.
基础知识介绍到这里,接下来我们来自己动手编写Rxjava
编写Rxjava
看一个常见的rxjava的使用示例,(原始数据,数据转换,线程切换,数据接收处理一系列功能):
public static void main(String[] args) throws InterruptedException {
Observable.create((ObservableOnSubscribe<String>) emitter -> {
emitter.onNext("1");
emitter.onNext("2");
emitter.onComplete();
})
.observeOn(Schedulers.io())
.map(s -> Integer.parseInt(s) * 10)
.subscribe(System.out::println);
TimeUnit.SECONDS.sleep(1);
}
// 10
// 20
接下来,我会一步一步带领大家实现上述所有的功能.
一个简单的观察者模式
// Observer.java
// 观察者
public interface Observer<T> {
void onUpdate(T t);
}
// ObservableSource.java
// 被观察者(主题)接口
public interface ObservableSource<T> {
void subscribe(Observer<? super T> observer);
}
// Observable.java
// 具体的被观察者(主题)
public class Observable<T> implements ObservableSource<T> {
private T t;
public Observable(T t) {
this.t = t;
}
@Override
public void subscribe(Observer<? super T> observer) {
// 调用订阅时,触发观察者更新
observer.onUpdate(t);
}
}
使用:
public static void main(String[] args) {
// 观察者
Observer<String> observer = s -> System.out.println(s);
// 被观察者(主题)
Observable<String> observable = new Observable<>("hello");
// 调用
observable.subscribe(observer);
}
// hello
这样,算是一个简单的观察模式了,但是这种方式很不灵活,数据在构造中直接传入了.
接下来我们来改造一下 Observable.java
类. 可以传入一个接口来定义数据的传递规则,并且为Observable
写一个适配器和一个事件分发器,为原始事件的产生提供支持.
- 添加
Emitter
接口,它是一个事件分发的接口; - 添加
ObservableOnSubscribe
接口,它是创建Observable
实例的桥梁,并且有生产事件的功能,支持lambda方式调用; - 添加
ObservableCreate
类,它是Observable
的适配器,能够根据ObservableOnSubscribe
接口,快速创建一个Observable
实例;并且内部类CreateEmitter
实现了Emitter
接口,用于事件的分发; - 修改
Observable
类,添加工厂方法,能够根据ObservableOnSubscribe
接口,快速构建Observable
实例;
// Emitter.java
// 事件分发器接口
public interface Emitter<T> {
void onUpdate(T t);
}
// ObservableOnSubscribe.java
// Observable的事件分发接口
public interface ObservableOnSubscribe<T> {
void subscribe(Emitter<T> emitter) throws Exception;
}
// Observable.java
public abstract class Observable<T> implements ObservableSource<T> {
// 工厂方法,生产出一个Observable实例
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
return new ObservableCreate<>(source);
}
// 真正进行事件分发处理的方法
abstract void subscribeActual(Observer<? super T> observer) throws Exception;
@Override // 交给subscribeActual实现,需要子类实现
public void subscribe(Observer<? super T> observer) throws Exception {
subscribeActual(observer);
}
}
// ObservableCreate.java
// Observable的一个适配器,用于快速创建一个可以发送事件的Observable
class ObservableCreate<T> extends Observable<T> {
// 事件分发接口
private final ObservableOnSubscribe<T> source;
ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override // 分发逻辑的具体代码
void subscribeActual(Observer<? super T> observer) throws Exception {
CreateEmitter<T> emitter = new CreateEmitter<>(observer);
source.subscribe(emitter);
}
// 内部分发器
static class CreateEmitter<T> implements Emitter<T> {
private final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override // 这里只是简单的将observer观察者的事件直接分发出去
public void onUpdate(T t) {
observer.onUpdate(t);
}
}
}
ObservableSource.java
和Observer.java
没有修改,固没有贴出,未贴出代码请查看上一步.使用 :
public static void main(String[] args) throws Exception {
Observable.create(emitter -> {
emitter.onUpdate("hello");
emitter.onUpdate("world");
})
.subscribe(System.out::println);
}
// hello
// world
哇! 你会发现,到此为止,你已经使用观察模式实现了一个简易的函数式编程的代码了.
如果你完全理解了上述代码是怎么产生的,那么恭喜你,你已经理解rxjava最最核心的原理了.
添加事件结束和异常捕获
上诉的代码,还不能够捕获异常和结束事件,这样使用起来很不方便,接下来我们来改造实现它.
仿造rxjava,我们也将事件分为onNext
,onError
,onComplete
三个事件.
需要修改 分发器接口和观察者接口,以及Observable
的适配器.
- 修改
Observer
接口和Emitter
接口, 改为onNext
,onError
,onComplete
方法; - 修改
ObservableCreate
类,添加异常处理和结束的逻辑;
// Observer.java
public interface Observer<T> {
void onNext(T t);
void onError(Throwable e);
void onComplete();
}
// Emitter.java
public interface Emitter<T> {
void onNext(T t);
void onError(Throwable e);
void onComplete();
}
// ObservableCreate.java
class ObservableCreate<T> extends Observable<T> {
// 事件分发接口
private final ObservableOnSubscribe<T> source;
ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override // 分发逻辑代码
void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> emitter = new CreateEmitter<>(observer);
try {
source.subscribe(emitter);
} catch (Exception e) {
// 异常接收和处理
emitter.onError(e);
}
}
// 内部分发器
static class CreateEmitter<T> implements Emitter<T> {
private final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
observer.onNext(t);
}
@Override
public void onError(Throwable e) {
observer.onError(e);
}
@Override
public void onComplete() {
observer.onComplete();
}
}
}
修改过这是三个类后,我们就能接收异常和结束了.使用:
public static void main(String[] args) {
Observable.create((ObservableOnSubscribe<String>) emitter -> {
emitter.onNext("1");
emitter.onNext("2");
emitter.onComplete();
})
.subscribe(new Observer<String>() {
@Override
public void onNext(String s) {
System.out.println(s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
System.out.println("end...");
}
});
}
// 1
// 2
// end...
嗯!? 虽然实现了接收异常和结束的功能,但是有时我们只需要onNext
事件时,这样的代码写起来不够优雅.
接下来我们编写一个观察者的适配器,让它能够使用 lambda
表达式来优雅的编写代码.
- 添加
Consumer
接口,它是接收一个参数,无返回值的接口,用途是进行lambda方式进行参数传递; - 添加
Action
接口,它是一个不接受参数,无返回值和的接口,用途也是进行lambda方式进行参数传递; - 添加
Functions
类,它是一个辅助类,能获取空Consumer
和空Action
实现; - 添加
LambdaObserver
类,它会将lambda参数形式,转化为Observer
实例,进而实现lambda式的调用; - 修改
Observable
类, 添加void subscribe(Consumer<? super T> next, Action complete, Consumer<? super Throwable> error)
系列的方法,让subscribe
方法真正支持 lambda式调用.
// Consumer.java
// 接受一个参数,无返回的接口
public interface Consumer<T> {
void apply(T t) throws Exception;
}
// Action.java
// 不接受参数,无返回的接口
public interface Action {
void apply() throws Exception;
}
// Functions.java
public class Functions {
public static final Action EMPTY_ACTION = () -> {};
public static <T> Consumer<T> emptyConsumer() {
return t -> {};
}
}
// LambdaObserver.java
public class LambdaObserver<T> implements Observer<T> {
private final Consumer<? super T> onNext;
private final Consumer<? super Throwable> onError;
private final Action onComplete;
public LambdaObserver(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {
this.onNext = onNext;
this.onError = onError;
this.onComplete = onComplete;
}
@Override
public void onNext(T t) {
try {
onNext.apply(t);
} catch (Exception e) {
onError(e);
}
}
@Override
public void onError(Throwable e) {
try {
onError.apply(e);
} catch (Exception e1) {
e1.printStackTrace();
}
}
@Override
public void onComplete() {
try {
onComplete.apply();
} catch (Exception e) {
e.printStackTrace();
}
}
}
// Observable.java
public abstract class Observable<T> implements ObservableSource<T> {
// 工厂方法,生产出一个Observable实例
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
return new ObservableCreate<>(source);
}
// 真正进行事件分发处理的方法
abstract void subscribeActual(Observer<? super T> observer);
@Override // 交给subscribeActual实现,需要子类实现
public void subscribe(Observer<? super T> observer) {
subscribeActual(observer);
}
// 接受3个lambda表达式的方法参数
public void subscribe(Consumer<? super T> next, Consumer<? super Throwable> error, Action complete) {
LambdaObserver<T> lambdaObserver = new LambdaObserver<>(next, error, complete);
subscribe(lambdaObserver);
}
// 接受2个lambda表达式的方法参数
public void subscribe(Consumer<? super T> next, Consumer<? super Throwable> error) {
subscribe(next, error, Functions.EMPTY_ACTION);
}
// 接受1个lambda表达式的方法参数
public void subscribe(Consumer<? super T> next) {
subscribe(next, Functions.emptyConsumer(), Functions.EMPTY_ACTION);
}
}
接下来是调用:
public static void main(String[] args) {
Observable.create((ObservableOnSubscribe<String>) emitter -> {
emitter.onNext("1");
emitter.onNext("2");
emitter.onComplete();
}).subscribe(System.out::println);
}
// 1
// 2
啧啧!?有点激动了,已经看起来有模有样了,但是功能远远还不够.
添加 是否消费事件的功能(中断事件功能)
在上诉的代码中,我们无法主动中断整个事件发生的过程.接下来我们就需要编写 Disposable
来实现onComplete
和onError
的自中断,以及主动取消事件.
Rxjava中,Disposable
是使用 枚举类型加上原子引用(AtomicReference
)类来实现线程安全(具体可查看DisposableHelper
类).这种方式比较繁琐,这里就不用这种方式来演示,而使用 volatile
声明的状态变量来做同步安全.
- 添加
Disposable
接口,提供中断和是否中断的方法; - 修改
Observer
接口, 添加onSubscribe
方法,让观察者可以在事件传递前,获取Disposable
,进而可以在事件传递的任意阶段中断事件; - 修改
ObservableCreate
类,添加观察者回调onSubscribe
,此回调需在事件分发前才能起到作用;内部类CreateEmitter
实现Disposable
接口,在事件分发前先判断是否被中断了;使用volatile
变量实现中断判断; - 修改
LambdaObserver
类,让它实现Disposable
接口,添加是否中断的判断; - 修改
Observable
类,添加onSubscribe
lambda调用;以及返回Disposable
实例;
// Disposable.java
public interface Disposable {
void dispose();
boolean isDisposed();
}
// Observer.java
public interface Observer<T> {
void onSubscribe(Disposable d);
void onNext(T t);
void onError(Throwable e);
void onComplete();
}
// ObservableCreate.java
// Observable的一个适配器,用于快速创建一个可以发送事件的Observable
final class ObservableCreate<T> extends Observable<T> {
// 事件分发接口
private final ObservableOnSubscribe<T> source;
ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override // 分发逻辑代码
void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> emitter = new CreateEmitter<>(observer);
// 将中断器回调给observer
observer.onSubscribe(emitter);
try {
source.subscribe(emitter);
} catch (Exception e) {
// 异常接收和处理
emitter.onError(e);
}
}
// 内部分发器
static class CreateEmitter<T> implements Emitter<T>, Disposable {
private final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
// 如果事件没被消费,则进行操作
if (!isDisposed())
observer.onNext(t);
}
@Override
public void onError(Throwable e) {
if (!isDisposed()) {
try {
observer.onError(e);
} finally {
// 触发消费,后续不再处理事件
dispose();
}
}
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
// 触发消费,后续不再处理事件
dispose();
}
}
}
private volatile boolean isDisposed = false;
@Override
public void dispose() {
isDisposed = true;
}
@Override
public boolean isDisposed() {
return isDisposed;
}
}
}
// LambdaObserver.java
public class LambdaObserver<T> implements Observer<T>, Disposable {
private final Consumer<? super T> onNext;
private final Consumer<? super Throwable> onError;
private final Action onComplete;
private final Consumer<? super Disposable> onSubscribe;
public LambdaObserver(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Disposable> onSubscribe) {
this.onNext = onNext;
this.onError = onError;
this.onComplete = onComplete;
this.onSubscribe = onSubscribe;
}
@Override
public void onSubscribe(Disposable d) {
try {
onSubscribe.apply(d);
} catch (Exception e) {
onError(e);
}
}
@Override
public void onNext(T t) {
if (!isDisposed())
try {
onNext.apply(t);
} catch (Exception e) {
onError(e);
}
}
@Override
public void onError(Throwable e) {
if (!isDisposed())
try {
onError.apply(e);
} catch (Exception e1) {
e1.printStackTrace();
} finally {
dispose();
}
}
@Override
public void onComplete() {
if (!isDisposed())
try {
onComplete.apply();
} catch (Exception e) {
e.printStackTrace();
} finally {
dispose();
}
}
private volatile boolean isDisposed = false;
@Override
public void dispose() {
isDisposed = true;
}
@Override
public boolean isDisposed() {
return isDisposed;
}
}
// Observable.java
public abstract class Observable<T> implements ObservableSource<T> {
// 工厂方法,生产出一个Observable实例
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
return new ObservableCreate<>(source);
}
// 真正进行事件分发处理的方法
abstract void subscribeActual(Observer<? super T> observer);
@Override // 交给subscribeActual实现,需要子类实现
public void subscribe(Observer<? super T> observer) {
subscribeActual(observer);
}
// 接受4个lambda表达式的方法参数
public Disposable subscribe(Consumer<? super T> next, Consumer<? super Throwable> error,
Action complete, Consumer<? super Disposable> onSubscribe) {
LambdaObserver<T> lambdaObserver = new LambdaObserver<>(next, error, complete, onSubscribe);
subscribe(lambdaObserver);
return lambdaObserver;
}
// 接受3个lambda表达式的方法参数
public Disposable subscribe(Consumer<? super T> next, Consumer<? super Throwable> error, Action complete) {
return subscribe(next, error, complete, Functions.emptyConsumer());
}
// 接受2个lambda表达式的方法参数
public Disposable subscribe(Consumer<? super T> next, Consumer<? super Throwable> error) {
return subscribe(next, error, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}
// 接受1个lambda表达式的方法参数
public Disposable subscribe(Consumer<? super T> next) {
return subscribe(next, Functions.emptyConsumer(), Functions.EMPTY_ACTION, Functions.emptyConsumer());
}
}
到此,我们就能够控制事件的中断了.我们来看使用:
private Disposable mDisposable = null;
void disposableTest() {
Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
})
.subscribe(integer -> {
System.out.println(integer);
// 3 事件将不再传递和接收
if (integer == 2 && mDisposable != null)
mDisposable.dispose();
},
Functions.emptyConsumer(), Functions.EMPTY_ACTION,
d -> mDisposable = d);
}
// 1
// 2
// 这种方式只在,异步的情况下使用,由于示例中目前还不支持异步,因此以下代码起不了作用.
void disposableTest2() {
Disposable disposable = Observable
.create((ObservableOnSubscribe<Integer>) emitter -> {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
})
.subscribe(System.out::println);
disposable.dispose();
}
像这样,在事件分发前,拿到Disposable
对象,这样你能在任意阶段中断 这个过程.
至此, 我们实现了基本的事件发送和lambda调用,以及中断功能.接下来我们需要开始添加 操作符了, 让它真的能达到函数式调用的模样!
添加 操作符(Map)
操作符的重点是, 你需要处理好上游传递下来的Disposable
对象,以及下游待传递的Observer
.
下面我们来实现map
操作符的功能, 它可以将一个类型转化为另一个类型.
- 添加
Function
接口,该接口接收一个类型的参数,并且返回另一个类型的值; - 添加
BasicObserver
类,该类实现Observer
和Disposable
接口,用于传递上下游的数据; - 添加
ObservableMap
类,该类继承Observable
,并且使用Function
接口,实现类型转换;内部类MapObserver
继承自BasicObserver
实现具体转换逻辑; - 修改
Observable
类,添加map
操作符;
// Function.java
// 接收一个类型的参数,返回一个类型
public interface Function<T, R> {
R apply(T t) throws Exception;
}
// BasicObserver.java
public abstract class BasicObserver<T, R> implements Observer<T>, Disposable {
// 上游传递的Disposable对象
private Disposable upstream;
// 下游接收的观察者对象
final Observer<? super R> downstream;
// 如果已经中断,则无需下传
boolean done;
BasicObserver(Observer<? super R> downstream) {
this.downstream = downstream;
}
@Override
public void onSubscribe(Disposable d) {
// 接收上游的Disposable
this.upstream = d;
downstream.onSubscribe(this);
}
@Override
public void onError(Throwable e) {
if (done) return;
done = true;
downstream.onError(e);
}
@Override
public void onComplete() {
if (done) return;
done = true;
downstream.onComplete();
}
@Override
public void dispose() {
upstream.dispose();
}
@Override
public boolean isDisposed() {
return upstream.isDisposed();
}
}
// ObservableMap.java
final class ObservableMap<T, U> extends Observable<U> {
private final ObservableSource<T> source;
private final Function<? super T, ? extends U> function;
ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
this.source = source;
this.function = function;
}
@Override
void subscribeActual(Observer<? super U> observer) {
source.subscribe(new MapObserver<T, U>(observer, function));
}
static class MapObserver<T, U> extends BasicObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) return;
try {
// 这里实现具体的转化,下游接收到转化后的类型变量
downstream.onNext(mapper.apply(t));
} catch (Exception e) {
onError(e);
}
}
}
}
// Observable.java
public abstract class Observable<T> implements ObservableSource<T> {
...
// map操作符
public <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
return new ObservableMap<>(this, mapper);
}
}
实现好了map
功能,我们来验证一下:
void mapTest() {
Observable.create((ObservableOnSubscribe<String>) emitter -> {
emitter.onNext("1");
emitter.onNext("2");
emitter.onComplete();
})
.map(s -> Integer.parseInt(s) * 10)
.subscribe(System.out::println);
}
// 10
// 20
哇!! 有点不可思议,已经做到这一步了,我们还差什么呢? 没错,就是线程切换!!
PS : Rxjava中有很多的操作符, 我用其中比较典型的
map
来做示范,其他操作符,有兴趣的可以自己手动来实现.
线程切换
rxjava中,自己实现了一套功能强大的线程池.配合操作符 observeOn
,subscribeOn
来进行线程切换.这里就不对其进行展开,我们的重点是自己实现.
由于rxjava的线程池调度相当的复杂, 这里为了方便演示,将只采用jdk自带的线程池来做线程调度.下面我们来实现, rxjava中 observeOn
操作符,以及Schedulers.io()
的线程调度.
- 添加
Scheduler
接口,定义了调度的方法,提交任务,移除任务,停止线程池; - 添加
IOScheduler
类,是Scheduler
的具体实现,采用newCachedThreadPool
提交任务和中断任务; - 添加
ObservableObserveOn
类,继承Observable
,为observeOn
操作符提供支持;内部类ObserveOnObserver
,实现Observer
,Disposable
和Runnable
接口,run
方法将拦截所有事件,将其作为任务提交给线程池运行,达到异步的效果; - 修改
Observable
类,添加observeOn
操作符;
// Scheduler.java
public interface Scheduler {
void submit(Runnable runnable);
void remove(Runnable runnable);
void shutdown();
}
// IOScheduler
public class IOScheduler implements Scheduler {
// 线程池
private final ExecutorService executor = Executors.newCachedThreadPool();
// 保存Future对象,为了能够中断指定线程
private final Map<Runnable, Future> futureMap = new ConcurrentHashMap<>();
@Override
public void submit(Runnable runnable) {
Future future = futureMap.get(runnable);
// 如果对应的任务正在执行,则无需再提交
if (future != null && !future.isDone()) return;
if (executor.isShutdown()) return;
futureMap.put(runnable, executor.submit(runnable));
}
@Override
public void remove(Runnable runnable) {
Future future = futureMap.get(runnable);
if (future == null) return;
try {
future.cancel(true);
} catch (Exception ignored) {
} finally {
futureMap.remove(runnable);
}
}
@Override
public void shutdown() {
if (!executor.isShutdown())
executor.shutdown();
}
}
// ObservableObserveOn.java
public final class ObservableObserveOn<T> extends Observable<T> {
private final ObservableSource<T> source;
private final Scheduler scheduler;
ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler) {
this.source = source;
this.scheduler = scheduler;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
source.subscribe(new ObserveOnObserver<>(observer, scheduler));
}
static final class ObserveOnObserver<T> implements Observer<T>, Disposable, Runnable {
private final Observer<? super T> downstream;
private final Scheduler scheduler;
private Disposable upstream;
private volatile boolean done;
private volatile boolean disposed;
private Queue<T> queue = new LinkedList<>();
private Throwable error;
ObserveOnObserver(Observer<? super T> actual, Scheduler scheduler) {
this.downstream = actual;
this.scheduler = scheduler;
}
@Override
public void onSubscribe(Disposable d) {
upstream = d;
downstream.onSubscribe(this);
}
@Override
public void onNext(T t) {
if (done) return;
queue.offer(t);
schedule();
}
@Override
public void onError(Throwable t) {
if (done) return;
done = true;
error = t;
schedule();
}
@Override
public void onComplete() {
if (done) return;
done = true;
schedule();
}
@Override
public void dispose() {
if (!disposed) {
disposed = true;
upstream.dispose();
scheduler.remove(this);
queue.clear();
}
}
@Override
public boolean isDisposed() {
return disposed;
}
// 提交任务
void schedule() {
scheduler.submit(this);
}
// 检查事件是否已中断,并作出相应的反馈
boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
if (disposed) {
queue.clear();
return true;
}
if (d) {
Throwable e = error;
if (e != null) {
disposed = true;
queue.clear();
a.onError(e);
scheduler.remove(this);
return true;
} else if (empty) {
disposed = true;
a.onComplete();
scheduler.remove(this);
return true;
}
}
return false;
}
@Override // 拦截事件传递,到run方法,run方法将由线程池运行
public void run() {
final Queue<T> q = queue;
final Observer<? super T> a = downstream;
for (; ; ) {
if (checkTerminated(done, q.isEmpty(), a)) return;
for (; ; ) {
boolean d = done;
T v;
try {
v = q.poll();
} catch (Throwable ex) {
disposed = true;
upstream.dispose();
q.clear();
a.onError(ex);
scheduler.remove(this);
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) return;
if (empty) break;
a.onNext(v);
}
}
}
}
}
// Observable.java
public abstract class Observable<T> implements ObservableSource<T> {
...
// 线程调度操作符
public final Observable<T> observeOn(Scheduler scheduler) {
return new ObservableObserveOn<>(this, scheduler);
}
}
接下来我们来看示例:
public void schedulerTest() throws InterruptedException {
Observable.create((ObservableOnSubscribe<String>) emmit -> {
System.out.println("emmit : " + Thread.currentThread().getName());
emmit.onNext("1");
emmit.onNext("2");
emmit.onComplete();
})
.observeOn(Schedulers.io())
.map(it -> {
int r = Integer.parseInt(it) * 10;
System.out.println(r + " : map : " + Thread.currentThread().getName());
return r;
}).subscribe(it -> System.out.println(it + " : observer : " + Thread.currentThread().getName()),
Functions.emptyConsumer(),
() -> System.out.println("onCompleted.... " + Thread.currentThread().getName()));
TimeUnit.SECONDS.sleep(1);
}
emmit : main
10 : map : pool-1-thread-1
10 : observer : pool-1-thread-1
20 : map : pool-1-thread-1
20 : observer : pool-1-thread-1
onCompleted.... pool-1-thread-1
可以看到 observeOn
的所有下游事件,都在新的线程中运行了!!至此,线程调度的部分功能,我们也粗略的实现了!
总结
如果,你从开始看到现在, 我们已经自己实现了rxjava的一个基本使用操作了,编写了10来个类,其中大部分都是接口,写了500多行的代码.其中涉及rxjava中事件分发,lambda调用,取消和中断,map
操作符,io
线程切换,这一完整的流程.
rxjava 提供了丰富的 操作符, 和 各种的线程切换模型, 我们在理解其原理的情况下,都可以自己来实现.
rxjava中 RxJavaPlugins
使用代理的思想来插入全局资源管理,以及使用Backpressure
(背压)来控制数据流的思想,我们都可以学习和借鉴!
我们在学习过程中,可以根据源码,分解其中的知识点,逐步消化,甚至自己动手来实现它, 来达到深入理解的目的.
最后,赶紧动手编写吧!!!