Fork/Join
Fork/Join 是一个并行执行任务的框架,利用的分而治之的思想。
Fork 是把一个大的任务拆分成若干个小任务并行执行,Join 则是合并拆分的子任务的结果集,最终计算出大任务的结果。
所以整个 Fork/Join 的流程可以认为就是两步:
- Fork 拆分任务,直到拆分到最小粒度不可拆分为止
- Join 计算结果,把每个子任务的结果进行合并
这里我们需要介绍一下主要的几个类:
ForkJoinTask:就是我们的分治任务的抽象类
RecursiveTask:继承于 ForkJoinTask,用于计算有返回结果的任务
RecursiveAction: 继承于 ForkJoinTask,用于计算没有返回结果的任务
ForkJoinPool:用于执行 ForkJoinTask 任务的线程池,通常我们可以用 ForkJoinPool.commonPool() 去创建一个 Fork/Join 的线程池,然后用 submit 或者 invoke 去提交执行任务。
这里我们写一个测试程序,用于计算[0,999]的求和结果,所以我们写一个类继承 RecursiveTask ,并且实现他的 compute 方法。
invokeAll() 相当于每个任务都执行 fork,fork 之后会再次执行 compute 判断是否要继续拆分,如果无需拆分那么则使用 join 方法计算汇总结果。
public class ForkJoinTest { public static void main(String[] args) throws Exception { List<Integer> list = new LinkedList<>(); Integer sum = 0; for (int i = 0; i < 1000; i++) { list.add(i); sum += i; } CalculateTask task = new CalculateTask(0, list.size(), list); Future<Integer> future = ForkJoinPool.commonPool().submit(task); System.out.println("sum=" + sum + ",Fork/Join result=" + future.get()); } @Data static class CalculateTask extends RecursiveTask<Integer> { private Integer start; private Integer end; private List<Integer> list; public CalculateTask(Integer start, Integer end, List<Integer> list) { this.start = start; this.end = end; this.list = list; } @Override protected Integer compute() { Integer sum = 0; if (end - start < 200) { for (int i = start; i < end; i++) { sum += list.get(i); } } else { int middle = (start + end) / 2; System.out.println(String.format("从[%d,%d]拆分为:[%d,%d],[%d,%d]", start, end, start, middle, middle, end)); CalculateTask task1 = new CalculateTask(start, middle, list); CalculateTask task2 = new CalculateTask(middle, end, list); invokeAll(task1, task2); sum = task1.join() + task2.join(); } return sum; } } } //输出 从[0,1000]拆分为:[0,500],[500,1000] 从[0,500]拆分为:[0,250],[250,500] 从[500,1000]拆分为:[500,750],[750,1000] 从[0,250]拆分为:[0,125],[125,250] 从[250,500]拆分为:[250,375],[375,500] 从[500,750]拆分为:[500,625],[625,750] 从[750,1000]拆分为:[750,875],[875,1000] sum=499500,Fork/Join result=499500
使用完成之后,我们再来谈一下 Fork/Join 的原理。
先看 fork 的代码,调用 fork 之后,使用workQueue.push() 把任务添加到队列中,注意 push 之后调用 signalWork 唤醒一个线程去执行任务。
public final ForkJoinTask<V> fork() { Thread t; if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ((ForkJoinWorkerThread)t).workQueue.push(this); else ForkJoinPool.common.externalPush(this); return this; } final ForkJoinPool.WorkQueue workQueue; // 工作窃取 final void push(ForkJoinTask<?> task) { ForkJoinTask<?>[] a; ForkJoinPool p; int b = base, s = top, n; if ((a = array) != null) { // ignore if queue removed int m = a.length - 1; // fenced write for task visibility U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task); U.putOrderedInt(this, QTOP, s + 1); if ((n = s - b) <= 1) { if ((p = pool) != null) p.signalWork(p.workQueues, this); } else if (n >= m) growArray(); } }
上面我们看到了 workQueue,这个其实就是我们说的工作队列,它是一个双端队列,并且有一个工作线程和他对应。
@sun.misc.Contended static final class WorkQueue { volatile int base; // 下一个出队列索引 int top; // 下一个入队列索引 ForkJoinTask<?>[] array; // 队列中的 task final ForkJoinPool pool; final ForkJoinWorkerThread owner; // 工作队列中的工作线程 volatile Thread parker; // == owner during call to park; else null volatile ForkJoinTask<?> currentJoin; // 当前join的任务 volatile ForkJoinTask<?> currentSteal; // 当前偷到的任务 }
那如果工作线程自己队列的做完了怎么办?只能傻傻地等待吗?并不是,这时候有一个叫做工作窃取的机制,所以他就会去其他线程的队列里偷一个任务来执行。
为了避免偷任务线程和自己的线程产生竞争,所以自己的工作线程是从队列头部获取任务执行,而偷任务线程则从队列尾部偷任务。
工作窃取
Executor
Executor是并发编程中重要的一环,任务创建后提交到Executor执行并最终返回结果。
Executor
任务
线程两种创建方式:Runnable和Callable。
Runnable是最初创建线程的方式,在JDK1.1的版本就已经存在,Callable则在JDK1.5版本之后加入,他们的主要区别在于Callable可以返回任务的执行结果。
任务执行
任务的执行主要靠Executor,ExecutorService继承自Executor,ThreadPoolExecutor和ScheduledThreadPoolExecutor分别实现了ExecutorService。
那说到线程池之前,我们肯定要提及到线程池的几个核心参数和原理,这个之前的文章也写到过,属于基础中的基础部分。
首先线程池有几个核心的参数概念:
- 最大线程数maximumPoolSize
- 核心线程数corePoolSize
- 活跃时间keepAliveTime
- 阻塞队列workQueue
- 拒绝策略RejectedExecutionHandler
当提交一个新任务到线程池时,具体的执行流程如下:
- 当我们提交任务,线程池会根据corePoolSize大小创建若干任务数量线程执行任务
- 当任务的数量超过corePoolSize数量,后续的任务将会进入阻塞队列阻塞排队
- 当阻塞队列也满了之后,那么将会继续创建(maximumPoolSize-corePoolSize)个数量的线程来执行任务,如果任务处理完成,maximumPoolSize-corePoolSize额外创建的线程等待keepAliveTime之后被自动销毁
- 如果达到maximumPoolSize,阻塞队列还是满的状态,那么将根据不同的拒绝策略对应处理
拒绝策略主要有四种:
- AbortPolicy:直接丢弃任务,抛出异常,这是默认策略
- CallerRunsPolicy:使用调用者所在的线程来处理任务
- DiscardOldestPolicy:丢弃等待队列中最老的任务,并执行当前任务
- DiscardPolicy:直接丢弃任务,也不抛出异常
ThreadPoolExecutor
通常为了快捷我们会用Executors工具类提供的创建线程池的方法快速地创建一个线程池出来,主要有几个方法,但是一般我们不推荐这样使用,非常容易导致出现问题,生产环境中我们一般推荐自己实现,参数自己定义,而不要使用这些方法。
创建
//创建固定线程数大小的线程池,核心线程数=最大线程数,阻塞队列长度=Integer.MAX_VALUE public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } //创建只有一个线程的线程池,阻塞队列长度=Integer.MAX_VALUE public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } //创建核心线程数为0,最大线程数=Integer.MAX_VALUE的线程池,阻塞队列为同步队列 public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
最好的办法就是自己创建,并且指定线程名称:
new ThreadPoolExecutor( Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors()*2, 1000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(100), new ThreadFactoryBuilder().setNameFormat("thread-name").build());
提交任务
重点说一下几个方法:
submit(Runnable task, T result):可以用于主线程和子线程之间的通信,数据共享。
submit(Runnable task):返回null,相当于调用submit(Runnable task, null)。
invokeAll(Collection<? extends Callable> tasks):批量提交任务,阻塞等待所有任务执行完成之后返回,带超时时间的则是在超时之后返回,并且取消没有执行完成的任务。
invokeAny(Collection<? extends Callable> tasks):批量提交任务,只要一个任务有返回,那么其他的任务都会被终止。
public void execute(Runnable command); //提交runnable任务,无返回 public <T> Future<T> submit(Callable<T> task); //提交callable任务,有返回 public Future<?> submit(Runnable task); //提交runnable,有返回 public <T> Future<T> submit(Runnable task, T result); //提交runnable,有返回 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks); //批量提交任务 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit); public <T> T invokeAny(Collection<? extends Callable<T>> tasks); public <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit);
关闭
shutdown:线程池状态设置为SHUTDOWN
,不再接受新任务,直接返回,线程池中任务会执行完成,遍历线程池中的线程,逐个调用interrupt方法去中断线程。
shutdownNow:线程池状态设置为STOP
,不再接受新任务,直接返回,线程池中任务会被中断,返回值为被丢弃的任务列表。
isShutdown:只要调用了shutdown或者shutdownNow,都会返回true
isTerminating:所有任务都关闭后,才返回true
public void shutdown(); public List<Runnable> shutdownNow(); public boolean isShutdown(); public boolean isTerminating();
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor 继承于 ThreadPoolExecutor,从名字我们也知道,他是用于定时执行任务的线程池。
内部实现了一个DelayedWorkQueue作为任务的阻塞队列,ScheduledFutureTask 作为调度的任务,保存到队列中。
我们先看下他的构造函数,4个构造函数都不支持传队列进来,所以默认的就是使用他的内部类 DelayedWorkQueue,由于 DelayedWorkQueue 是一个无界队列,所以这里最大线程数都是设置的为 Integer.MAX,因为没有意义。
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); } public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory); } public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), handler); } public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler); }
执行定时任务的方法主要有4个,前面两个 schedule 传参区分 Runnable 和 Callable 其实并没有区别,最终 Runnable 会通过 Executors.callable(runnable, result) 转换为 Callable,本质上我们可以当做只有3个执行方法来看。
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit); public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit); public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
schedule:提交一个延时任务,从时间单位为 unit 的 delay 时间开始执行,并且任务只会执行一次。
scheduleWithFixedDelay:以固定的延迟时间重复执行任务,initialDelay 表示提交任务后多长时间开始执行,delay 表示任务执行时间间隔。
scheduleAtFixedRate:以固定的时间频率重复执行任务,指的是以起始时间开始,然后以固定的时间间隔重复执行任务,initialDelay 表示提交任务后多长时间开始执行,然后从 initialDelay + N * period
执行。
这两个特别容易搞混,很难理解到底是个啥意思,记住了。
scheduleAtFixedRate 是上次执行完成之后立刻执行,scheduleWithFixedDelay 则是上次执行完成+delay 后执行。
看个例子,两个任务都会延迟1秒,然后以2秒的间隔开始重复执行,任务睡眠1秒的时间。
scheduleAtFixedRate 由于任务执行的耗时比时间间隔小,所以始终是以2秒的间隔在执行。
scheduleWithFixedDelay 因为任务耗时用了1秒,导致后面的时间间隔都成了3秒。
public class ScheduledThreadPoolTest { public static void main(String[] args) throws Exception { ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10); executorService.scheduleAtFixedRate(() -> { try { System.out.println("scheduleAtFixedRate=" + new SimpleDateFormat("HH:mm:ss").format(new Date())); Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } }, 1000, 2000, TimeUnit.MILLISECONDS); executorService.scheduleWithFixedDelay(() -> { try { System.err.println("scheduleWithFixedDelay=" + new SimpleDateFormat("HH:mm:ss").format(new Date())); Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } }, 1000, 2000, TimeUnit.MILLISECONDS); // executorService.shutdown(); } } //输出 scheduleAtFixedRate=01:17:05 scheduleWithFixedDelay=01:17:05 scheduleAtFixedRate=01:17:07 scheduleWithFixedDelay=01:17:08 scheduleAtFixedRate=01:17:09 scheduleAtFixedRate=01:17:11 scheduleWithFixedDelay=01:17:11 scheduleAtFixedRate=01:17:13 scheduleWithFixedDelay=01:17:14 scheduleAtFixedRate=01:17:15 scheduleAtFixedRate=01:17:17 scheduleWithFixedDelay=01:17:17 scheduleAtFixedRate=01:17:19 scheduleWithFixedDelay=01:17:20 scheduleAtFixedRate=01:17:21
我们把任务耗时调整到超过时间间隔,比如改成睡眠3秒,观察输出结果。
scheduleAtFixedRate 由于任务执行的耗时比时间间隔长,按照规定上次任务执行结束之后立刻执行,所以变成以3秒的时间间隔执行。
scheduleWithFixedDelay 因为任务耗时用了3秒,导致后面的时间间隔都成了5秒。
scheduleWithFixedDelay=01:46:21 scheduleAtFixedRate=01:46:21 scheduleAtFixedRate=01:46:24 scheduleWithFixedDelay=01:46:26 scheduleAtFixedRate=01:46:27 scheduleAtFixedRate=01:46:30 scheduleWithFixedDelay=01:46:31 scheduleAtFixedRate=01:46:33 scheduleWithFixedDelay=01:46:36 scheduleAtFixedRate=01:46:36
OK,最后来说说实现原理:
- 首先我们通过调用 schedule 的几个方法,把任务添加到 ScheduledThreadPoolExecutor 去执行
- 接收到任务之后,会通过请求参数的延迟时间计算出真正需要执行任务的时间,然后把任务封装成 RunnableScheduledFuture
- 然后把封装之后的任务添加到延迟队列中,任务 ScheduledFutureTask 实现了 comparable 接口,把时间越小的任务放在队列头,如果时间一样,则会通过 sequenceNumber 去比较,也就是执行时间相同,先提交的先执行
- 最后线程池会从延迟队列中去获取任务执行,如果是一次性的任务,执行之后删除队列中的任务,如果是重复执行的,则再次计算时间,然后把任务添加到延迟队列中
CompletionService
记得上面我将 ThreadPoolExecutor 的方法吗,其中有一个 invokeAny 的方法,批量提交任务,只要有一个完成了,就直接返回,而不用一直傻傻地等,他的实现就是使用了 CompletionService ,我给你看一段源码。
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos) throws InterruptedException, ExecutionException, TimeoutException { if (tasks == null) throw new NullPointerException(); int ntasks = tasks.size(); if (ntasks == 0) throw new IllegalArgumentException(); ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks); ExecutorCompletionService<T> ecs = new ExecutorCompletionService<T>(this); }
看到了吧,OK,在我们想试试使用这个类之前,我们先试试 invokeAny 好使不。
public class CompletionServiceTest { private static final int TOTAL = 10; private static ExecutorService executorService = Executors.newFixedThreadPool(TOTAL); public static void main(String[] args) throws Exception { testInvokeAny(); } private static void testInvokeAny() throws Exception { List<TestTask> taskList = new LinkedList<>(); for (int i = 0; i < TOTAL; i++) { taskList.add(new TestTask(i)); } String value = executorService.invokeAny(taskList, 60, TimeUnit.SECONDS); System.out.println("get value = " + value); executorService.shutdown(); } static class TestTask implements Callable<String> { private Integer index; public TestTask(Integer index) { this.index = index; } @Override public String call() throws Exception { long sleepTime = ThreadLocalRandom.current().nextInt(1000, 10000); System.out.println("task-" + index + " sleep " + sleepTime + " Ms"); Thread.sleep(sleepTime); return "task-" + index; } } } //输出 task-7 sleep 3072 Ms task-4 sleep 1186 Ms task-3 sleep 6182 Ms task-9 sleep 7411 Ms task-0 sleep 1882 Ms task-1 sleep 8274 Ms task-2 sleep 4789 Ms task-5 sleep 8894 Ms task-8 sleep 7211 Ms task-6 sleep 5959 Ms get value = task-4
看到效果了吧,耗时最短的任务返回,整个流程就结束了,那我们试试自己用 CompletionService 来实现这个效果看看。
public static void main(String[] args) throws Exception { // testInvokeAny(); testCompletionService(); } private static void testCompletionService() { CompletionService<String> completionService = new ExecutorCompletionService(executorService); List<Future> taskList = new LinkedList<>(); for (int i = 0; i < TOTAL; i++) { taskList.add(completionService.submit(new TestTask(i))); } String value = null; try { for (int i = 0; i < TOTAL; i++) { value = completionService.take().get(); if (value != null) { System.out.println("get value = " + value); break; } } } catch (Exception e) { e.printStackTrace(); } finally { taskList.forEach(task -> { task.cancel(true); }); } executorService.shutdown(); } //输出 task-4 sleep 5006 Ms task-1 sleep 4114 Ms task-2 sleep 4865 Ms task-5 sleep 1592 Ms task-3 sleep 6190 Ms task-7 sleep 2482 Ms task-8 sleep 9405 Ms task-9 sleep 8798 Ms task-6 sleep 2040 Ms task-0 sleep 2111 Ms get value = task-5
效果是一样的,我们只是实现了一个简化版的 invokeAny 功能,使用起来也挺简单的。
实现原理也挺简单的,哪个任务先完成,就把他丢到阻塞队列里,这样取任务结果的时候直接从队列里拿,肯定是拿到最新的那一个。
异步结果
通常,我们都会用 FutureTask 来获取线程异步执行的结果,基于 AQS 实现。
这个没有说太多的必要,看看几个方法就行了。
public V get(); public V get(long timeout, TimeUnit unit); public boolean cancel(boolean mayInterruptIfRunning);
get 会阻塞的获取线程异步执行的结果,一般不建议直接使用,最好是使用带超时时间的 get 方法。
我们可以通过 cancel 方法去尝试取消任务的执行,参数代表是否支持中断,如果任务未执行,那么可以直接取消,如果任务执行中,使用 cancel(true) 会尝试中断任务。
CompletableFuture
之前我们都在使用 Future,要么只能用 get 方法阻塞,要么就用 isDone 来判断,JDK1.8 之后新增了 CompletableFuture 用于异步编程,它针对 Future 的功能增加了回调能力,可以帮助我们简化异步编程。
CompletableFuture 主要包含四个静态方法去创建对象,主要区别在于 supplyAsync 返回计算结果,runAsync 不返回,另外两个方法则是可以指定线程池,如果不指定线程池则默认使用 ForkJoinPool,默认线程数为CPU核数。
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier); public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor); public static CompletableFuture<Void> runAsync(Runnable runnable); public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);
下面看看他的那些恶心人的几十个方法,我估计能疯。
串行
串行就不用解释了,A->B->C 按照顺序执行,下一个任务必须等上一个任务执行完成才可以。
主要包含 thenApply、thenAccept、thenRun 和 thenCompose,以及他们对应的带 async 的异步方法。
为了方便记忆我们要记住,有 apply 的有传参有返回值,带 accept 的有传参但是没有返回值,带 run 的啥也没有,带 compose 的会返回一个新的 CompletableFuture 实例。
public static void main(String[] args) throws Exception { CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); System.out.println(Thread.currentThread() + "工作完成"); return "supplyAsync"; } catch (InterruptedException e) { throw new RuntimeException(e); } }); CompletableFuture newFuture = future.thenApply((ret) -> { System.out.println(Thread.currentThread() + "thenApply=>" + ret); return "thenApply"; }).thenAccept((ret) -> { System.out.println(Thread.currentThread() + "thenAccept=>" + ret); }).thenRun(() -> { System.out.println(Thread.currentThread() + "thenRun"); }); CompletableFuture<String> composeFuture = future.thenCompose((ret) -> { System.out.println(Thread.currentThread() + "thenCompose=>" + ret); return CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); System.out.println(Thread.currentThread() + "thenCompose工作完成"); return "thenCompose"; } catch (InterruptedException e) { throw new RuntimeException(e); } }); }); System.out.println(future.get()); System.out.println(newFuture.get()); System.out.println(composeFuture.get()); } //输出 Thread[ForkJoinPool.commonPool-worker-9,5,main]工作完成 Thread[ForkJoinPool.commonPool-worker-9,5,main]thenCompose=>supplyAsync Thread[main,5,main]thenApply=>supplyAsync Thread[main,5,main]thenAccept=>thenApply Thread[main,5,main]thenRun supplyAsync null Thread[ForkJoinPool.commonPool-worker-2,5,main]thenCompose工作完成 thenCompose
AND 聚合
这个意思是下一个任务执行必须等前两个任务完成可以。
主要包含 thenCombine、thenAcceptBoth、runAfterBoth ,以及他们对应的带 async 的异步方法,区别和上面一样。
public static void main(String[] args) throws Exception { CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); System.out.println(Thread.currentThread() + "A工作完成"); return "A"; } catch (InterruptedException e) { throw new RuntimeException(e); } }); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); System.out.println(Thread.currentThread() + "B工作完成"); return "B"; } catch (InterruptedException e) { throw new RuntimeException(e); } }); CompletableFuture newFuture = future.thenCombine(future2, (ret1, ret2) -> { System.out.println(Thread.currentThread() + "thenCombine=>" + ret1 + "," + ret2); return "thenCombine"; }).thenAcceptBoth(future2, (ret1, ret2) -> { System.out.println(Thread.currentThread() + "thenAcceptBoth=>" + ret1 + "," + ret2); }).runAfterBoth(future2, () -> { System.out.println(Thread.currentThread() + "runAfterBoth"); }); System.out.println(future.get()); System.out.println(future2.get()); System.out.println(newFuture.get()); } //输出 Thread[ForkJoinPool.commonPool-worker-9,5,main]A工作完成 A Thread[ForkJoinPool.commonPool-worker-2,5,main]B工作完成 B Thread[ForkJoinPool.commonPool-worker-2,5,main]thenCombine=>A,B Thread[ForkJoinPool.commonPool-worker-2,5,main]thenAcceptBoth=>thenCombine,B Thread[ForkJoinPool.commonPool-worker-2,5,main]runAfterBoth null
Or 聚合
Or 聚合代表只要多个任务中有一个完成了,就可以继续下面的任务。
主要包含 applyToEither、acceptEither、runAfterEither ,以及他们对应的带 async 的异步方法,区别和上面一样,不再举例了。
回调/异常处理
whenComplete、handle 代表执行完成的回调,一定会执行,exceptionally 则是任务执行发生异常的回调。
public static void main(String[] args) throws Exception { CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); int a = 1 / 0; return "success"; } catch (Exception e) { throw new RuntimeException(e); } }); CompletableFuture newFuture = future.handle((ret, exception) -> { System.out.println(Thread.currentThread() + "handle exception=>" + exception.getMessage()); return "handle"; }); future.whenComplete((ret, exception) -> { System.out.println(Thread.currentThread() + "whenComplete exception=>" + exception.getMessage()); }); CompletableFuture exceptionFuture = future.exceptionally((e) -> { System.out.println(Thread.currentThread() + "exceptionally exception=>" + e.getMessage()); return "exception"; }); System.out.println("task future = " + future.get()); System.out.println("handle future = " + newFuture.get()); System.out.println("exception future = " + exceptionFuture.get()); } //输出 Thread[ForkJoinPool.commonPool-worker-9,5,main]exceptionally exception=>java.lang.RuntimeException: java.lang.ArithmeticException: / by zero Thread[main,5,main]whenComplete exception=>java.lang.RuntimeException: java.lang.ArithmeticException: / by zero Thread[ForkJoinPool.commonPool-worker-9,5,main]handle exception=>java.lang.RuntimeException: java.lang.ArithmeticException: / by zero Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: java.lang.ArithmeticException: / by zero at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) at com.example.demo.CompletableFutureTest3.main(CompletableFutureTest3.java:31) Caused by: java.lang.RuntimeException: java.lang.ArithmeticException: / by zero at com.example.demo.CompletableFutureTest3.lambda$main$0(CompletableFutureTest3.java:13) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1596) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172) Caused by: java.lang.ArithmeticException: / by zero at com.example.demo.CompletableFutureTest3.lambda$main$0(CompletableFutureTest3.java:10) ... 6 more