java的反应式流

简介: 本文介绍了Java反应式流的概念和编程模型,通过使用Java自身的Flow API和Reactor库,展示了如何实现发布者-订阅者模式,并提供了实际的代码示例。

Java的反应式流是一种新的编程模型,它在异步和事件驱动的环境下工作。反应式流的目的是为了解决传统的单线程或者多线程编程模型在高并发和大流量情况下的性能瓶颈。

反应式流的核心是Observable和Observer,Observable表示一个数据流,而Observer则表示这个数据流的消费者。Observable在数据流上产生事件,而Observer则对这些事件进行响应。反应式流的数据流是一种推式的流,Observable发布事件时不需要等待Observer接收,Observable会把事件推送给Observer,而不是Observer去轮询Observable。

Java的反应式流通常基于Reactor或RxJava等库,这些库提供了丰富的函数式编程API和运算符,可以非常方便地处理异步事件。这些库都提供了类似于Observable和Observer的抽象概念,可以用来描述和处理异步数据流。同时还提供了常用的运算符,包括map、filter、reduce等,这些运算符可以方便地对数据流进行变换和过滤。

反应式流还有一个重要的概念是背压(backpressure),它是指在高并发和大流量情况下,消费者无法处理生产者产生的数据流,导致数据积压的情况。为了解决这个问题,反应式流引入了背压机制,生产者会在发送数据前先询问消费者的处理能力,如果消费者没有处理能力,生产者会等待一段时间或者缓存数据,等待消费者处理完数据后再继续发送。

反应式流已经被广泛应用于大规模的互联网应用中,包括机器学习、数据分析、网络爬虫等领域。它的优点在于处理高并发和大流量的数据流时,能够更加高效地利用系统资源,提高系统的性能和可扩展性。

总之,反应式流是Java编程中的一个重要概念,它可以帮助我们更好地处理异步和事件驱动的数据流,提高系统的性能和可扩展性。

不涉及任何库,就单纯用java的反应式流,完成发布订阅者模式:

package com.example.jdk9.react;

import java.util.concurrent.Flow.*;

public class PublisherSubscriberDemo {
   
    public static void main(String[] args) {
   
        SimplePublisher<String> publisher = new SimplePublisher<>();
        SimpleSubscriber<String> subscriber1 = new SimpleSubscriber<>();
        SimpleSubscriber<String> subscriber2 = new SimpleSubscriber<>();
        publisher.subscribe(subscriber1);
        publisher.subscribe(subscriber2);
        publisher.submit("hello");
        publisher.submit("world");
        publisher.close();
    }
}

class SimplePublisher<T> implements Publisher<T> {
   
    private Subscription subscription;

    @Override
    public void subscribe(Subscriber<? super T> subscriber) {
   
        subscriber.onSubscribe(new Subscription() {
   

            @Override
            public void request(long n) {
   

            }

            @Override
            public void cancel() {
   

                // nothing to do
            }
        });

        this.subscription = new Subscription() {
   
            private boolean cancelled = false;

            @Override
            public void request(long n) {
   
                // nothing to do
            }

            @Override
            public void cancel() {
   
                this.cancelled = true;
            }

            public boolean isCancelled() {
   
                return this.cancelled;
            }
        };

        subscriber.onSubscribe(this.subscription);
    }

    public void submit(T item) {
   
        subscriptionLimitedQueue.offer(item);
        subscription.request(1);
    }

    public void close() {
   
        while (!subscriptionLimitedQueue.isEmpty()) {
   
           subscriptionLimitedQueue.poll();
        }
        subscription.cancel();
    }

    private SubscriptionLimitedQueue<T> subscriptionLimitedQueue = new SubscriptionLimitedQueue<>(2);

    static class SubscriptionLimitedQueue<T> {
   
        private final int limit;
        private int size = 0;
        private Node<T> head;
        private Node<T> tail;

        public SubscriptionLimitedQueue(int limit) {
   
            this.limit = limit;
        }

        private static class Node<T> {
   
            final T item;
            Node<T> next;

            Node(T item, Node<T> next) {
   
                this.item = item;
                this.next = next;
            }
        }

        public void offer(T item) {
   
            Node<T> node = new Node<>(item, null);
            if (head == null) {
   
                head = node;
                tail = head;
            } else {
   
                tail.next = node;
                tail = tail.next;
            }
            size++;
            if (size > limit) {
   
                Node<T> newHead = head.next;
                head.next = null;
                head = newHead;
                size--;
            }
        }

        public boolean isEmpty() {
   
            return size == 0;
        }

        public T poll() {
   
            if (isEmpty()) {
   
                return null;
            }
            T item = head.item;
            Node<T> newHead = head.next;
            head.next = null;
            head = newHead;
            size--;
            return item;
        }
    }
}

class SimpleSubscriber<T> implements Subscriber<T> {
   
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
   
        this.subscription = subscription;
        System.out.println("订阅成功");
        subscription.request(1);
    }

    @Override
    public void onNext(T item) {
   
        System.out.println("Received item: " + item);
        subscription.request(1);
    }

    @Override
    public void onError(Throwable throwable) {
   
        throwable.printStackTrace();
    }

    @Override
    public void onComplete() {
   
        System.out.println("Done");
    }
}

这段代码演示了使用Flow API来发布和订阅消息的过程,它包含以下类和接口:

  1. Publisher:发布者接口,表示能够发布指定类型的消息给订阅者。
  2. Subscriber:订阅者接口,表示能够接收指定类型的消息。
  3. Subscription:订阅接口,表示订阅关系,能够请求一定数量的消息和取消订阅。
  4. SubmissionPublisher:继承自Publisher接口,实现了异步发布消息的能力。
  5. Flow API:一组用于处理数据流和异步操作的接口和类。

具体解释:

  1. SimplePublisher类是一个实现了Publisher接口的简单发布者类,它能够发布指定类型的消息给订阅者。它内部维护了一个SubscriptionLimitedQueue类的对象,用于限制消息队列的长度。
  2. SubscriptionLimitedQueue类是一个维护队列长度的类,用于实现限制消息队列长度的功能。
  3. SimpleSubscriber类是一个实现了Subscriber接口的简单订阅者类,它能够接收指定类型的消息,并将其输出到控制台中。
  4. main方法创建了一个SimplePublisher类的实例和一个SimpleSubscriber类的实例,然后将它们关联起来,最后向SimplePublisher类的实例中发布了两个消息,随后关闭了发布者。

运行结果:

例子:

第一步,引入依赖:

<dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
            <version>3.5.11</version>
        </dependency>

第二步,编写代码:

package com.example.jdk9.react;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class ReactiveStreamExample {
    public static void main(String[] args) {
        Flux<Integer> stream = Flux.range(1, 10);

        stream
            .map(i -> i * 2)
            .filter(i -> i % 3 == 0)
            .flatMap(i -> Mono.just(i).zipWith(Mono.just(i * 3)))
            .subscribe(System.out::println);
    }
}

上面的代码首先创建了一个从1到10的数字列表,然后通过map操作符将每个元素乘以2,再使用filter操作符过滤掉不能被3整除的元素。接下来,使用flatMap操作符来创建一个新的流,该流将原始元素和该元素乘以3的结果合并在一起。最后,使用subscribe方法来订阅这个流并打印出每个元素的值。

这个例子展示了Reactor库中的一些常见操作符,包括mapfilterflatMap。通过这些操作符的链式调用,我们可以轻松地对数据流进行复杂的操作。在实际的应用中,我们可以根据具体的需求选择不同的操作符来实现所需的数据处理逻辑。

使用Reactor 库实现发布订阅者模式:

package com.example.jdk9.react;

import reactor.core.publisher.Flux;

public class PublisherSubscriberExample {
    public static void main(String[] args) {
        // 创建发布者
        Flux<Integer> publisher = Flux.just(1, 2, 3, 4, 5);

        // 订阅者1:打印每个元素
        publisher.subscribe(System.out::println);

        // 订阅者2:计算元素的总和并打印
        publisher.reduce(0, Integer::sum)
                .subscribe(total -> System.out.println("Sum = " + total));
    }
}
目录
相关文章
|
22天前
|
弹性计算 人工智能 架构师
阿里云携手Altair共拓云上工业仿真新机遇
2024年9月12日,「2024 Altair 技术大会杭州站」成功召开,阿里云弹性计算产品运营与生态负责人何川,与Altair中国技术总监赵阳在会上联合发布了最新的“云上CAE一体机”。
阿里云携手Altair共拓云上工业仿真新机遇
|
15天前
|
存储 关系型数据库 分布式数据库
GraphRAG:基于PolarDB+通义千问+LangChain的知识图谱+大模型最佳实践
本文介绍了如何使用PolarDB、通义千问和LangChain搭建GraphRAG系统,结合知识图谱和向量检索提升问答质量。通过实例展示了单独使用向量检索和图检索的局限性,并通过图+向量联合搜索增强了问答准确性。PolarDB支持AGE图引擎和pgvector插件,实现图数据和向量数据的统一存储与检索,提升了RAG系统的性能和效果。
|
19天前
|
机器学习/深度学习 算法 大数据
【BetterBench博士】2024 “华为杯”第二十一届中国研究生数学建模竞赛 选题分析
2024“华为杯”数学建模竞赛,对ABCDEF每个题进行详细的分析,涵盖风电场功率优化、WLAN网络吞吐量、磁性元件损耗建模、地理环境问题、高速公路应急车道启用和X射线脉冲星建模等多领域问题,解析了问题类型、专业和技能的需要。
2570 22
【BetterBench博士】2024 “华为杯”第二十一届中国研究生数学建模竞赛 选题分析
|
17天前
|
人工智能 IDE 程序员
期盼已久!通义灵码 AI 程序员开启邀测,全流程开发仅用几分钟
在云栖大会上,阿里云云原生应用平台负责人丁宇宣布,「通义灵码」完成全面升级,并正式发布 AI 程序员。
|
1天前
|
存储 人工智能 搜索推荐
数据治理,是时候打破刻板印象了
瓴羊智能数据建设与治理产品Datapin全面升级,可演进扩展的数据架构体系为企业数据治理预留发展空间,推出敏捷版用以解决企业数据量不大但需构建数据的场景问题,基于大模型打造的DataAgent更是为企业用好数据资产提供了便利。
152 2
|
19天前
|
机器学习/深度学习 算法 数据可视化
【BetterBench博士】2024年中国研究生数学建模竞赛 C题:数据驱动下磁性元件的磁芯损耗建模 问题分析、数学模型、python 代码
2024年中国研究生数学建模竞赛C题聚焦磁性元件磁芯损耗建模。题目背景介绍了电能变换技术的发展与应用,强调磁性元件在功率变换器中的重要性。磁芯损耗受多种因素影响,现有模型难以精确预测。题目要求通过数据分析建立高精度磁芯损耗模型。具体任务包括励磁波形分类、修正斯坦麦茨方程、分析影响因素、构建预测模型及优化设计条件。涉及数据预处理、特征提取、机器学习及优化算法等技术。适合电气、材料、计算机等多个专业学生参与。
1565 16
【BetterBench博士】2024年中国研究生数学建模竞赛 C题:数据驱动下磁性元件的磁芯损耗建模 问题分析、数学模型、python 代码
|
2天前
|
JSON 自然语言处理 数据管理
阿里云百炼产品月刊【2024年9月】
阿里云百炼产品月刊【2024年9月】,涵盖本月产品和功能发布、活动,应用实践等内容,帮助您快速了解阿里云百炼产品的最新动态。
阿里云百炼产品月刊【2024年9月】
|
21天前
|
编解码 JSON 自然语言处理
通义千问重磅开源Qwen2.5,性能超越Llama
击败Meta,阿里Qwen2.5再登全球开源大模型王座
920 14
|
16天前
|
人工智能 开发框架 Java
重磅发布!AI 驱动的 Java 开发框架:Spring AI Alibaba
随着生成式 AI 的快速发展,基于 AI 开发框架构建 AI 应用的诉求迅速增长,涌现出了包括 LangChain、LlamaIndex 等开发框架,但大部分框架只提供了 Python 语言的实现。但这些开发框架对于国内习惯了 Spring 开发范式的 Java 开发者而言,并非十分友好和丝滑。因此,我们基于 Spring AI 发布并快速演进 Spring AI Alibaba,通过提供一种方便的 API 抽象,帮助 Java 开发者简化 AI 应用的开发。同时,提供了完整的开源配套,包括可观测、网关、消息队列、配置中心等。
684 9
|
15天前
|
存储 监控 调度
云迁移中心CMH:助力企业高效上云实践全解析
随着云计算的发展,企业上云已成为创新发展的关键。然而,企业上云面临诸多挑战,如复杂的应用依赖梳理、成本效益分析等。阿里云推出的云迁移中心(CMH)旨在解决这些问题,提供自动化的系统调研、规划、迁移和割接等功能,简化上云过程。CMH通过评估、准备、迁移和割接四个阶段,帮助企业高效完成数字化转型。未来,CMH将继续提升智能化水平,支持更多行业和复杂环境,助力企业轻松上云。