外行人都能看懂的WebFlux,错过了血亏(二)

简介: 这篇文章主要讲解什么是WebFlux,带领大家入个门,希望对大家有所帮助(至少看完这篇文章,知道WebFlux是干嘛用的)

而背压说白了就是:消费者能告诉生产者自己需要多少量的数据。这里就是Subscription接口所做的事。

下面我们来看看JDK9接口的方法,或许就更加能理解上面所说的话了:

// 发布者(生产者)
public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}
// 订阅者(消费者)
public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}
// 用于发布者与订阅者之间的通信(实现背压:订阅者能够告诉生产者需要多少数据)
public interface Subscription {
    public void request(long n);
    public void cancel();
}
// 用于处理发布者 发布消息后,对消息进行处理,再交由消费者消费
public interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}


3.1 看个例子


代码中有大量的注释,我就不多BB了,建议直接复制跑一下看看:

class MyProcessor extends SubmissionPublisher<String>
        implements Processor<Integer, String> {
    private Subscription subscription;
    @Override
    public void onSubscribe(Subscription subscription) {
        // 保存订阅关系, 需要用它来给发布者响应
        this.subscription = subscription;
        // 请求一个数据
        this.subscription.request(1);
    }
    @Override
    public void onNext(Integer item) {
        // 接受到一个数据, 处理
        System.out.println("处理器接受到数据: " + item);
        // 过滤掉小于0的, 然后发布出去
        if (item > 0) {
            this.submit("转换后的数据:" + item);
        }
        // 处理完调用request再请求一个数据
        this.subscription.request(1);
        // 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
        // this.subscription.cancel();
    }
    @Override
    public void onError(Throwable throwable) {
        // 出现了异常(例如处理数据的时候产生了异常)
        throwable.printStackTrace();
        // 我们可以告诉发布者, 后面不接受数据了
        this.subscription.cancel();
    }
    @Override
    public void onComplete() {
        // 全部数据处理完了(发布者关闭了)
        System.out.println("处理器处理完了!");
        // 关闭发布者
        this.close();
    }
}
public class FlowDemo2 {
    public static void main(String[] args) throws Exception {
        // 1. 定义发布者, 发布的数据类型是 Integer
        // 直接使用jdk自带的SubmissionPublisher
        SubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>();
        // 2. 定义处理器, 对数据进行过滤, 并转换为String类型
        MyProcessor processor = new MyProcessor();
        // 3. 发布者 和 处理器 建立订阅关系
        publiser.subscribe(processor);
        // 4. 定义最终订阅者, 消费 String 类型数据
        Subscriber<String> subscriber = new Subscriber<String>() {
            private Subscription subscription;
            @Override
            public void onSubscribe(Subscription subscription) {
                // 保存订阅关系, 需要用它来给发布者响应
                this.subscription = subscription;
                // 请求一个数据
                this.subscription.request(1);
            }
            @Override
            public void onNext(String item) {
                // 接受到一个数据, 处理
                System.out.println("接受到数据: " + item);
                // 处理完调用request再请求一个数据
                this.subscription.request(1);
                // 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
                // this.subscription.cancel();
            }
            @Override
            public void onError(Throwable throwable) {
                // 出现了异常(例如处理数据的时候产生了异常)
                throwable.printStackTrace();
                // 我们可以告诉发布者, 后面不接受数据了
                this.subscription.cancel();
            }
            @Override
            public void onComplete() {
                // 全部数据处理完了(发布者关闭了)
                System.out.println("处理完了!");
            }
        };
        // 5. 处理器 和 最终订阅者 建立订阅关系
        processor.subscribe(subscriber);
        // 6. 生产数据, 并发布
        publiser.submit(-111);
        publiser.submit(111);
        // 7. 结束后 关闭发布者
        // 正式环境 应该放 finally 或者使用 try-resouce 确保关闭
        publiser.close();
        // 主线程延迟停止, 否则数据没有消费就退出
        Thread.currentThread().join(1000);
    }
}

输出的结果如下:

61.png

流程实际上非常简单的:

62.png

参考资料:

Java 8 的 Stream 主要关注在流的过滤,映射,合并,而  Reactive Stream 更进一层,侧重的是流的产生与消费,即流在生产与消费者之间的协调

说白了就是:响应式流是异步非阻塞+流量控制的(可以告诉生产者自己需要多少的量/取消订阅关系)

展望响应式编程的场景应用:

比如一个日志监控系统,我们的前端页面将不再需要通过“命令式”的轮询的方式不断向服务器请求数据然后进行更新,而是在建立好通道之后,数据流从系统源源不断流向页面,从而展现实时的指标变化曲线;

再比如一个社交平台,朋友的动态、点赞和留言不是手动刷出来的,而是当后台数据变化的时候自动体现到界面上的。


四、入门WebFlux


扯了一大堆,终于回到WebFlux了。经过上面的基础,我们现在已经能够得出一些结论的了:

  • WebFlux是Spring推出响应式编程的一部分(web端)
  • 响应式编程是异步非阻塞的(是一种基于数据流(data stream)和变化传递(propagation of change)的声明式(declarative)的编程范式)

我们再回来看官网的图:

63.png


4.1 简单体验WebFlux


Spring官方为了让我们更加快速/平滑到WebFlux上,之前SpringMVC那套都是支持的。也就是说:我们可以像使用SpringMVC一样使用着WebFlux

64.jpg

WebFlux使用的响应式流并不是用JDK9平台的,而是一个叫做Reactor响应式流库。所以,入门WebFlux其实更多是了解怎么使用Reactor的API,下面我们来看看~

Reactor是一个响应式流,它也有对应的发布者(Publisher ),Reactor的发布者用两个类来表示:

  • Mono(返回0或1个元素)
  • Flux(返回0-n个元素)

而消费者则是Spring框架帮我们去完成

下面我们来看一个简单的例子(基于WebFlux环境构建):

// 阻塞5秒钟
private String createStr() {
    try {
        TimeUnit.SECONDS.sleep(5);
    } catch (InterruptedException e) {
    }
    return "some string";
}
// 普通的SpringMVC方法
@GetMapping("/1")
private String get1() {
    log.info("get1 start");
    String result = createStr();
    log.info("get1 end.");
    return result;
}
// WebFlux(返回的是Mono)
@GetMapping("/2")
private Mono<String> get2() {
    log.info("get2 start");
    Mono<String> result = Mono.fromSupplier(() -> createStr());
    log.info("get2 end.");
    return result;
}

首先,值得说明的是,我们构建WebFlux环境启动时,应用服务器默认是Netty的:

65.jpg

我们分别来访问一下SpringMVC的接口和WebFlux的接口,看一下有什么区别:

SpringMVC:

66.pngWebFlux:

67.png

从调用者(浏览器)的角度而言,是感知不到有什么变化的,因为都是得等待5s才返回数据。但是,从服务端的日志我们可以看出,WebFlux是直接返回Mono对象的(而不是像SpringMVC一直同步阻塞5s,线程才返回)。

这正是WebFlux的好处:能够以固定的线程来处理高并发(充分发挥机器的性能)。

WebFlux还支持服务器推送(SSE - >Server Send Event),我们来看个例子:

/**
     * Flux : 返回0-n个元素
     * 注:需要指定MediaType
     * @return
     */
@GetMapping(value = "/3", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
private Flux<String> flux() {
    Flux<String> result = Flux
        .fromStream(IntStream.range(1, 5).mapToObj(i -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
            }
            return "flux data--" + i;
        }));
    return result;
}

效果就是每秒会给浏览器推送数据:

68.gif

WebFlux我还没写完,这篇写了WebFlux支持SpringMVC那套注解来开发,下篇写写如何使用WebFlux另一种模式(Functional Endpoints)来开发以及一些常见的问题还需要补充一下~


目录
相关文章
|
7月前
|
缓存 Java 数据库
最熟悉的陌生人Spring框架
大家好,我是苍何。最近思考了一个问题,为什么会出现公司面试造火箭,工作扭螺丝的现象,包括各种八股文的连环大绝杀问到你不会为主,其实这是考察你的知识面以及掌握的深度,而为什么需要这样呢?归其原因,无非是通过筛选找到那些会思考的人,他们需要的并不是CRUD的工具人,而是会思考能创新的工程师。
52 0
|
8月前
|
消息中间件 缓存 安全
讲理论,重实战!阿里独家SpringBoot王者晋级之路小册,太强了!
大家平时学习SpringBoot的方式也一般是看大量博客或者是找一些业界评价好点的书籍,虽然SpringBoot相关资料很多,但是大多不成体系,很少有真正有能从0到1,详解Spring Boot一切从代码案例出发的案头笔记。 今天给小伙伴分享的就是来自阿里的SpringBoot王者晋级之路小册,这份小册从SpringBoot的开发环境部署开始,把Spring Boot搭建Web项目、操作数据库、使用缓存、日志、整合安全框架、结合消息队列和搜索框架,以及在实际应用中的部署全部讲得清清楚楚。
|
3月前
|
XML Java 数据格式
🚀今天,我们来详细的聊一聊SpringBoot自动配置原理,学了这么久,你学废了吗?
🚀今天,我们来详细的聊一聊SpringBoot自动配置原理,学了这么久,你学废了吗?
73 0
|
11月前
|
SQL XML 存储
MyBatis这样用,同事直呼哇塞,堪称最佳实践
MyBatis是一款非常流行的ORM框架,相信很多小伙伴都在使用。我们经常会把它和MyBatis-Plus或者MBG一起使用,用多了之后对于其一些常规操作就不太熟悉了。最近总结了下MyBatis的实用用法和技巧,希望对大家有所帮助!
|
11月前
|
前端开发 Java 数据库
SpringBoot日记本系统全程直播01:先把框架搞起来撒~~
SpringBoot日记本系统全程直播01:先把框架搞起来撒~~
107 0
|
消息中间件 存储 缓存
国庆假期,整整七天,我用76张图把Spring AOP给画明白了!
国庆假期,整整七天,我用76张图把Spring AOP给画明白了!
|
算法 NoSQL API
到底该不该看源码(懂这三点儿就够了)
1、不要为了看源码而看源码 2、代码积累到一定程度,遇到问题自然就去查源码了,然后你就看懂了 3、两年内不要刻意去看源码,可以点开简单了解一下就行,前两年疯狂做项目就行了,后期项目做的多了,你自己就会有疑问,每次写代码就会问自己为什么要这样写?底层的原理是什么?很自觉的带着问题就去看源码了,如果你没有这样的疑问,那说明你也不适合去看源码了,写写业务代码,了了一生
148 0
|
前端开发 Oracle 算法
卷王必备学习的MyBatis-Plus用法,不来瞧瞧吗~~
卷王必备学习的MyBatis-Plus用法,不来瞧瞧吗~~
115095 1
卷王必备学习的MyBatis-Plus用法,不来瞧瞧吗~~
|
前端开发 Java Linux
当 Swagger 遇上 Torna,瞬间高大上了
Swagger作为一款非常流行的API文档生成工具,相信很多小伙们都在用!用多了可能会觉得它界面丑、功能弱。今天给大家推荐一款工具Torna,配合Swagger使用可以搭建界面漂亮、功能强大的API文档网站,希望对大家有所帮助! SpringBoot实战电商项目mall(50k+star)地址:github.com/macrozheng/…
|
SQL 安全 大数据
【基础理论-WEB测试】面试官让你介绍下web测试,标准的高大上回答如下:
【基础理论-WEB测试】面试官让你介绍下web测试,标准的高大上回答如下: