本文需要知识点
lambda表达式
先写一个lambda表达式的小例子,注意,本文不是讲lambda表达式,所以这里仅仅是能够看到接下来的文章做铺垫而已。
@Test
public void test11() {
Function<Integer, Integer> mapper = i -> i * i;//定义lambda表达式
Integer result = mapper.apply(2); //执行
System.out.println(result);
}
输出结果是4
自定义lambda:
//定义lambda函数式接口
@FunctionalInterface
public interface MyLambda {
int add(int a, int b);
}
@Test
public void test12() {
//使用自定义的lambda
MyLambda myLambda = (a, b) -> a + b;
result = myLambda.add(2 ,3);
System.out.println(result);
}
输出结果5
reactor源码分析
分析之前,先看一下reactor提供的顶级抽奖接口
发布者Publisher
是一个可以发送无限序列元素的发布者。他可以根据订阅者的要求来发布他们。
//发布者
public interface Publisher<T> {
/**
* 订阅方法
* 请求发布者启动数据流
* @param s 消费者
*/
public void subscribe(Subscriber<? super T> s);
}
org.reactivestreams.Publisher#subscribe
是一个工厂方法,允许调用多次,每次调用都会启动一个新的Subscription
。每个Subscription只能被一个Subscriber使用;Subscriber消费者只能订阅一次Publisher。如果在执行过程中出错,则会发出error信号。
订阅者(消费者)
public interface Subscriber<T> {
/**
* 该方法在调用Publisher#subscribe(Subscriber)后执行
* 在Subscription#request(long)调用之前不会有数据流消费
* 如果订阅者想要消费更多的数据,需要调用Subscription#request(long)来请求数据。
*/
public void onSubscribe(Subscription s);
/**
* 消费下一个消息
* 在调用Subscription#request(long)方法时,Publisher会通过这个方法来通知订阅者消息
*
* @param t 数据元素
*/
public void onNext(T t);
/**
* 出错
*/
public void onError(Throwable t);
/**
* 执行完成
*/
public void onComplete();
}
- 通过
org.reactivestreams.Subscriber#onSubscribe
官方注释可以看出,在调用onSubscribe方法之前,数据流不会消费。如果想要消费必须有消费者调用才行。这种模式是冷数据流模式。 -
org.reactivestreams.Subscriber#onSubscribe
只会被调用一次 -
error
和Complete
是结束终止信号,当发出该信号之后不会再有任何新的信号发出 -
onNext
方法可以被调用一次或多次,最多被调用的次数由Subscription#request(long)
的参数决定
Subscription
public interface Subscription {
public void request(long n);
public void cancel();
}
-
Subscription
的生命周期是订阅者对发布者的一次消费。 - 一个Subscription只能被一个Subscriber使用
- Subscription不仅允许请求数据,也允许取消对数据的请求,并且支持资源清理
- 在通过
org.reactivestreams.Subscription#request
发送需求信号之前,不会有任何事件产生。该方法请求的数量最大是Long.MAX_VALUE
Processor
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
-
Processor
实现了一种即是发布者又是消费者的组件。后续再讲map的时候会讲到。
先从最简单的实例开始入手。
@Test
public void test13() {
Flux.just(1, 2, 3, 4, 5)
.subscribe(new CoreSubscriber<>() {//这里传入CoreSubscriber对象作为订阅者
@Override
public void onSubscribe(Subscription s) {
log.info("onSubscribe, {}", s.getClass());
s.request(5);
}
@Override
public void onNext(Integer integer) {
log.info("onNext: {}", integer);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
log.info("onComplete");
}
});
}
运行程序,输出:
17:24:57.303 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
17:24:57.308 [main] INFO MyTest - onSubscribe, class reactor.core.publisher.FluxArray$ArraySubscription
17:24:57.310 [main] INFO MyTest - onNext: 1
17:24:57.310 [main] INFO MyTest - onNext: 2
17:24:57.310 [main] INFO MyTest - onNext: 3
17:24:57.310 [main] INFO MyTest - onNext: 4
17:24:57.310 [main] INFO MyTest - onNext: 5
17:24:57.310 [main] INFO MyTest - onComplete
Flux
是一个发布者,他实现了Publisher
接口。我们看just方法
public static <T> Flux<T> just(T... data) {
return fromArray(data);
}
public static <T> Flux<T> fromArray(T[] array) {
if (array.length == 0) {
return empty();
}
if (array.length == 1) {
return just(array[0]);
}
return onAssembly(new FluxArray<>(array));
}
可以看到,它返回的是一个FluxArray
对象,将我们传进去的参数封装为一个数组。
final class FluxArray<T> extends Flux<T> implements Fuseable, SourceProducer<T> {
final T[] array;//保存数据
public FluxArray(T... array) {
this.array = Objects.requireNonNull(array, "array");
}
public static <T> void subscribe(CoreSubscriber<? super T> s, T[] array) {
if (array.length == 0) {
Operators.complete(s);
return;
}
if (s instanceof ConditionalSubscriber) {
s.onSubscribe(new ArrayConditionalSubscription<>((ConditionalSubscriber<? super T>) s, array));
}
else {//上面demo会走这个分支
//这里的onSubscribe调用的就是demo中我们自定义的CoreSubscriber中的onSubscribe方法,方法内会打印日志并且调用request请求数据
s.onSubscribe(new ArraySubscription<>(s, array));
}
}
//上面示例demo中调用的subscribe方法就是这个;actual是我们new的CoreSubscriber,array是我们初始化的1,2,3,4,5数组
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
subscribe(actual, array);
}
static final class ArraySubscription<T>
implements InnerProducer<T>, SynchronousSubscription<T> {
//省略
}
//省略
}
FluxArray
继承了Flux
也是一个发布者。内部有一个final 的属性array用于存储数据。
在publisher#subscribe(Subscriber<? super T>)
方法中调用Subscriber#onSubscribe()
是固定流程。在上面org.reactivestreams.Subscriber#onSubscribe
有介绍到。
接下来我们看一下Subscription
。这里使用的是ArraySubscription
:
static final class ArraySubscription<T>
implements InnerProducer<T>, SynchronousSubscription<T> {
final CoreSubscriber<? super T> actual;//我们自定义的CoreSubscriber
final T[] array;//存储数据
int index;
//下面这两个都是volatile ,保证了可见性
volatile boolean cancelled;//流是否已经取消
volatile long requested;//记录请求了多少个
//使用AtomicLongFieldUpdater将requested进行了封装,保证了原子性更新
@SuppressWarnings("rawtypes")
static final AtomicLongFieldUpdater<ArraySubscription> REQUESTED =
AtomicLongFieldUpdater.newUpdater(ArraySubscription.class, "requested");
ArraySubscription(CoreSubscriber<? super T> actual, T[] array) {
this.actual = actual;
this.array = array;
}
@Override
public void request(long n) {
if (Operators.validate(n)) {
if (Operators.addCap(REQUESTED, this, n) == 0) {//每次请求request都会更新requested属性的值
if (n == Long.MAX_VALUE) {
fastPath();
}
else {
slowPath(n);
}
}
}
}
void slowPath(long n) {
final T[] a = array;
final int len = a.length;
final Subscriber<? super T> s = actual;
int i = index;
int e = 0;
for (; ; ) {
if (cancelled) {//每次都会判断事件流是否已经取消
return;
}
while (i != len && e != n) {
T t = a[i];
if (t == null) {
s.onError(new NullPointerException("The " + i + "th array element was null"));
return;
}
//调用我们自定义的CoreSubscriber.onNext()方法来处理消息。
s.onNext(t);
if (cancelled) {
return;
}
i++;
e++;
}
if (i == len) {//数据已经全部被消费,发送完成信号。我们这个demo中一次性取了5个数据,直接取完,所以到了这里就直接发送完成信号结束了。
s.onComplete();
return;
}
n = requested;
if (n == e) {
index = i;
n = REQUESTED.addAndGet(this, -e);
if (n == 0) {
return;
}
e = 0;
}
}
}
//取消
@Override
public void cancel() {
cancelled = true;
}
}
ArraySubscription
定义了订阅消费数据的过程,在一次订阅的生命周期中,通过不断回调onNext方法来消费消息。消费完成后,通过回调onComplete()方法结束整个流程,也就是通常说的发出完成信号。
整个流程是采用的观察者模式,通过对象的传递与回调来实现的发布与消费,时序图与类图如下:
好了,上面介绍了关键的Publisher,Subscriber及Subscription。接下来看一下map这种即是订阅者又充当发布者的角色。
我们把上面的demo修改一下:
@Test
publicvoid test13() {
Flux.just(1, 2, 3, 4, 5)
.map(i -> i * i)
.subscribe(new CoreSubscriber<>() {
//省略,与之前相同
});
}
直接跟进到map方法:
//这里mapper是map的参数,lambda表达式 : i->i*i
public final <V> Flux<V> map(Function<? super T, ? extends V> mapper) {
if (this instanceof Fuseable) {
//this是FluxArray, 他实现了Fuseable接口,所以走这一步
return onAssembly(new FluxMapFuseable<>(this, mapper));
}
return onAssembly(new FluxMap<>(this, mapper));
}
看一下FluxMapFuseable<T, R>
类, T 是源数据, R是经过计算后的结果数据。
final class FluxMapFuseable<T, R> extends FluxOperator<T, R> implements Fuseable {
//lambda表达式:需要执行的计算
final Function<? super T, ? extends R> mapper;
FluxMapFuseable(Flux<? extends T> source,
Function<? super T, ? extends R> mapper) {
super(source);//这里source是fluxArray对象,
this.mapper = Objects.requireNonNull(mapper, "mapper");
}
@Override
@SuppressWarnings("unchecked")
public void subscribe(CoreSubscriber<? super R> actual) {
if (actual instanceof ConditionalSubscriber) {
ConditionalSubscriber<? super R> cs = (ConditionalSubscriber<? super R>) actual;
source.subscribe(new MapFuseableConditionalSubscriber<>(cs, mapper));
return;
}
//这里source是fluxArray对象,
source.subscribe(new MapFuseableSubscriber<>(actual, mapper));
}
static final class MapFuseableSubscriber<T, R>
implements InnerOperator<T, R>,
QueueSubscription<R> {
final CoreSubscriber<? super R> actual;
final Function<? super T, ? extends R> mapper;
boolean done;
QueueSubscription<T> s;
int sourceMode;
MapFuseableSubscriber(CoreSubscriber<? super R> actual,
Function<? super T, ? extends R> mapper) {
this.actual = actual;//这个actual是demo实例中我们自己创建的CoreSubscriber对象
this.mapper = mapper;//map中的lambda表达式
}
//当调用FluxArray的subscribe后会调用这个onSubscribe方法
public void onSubscribe(Subscription s) {
if (Operators.validate(this.s, s)) {
this.s = (QueueSubscription<T>) s;
actual.onSubscribe(this);
}
}
//调用FluxArray的request方法后,会调用这里的OnNext方法,这里充当了消费者的角色
@Override
public void onNext(T t) {
if (sourceMode == ASYNC) {
actual.onNext(null);
}
else {
if (done) {
Operators.onNextDropped(t, actual.currentContext());
return;
}
R v;
try {//这里使用mapper.apply(t)来执行demo中的lambda表达式,v是执行结果。这里充当了消费者的角色
v = Objects.requireNonNull(mapper.apply(t),
"The mapper returned a null value.");
}
catch (Throwable e) {
Throwable e_ = Operators.onNextError(t, e, actual.currentContext(), s);
if (e_ != null) {
onError(e_);
}
else {
s.request(1);
}
return;
}
//调用demo中我们自己创建的CoreSubscriber的onNext方法,这里其实充当了发布者的角色
actual.onNext(v);
}
}
//省略部分代码
@Override
public void onComplete() {
if (done) {
return;
}
done = true;
actual.onComplete();
}
@Override
public void request(long n) {
//这个s是
s.request(n);
}
@Override
public void cancel() {
s.cancel();
}
//省略部分代码
}
}
上面可以看到,map的执行过程中,并不会产生数据,只是传送数据过程中对其执行一些计算。map即是消费者也是发布者。这个map的调用链路比较长,画个图清晰一下:
好了,相必有上面的图,再看这段代码就清晰多了。这里有很多对象的传递,回调,比较复杂。