谷粒商城笔记+踩坑(14)——异步和线程池

简介: 初始化线程的4种方式、线程池详解、异步编排 CompletableFuture

 导航:

谷粒商城笔记+踩坑汇总篇

Java笔记汇总:

【Java笔记+踩坑汇总】Java基础+JavaWeb+SSM+SpringBoot+SpringCloud+瑞吉外卖/谷粒商城/学成在线+设计模式+面试题汇总+性能调优/架构设计+源码解析-CSDN博客

目录

1. 初始化线程的4种方式

1.1 继承 Thread类,重写run()方法

1.2 实现 Runnable 接口,重写run()方法

1.3 实现 Callable 接口 , FutureTask (可以拿到返回结果, 可以处理异常)

1.4 创建线程池直接提交任务(推荐),

1.5 四种创建线程方法的区别

2. 线程池详解

2.1 创建线程池方法1:执行器工具类创建线程池

2.1.1 执行器工具类 Executors创建线程池

2.1.2 线程池execute和submit区别(向线程池提交任务)

2.1.3 执行器工具类的4种线程池

2.2 创建线程池方法2(推荐):创建自定义线程池

2.2.1 线程池执行器ThreadPoolExecutor创建自定义线程池

2.2.2 构造方法的七个参数

2.2.3 线程池执行任务流程

2.3 使用线程池的好处

3. 异步编排 CompletableFuture

3.1 简介

3.2 创建异步对象,runAsync|supplyAsync

3.2.1 创建异步对象的方法

3.2.2 runAsync,不带线程返回值

3.2.3 supplyAsync 带线程返回值

3.3 线程结果感知和处理,使用whenCompleteAsync与exceptionally

3.3.1 简介

3.3.2 感知结果和异常但不处理,whenCompleteAsync

3.3.3 感知结果和异常并处理异常,whenCompleteAsync和exceptionally

3.4 线程结果感知和处理(推荐), handle 方法

3.5 线程串行化方法

3.5.0 简介

3.5.1 thenRunAsync

3.5.2 thenAcceptAsync

3.5.3 thenApplyAsync

3.6 两任务组合 - 都要完成

3.6.0 概述

3.6.1 runAfterBothAsync,不获取结果并处理新任务

3.6.2 thenAcceptBothAsync,获取结果并处理新任务

3.6.3 thenCombineAsync,,获取结果并获得新任务结果

3.7 两个任务 - 一个完成

3.7.0 概述

3.7.1 runAfterEitherAsync,不获取结果, 新任务无返回值。

3.7.2 acceptEitherAsync,获取结果, 新任务无返回值。

3.7.3 applyToEitherAsync,获取结果, 新任务有返回值。

3.8 多任务组合

3.8.0 概述

3.8.1 allOf,等待所有任务完成

3.8.2 anyOf,只要有一个任务完成


1. 初始化线程的4种方式

1.1 继承 Thread类,重写run()方法

package site.xxx.gilimall.search.thread;
public class ThreadTest {
    public static void main(String[] args) {
        System.out.println("main......start...");
        Thread01 thread01 = new Thread01();
        thread01.start();
        System.out.println("main......end...");
    }
//继承 Thread类,重写run()方法
    public static class Thread01 extends Thread{
        @Override
        public void run() {
            System.out.println("当前线程:"+Thread.currentThread().getName());
            Integer i=10/2;
            System.out.println("运行结果:"+i);
        }
    }
}

image.gif

运行结果

image.gif 编辑

1.2 实现 Runnable 接口,重写run()方法

package site.xxx.gilimall.search.thread;
public class ThreadTest {
    public static void main(String[] args) {
        Thread02 thread02 = new Thread02();
        new Thread(thread02).start();
        System.out.println("main......end...");
    }
//实现 Runnable 接口,重写run()方法
    public static class Thread02 implements Runnable{
        @Override
        public void run() {
            System.out.println("当前线程:"+Thread.currentThread().getName());
            Integer i=12/2;
            System.out.println("运行结果:"+i);
        }
    }
}

image.gif

image.gif 编辑

1.3 实现 Callable 接口 , FutureTask (可以拿到返回结果, 可以处理异常)

package site.xxx.gilimall.search.thread;
public class ThreadTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("main......start...");
        //可以拿到返回结果, 可以处理异常
        FutureTask futureTask = new FutureTask<>(new Thread03());
        new Thread(futureTask).start();
        Integer i = (Integer) futureTask.get();
        System.out.println("main......end..."+i);
    }
//实现 Callable 接口
    public static class Thread03 implements Callable {
        @Override
        public Object call() throws Exception {
            System.out.println("当前线程:"+Thread.currentThread().getName());
            Integer i=14/2;
            System.out.println("运行结果:"+i);
            return i;
        }
    }
}

image.gif

image.gif 编辑

注意

实现callable的方法可以拿到返回值

FutureTask继承了Runnable

image.gif 编辑

1.4 创建线程池直接提交任务(推荐),

我们以后再业务代码里面,前三种启动线程的方式都不用。将所有的多线程异步任务都交给线程池执行。

创建一个固定类型的线程池

ExecutorService executorService = Executors.newFixedThreadPool(10);
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println("当前线程"+Thread.currentThread());
            }
        });

image.gif

1.5 四种创建线程方法的区别

区别:

  • 1、2不能得到返回值。3可以获取返回值
  • 1、2、3都不能控制资源
  • 4可以控制资源,性能稳定,不会一下子所有线程一起运行

总结:

1、实际开发中,只用线程池【高并发状态开启了n个线程,会耗尽资源】

2、当前系统中线程池只有一两个,每个异步任务提交给线程池让他自己去执行

2. 线程池详解

2.1 创建线程池方法1:执行器工具类创建线程池

2.1.1 执行器工具类 Executors创建线程池

Executors译为执行器、线程池。

ExecutorService executorService = Executors.newFixedThreadPool(10);
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println("当前线程"+Thread.currentThread());
            }
        });

image.gif

2.1.2 线程池execute和submit区别(向线程池提交任务)

execute:参数只能是Runnable,没有返回值

submit:参数可以是Runnable、Callable,返回值是FutureTask

注意:声明的线程池必须是全局的不然起不到效果

2.1.3 执行器工具类的4种线程池

1、newCachedThreadPool:缓存线程池。核心线程数是0,如果空闲会回收所有线程

创建一个可缓存线程池, 如果线程池长度超过处理需要, 可灵活回收空闲线程, 若无可回收, 则新建线程。

2、newFixedThreadPool:固定大小的线程池。核心线程数 = 最大线程数,【不回收】

创建一个定长线程池, 可控制线程最大并发数, 超出的线程会在队列中等待。

3、newScheduledThreadPool:定时任务线程池。多久之后执行【可提交核心线程数,最大线程数是Integer.Max】

创建一个定长线程池, 支持定时及周期性任务执行。

4、newSingleThreadPool:单线程化的线程池。核心与最大都只有一个【不回收】,后台从队列中获取任务

创建一个单线程化的线程池, 它只会用唯一的工作线程来执行任务, 保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

2.2 创建线程池方法2(推荐):创建自定义线程池

2.2.1 线程池执行器ThreadPoolExecutor创建自定义线程池

ThreadPoolExecutor executor = new ThreadPoolExecutor( 
           5,    //核心线程数
                 200,    //最大线程数量,控制资源并发
                 10,    //存活时间
                TimeUnit.SECONDS,    //时间单位
                new LinkedBlockingDeque<>(  100000),    //任务队列,大小100000个
        Executors.defaultThreadFactory(),    //线程的创建工厂
        new ThreadPoolExecutor.AbortPolicy());    //拒绝策略
        // 任务1
        executor.execute(() -> {
            try {
                Thread.sleep(3 * 1000);
                System.out.println("--helloWorld_001--" + Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

image.gif

2.2.2 构造方法的七个参数

new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit unit,workQueue, threadFactory, handler);

image.gif

  • corePoolSize:核心线程数。创建以后,会一直存活到线程池销毁,空闲时也不销毁。
  • maximumPoolSize:最大线程数量。阻塞队列满了
  • keepAliveTime: 存活时间。释放空闲时间超过“存活时间”的线程,仅留核心线程数量的线程。
  • TimeUnitunit:时间单位
  • workQueue: 任务队列。如果线程数超过核心数量,就把剩余的任务放到队列里。只要有线程空闲,就会去队列取出新的任务执行。new LinkedBlockingDeque()队列大小默认是Integer的最大值,内存不够,所以建议指定队列大小。
  • SynchronousQueue是一个同步队列,它不会缓存任务,而是直接将任务传递给线程处理。当队列中已经有一个元素时,再有其他元素插入到队列中就会被阻塞,直到其他线程接收任务为止。
  • LinkedBlockingQueue是一个无界队列,可以缓存无限多的任务。由于其无界特性,因此需要合理地处理好任务的生产速率和线程池中线程的数量,以避免内存溢出等异常问题。
  • ArrayBlockingQueue是一个有界(容量固定)队列,只能缓存固定数量的任务。通过固定队列容量,可以避免任务过多导致线程阻塞,保证线程池资源的可控性和稳定性。
  • PriorityBlockingQueue是一个优先级队列,能够对任务按照优先级进行排序,当任务数量超过队列容量时,会根据元素的Comparable或Comparator排序规则进行丢弃或抛异常。
  • threadFactory:线程的创建工厂【可以自定义】。默认的线程工厂Executors.defaultThreadFactory(),
  • RejectedExecutionHandler handler:拒绝策略。如果任务队列和最大线程数量满了,按照指定的拒绝策略执行任务。
  • Rejected:丢弃最老的
  • Caller:调用者同步调用,直接调用run方法,不创建线程了
  • Abort (默认):直接丢弃新任务
  • Discard:丢弃新任务,并且抛出异常

2.2.3 线程池执行任务流程(线程池原理)

任务加入时判断的顺序:核心线程数 、阻塞队列、最大线程数、拒绝策略。

线程池执原理:

  1. 新加入任务,判断corePoolSize是否到最大值;如果没到最大值就创建核心线程执行新任务,如果到最大值就判断是否有空闲的核心线程;
  2. 如果有空闲的核心线程,则空闲核心线程执行新任务,如果没空闲的核心线程,则尝试加入FIFO阻塞队列;
  3. 若加入成功,则等待空闲核心线程将队头任务取出并执行,若加入失败(例如队列满了),则判断maximumPoolSize是否到最大值;
  4. 如果没到最大值就创建非核心线程执行新任务,如果到了最大值就执行丢弃策略,默认丢弃新任务;
  5. 线程数大于corePoolSize时,空闲线程将在keepAliveTime后回收,直到线程数等于核心线程数。这些核心线程也不会被回收。

实际上线程本身没有核心和非核心的概念,都是靠比较corePoolSize和当前线程数判断一个线程是不是能看作核心线程。

可能某个线程之前被看作是核心线程,等它空闲了,线程池又有corePoolSize个线程在执行任务,这个线程到keepAliveTime后还是会被回收。

练习:

一个线程池 core 7; max 20 , queue: 50, 100 并发进来怎么分配的;

先有 7 个能直接得到执行, 接下来 50 个进入队列排队, 在多开 13 个继续执行。 现在 70 个任务已经被安排上了,剩下 30 个默认拒绝策略。

拒绝策略

1、丢弃最老的 Rejected

2、调用者同步调用,直接调用run方法,不创建线程了 Caller

3、直接丢弃新任务 Abort 【默认使用这个】

4、丢弃新任务,并且抛出异常 Discard

image.gif 编辑

2.3 使用线程池的好处

1、降低资源的消耗【减少创建销毁线程的开销】

通过重复利用已经创建好的线程降低线程的创建和销毁带来的损耗

2、提高响应速度【控制线程个数】

因为线程池中的线程数没有超过线程池的最大上限时,有的线程处于等待分配任务的状态,当任务来时无需创建新的线程就能执行

3、提高线程的可管理性【例如系统中可以创建两个线程池,核心线程池、非核心线程池【短信等】,关闭非核心线程池释放内存资源

线程池会根据当前系统特点对池内的线程进行优化处理,减少创建和销毁线程带来的系统开销。无限的创建和销毁线程不仅消耗系统资源,还降低系统的稳定性,使用线程池进行统一分配

3. 异步编排 CompletableFuture

3.1 简介

Completable译为“可完备化的” 。

CompletableFuture提供了非常强大的Future接口的扩展功能可以简化异步编程

提供了函数式编程的能力, 可以通过回调的方式处理计算结果, 并且提供了转换和组合 CompletableFuture 的方法。CompletableFuture 类实现了 Future 接口, 所以你还是可以像以前一样通过get方法阻塞或者轮询的方式获得结果, 但是这种方式不推荐使用。

CompletableFuture 和 FutureTask ( 构造参数为Callable实现类)同属于 Future 接口的实现类, 都可以获取线程的执行结果。

image.gif 编辑

示例,使用异步可以缩短响应时间:

image.gif 编辑

查询商品详情页的逻辑比较复杂,有些数据还需要远程调用,必然需要花费更多的时间。

// 1.获取sku的基本信息0.5s
// 2.获取sku的图片信息0.5s
// 3.获取sku的促销信息1s
// 4.获取spu的所有销售属性 1s
// 5.获取规格参数组及组下的规格参数1.5s
// 6.spu详情1s

image.gif

假如商品详情页的每个查询,需要如下标注的时间才能完成那么,用户需要 6.5s 后才能看到商品详情页的内容。很显然是不能接受的。如果有多个线程同时完成这 6步操作,只需要 1.5s 即可完成响应。

3.2 创建异步对象,runAsync|supplyAsync

3.2.1 创建异步对象的方法

CompletableFuture 提供了四个静态方法来创建一个异步操作:

public static CompletableFuture<Void> runAsync(Runnable runnable)
//无线程返回值,指定线程池
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
//有线程返回值,指定线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)

image.gif

1、 runAsync 都是没有线程返回结果的, supplyAsync 都是可以获取线程返回结果的

2、 可以传入自定义的线程池, 否则就用默认的线程池;

3、Async代表异步方法

3.2.2 runAsync,不带线程返回值

public class ThreadTest {
    //        ExecutorService executorService = Executors.newFixedThreadPool(10);
    public static ThreadPoolExecutor executor = new ThreadPoolExecutor(  5,
            200,
            10,
            TimeUnit.SECONDS,
            new LinkedBlockingDeque<>(  100000),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy());
    public static void main(String[] args) {
        CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {
            System.out.println("当前线程:"+Thread.currentThread().getName());
            int i = 10 / 2;
            System.out.println("运行结果...."+i);
        }, executor);
    }
}

image.gif

image.gif 编辑

3.2.3 supplyAsync 带线程返回值

public class ThreadTest {
    //        ExecutorService executorService = Executors.newFixedThreadPool(10);
    public static ThreadPoolExecutor executor = new ThreadPoolExecutor(  5,
            200,
            10,
            TimeUnit.SECONDS,
            new LinkedBlockingDeque<>(  100000),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy());
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getName());
            int i = 12 / 2;
            System.out.println("运行结果...." + i);
            return i;    //有返回值
        }, executor);
        Integer integer = supplyAsync.get();
        System.out.println("返回数据:"+integer);
    }
}

image.gif

image.gif 编辑

3.3 线程结果感知和处理,使用whenCompleteAsync与exceptionally

3.3.1 简介

public CompletableFuture whencomplete(BiConsumer<? super T,? super Throwable> action);
public CompletableFuture whenCompleteAsync(BiConsumer <? super T,? super Throwable> action);
public CompletableFuture whenCompleteAsync(BiConsumer<? super T,? super Throwable> action,Executor executor);
public CompletableFuture exceptionally(Function<Throwable,? extends T> fn);

image.gif

  • whenComplete可以处理正常和异常的计算结果,exceptionally处理异常情况。

whenComplete 和 whenCompleteAsync 的区别:

whenComplete: 是执行当前任务的线程执行继续执行 whenComplete 的任务。

whenCompleteAsync: 是执行把 whenCompleteAsync 这个任务继续提交给线程池

来进行执行。

  • 方法不以 Async 结尾, 意味着 Action 使用相同的线程执行, 而 Async 可能会使用其他线程执行(如果是使用相同的线程池, 也可能会被同一个线程选中执行)

3.3.2 感知结果和异常但不处理,whenCompleteAsync

public class ThreadTest {
    //        ExecutorService executorService = Executors.newFixedThreadPool(10);
    public static ThreadPoolExecutor executor = new ThreadPoolExecutor(  5,
            200,
            10,
            TimeUnit.SECONDS,
            new LinkedBlockingDeque<>(  100000),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy());
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getName());
            int i = 12 / 2;
            System.out.println("运行结果...." + i);
            return i;
        }, executor).whenCompleteAsync((res, exception) -> {    //第一个参数是结果,第二个参数是异常
            System.out.println("异步任务完成....感知到返回值为:"+res+"异常:"+exception);
        },executor);
        Integer integer = supplyAsync.get();
        System.out.println("返回数据:"+integer);
    }
}

image.gif

image.gif 编辑

模拟异常情况,将线程里计算改成“12/0”

public class ThreadTest {
    //        ExecutorService executorService = Executors.newFixedThreadPool(10);
    public static ThreadPoolExecutor executor = new ThreadPoolExecutor(  5,
            200,
            10,
            TimeUnit.SECONDS,
            new LinkedBlockingDeque<>(  100000),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy());
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getName());
            int i = 12 / 0;
            System.out.println("运行结果...." + i);
            return i;
        }, executor).whenCompleteAsync((res, exception) -> {
            System.out.println("异步任务完成....感知到返回值为:"+res+"异常:"+exception);
        },executor);
        Integer integer = supplyAsync.get();
        System.out.println("返回数据:"+integer);
    }
}
image.gif

image.gif 编辑

此处虽然得到了异常信息但是没有办法修改返回数据,使用exceptionally自定义异常时的返回值

3.3.3 感知结果和异常并处理异常,whenCompleteAsync和exceptionally

异常情况处理

public class ThreadTest {
    //        ExecutorService executorService = Executors.newFixedThreadPool(10);
    public static ThreadPoolExecutor executor = new ThreadPoolExecutor(  5,
            200,
            10,
            TimeUnit.SECONDS,
            new LinkedBlockingDeque<>(  100000),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy());
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getName());
            int i = 12 / 0;
            System.out.println("运行结果...." + i);
            return i;
//虽然能得到异常信息,但无法修改返回结果
        }, executor).whenCompleteAsync((res, exception) -> {
            System.out.println("异步任务完成....感知到返回值为:"+res+"异常:"+exception);
//可以感知异常,并返回自定义默认值
        },executor).exceptionally(throwable -> {
            return 0;
        });
        Integer integer = supplyAsync.get();
        System.out.println("返回数据:"+integer);
    }
}

image.gif

image.gif 编辑

无异常情况,正常返回不会进exceptionally,也就不会处理异常:

public class ThreadTest {
    //        ExecutorService executorService = Executors.newFixedThreadPool(10);
    public static ThreadPoolExecutor executor = new ThreadPoolExecutor(  5,
            200,
            10,
            TimeUnit.SECONDS,
            new LinkedBlockingDeque<>(  100000),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy());
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getName());
            int i = 12 / 2;
            System.out.println("运行结果...." + i);
            return i;
        }, executor).whenCompleteAsync((res, exception) -> {
            System.out.println("异步任务完成....感知到返回值为:"+res+"异常:"+exception);
        },executor).exceptionally(throwable -> {
            return 0;
        });
        Integer integer = supplyAsync.get();
        System.out.println("返回数据:"+integer);
    }
}
image.gif

image.gif 编辑

3.4 线程结果感知和处理(推荐), handle 方法

和 complete 一样, 可对结果做最后的处理(可处理异常),可改变返回值。

总结:使用R apply(T t, U u); 可以感知异常,和修改返回值的功能。

有异常情况

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getName());
            int i = 12 / 0;
            System.out.println("运行结果...." + i);
            return i;
        //处理方法执行结果
        }, executor).handleAsync((res, throwable) -> {
            if (res!=null){
                return res*2;
            }
            if (throwable!=null){
                System.out.println("出现异常"+throwable.getMessage());
                return -1;
            }
            return 0;
        },executor);
        Integer integer = supplyAsync.get();
        System.out.println("返回数据:"+integer);
    }

image.gif

image.gif 编辑

无异常情况

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getName());
            int i = 12 / 6;
            System.out.println("运行结果...." + i);
            return i;
        }, executor).handleAsync((res, throwable) -> {
            if (res!=null){
                return res*2;
            }
            if (throwable!=null){
                System.out.println("出现异常"+throwable.getMessage());
                return -1;
            }
            return 0;
        },executor);
        Integer integer = supplyAsync.get();
        System.out.println("返回数据:"+integer);
    }

image.gif

image.gif 编辑

总结:

一般用handle,因为whencomplete如果异常不能给定默认返回结果,需要再调用exceptionally,而handle可以

该方法作用:获得前一任务的返回值【自己也可以是异步执行的】,也可以处理上一任务的异常,调用exceptionally修改前一任务的返回值【例如异常情况时给一个默认返回值】而handle方法可以简化操作

3.5 线程串行化方法

3.5.0 简介

image.gif 编辑

  • thenRun:继续执行,不接受上一个任务的返回结果,自己执行完没有返回结果
  • thenAccept:继续执行,接受上一个任务的返回结果,自己执行完没有返回结果
  • thenApply:继续执行,接受上一任务的返回结果,并且自己的返回结果也被下一个任务所感知
  • 以上都要前置任务成功完成。
    Function<? super T,? extends U>
    T: 上一个任务返回结果的类型
    U: 当前任务的返回值类型

3.5.1 thenRunAsync

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Void>  runAsync= CompletableFuture.runAsync(() -> {
            System.out.println("当前线程:"+Thread.currentThread().getName());
            int i = 10 / 2;
            System.out.println("运行结果...."+i);
        }, executor).thenRunAsync(() -> {
            System.out.println("任务二启动了...");
        },executor);
        System.out.println("返回数据:");
    }

image.gif

image.gif 编辑

3.5.2 thenAcceptAsync

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Void>  supplyAsync= CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程:"+Thread.currentThread().getName());
            int i = 10 / 2;
            System.out.println("运行结果...."+i);
            return i;
        }, executor).thenAcceptAsync(res -> {
            System.out.println("任务二启动了..."+"拿到了上一步的结果:"+res);
        },executor);
        System.out.println("返回数据:");
    }

image.gif

image.gif 编辑

3.5.3 thenApplyAsync

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getName());
            int i = 10 / 2;
            System.out.println("运行结果...." + i);
            return i;
        }, executor).thenApplyAsync(res -> {
            System.out.println("任务二启动了..." + "拿到了上一步的结果:" + res);
            return res*2;
        }, executor);
        Integer integer = future.get();
        System.out.println("返回数据:"+integer);
    }

image.gif

image.gif 编辑

3.6 两任务组合 - 都要完成

3.6.0 概述

runAfterBoth:组合两个future,不需要获取之前任务future的结果,只需两个future处理完任务后,处理该任务。

thenAcceptBoth:组合两个future,获取前两个future任务的返回结果,然后处理任务,没有返回值。

thenCombine:组合两个future,获取前两个future的返回结果,并返回当前任务的返回值

public <U,V> CompletableFuture thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletableFuture thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletableFuture thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor);
public CompletableFuture thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action);
public CompletableFuture thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action);
public CompletableFuture thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor);
public CompletableFuture runAfterBoth(CompletionStage<?> other, Runnable action);
public CompletableFuture runAfterBothAsync(CompletionStage<?> other, Runnable action);
public CompletableFuture runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor);
image.gif

3.6.1 runAfterBothAsync,不获取结果并处理新任务

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务一线程开始:" + Thread.currentThread().getName());
            int i = 12 / 2;
            System.out.println("任务一运行结束...." + i);
            return i;
        }, executor);
        CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务二线程开始:" + Thread.currentThread().getName());
            System.out.println("任务二运行结束....");
            return "hello";
        }, executor);
        future01.runAfterBothAsync(future02,() -> {
            System.out.println("任务三开始...");
        });
        System.out.println("返回数据:");
    }

image.gif

image.gif 编辑

3.6.2 thenAcceptBothAsync,获取结果并处理新任务

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务一线程开始:" + Thread.currentThread().getName());
            int i = 12 / 2;
            System.out.println("任务一运行结束...." + i);
            return i;
        }, executor);
        CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务二线程开始:" + Thread.currentThread().getName());
            System.out.println("任务二运行结束....");
            return "hello";
        }, executor);
        future01.thenAcceptBothAsync(future02,(res1, res2) -> {
            System.out.println("任务一返回值:"+res1+"任务二返回值:"+res2);
        });
        System.out.println("返回数据:");
    }

image.gif

image.gif 编辑

3.6.3 thenCombineAsync,,获取结果并获得新任务结果

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务一线程开始:" + Thread.currentThread().getName());
            int i = 12 / 2;
            System.out.println("任务一运行结束...." + i);
            return i;
        }, executor);
        CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务二线程开始:" + Thread.currentThread().getName());
            System.out.println("任务二运行结束....");
            return "hello";
        }, executor);
        CompletableFuture<String> future = future01.thenCombineAsync(future02, (res1, res2) -> {
            System.out.println("任务一返回值:" + res1 + "任务二返回值:" + res2);
            return res1 + (String) res2;
        }, executor);
        System.out.println("返回数据:"+future.get());
    }

image.gif

image.gif 编辑

3.7 两个任务 - 一个完成

3.7.0 概述

runAfterEither: 两个任务有一个执行完成, 不需要获取 future 的结果, 处理任务, 也没有返回值。

acceptEither: 两个任务有一个执行完成, 获取它的返回值, 处理任务, 没有新的返回值。

applyToEither: 两个任务有一个执行完成, 获取它的返回值, 处理任务并有新的返回值。

public <U> CompletableFuture<U> applyToEither(
    CompletionStage<? extends T> other, Function<? super T, U> fn) {
    return orApplyStage(null, other, fn);
}
public <U> CompletableFuture<U> applyToEitherAsync(
    CompletionStage<? extends T> other, Function<? super T, U> fn) {
    return orApplyStage(asyncPool, other, fn);
}
public <U> CompletableFuture<U> applyToEitherAsync(
    CompletionStage<? extends T> other, Function<? super T, U> fn,
    Executor executor) {
    return orApplyStage(screenExecutor(executor), other, fn);
}
public CompletableFuture<Void> acceptEither(
    CompletionStage<? extends T> other, Consumer<? super T> action) {
    return orAcceptStage(null, other, action);
}
public CompletableFuture<Void> acceptEitherAsync(
    CompletionStage<? extends T> other, Consumer<? super T> action) {
    return orAcceptStage(asyncPool, other, action);
}
public CompletableFuture<Void> acceptEitherAsync(
    CompletionStage<? extends T> other, Consumer<? super T> action,
    Executor executor) {
    return orAcceptStage(screenExecutor(executor), other, action);
}
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,
                                              Runnable action) {
    return orRunStage(null, other, action);
}
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
                                                   Runnable action) {
    return orRunStage(asyncPool, other, action);
}
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
                                                   Runnable action,
                                                   Executor executor) {
    return orRunStage(screenExecutor(executor), other, action);
}
image.gif

3.7.1 runAfterEitherAsync,不获取结果, 新任务无返回值。

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务一线程开始:" + Thread.currentThread().getName());
            int i = 12 / 2;
            System.out.println("任务一运行结束...." + i);
            return i;
        }, executor);
        CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务二线程开始:" + Thread.currentThread().getName());
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任务二运行结束....");
            return "hello";
        }, executor);
        future01.runAfterEitherAsync(future02,() -> {
            System.out.println("任务三线程开始:" + Thread.currentThread().getName());
        },executor);
        System.out.println("返回数据:");
    }

image.gif

测试发现,线程二睡了3秒钟,但是线程一完成了,达成runAfterEitherAsync执行条件,线程二就不继续执行了

image.gif 编辑

3.7.2 acceptEitherAsync,获取结果, 新任务无返回值。

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务一线程开始:" + Thread.currentThread().getName());
            int i = 12 / 2;
            System.out.println("任务一运行结束...." + i);
            return i;
        }, executor);
        CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务二线程开始:" + Thread.currentThread().getName());
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任务二运行结束....");
            return "hello";
        }, executor);
         future02.acceptEitherAsync(future01,res ->{
            System.out.println("任务三线程开始:" + Thread.currentThread().getName()+"拿到上次任务的结果:"+res);
        },executor);
        System.out.println("返回数据:");
    }

image.gif

image.gif 编辑

3.7.3 applyToEitherAsync,获取结果, 新任务有返回值。

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务一线程开始:" + Thread.currentThread().getName());
            int i = 12 / 2;
            System.out.println("任务一运行结束...." + i);
            return i;
        }, executor);
        CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务二线程开始:" + Thread.currentThread().getName());
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任务二运行结束....");
            return "hello";
        }, executor);
        CompletableFuture<String> future = future02.applyToEitherAsync(future01, res -> {
            System.out.println("任务三线程开始:" + Thread.currentThread().getName() + "拿到上次任务的结果:" + res);
            return res + "t3";
        }, executor);
        System.out.println("返回数据:"+future.get());
    }

image.gif

image.gif 编辑

3.8 多任务组合

3.8.0 概述

//allOf: 等待所有任务完成
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
    return andTree(cfs, 0, cfs.length - 1);
}
//anyOf: 只要有一个任务完成
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
    return orTree(cfs, 0, cfs.length - 1);
}

image.gif

3.8.1 allOf,等待所有任务完成

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务一线程开始:" + Thread.currentThread().getName());
            int i = 12 / 2;
            System.out.println("任务一运行结束...." + i);
            return i;
        }, executor);
        CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务二线程开始:" + Thread.currentThread().getName());
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任务二运行结束....");
            return "hello";
        }, executor);
        CompletableFuture<Object> future03 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务三线程开始:" + Thread.currentThread().getName());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任务三运行结束....");
            return "hello2";
        }, executor);
        CompletableFuture<Void> allOf = CompletableFuture.allOf(future01, future02, future03);
        allOf.get();//等待所有任务完成
        System.out.println("返回数据:");
    }

image.gif

image.gif 编辑

3.8.2 anyOf,只要有一个任务完成

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务一线程开始:" + Thread.currentThread().getName());
            int i = 12 / 2;
            System.out.println("任务一运行结束...." + i);
            return i;
        }, executor);
        CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务二线程开始:" + Thread.currentThread().getName());
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任务二运行结束....");
            return "hello";
        }, executor);
        CompletableFuture<Object> future03 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务三线程开始:" + Thread.currentThread().getName());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任务三运行结束....");
            return "hello2";
        }, executor);
         CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future01, future02, future03);
        anyOf.get();//等待其中之一任务完成
        System.out.println("返回数据:");
    }

image.gif

image.gif 编辑


相关文章
|
2月前
|
编解码 数据安全/隐私保护 计算机视觉
Opencv学习笔记(十):同步和异步(多线程)操作打开海康摄像头
如何使用OpenCV进行同步和异步操作来打开海康摄像头,并提供了相关的代码示例。
114 1
Opencv学习笔记(十):同步和异步(多线程)操作打开海康摄像头
|
2月前
|
安全 调度 C#
STA模型、同步上下文和多线程、异步调度
【10月更文挑战第19天】本文介绍了 STA 模型、同步上下文和多线程、异步调度的概念及其优缺点。STA 模型适用于单线程环境,确保资源访问的顺序性;同步上下文和多线程提高了程序的并发性和响应性,但增加了复杂性;异步调度提升了程序的响应性和资源利用率,但也带来了编程复杂性和错误处理的挑战。选择合适的模型需根据具体应用场景和需求进行权衡。
|
2月前
|
网络协议 安全 Java
难懂,误点!将多线程技术应用于Python的异步事件循环
难懂,误点!将多线程技术应用于Python的异步事件循环
81 0
|
4月前
|
缓存 Java
异步&线程池 线程池的七大参数 初始化线程的4种方式 【上篇】
这篇文章详细介绍了Java中线程的四种初始化方式,包括继承Thread类、实现Runnable接口、实现Callable接口与FutureTask结合使用,以及使用线程池。同时,还深入探讨了线程池的七大参数及其作用,解释了线程池的运行流程,并列举了四种常见的线程池类型。最后,阐述了在开发中使用线程池的原因,如降低资源消耗、提高响应速度和增强线程的可管理性。
异步&线程池 线程池的七大参数 初始化线程的4种方式 【上篇】
|
4月前
|
Java 数据库
异步&线程池 CompletableFuture 异步编排 实战应用 【终结篇】
这篇文章通过一个电商商品详情页的实战案例,展示了如何使用`CompletableFuture`进行异步编排,以解决在不同数据库表中查询商品信息的问题,并提供了详细的代码实现和遇到问题(如图片未显示)的解决方案。
异步&线程池 CompletableFuture 异步编排 实战应用 【终结篇】
|
4月前
|
Java
异步&线程池 CompletableFuture 异步编排 【下篇】
这篇文章深入探讨了Java中的`CompletableFuture`类,解释了如何创建异步操作、使用计算完成时的回调方法、异常处理、串行化方法、任务组合以及多任务组合的使用方式,并通过代码示例展示了各种场景下的应用。
异步&线程池 CompletableFuture 异步编排 【下篇】
|
4月前
|
数据采集 Python
多线程和异步
【8月更文挑战第12天】
44 3
|
5月前
|
Java Spring 容器
Spring boot 自定义ThreadPoolTaskExecutor 线程池并进行异步操作
Spring boot 自定义ThreadPoolTaskExecutor 线程池并进行异步操作
254 3
|
4月前
|
Dart API C语言
Dart ffi 使用问题之想在C/C++中创建异步线程来调用Dart方法,如何操作
Dart ffi 使用问题之想在C/C++中创建异步线程来调用Dart方法,如何操作
|
5月前
|
Java
不自定义异步方法的线程池默认使用SimpleAsyncTaskExecutor
不自定义异步方法的线程池默认使用SimpleAsyncTaskExecutor