java的反应式流

简介: 本文介绍了Java反应式流的概念和编程模型,通过使用Java自身的Flow API和Reactor库,展示了如何实现发布者-订阅者模式,并提供了实际的代码示例。

Java的反应式流是一种新的编程模型,它在异步和事件驱动的环境下工作。反应式流的目的是为了解决传统的单线程或者多线程编程模型在高并发和大流量情况下的性能瓶颈。

反应式流的核心是Observable和Observer,Observable表示一个数据流,而Observer则表示这个数据流的消费者。Observable在数据流上产生事件,而Observer则对这些事件进行响应。反应式流的数据流是一种推式的流,Observable发布事件时不需要等待Observer接收,Observable会把事件推送给Observer,而不是Observer去轮询Observable。

Java的反应式流通常基于Reactor或RxJava等库,这些库提供了丰富的函数式编程API和运算符,可以非常方便地处理异步事件。这些库都提供了类似于Observable和Observer的抽象概念,可以用来描述和处理异步数据流。同时还提供了常用的运算符,包括map、filter、reduce等,这些运算符可以方便地对数据流进行变换和过滤。

反应式流还有一个重要的概念是背压(backpressure),它是指在高并发和大流量情况下,消费者无法处理生产者产生的数据流,导致数据积压的情况。为了解决这个问题,反应式流引入了背压机制,生产者会在发送数据前先询问消费者的处理能力,如果消费者没有处理能力,生产者会等待一段时间或者缓存数据,等待消费者处理完数据后再继续发送。

反应式流已经被广泛应用于大规模的互联网应用中,包括机器学习、数据分析、网络爬虫等领域。它的优点在于处理高并发和大流量的数据流时,能够更加高效地利用系统资源,提高系统的性能和可扩展性。

总之,反应式流是Java编程中的一个重要概念,它可以帮助我们更好地处理异步和事件驱动的数据流,提高系统的性能和可扩展性。

不涉及任何库,就单纯用java的反应式流,完成发布订阅者模式:

package com.example.jdk9.react;

import java.util.concurrent.Flow.*;

public class PublisherSubscriberDemo {
   
    public static void main(String[] args) {
   
        SimplePublisher<String> publisher = new SimplePublisher<>();
        SimpleSubscriber<String> subscriber1 = new SimpleSubscriber<>();
        SimpleSubscriber<String> subscriber2 = new SimpleSubscriber<>();
        publisher.subscribe(subscriber1);
        publisher.subscribe(subscriber2);
        publisher.submit("hello");
        publisher.submit("world");
        publisher.close();
    }
}

class SimplePublisher<T> implements Publisher<T> {
   
    private Subscription subscription;

    @Override
    public void subscribe(Subscriber<? super T> subscriber) {
   
        subscriber.onSubscribe(new Subscription() {
   

            @Override
            public void request(long n) {
   

            }

            @Override
            public void cancel() {
   

                // nothing to do
            }
        });

        this.subscription = new Subscription() {
   
            private boolean cancelled = false;

            @Override
            public void request(long n) {
   
                // nothing to do
            }

            @Override
            public void cancel() {
   
                this.cancelled = true;
            }

            public boolean isCancelled() {
   
                return this.cancelled;
            }
        };

        subscriber.onSubscribe(this.subscription);
    }

    public void submit(T item) {
   
        subscriptionLimitedQueue.offer(item);
        subscription.request(1);
    }

    public void close() {
   
        while (!subscriptionLimitedQueue.isEmpty()) {
   
           subscriptionLimitedQueue.poll();
        }
        subscription.cancel();
    }

    private SubscriptionLimitedQueue<T> subscriptionLimitedQueue = new SubscriptionLimitedQueue<>(2);

    static class SubscriptionLimitedQueue<T> {
   
        private final int limit;
        private int size = 0;
        private Node<T> head;
        private Node<T> tail;

        public SubscriptionLimitedQueue(int limit) {
   
            this.limit = limit;
        }

        private static class Node<T> {
   
            final T item;
            Node<T> next;

            Node(T item, Node<T> next) {
   
                this.item = item;
                this.next = next;
            }
        }

        public void offer(T item) {
   
            Node<T> node = new Node<>(item, null);
            if (head == null) {
   
                head = node;
                tail = head;
            } else {
   
                tail.next = node;
                tail = tail.next;
            }
            size++;
            if (size > limit) {
   
                Node<T> newHead = head.next;
                head.next = null;
                head = newHead;
                size--;
            }
        }

        public boolean isEmpty() {
   
            return size == 0;
        }

        public T poll() {
   
            if (isEmpty()) {
   
                return null;
            }
            T item = head.item;
            Node<T> newHead = head.next;
            head.next = null;
            head = newHead;
            size--;
            return item;
        }
    }
}

class SimpleSubscriber<T> implements Subscriber<T> {
   
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
   
        this.subscription = subscription;
        System.out.println("订阅成功");
        subscription.request(1);
    }

    @Override
    public void onNext(T item) {
   
        System.out.println("Received item: " + item);
        subscription.request(1);
    }

    @Override
    public void onError(Throwable throwable) {
   
        throwable.printStackTrace();
    }

    @Override
    public void onComplete() {
   
        System.out.println("Done");
    }
}

这段代码演示了使用Flow API来发布和订阅消息的过程,它包含以下类和接口:

  1. Publisher:发布者接口,表示能够发布指定类型的消息给订阅者。
  2. Subscriber:订阅者接口,表示能够接收指定类型的消息。
  3. Subscription:订阅接口,表示订阅关系,能够请求一定数量的消息和取消订阅。
  4. SubmissionPublisher:继承自Publisher接口,实现了异步发布消息的能力。
  5. Flow API:一组用于处理数据流和异步操作的接口和类。

具体解释:

  1. SimplePublisher类是一个实现了Publisher接口的简单发布者类,它能够发布指定类型的消息给订阅者。它内部维护了一个SubscriptionLimitedQueue类的对象,用于限制消息队列的长度。
  2. SubscriptionLimitedQueue类是一个维护队列长度的类,用于实现限制消息队列长度的功能。
  3. SimpleSubscriber类是一个实现了Subscriber接口的简单订阅者类,它能够接收指定类型的消息,并将其输出到控制台中。
  4. main方法创建了一个SimplePublisher类的实例和一个SimpleSubscriber类的实例,然后将它们关联起来,最后向SimplePublisher类的实例中发布了两个消息,随后关闭了发布者。

运行结果:

例子:

第一步,引入依赖:

<dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
            <version>3.5.11</version>
        </dependency>

第二步,编写代码:

package com.example.jdk9.react;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class ReactiveStreamExample {
    public static void main(String[] args) {
        Flux<Integer> stream = Flux.range(1, 10);

        stream
            .map(i -> i * 2)
            .filter(i -> i % 3 == 0)
            .flatMap(i -> Mono.just(i).zipWith(Mono.just(i * 3)))
            .subscribe(System.out::println);
    }
}

上面的代码首先创建了一个从1到10的数字列表,然后通过map操作符将每个元素乘以2,再使用filter操作符过滤掉不能被3整除的元素。接下来,使用flatMap操作符来创建一个新的流,该流将原始元素和该元素乘以3的结果合并在一起。最后,使用subscribe方法来订阅这个流并打印出每个元素的值。

这个例子展示了Reactor库中的一些常见操作符,包括mapfilterflatMap。通过这些操作符的链式调用,我们可以轻松地对数据流进行复杂的操作。在实际的应用中,我们可以根据具体的需求选择不同的操作符来实现所需的数据处理逻辑。

使用Reactor 库实现发布订阅者模式:

package com.example.jdk9.react;

import reactor.core.publisher.Flux;

public class PublisherSubscriberExample {
    public static void main(String[] args) {
        // 创建发布者
        Flux<Integer> publisher = Flux.just(1, 2, 3, 4, 5);

        // 订阅者1:打印每个元素
        publisher.subscribe(System.out::println);

        // 订阅者2:计算元素的总和并打印
        publisher.reduce(0, Integer::sum)
                .subscribe(total -> System.out.println("Sum = " + total));
    }
}
目录
相关文章
|
5月前
|
存储 分布式计算 Java
Java8实战-引入流(Stream)
Java8实战-引入流(Stream)
31 0
|
6月前
|
Java 数据处理
|
6月前
|
Java 数据处理
JAVA流概述
JAVA流概述
39 1
|
SQL 自然语言处理 分布式计算
Java8 stream流特性总结(超详细)
Java8 stream流特性总结(超详细)
219 0
|
存储 Java BI
Java流Strem 2
Java流Strem
47 0
java学会这些,我就入门啦!(基础篇五)流与IO流
java学会这些,我就入门啦!(基础篇五)流与IO流
|
Java
Java I/O流操作基础
Java I/O流操作基础
125 0
|
Java
Java I/O 流 案例
Java I/O 流 案例
65 0
|
SQL 前端开发 Java
下一篇
无影云桌面