外行人都能看懂的WebFlux,错过了血亏(二)

简介: 这篇文章主要讲解什么是WebFlux,带领大家入个门,希望对大家有所帮助(至少看完这篇文章,知道WebFlux是干嘛用的)

而背压说白了就是:消费者能告诉生产者自己需要多少量的数据。这里就是Subscription接口所做的事。

下面我们来看看JDK9接口的方法,或许就更加能理解上面所说的话了:

// 发布者(生产者)
public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}
// 订阅者(消费者)
public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}
// 用于发布者与订阅者之间的通信(实现背压:订阅者能够告诉生产者需要多少数据)
public interface Subscription {
    public void request(long n);
    public void cancel();
}
// 用于处理发布者 发布消息后,对消息进行处理,再交由消费者消费
public interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}


3.1 看个例子


代码中有大量的注释,我就不多BB了,建议直接复制跑一下看看:

class MyProcessor extends SubmissionPublisher<String>
        implements Processor<Integer, String> {
    private Subscription subscription;
    @Override
    public void onSubscribe(Subscription subscription) {
        // 保存订阅关系, 需要用它来给发布者响应
        this.subscription = subscription;
        // 请求一个数据
        this.subscription.request(1);
    }
    @Override
    public void onNext(Integer item) {
        // 接受到一个数据, 处理
        System.out.println("处理器接受到数据: " + item);
        // 过滤掉小于0的, 然后发布出去
        if (item > 0) {
            this.submit("转换后的数据:" + item);
        }
        // 处理完调用request再请求一个数据
        this.subscription.request(1);
        // 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
        // this.subscription.cancel();
    }
    @Override
    public void onError(Throwable throwable) {
        // 出现了异常(例如处理数据的时候产生了异常)
        throwable.printStackTrace();
        // 我们可以告诉发布者, 后面不接受数据了
        this.subscription.cancel();
    }
    @Override
    public void onComplete() {
        // 全部数据处理完了(发布者关闭了)
        System.out.println("处理器处理完了!");
        // 关闭发布者
        this.close();
    }
}
public class FlowDemo2 {
    public static void main(String[] args) throws Exception {
        // 1. 定义发布者, 发布的数据类型是 Integer
        // 直接使用jdk自带的SubmissionPublisher
        SubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>();
        // 2. 定义处理器, 对数据进行过滤, 并转换为String类型
        MyProcessor processor = new MyProcessor();
        // 3. 发布者 和 处理器 建立订阅关系
        publiser.subscribe(processor);
        // 4. 定义最终订阅者, 消费 String 类型数据
        Subscriber<String> subscriber = new Subscriber<String>() {
            private Subscription subscription;
            @Override
            public void onSubscribe(Subscription subscription) {
                // 保存订阅关系, 需要用它来给发布者响应
                this.subscription = subscription;
                // 请求一个数据
                this.subscription.request(1);
            }
            @Override
            public void onNext(String item) {
                // 接受到一个数据, 处理
                System.out.println("接受到数据: " + item);
                // 处理完调用request再请求一个数据
                this.subscription.request(1);
                // 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
                // this.subscription.cancel();
            }
            @Override
            public void onError(Throwable throwable) {
                // 出现了异常(例如处理数据的时候产生了异常)
                throwable.printStackTrace();
                // 我们可以告诉发布者, 后面不接受数据了
                this.subscription.cancel();
            }
            @Override
            public void onComplete() {
                // 全部数据处理完了(发布者关闭了)
                System.out.println("处理完了!");
            }
        };
        // 5. 处理器 和 最终订阅者 建立订阅关系
        processor.subscribe(subscriber);
        // 6. 生产数据, 并发布
        publiser.submit(-111);
        publiser.submit(111);
        // 7. 结束后 关闭发布者
        // 正式环境 应该放 finally 或者使用 try-resouce 确保关闭
        publiser.close();
        // 主线程延迟停止, 否则数据没有消费就退出
        Thread.currentThread().join(1000);
    }
}

输出的结果如下:

61.png

流程实际上非常简单的:

62.png

参考资料:

Java 8 的 Stream 主要关注在流的过滤,映射,合并,而  Reactive Stream 更进一层,侧重的是流的产生与消费,即流在生产与消费者之间的协调

说白了就是:响应式流是异步非阻塞+流量控制的(可以告诉生产者自己需要多少的量/取消订阅关系)

展望响应式编程的场景应用:

比如一个日志监控系统,我们的前端页面将不再需要通过“命令式”的轮询的方式不断向服务器请求数据然后进行更新,而是在建立好通道之后,数据流从系统源源不断流向页面,从而展现实时的指标变化曲线;

再比如一个社交平台,朋友的动态、点赞和留言不是手动刷出来的,而是当后台数据变化的时候自动体现到界面上的。


四、入门WebFlux


扯了一大堆,终于回到WebFlux了。经过上面的基础,我们现在已经能够得出一些结论的了:

  • WebFlux是Spring推出响应式编程的一部分(web端)
  • 响应式编程是异步非阻塞的(是一种基于数据流(data stream)和变化传递(propagation of change)的声明式(declarative)的编程范式)

我们再回来看官网的图:

63.png


4.1 简单体验WebFlux


Spring官方为了让我们更加快速/平滑到WebFlux上,之前SpringMVC那套都是支持的。也就是说:我们可以像使用SpringMVC一样使用着WebFlux

64.jpg

WebFlux使用的响应式流并不是用JDK9平台的,而是一个叫做Reactor响应式流库。所以,入门WebFlux其实更多是了解怎么使用Reactor的API,下面我们来看看~

Reactor是一个响应式流,它也有对应的发布者(Publisher ),Reactor的发布者用两个类来表示:

  • Mono(返回0或1个元素)
  • Flux(返回0-n个元素)

而消费者则是Spring框架帮我们去完成

下面我们来看一个简单的例子(基于WebFlux环境构建):

// 阻塞5秒钟
private String createStr() {
    try {
        TimeUnit.SECONDS.sleep(5);
    } catch (InterruptedException e) {
    }
    return "some string";
}
// 普通的SpringMVC方法
@GetMapping("/1")
private String get1() {
    log.info("get1 start");
    String result = createStr();
    log.info("get1 end.");
    return result;
}
// WebFlux(返回的是Mono)
@GetMapping("/2")
private Mono<String> get2() {
    log.info("get2 start");
    Mono<String> result = Mono.fromSupplier(() -> createStr());
    log.info("get2 end.");
    return result;
}

首先,值得说明的是,我们构建WebFlux环境启动时,应用服务器默认是Netty的:

65.jpg

我们分别来访问一下SpringMVC的接口和WebFlux的接口,看一下有什么区别:

SpringMVC:

66.pngWebFlux:

67.png

从调用者(浏览器)的角度而言,是感知不到有什么变化的,因为都是得等待5s才返回数据。但是,从服务端的日志我们可以看出,WebFlux是直接返回Mono对象的(而不是像SpringMVC一直同步阻塞5s,线程才返回)。

这正是WebFlux的好处:能够以固定的线程来处理高并发(充分发挥机器的性能)。

WebFlux还支持服务器推送(SSE - >Server Send Event),我们来看个例子:

/**
     * Flux : 返回0-n个元素
     * 注:需要指定MediaType
     * @return
     */
@GetMapping(value = "/3", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
private Flux<String> flux() {
    Flux<String> result = Flux
        .fromStream(IntStream.range(1, 5).mapToObj(i -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
            }
            return "flux data--" + i;
        }));
    return result;
}

效果就是每秒会给浏览器推送数据:

68.gif

WebFlux我还没写完,这篇写了WebFlux支持SpringMVC那套注解来开发,下篇写写如何使用WebFlux另一种模式(Functional Endpoints)来开发以及一些常见的问题还需要补充一下~


目录
相关文章
|
7月前
|
XML Java 数据格式
🚀今天,我们来详细的聊一聊SpringBoot自动配置原理,学了这么久,你学废了吗?
🚀今天,我们来详细的聊一聊SpringBoot自动配置原理,学了这么久,你学废了吗?
117 0
|
消息中间件 存储 缓存
国庆假期,整整七天,我用76张图把Spring AOP给画明白了!
国庆假期,整整七天,我用76张图把Spring AOP给画明白了!
|
存储 监控 Dubbo
互联网后端技术栈一览,写得太好了。。(2)
互联网后端技术栈一览,写得太好了。。(2)
475 0
互联网后端技术栈一览,写得太好了。。(2)
|
SQL 分布式计算 监控
互联网后端技术栈一览,写得太好了。。(3)
互联网后端技术栈一览,写得太好了。。(3)
504 0
互联网后端技术栈一览,写得太好了。。(3)
外行人都能看懂的WebFlux,错过了血亏(一)
这篇文章主要讲解什么是WebFlux,带领大家入个门,希望对大家有所帮助(至少看完这篇文章,知道WebFlux是干嘛用的)
4183 0
外行人都能看懂的WebFlux,错过了血亏(一)
|
存储 缓存 JavaScript
想好怎么学 Servlet规范了嘛?想好了嘛?没想好先看看这篇文章(爆肝之作),先看着然后慢慢想!!
Servlet(Server Applet)是Java Servlet的简称,称为小服务程序或服务连接器,用Java编写的服务器端程序,具有独立于平台和协议的特性,主要功能在于交互式地浏览和生成数据,生成动态Web内容。 狭义的Servlet是指Java语言实现的一个接口,广义的Servlet是指任何实现了这个Servlet接口的类,一般情况下,人们将Servlet理解为后者。Servlet运行于支持Java的应用服务器中。从原理上讲,Servlet可以响应任何类型的请求,但绝大多数情况下Servlet只用来扩展基于HTTP协议的Web服务器。
想好怎么学 Servlet规范了嘛?想好了嘛?没想好先看看这篇文章(爆肝之作),先看着然后慢慢想!!
|
XML 缓存 NoSQL
恐怖!这份神仙架构笔记,简直把所有spring boot的核心技术都写出来了!
自从 structs2 出现上次的漏洞以后,对 spring 的关注度开始越来越浓。以前 spring 开发需要配置一大堆的 xml,后台 spring 加入了 annotaion,使得 xml 配置简化了很多,当然还是有些配置需要使用 xml,比如申明 component scan 等。Spring 开了一个新的 model spring boot,主要思想是降低 spring 的入门,使得新手可以以最快的速度让程序在 spring 框架下跑起来。
214 0
|
程序员
程序人生 - 程序员要学点儿理财知识,而不仅仅是代码技巧
程序人生 - 程序员要学点儿理财知识,而不仅仅是代码技巧
139 0
程序人生 - 程序员要学点儿理财知识,而不仅仅是代码技巧
|
Rust Cloud Native Dubbo
舒服,给Spring贡献一波源码。 (下)
舒服,给Spring贡献一波源码。 (下)
142 0
舒服,给Spring贡献一波源码。 (下)
|
IDE Java 开发工具
舒服,给Spring贡献一波源码。 (中)
舒服,给Spring贡献一波源码。 (中)
510 0
舒服,给Spring贡献一波源码。 (中)