Project Reactor 深度解析 - 2. 响应式编程调试,FLow的概念设计以及实现(下)

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
日志服务 SLS,月写入数据量 50GB 1个月
云解析 DNS,旗舰版 1个月
简介: Project Reactor 深度解析 - 2. 响应式编程调试,FLow的概念设计以及实现(下)

响应式编程 - Flow 的理解


之前说过 FLow 是 Java 9 中引入的响应式编程的抽象概念,对应的类就是:

java.util.concurrent.Flow Flow 是一个概念类,其中定义了三个接口供实现。这三个接口分别是:Publisher, SubscriberSubscription


//标注是一个FunctionalInterface,因为只有一个抽象方法
@FunctionalInterface
public static interface Publisher<T> {
    public void subscribe(Subscriber<? super T> subscriber);
}
public static interface Subscriber<T> {
    public void onSubscribe(Subscription subscription);
    public void onNext(T item);
    public void onError(Throwable throwable);
    public void onComplete();
}
public static interface Subscription {
    public void request(long n);
    public void cancel();
}


Publisher是负责生成 item 的,其中的subscribe方法就是注册Subscriber进去,用于消费。注册成功后,会调用SubscriberonSubscribe方法,传Subscription进来。这个Subscription里面的 request 用于请求Publisher发送多少 item 过来,cancel 用于告诉Publisher不要再发 item 过来了。每次Publisher有 item 生成并且没有超过Subscription request 的个数限制,onNext方法会被调用用于发送这个 item。当有异常发生时,onError 就会被调用。当Publisher判断不会有新的 item 或者异常发生的时候,就会调用onComplete告诉Subscriber消费完成了。大体上就是这么个流程。


Project Reactor 就是Flow的一种实现。并且在Flow这个模型的基础上,参考了 Java 8 Stream 的接口功能设计,加入了流处理的机制。


Project Reactor - Flux如何实现Flow的接口


Flux就是一串相同类型数据的流,他包括并且会发射 0~n 个对象,例如:


Flux<String> just = Flux.just("1", "2", "3");


这样,我们就生成了一个包含三个字符串的Flux流(底层实现实际上就是FluxArray,这个我们以后会说的)


然后,我们按照之前 Flow 里面提到的流程,先进行简单的 subscribe


Flux.just("test1", "test2", "test3")
    //打印详细流日志
    .log()
    //订阅消费
    .subscribe(System.out::println);


运行代码,我们会看到日志输出:


07:08:13.816 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
07:08:13.822 [main] INFO reactor.Flux.Array.1 - | request(unbounded)
07:08:13.823 [main] INFO reactor.Flux.Array.1 - | onNext(test1)
test1
07:08:13.823 [main] INFO reactor.Flux.Array.1 - | onNext(test2)
test2
07:08:13.823 [main] INFO reactor.Flux.Array.1 - | onNext(test3)
test3
07:08:13.824 [main] INFO reactor.Flux.Array.1 - | onComplete()


这些日志很清楚的说明了subscribe究竟是如何工作的:

  1. 首先在subscribe的同时,onSubscribe首先被调用
  2. 然后调用request(unbounded),这里request代表请求多少个数据,unbounded代表请求无限个,就是所有的数据
  3. 对于每个数据对象,调用onNext方法:onNext(test1),onNext(test2),onNext(test3)
  4. 在最后完成的时候,onComplete会被调用,如果说遇到了异常,那么onError会被调用,就不会调用onComplete了 这些方法其实都是Subscriber的方法,Subscriber是Flux的订阅者,配置订阅者如何消费以及消费的具体操作。


Subscriber<String> subscriber = new Subscriber<String>() {
    //在订阅成功的时候,如何操作
    @Override
    public void onSubscribe(Subscription subscription) {
        //取最大数量的元素个数
        subscription.request(Long.MAX_VALUE);
    }
    //对于每个元素的操作
    @Override
    public void onNext(String o) {
        System.out.println(o);
    }
    //在发生错误的时候
    @Override
    public void onError(Throwable throwable) {
        log.error("error: {}", throwable.getMessage(), throwable);
    }
    //在完成的时候,发生错误不算完成
    @Override
    public void onComplete() {
        log.info("complete");
    }
};
Flux.just("test1", "test2", "test3")
    //打印详细流日志
    .log()
    //订阅消费
    .subscribe(subscriber);


运行后,日志是:


07:28:27.227 [main] INFO reactor.Flux.Array.2 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
07:28:27.227 [main] INFO reactor.Flux.Array.2 - | request(unbounded)
07:28:27.228 [main] INFO reactor.Flux.Array.2 - | onNext(test1)
test1
07:28:27.228 [main] INFO reactor.Flux.Array.2 - | onNext(test2)
test2
07:28:27.228 [main] INFO reactor.Flux.Array.2 - | onNext(test3)
test3
07:28:27.228 [main] INFO reactor.Flux.Array.2 - | onComplete()
07:28:27.235 [main] INFO com.test.TestMonoFlux - complete


subscribe还有如下几个api:


//在不需要消费,只需要启动Flux中间处理的话,用这个
subscribe();
//相当于:
new Subscriber() {
    @Override
    public void onSubscribe(Subscription subscription) {
        //取最大数量的元素个数
        subscription.request(Long.MAX_VALUE);
    }
    @Override
    public void onNext(Object o) {
    }
    @Override
    public void onError(Throwable throwable) {
    }
    @Override
    public void onComplete() {
    }
};
//指定消费者消费
subscribe(Consumer<? super T> consumer); 
//相当于:
new Subscriber() {
    @Override
    public void onSubscribe(Subscription subscription) {
        //取最大数量的元素个数
        subscription.request(Long.MAX_VALUE);
    }
    @Override
    public void onNext(Object o) {
        consumer.accept(o);
    }
    @Override
    public void onError(Throwable throwable) {
    }
    @Override
    public void onComplete() {
    }
};
//指定消费者,还有异常处理者
subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer); 
//相当于:
new Subscriber() {
    @Override
    public void onSubscribe(Subscription subscription) {
        //取最大数量的元素个数
        subscription.request(Long.MAX_VALUE);
    }
    @Override
    public void onNext(Object o) {
        consumer.accept(o);
    }
    @Override
    public void onError(Throwable throwable) {
        errorConsumer.accept(throwable);
    }
    @Override
    public void onComplete() {
    }
};
//指定消费者,异常处理着还有完成的时候的要执行的操作
subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer);
//相当于:
new Subscriber() {
    @Override
    public void onSubscribe(Subscription subscription) {
        //取最大数量的元素个数
        subscription.request(Long.MAX_VALUE);
    }
    @Override
    public void onNext(Object o) {
        consumer.accept(o);
    }
    @Override
    public void onError(Throwable throwable) {
        errorConsumer.accept(throwable);
    }
    @Override
    public void onComplete() {
        completeConsumer.run();
    }
};
//指定Subscriber所有需要的元素
subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer, Consumer<? super Subscription> subscriptionConsumer); 
//相当于:
new Subscriber() {
    @Override
    public void onSubscribe(Subscription subscription) {
        subscriptionConsumer.accept(subscription);
    }
    @Override
    public void onNext(Object o) {
        consumer.accept(o);
    }
    @Override
    public void onError(Throwable throwable) {
        errorConsumer.accept(throwable);
    }
    @Override
    public void onComplete() {
        completeConsumer.run();
    }
};


这样,就和之前所说的Flow的设计对应起来了。

相关文章
|
1月前
|
调度 开发者
核心概念解析:进程与线程的对比分析
在操作系统和计算机编程领域,进程和线程是两个基本而核心的概念。它们是程序执行和资源管理的基础,但它们之间存在显著的差异。本文将深入探讨进程与线程的区别,并分析它们在现代软件开发中的应用和重要性。
62 4
|
3月前
|
机器学习/深度学习 自然语言处理 JavaScript
信息论、机器学习的核心概念:熵、KL散度、JS散度和Renyi散度的深度解析及应用
在信息论、机器学习和统计学领域中,KL散度(Kullback-Leibler散度)是量化概率分布差异的关键概念。本文深入探讨了KL散度及其相关概念,包括Jensen-Shannon散度和Renyi散度。KL散度用于衡量两个概率分布之间的差异,而Jensen-Shannon散度则提供了一种对称的度量方式。Renyi散度通过可调参数α,提供了更灵活的散度度量。这些概念不仅在理论研究中至关重要,在实际应用中也广泛用于数据压缩、变分自编码器、强化学习等领域。通过分析电子商务中的数据漂移实例,展示了这些散度指标在捕捉数据分布变化方面的独特优势,为企业提供了数据驱动的决策支持。
197 2
信息论、机器学习的核心概念:熵、KL散度、JS散度和Renyi散度的深度解析及应用
|
2月前
|
算法 Java 数据库连接
Java连接池技术,从基础概念出发,解析了连接池的工作原理及其重要性
本文详细介绍了Java连接池技术,从基础概念出发,解析了连接池的工作原理及其重要性。连接池通过复用数据库连接,显著提升了应用的性能和稳定性。文章还展示了使用HikariCP连接池的示例代码,帮助读者更好地理解和应用这一技术。
64 1
|
2月前
|
消息中间件 存储 负载均衡
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
99 2
|
3月前
|
存储 NoSQL MongoDB
MongoDB 概念解析
10月更文挑战第12天
51 0
MongoDB 概念解析
|
3月前
|
机器学习/深度学习 人工智能 自然语言处理
Transformer图解以及相关的概念解析
前言 transformer是目前NLP甚至是整个深度学习领域不能不提到的框架,同时大部分LLM也是使用其进行训练生成模型,所以transformer几乎是目前每一个机器人开发者或者人工智能开发者不能越过的一个框架。接下来本文将从顶层往下去一步步掀开transformer的面纱。 transformer概述 Transformer模型来自论文Attention Is All You Need。 在论文中最初是为了提高机器翻译的效率,它使用了Self-Attention机制和Position Encoding去替代RNN。后来大家发现Self-Attention的效果很好,并且在其它的地
|
3月前
|
JSON 关系型数据库 API
ElasticSearch 的概念解析与使用方式(二)
ElasticSearch 的概念解析与使用方式(二)
46 1
|
3月前
|
存储 搜索推荐 Java
ElasticSearch 的概念解析与使用方式(一)
ElasticSearch 的概念解析与使用方式(一)
82 1
|
3月前
|
供应链 网络协议 数据安全/隐私保护
|
3月前
|
前端开发 JavaScript Shell
深入解析前端构建利器:webpack核心概念与基本功能全览
深入解析前端构建利器:webpack核心概念与基本功能全览—
38 0

推荐镜像

更多