Reactor中的Thread和Scheduler

简介: Reactor中的Thread和Scheduler

目录



简介



今天我们要介绍的是Reactor中的多线程模型和定时器模型,Reactor之前我们已经介绍过了,它实际上是观察者模式的延伸。


所以从本质上来说,Reactor是和多线程无关的。你可以把它用在多线程或者不用在多线程。


今天将会给大家介绍一下如何在Reactor中使用多线程和定时器模型。


Thread多线程



先看一下之前举的Flux的创建的例子:


Flux<String> flux = Flux.generate(
                () -> 0,
                (state, sink) -> {
                    sink.next("3 x " + state + " = " + 3*state);
                    if (state == 10) sink.complete();
                    return state + 1;
                });
        flux.subscribe(System.out::println);


可以看到,不管是Flux generator还是subscriber,他们实际上都是运行在同一个线程中的。


如果我们想让subscribe发生在一个新的线程中,我们需要新启动一个线程,然后在线程内部进行subscribe操作。


Mono<String> mono = Mono.just("hello ");
        Thread t = new Thread(() -> mono
                .map(msg -> msg + "thread ")
                .subscribe(v ->
                        System.out.println(v + Thread.currentThread().getName())
                )
        );
        t.start();
        t.join();


上面的例子中,Mono在主线程中创建,而subscribe发生在新启动的Thread中。


Schedule定时器



很多情况下,我们的publisher是需要定时去调用一些方法,来产生元素的。Reactor提供了一个新的Schedule类来负责定时任务的生成和管理。


Scheduler是一个接口:


public interface Scheduler extends Disposable


它定义了一些定时器中必须要实现的方法:


比如立即执行的:


Disposable schedule(Runnable task);


延时执行的:


default Disposable schedule(Runnable task, long delay, TimeUnit unit)


和定期执行的:


default Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit)


Schedule有一个工具类叫做Schedules,它提供了多个创建Scheduler的方法,它的本质就是对ExecutorService和ScheduledExecutorService进行封装,将其做为Supplier来创建Schedule。


简单点看Schedule就是对ExecutorService的封装。


Schedulers工具类



Schedulers工具类提供了很多个有用的工具类,我们来详细介绍一下:


Schedulers.immediate():


提交的Runnable将会立马在当前线程执行。


Schedulers.single():


使用同一个线程来执行所有的任务。


Schedulers.boundedElastic():


创建一个可重用的线程池,如果线程池中的线程在长时间内都没有被使用,那么将会被回收。boundedElastic会有一个最大的线程个数,一般来说是CPU cores x 10。 如果目前没有可用的worker线程,提交的任务将会被放入队列等待。


Schedulers.parallel():


创建固定个数的工作线程,个数和CPU的核数相关。


Schedulers.fromExecutorService(ExecutorService):


从一个现有的线程池创建Scheduler。


Schedulers.newXXX:


Schedulers提供了很多new开头的方法,来创建各种各样的Scheduler。


我们看一个Schedulers的具体应用,我们可以指定特定的Scheduler来产生元素:


Flux.interval(Duration.ofMillis(300), Schedulers.newSingle("test"))


publishOn 和 subscribeOn


publishOn和subscribeOn主要用来进行切换Scheduler的执行上下文。


先讲一个结论,就是在链式调用中,publishOn可以切换Scheduler,但是subscribeOn并不会起作用。


这是因为真正的publish-subscribe关系只有在subscriber开始subscribe的时候才建立。

下面我们来具体看一下这两个方法的使用情况:


publishOn



publishOn可以在链式调用的过程中,进行publish的切换:


@Test
    public void usePublishOn() throws InterruptedException {
        Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);
        final Flux<String> flux = Flux
                .range(1, 2)
                .map(i -> 10 + i + ":"+ Thread.currentThread())
                .publishOn(s)
                .map(i -> "value " + i+":"+ Thread.currentThread());
        new Thread(() -> flux.subscribe(System.out::println),"ThreadA").start();
        System.out.println(Thread.currentThread());
        Thread.sleep(5000);
    }


上面我们创建了一个名字为parallel-scheduler的scheduler。


然后创建了一个Flux,Flux先做了一个map操作,然后切换执行上下文到parallel-scheduler,最后右执行了一次map操作。


最后,我们采用一个新的线程来进行subscribe的输出。


先看下输出结果:


Thread[main,5,main]
value 11:Thread[ThreadA,5,main]:Thread[parallel-scheduler-1,5,main]
value 12:Thread[ThreadA,5,main]:Thread[parallel-scheduler-1,5,main]


可以看到,主线程的名字是Thread。Subscriber线程的名字是ThreadA。


那么在publishOn之前,map使用的线程就是ThreadA。 而在publishOn之后,map使用的线程就切换到了parallel-scheduler线程池。


subscribeOn



subscribeOn是用来切换Subscriber的执行上下文,不管subscribeOn出现在调用链的哪个部分,最终都会应用到整个调用链上。


我们看一个例子:


@Test
    public void useSubscribeOn() throws InterruptedException {
        Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);
        final Flux<String> flux = Flux
                .range(1, 2)
                .map(i -> 10 + i + ":" + Thread.currentThread())
                .subscribeOn(s)
                .map(i -> "value " + i + ":"+ Thread.currentThread());
        new Thread(() -> flux.subscribe(System.out::println), "ThreadA").start();
        Thread.sleep(5000);
    }


同样的,上面的例子中,我们使用了两个map,然后在两个map中使用了一个subscribeOn用来切换subscribe执行上下文。


看下输出结果:


value 11:Thread[parallel-scheduler-1,5,main]:Thread[parallel-scheduler-1,5,main]
value 12:Thread[parallel-scheduler-1,5,main]:Thread[parallel-scheduler-1,5,main]


可以看到,不管哪个map,都是用的是切换过的parallel-scheduler。


本文的例子learn-reactive

相关文章
|
4月前
|
资源调度
在SchedulerX中,你可以使用`schedulerx.output()`函数来向Worker报告运行结果
【1月更文挑战第7天】【1月更文挑战第35篇】在SchedulerX中,你可以使用`schedulerx.output()`函数来向Worker报告运行结果
20 1
|
5月前
|
存储 JavaScript 前端开发
RxJS中的调度器(Scheduler)机制
RxJS中的调度器(Scheduler)机制
53 0
|
5月前
|
资源调度 分布式计算 算法
Gang Scheduling
Gang Scheduling(Coscheduling)、FIFO Scheduling、Capacity Scheduling、Fair sharing、Binpack/Spread等是云计算和分布式系统中的任务调度算法,用于在资源有限的情况下,公平、高效地分配任务和资源。下面是这些调度算法的基本介绍和如何在实际应用中使用它们的一些建议:
99 2
|
7月前
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor
25 0
|
8月前
|
Java 索引
并发编程——ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor是ThreadPoolExecutor的一个子类,在线程池的基础上实现了延迟执行任务以及周期性执行任务的功能。
31 0
|
10月前
|
SQL 人工智能 缓存
|
Java 调度
ScheduledExecutorService使用介绍
JUC包(java.util.concurrent)中提供了对定时任务的支持,即ScheduledExecutorService接口。 本文对ScheduledExecutorService的介绍,将基于Timer类使用介绍进行,因此请先阅读Timer类使用介绍文章。
987 1
|
Java 调度
高并发之——ScheduledThreadPoolExecutor与Timer的区别和简单示例
DK 1.5开始提供ScheduledThreadPoolExecutor类,ScheduledThreadPoolExecutor类继承ThreadPoolExecutor类重用线程池实现了任务的周期性调度功能。在JDK 1.5之前,实现任务的周期性调度主要使用的是Timer类和TimerTask类。本文,就简单介绍下ScheduledThreadPoolExecutor类与Timer类的区别,ScheduledThreadPoolExecutor类相比于Timer类来说,究竟有哪些优势,以及二者分别实现任务调度的简单示例。
172 0
|
Java 调度
【高并发】ScheduledThreadPoolExecutor与Timer的区别和简单示例
JDK 1.5开始提供ScheduledThreadPoolExecutor类,ScheduledThreadPoolExecutor类继承ThreadPoolExecutor类重用线程池实现了任务的周期性调度功能。在JDK 1.5之前,实现任务的周期性调度主要使用的是Timer类和TimerTask类。本文,就简单介绍下ScheduledThreadPoolExecutor类与Timer类的区别,ScheduledThreadPoolExecutor类相比于Timer类来说,究竟有哪些优势,以及二者分别实现任务调度的简单示例。
248 0
【高并发】ScheduledThreadPoolExecutor与Timer的区别和简单示例