概述
java9开始,官方支持了响应式编程规范,提供了顶级的响应式编程接口。
java11开始,官方提供了支持http2的、友好的http客户端java.net.http
,该客户端就是jdk内部第一个基于响应式编程规范的实现。
响应式编程接口
package java.util.concurrent;
//这是Flow API的主要类。该类封装了Flow API的所有重要接口。这是一个final类,我们不能扩展它。
public final class Flow {
private Flow() {} // uninstantiable
//发布者
@FunctionalInterface
public static interface Publisher<T> {
public void subscribe(Subscriber<? super T> subscriber);
}
//订阅者
public static interface Subscriber<T> {
//这是订阅者订阅了发布者后接收消息时调用的第一个方法
public void onSubscribe(Subscription subscription);
public void onNext(T item);
public void onError(Throwable throwable);
public void onComplete();
}
public static interface Subscription {
public void request(long n);
public void cancel();
}
//此接口同时扩展了Publisher和Subscriber接口,用于在发布者和订阅者之间转换消息。
public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}
static final int DEFAULT_BUFFER_SIZE = 256;
public static int defaultBufferSize() {
return DEFAULT_BUFFER_SIZE;
}
}
该响应式接口规范与reactor中的基本一致,每个接口及方法含义请参考 reactor3 源码分析
demo
这里我们只是简单演示一下。借用SubmissionPublisher
作为发布者。
import java.util.Arrays;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
public class Java9FlowTest {
public static class MySubscriber<T> implements Flow.Subscriber<T> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1); //这里要使用Long.MAX_VALUE就会被认为获取无穷的数据。
}
@Override
public void onNext(T item) {
System.out.println("Got : " + item);
subscription.request(1);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Done");
}
}
public static void main(String[] args) throws InterruptedException {
SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
//注册订阅者
MySubscriber<Integer> subscriber = new MySubscriber<>();
publisher.subscribe(subscriber);
//发布信息
for(int i = 0 ; i < 10 ; i ++){
publisher.submit(i);
TimeUnit.SECONDS.sleep(1);
}
publisher.close();
TimeUnit.SECONDS.sleep(100);//为了等待结束
}
}
输出结果
Got : 0
Got : 1
Got : 2
Got : 3
Got : 4
Done