而背压说白了就是:消费者能告诉生产者自己需要多少量的数据。这里就是Subscription接口所做的事。
下面我们来看看JDK9接口的方法,或许就更加能理解上面所说的话了:
// 发布者(生产者) public interface Publisher<T> { public void subscribe(Subscriber<? super T> s); } // 订阅者(消费者) public interface Subscriber<T> { public void onSubscribe(Subscription s); public void onNext(T t); public void onError(Throwable t); public void onComplete(); } // 用于发布者与订阅者之间的通信(实现背压:订阅者能够告诉生产者需要多少数据) public interface Subscription { public void request(long n); public void cancel(); } // 用于处理发布者 发布消息后,对消息进行处理,再交由消费者消费 public interface Processor<T,R> extends Subscriber<T>, Publisher<R> { }
3.1 看个例子
代码中有大量的注释,我就不多BB了,建议直接复制跑一下看看:
class MyProcessor extends SubmissionPublisher<String> implements Processor<Integer, String> { private Subscription subscription; @Override public void onSubscribe(Subscription subscription) { // 保存订阅关系, 需要用它来给发布者响应 this.subscription = subscription; // 请求一个数据 this.subscription.request(1); } @Override public void onNext(Integer item) { // 接受到一个数据, 处理 System.out.println("处理器接受到数据: " + item); // 过滤掉小于0的, 然后发布出去 if (item > 0) { this.submit("转换后的数据:" + item); } // 处理完调用request再请求一个数据 this.subscription.request(1); // 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了 // this.subscription.cancel(); } @Override public void onError(Throwable throwable) { // 出现了异常(例如处理数据的时候产生了异常) throwable.printStackTrace(); // 我们可以告诉发布者, 后面不接受数据了 this.subscription.cancel(); } @Override public void onComplete() { // 全部数据处理完了(发布者关闭了) System.out.println("处理器处理完了!"); // 关闭发布者 this.close(); } } public class FlowDemo2 { public static void main(String[] args) throws Exception { // 1. 定义发布者, 发布的数据类型是 Integer // 直接使用jdk自带的SubmissionPublisher SubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>(); // 2. 定义处理器, 对数据进行过滤, 并转换为String类型 MyProcessor processor = new MyProcessor(); // 3. 发布者 和 处理器 建立订阅关系 publiser.subscribe(processor); // 4. 定义最终订阅者, 消费 String 类型数据 Subscriber<String> subscriber = new Subscriber<String>() { private Subscription subscription; @Override public void onSubscribe(Subscription subscription) { // 保存订阅关系, 需要用它来给发布者响应 this.subscription = subscription; // 请求一个数据 this.subscription.request(1); } @Override public void onNext(String item) { // 接受到一个数据, 处理 System.out.println("接受到数据: " + item); // 处理完调用request再请求一个数据 this.subscription.request(1); // 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了 // this.subscription.cancel(); } @Override public void onError(Throwable throwable) { // 出现了异常(例如处理数据的时候产生了异常) throwable.printStackTrace(); // 我们可以告诉发布者, 后面不接受数据了 this.subscription.cancel(); } @Override public void onComplete() { // 全部数据处理完了(发布者关闭了) System.out.println("处理完了!"); } }; // 5. 处理器 和 最终订阅者 建立订阅关系 processor.subscribe(subscriber); // 6. 生产数据, 并发布 publiser.submit(-111); publiser.submit(111); // 7. 结束后 关闭发布者 // 正式环境 应该放 finally 或者使用 try-resouce 确保关闭 publiser.close(); // 主线程延迟停止, 否则数据没有消费就退出 Thread.currentThread().join(1000); } }
输出的结果如下:
流程实际上非常简单的:
参考资料:
- https://yanbin.blog/java-9-talk-reactive-stream/#more-8877
- https://blog.csdn.net/wudaoshihun/article/details/83070086
- http://www.spring4all.com/article/6826
- https://www.cnblogs.com/IcanFixIt/p/7245377.html
Java 8 的 Stream 主要关注在流的过滤,映射,合并,而 Reactive Stream 更进一层,侧重的是流的产生与消费,即流在生产与消费者之间的协调
说白了就是:响应式流是异步非阻塞+流量控制的(可以告诉生产者自己需要多少的量/取消订阅关系)
展望响应式编程的场景应用:
比如一个日志监控系统,我们的前端页面将不再需要通过“命令式”的轮询的方式不断向服务器请求数据然后进行更新,而是在建立好通道之后,数据流从系统源源不断流向页面,从而展现实时的指标变化曲线;
再比如一个社交平台,朋友的动态、点赞和留言不是手动刷出来的,而是当后台数据变化的时候自动体现到界面上的。
四、入门WebFlux
扯了一大堆,终于回到WebFlux了。经过上面的基础,我们现在已经能够得出一些结论的了:
- WebFlux是Spring推出响应式编程的一部分(web端)
- 响应式编程是异步非阻塞的(是一种基于数据流(data stream)和变化传递(propagation of change)的声明式(declarative)的编程范式)
我们再回来看官网的图:
4.1 简单体验WebFlux
Spring官方为了让我们更加快速/平滑到WebFlux上,之前SpringMVC那套都是支持的。也就是说:我们可以像使用SpringMVC一样使用着WebFlux。
WebFlux使用的响应式流并不是用JDK9平台的,而是一个叫做Reactor响应式流库。所以,入门WebFlux其实更多是了解怎么使用Reactor的API,下面我们来看看~
Reactor是一个响应式流,它也有对应的发布者(Publisher
),Reactor的发布者用两个类来表示:
- Mono(返回0或1个元素)
- Flux(返回0-n个元素)
而消费者则是Spring框架帮我们去完成
下面我们来看一个简单的例子(基于WebFlux环境构建):
// 阻塞5秒钟 private String createStr() { try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { } return "some string"; } // 普通的SpringMVC方法 @GetMapping("/1") private String get1() { log.info("get1 start"); String result = createStr(); log.info("get1 end."); return result; } // WebFlux(返回的是Mono) @GetMapping("/2") private Mono<String> get2() { log.info("get2 start"); Mono<String> result = Mono.fromSupplier(() -> createStr()); log.info("get2 end."); return result; }
首先,值得说明的是,我们构建WebFlux环境启动时,应用服务器默认是Netty的:
我们分别来访问一下SpringMVC的接口和WebFlux的接口,看一下有什么区别:
SpringMVC:
WebFlux:
从调用者(浏览器)的角度而言,是感知不到有什么变化的,因为都是得等待5s才返回数据。但是,从服务端的日志我们可以看出,WebFlux是直接返回Mono对象的(而不是像SpringMVC一直同步阻塞5s,线程才返回)。
这正是WebFlux的好处:能够以固定的线程来处理高并发(充分发挥机器的性能)。
WebFlux还支持服务器推送(SSE - >Server Send Event),我们来看个例子:
/** * Flux : 返回0-n个元素 * 注:需要指定MediaType * @return */ @GetMapping(value = "/3", produces = MediaType.TEXT_EVENT_STREAM_VALUE) private Flux<String> flux() { Flux<String> result = Flux .fromStream(IntStream.range(1, 5).mapToObj(i -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { } return "flux data--" + i; })); return result; }
效果就是每秒会给浏览器推送数据:
WebFlux我还没写完,这篇写了WebFlux支持SpringMVC那套注解来开发,下篇写写如何使用WebFlux另一种模式(Functional Endpoints)来开发以及一些常见的问题还需要补充一下~