并发编程从入门到放弃系列开始和结束(四)

简介: 对于 Java 部分的面试来说,突然想到并发这一块的内容是不太完整的,这篇文章会通篇把多线程和并发都大致阐述一遍,至少能够达到了解原理和使用的目的,内容会比较多,从最基本的线程到我们常用的类会统一说一遍,慢慢看。

Fork/Join

Fork/Join 是一个并行执行任务的框架,利用的分而治之的思想。

Fork 是把一个大的任务拆分成若干个小任务并行执行,Join 则是合并拆分的子任务的结果集,最终计算出大任务的结果。

所以整个 Fork/Join 的流程可以认为就是两步:

  1. Fork 拆分任务,直到拆分到最小粒度不可拆分为止
  2. Join 计算结果,把每个子任务的结果进行合并

49113cf65ca6548c087b255ce557a82d.jpg这里我们需要介绍一下主要的几个类:

ForkJoinTask:就是我们的分治任务的抽象类

RecursiveTask:继承于 ForkJoinTask,用于计算有返回结果的任务

RecursiveAction: 继承于 ForkJoinTask,用于计算没有返回结果的任务

ForkJoinPool:用于执行 ForkJoinTask 任务的线程池,通常我们可以用 ForkJoinPool.commonPool() 去创建一个 Fork/Join 的线程池,然后用 submit 或者 invoke 去提交执行任务。

这里我们写一个测试程序,用于计算[0,999]的求和结果,所以我们写一个类继承 RecursiveTask ,并且实现他的 compute 方法。

invokeAll() 相当于每个任务都执行 fork,fork 之后会再次执行 compute 判断是否要继续拆分,如果无需拆分那么则使用 join 方法计算汇总结果。

public class ForkJoinTest {
    public static void main(String[] args) throws Exception {
        List<Integer> list = new LinkedList<>();
        Integer sum = 0;
        for (int i = 0; i < 1000; i++) {
            list.add(i);
            sum += i;
        }
        CalculateTask task = new CalculateTask(0, list.size(), list);
        Future<Integer> future = ForkJoinPool.commonPool().submit(task);
        System.out.println("sum=" + sum + ",Fork/Join result=" + future.get());
    }
    @Data
    static class CalculateTask extends RecursiveTask<Integer> {
        private Integer start;
        private Integer end;
        private List<Integer> list;
        public CalculateTask(Integer start, Integer end, List<Integer> list) {
            this.start = start;
            this.end = end;
            this.list = list;
        }
        @Override
        protected Integer compute() {
            Integer sum = 0;
            if (end - start < 200) {
                for (int i = start; i < end; i++) {
                    sum += list.get(i);
                }
            } else {
                int middle = (start + end) / 2;
                System.out.println(String.format("从[%d,%d]拆分为:[%d,%d],[%d,%d]", start, end, start, middle, middle, end));
                CalculateTask task1 = new CalculateTask(start, middle, list);
                CalculateTask task2 = new CalculateTask(middle, end, list);
                invokeAll(task1, task2);
                sum = task1.join() + task2.join();
            }
            return sum;
        }
    }
}
//输出
从[0,1000]拆分为:[0,500],[500,1000]
从[0,500]拆分为:[0,250],[250,500]
从[500,1000]拆分为:[500,750],[750,1000]
从[0,250]拆分为:[0,125],[125,250]
从[250,500]拆分为:[250,375],[375,500]
从[500,750]拆分为:[500,625],[625,750]
从[750,1000]拆分为:[750,875],[875,1000]
sum=499500,Fork/Join result=499500

使用完成之后,我们再来谈一下 Fork/Join 的原理。

先看 fork 的代码,调用 fork 之后,使用workQueue.push() 把任务添加到队列中,注意 push 之后调用 signalWork 唤醒一个线程去执行任务。

public final ForkJoinTask<V> fork() {
    Thread t;
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
        ((ForkJoinWorkerThread)t).workQueue.push(this);
    else
        ForkJoinPool.common.externalPush(this);
    return this;
}
final ForkJoinPool.WorkQueue workQueue; // 工作窃取
 final void push(ForkJoinTask<?> task) {
   ForkJoinTask<?>[] a; ForkJoinPool p;
   int b = base, s = top, n;
   if ((a = array) != null) {    // ignore if queue removed
     int m = a.length - 1;     // fenced write for task visibility
     U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
     U.putOrderedInt(this, QTOP, s + 1);
     if ((n = s - b) <= 1) {
       if ((p = pool) != null)
         p.signalWork(p.workQueues, this);
     }
     else if (n >= m)
       growArray();
   }
}

上面我们看到了 workQueue,这个其实就是我们说的工作队列,它是一个双端队列,并且有一个工作线程和他对应。

@sun.misc.Contended
static final class WorkQueue {
    volatile int base;         // 下一个出队列索引
    int top;                   // 下一个入队列索引
    ForkJoinTask<?>[] array;   // 队列中的 task
    final ForkJoinPool pool;   
    final ForkJoinWorkerThread owner; // 工作队列中的工作线程
    volatile Thread parker;    // == owner during call to park; else null
    volatile ForkJoinTask<?> currentJoin;  // 当前join的任务
    volatile ForkJoinTask<?> currentSteal; // 当前偷到的任务
}

那如果工作线程自己队列的做完了怎么办?只能傻傻地等待吗?并不是,这时候有一个叫做工作窃取的机制,所以他就会去其他线程的队列里偷一个任务来执行。

为了避免偷任务线程和自己的线程产生竞争,所以自己的工作线程是从队列头部获取任务执行,而偷任务线程则从队列尾部偷任务。

a694f6bc98d254898727a0f5344b3064.jpg

工作窃取

Executor

Executor是并发编程中重要的一环,任务创建后提交到Executor执行并最终返回结果。

37282697b886e63838d09a2cc474dbb7.jpg

Executor

任务

线程两种创建方式:Runnable和Callable。

Runnable是最初创建线程的方式,在JDK1.1的版本就已经存在,Callable则在JDK1.5版本之后加入,他们的主要区别在于Callable可以返回任务的执行结果。

任务执行

任务的执行主要靠Executor,ExecutorService继承自Executor,ThreadPoolExecutor和ScheduledThreadPoolExecutor分别实现了ExecutorService。

0a7bb701f703c922598e18669fa25915.jpg

那说到线程池之前,我们肯定要提及到线程池的几个核心参数和原理,这个之前的文章也写到过,属于基础中的基础部分。

首先线程池有几个核心的参数概念:

  1. 最大线程数maximumPoolSize
  2. 核心线程数corePoolSize
  3. 活跃时间keepAliveTime
  4. 阻塞队列workQueue
  5. 拒绝策略RejectedExecutionHandler

当提交一个新任务到线程池时,具体的执行流程如下:

  1. 当我们提交任务,线程池会根据corePoolSize大小创建若干任务数量线程执行任务
  2. 当任务的数量超过corePoolSize数量,后续的任务将会进入阻塞队列阻塞排队
  3. 当阻塞队列也满了之后,那么将会继续创建(maximumPoolSize-corePoolSize)个数量的线程来执行任务,如果任务处理完成,maximumPoolSize-corePoolSize额外创建的线程等待keepAliveTime之后被自动销毁
  4. 如果达到maximumPoolSize,阻塞队列还是满的状态,那么将根据不同的拒绝策略对应处理

2569341dba5c53e428a407ed151485f1.jpg

拒绝策略主要有四种:

  1. AbortPolicy:直接丢弃任务,抛出异常,这是默认策略
  2. CallerRunsPolicy:使用调用者所在的线程来处理任务
  3. DiscardOldestPolicy:丢弃等待队列中最老的任务,并执行当前任务
  4. DiscardPolicy:直接丢弃任务,也不抛出异常

ThreadPoolExecutor

通常为了快捷我们会用Executors工具类提供的创建线程池的方法快速地创建一个线程池出来,主要有几个方法,但是一般我们不推荐这样使用,非常容易导致出现问题,生产环境中我们一般推荐自己实现,参数自己定义,而不要使用这些方法。

创建

//创建固定线程数大小的线程池,核心线程数=最大线程数,阻塞队列长度=Integer.MAX_VALUE
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
//创建只有一个线程的线程池,阻塞队列长度=Integer.MAX_VALUE
public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
}
//创建核心线程数为0,最大线程数=Integer.MAX_VALUE的线程池,阻塞队列为同步队列
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}

最好的办法就是自己创建,并且指定线程名称:

new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(), 
Runtime.getRuntime().availableProcessors()*2,
1000L, 
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(100),
new ThreadFactoryBuilder().setNameFormat("thread-name").build());

提交任务

重点说一下几个方法:

submit(Runnable task, T result):可以用于主线程和子线程之间的通信,数据共享。

submit(Runnable task):返回null,相当于调用submit(Runnable task, null)。

invokeAll(Collection<? extends Callable> tasks):批量提交任务,阻塞等待所有任务执行完成之后返回,带超时时间的则是在超时之后返回,并且取消没有执行完成的任务。

invokeAny(Collection<? extends Callable> tasks):批量提交任务,只要一个任务有返回,那么其他的任务都会被终止。

public void execute(Runnable command); //提交runnable任务,无返回
public <T> Future<T> submit(Callable<T> task); //提交callable任务,有返回
public Future<?> submit(Runnable task); //提交runnable,有返回
public <T> Future<T> submit(Runnable task, T result); //提交runnable,有返回
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks); //批量提交任务
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit);
public <T> T invokeAny(Collection<? extends Callable<T>> tasks);
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit);

关闭

shutdown:线程池状态设置为SHUTDOWN,不再接受新任务,直接返回,线程池中任务会执行完成,遍历线程池中的线程,逐个调用interrupt方法去中断线程。

shutdownNow:线程池状态设置为STOP,不再接受新任务,直接返回,线程池中任务会被中断,返回值为被丢弃的任务列表。

isShutdown:只要调用了shutdown或者shutdownNow,都会返回true

isTerminating:所有任务都关闭后,才返回true

public void shutdown();
public List<Runnable> shutdownNow();
public boolean isShutdown();
public boolean isTerminating();

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor 继承于 ThreadPoolExecutor,从名字我们也知道,他是用于定时执行任务的线程池。

内部实现了一个DelayedWorkQueue作为任务的阻塞队列,ScheduledFutureTask 作为调度的任务,保存到队列中。

641566c5b4450a1fa09bdc87573f85bf.jpg

我们先看下他的构造函数,4个构造函数都不支持传队列进来,所以默认的就是使用他的内部类 DelayedWorkQueue,由于 DelayedWorkQueue 是一个无界队列,所以这里最大线程数都是设置的为 Integer.MAX,因为没有意义。

public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
}
public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
                                       RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), handler);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory,
                                       RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory, handler);
}

执行定时任务的方法主要有4个,前面两个 schedule 传参区分 Runnable 和 Callable 其实并没有区别,最终 Runnable 会通过 Executors.callable(runnable, result) 转换为 Callable,本质上我们可以当做只有3个执行方法来看。

public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay,
                                           TimeUnit unit);
public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);

schedule:提交一个延时任务,从时间单位为 unit 的 delay 时间开始执行,并且任务只会执行一次。

scheduleWithFixedDelay:以固定的延迟时间重复执行任务,initialDelay 表示提交任务后多长时间开始执行,delay 表示任务执行时间间隔。

scheduleAtFixedRate:以固定的时间频率重复执行任务,指的是以起始时间开始,然后以固定的时间间隔重复执行任务,initialDelay 表示提交任务后多长时间开始执行,然后从 initialDelay + N * period执行。

这两个特别容易搞混,很难理解到底是个啥意思,记住了。

scheduleAtFixedRate 是上次执行完成之后立刻执行,scheduleWithFixedDelay 则是上次执行完成+delay 后执行

看个例子,两个任务都会延迟1秒,然后以2秒的间隔开始重复执行,任务睡眠1秒的时间。

scheduleAtFixedRate 由于任务执行的耗时比时间间隔小,所以始终是以2秒的间隔在执行。

scheduleWithFixedDelay 因为任务耗时用了1秒,导致后面的时间间隔都成了3秒。

public class ScheduledThreadPoolTest {
    public static void main(String[] args) throws Exception {
        ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10);
        executorService.scheduleAtFixedRate(() -> {
            try {
                System.out.println("scheduleAtFixedRate=" + new SimpleDateFormat("HH:mm:ss").format(new Date()));
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }, 1000, 2000, TimeUnit.MILLISECONDS);
        executorService.scheduleWithFixedDelay(() -> {
            try {
                System.err.println("scheduleWithFixedDelay=" + new SimpleDateFormat("HH:mm:ss").format(new Date()));
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }, 1000, 2000, TimeUnit.MILLISECONDS);
//        executorService.shutdown();
    }
}
//输出
scheduleAtFixedRate=01:17:05
scheduleWithFixedDelay=01:17:05
scheduleAtFixedRate=01:17:07
scheduleWithFixedDelay=01:17:08
scheduleAtFixedRate=01:17:09
scheduleAtFixedRate=01:17:11
scheduleWithFixedDelay=01:17:11
scheduleAtFixedRate=01:17:13
scheduleWithFixedDelay=01:17:14
scheduleAtFixedRate=01:17:15
scheduleAtFixedRate=01:17:17
scheduleWithFixedDelay=01:17:17
scheduleAtFixedRate=01:17:19
scheduleWithFixedDelay=01:17:20
scheduleAtFixedRate=01:17:21

我们把任务耗时调整到超过时间间隔,比如改成睡眠3秒,观察输出结果。

scheduleAtFixedRate 由于任务执行的耗时比时间间隔长,按照规定上次任务执行结束之后立刻执行,所以变成以3秒的时间间隔执行。

scheduleWithFixedDelay 因为任务耗时用了3秒,导致后面的时间间隔都成了5秒。

scheduleWithFixedDelay=01:46:21
scheduleAtFixedRate=01:46:21
scheduleAtFixedRate=01:46:24
scheduleWithFixedDelay=01:46:26
scheduleAtFixedRate=01:46:27
scheduleAtFixedRate=01:46:30
scheduleWithFixedDelay=01:46:31
scheduleAtFixedRate=01:46:33
scheduleWithFixedDelay=01:46:36
scheduleAtFixedRate=01:46:36

OK,最后来说说实现原理:

  1. 首先我们通过调用 schedule 的几个方法,把任务添加到 ScheduledThreadPoolExecutor 去执行
  2. 接收到任务之后,会通过请求参数的延迟时间计算出真正需要执行任务的时间,然后把任务封装成 RunnableScheduledFuture
  3. 然后把封装之后的任务添加到延迟队列中,任务 ScheduledFutureTask 实现了 comparable 接口,把时间越小的任务放在队列头,如果时间一样,则会通过 sequenceNumber 去比较,也就是执行时间相同,先提交的先执行
  4. 最后线程池会从延迟队列中去获取任务执行,如果是一次性的任务,执行之后删除队列中的任务,如果是重复执行的,则再次计算时间,然后把任务添加到延迟队列中

c09311597cb51b7d350c4828b90a2e13.jpg

CompletionService

记得上面我将 ThreadPoolExecutor 的方法吗,其中有一个 invokeAny 的方法,批量提交任务,只要有一个完成了,就直接返回,而不用一直傻傻地等,他的实现就是使用了 CompletionService ,我给你看一段源码。

private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                          boolean timed, long nanos)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (tasks == null)
        throw new NullPointerException();
    int ntasks = tasks.size();
    if (ntasks == 0)
        throw new IllegalArgumentException();
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
    ExecutorCompletionService<T> ecs = new ExecutorCompletionService<T>(this);
}

看到了吧,OK,在我们想试试使用这个类之前,我们先试试 invokeAny 好使不。

public class CompletionServiceTest {
    private static final int TOTAL = 10;
    private static ExecutorService executorService = Executors.newFixedThreadPool(TOTAL);
    public static void main(String[] args) throws Exception {
        testInvokeAny();
    }
    private static void testInvokeAny() throws Exception {
        List<TestTask> taskList = new LinkedList<>();
        for (int i = 0; i < TOTAL; i++) {
            taskList.add(new TestTask(i));
        }
        String value = executorService.invokeAny(taskList, 60, TimeUnit.SECONDS);
        System.out.println("get value = " + value);
        executorService.shutdown();
    }
    static class TestTask implements Callable<String> {
        private Integer index;
        public TestTask(Integer index) {
            this.index = index;
        }
        @Override
        public String call() throws Exception {
            long sleepTime = ThreadLocalRandom.current().nextInt(1000, 10000);
            System.out.println("task-" + index + " sleep " + sleepTime + " Ms");
            Thread.sleep(sleepTime);
            return "task-" + index;
        }
    }
}
//输出
task-7 sleep 3072 Ms
task-4 sleep 1186 Ms
task-3 sleep 6182 Ms
task-9 sleep 7411 Ms
task-0 sleep 1882 Ms
task-1 sleep 8274 Ms
task-2 sleep 4789 Ms
task-5 sleep 8894 Ms
task-8 sleep 7211 Ms
task-6 sleep 5959 Ms
get value = task-4

看到效果了吧,耗时最短的任务返回,整个流程就结束了,那我们试试自己用 CompletionService 来实现这个效果看看。

public static void main(String[] args) throws Exception {
   //        testInvokeAny();
   testCompletionService();
}
private static void testCompletionService() {
    CompletionService<String> completionService = new ExecutorCompletionService(executorService);
    List<Future> taskList = new LinkedList<>();
    for (int i = 0; i < TOTAL; i++) {
        taskList.add(completionService.submit(new TestTask(i)));
    }
    String value = null;
    try {
        for (int i = 0; i < TOTAL; i++) {
            value = completionService.take().get();
            if (value != null) {
                System.out.println("get value = " + value);
                break;
            }
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        taskList.forEach(task -> {
            task.cancel(true);
        });
    }
    executorService.shutdown();
}
//输出
task-4 sleep 5006 Ms
task-1 sleep 4114 Ms
task-2 sleep 4865 Ms
task-5 sleep 1592 Ms
task-3 sleep 6190 Ms
task-7 sleep 2482 Ms
task-8 sleep 9405 Ms
task-9 sleep 8798 Ms
task-6 sleep 2040 Ms
task-0 sleep 2111 Ms
get value = task-5

效果是一样的,我们只是实现了一个简化版的 invokeAny 功能,使用起来也挺简单的。

实现原理也挺简单的,哪个任务先完成,就把他丢到阻塞队列里,这样取任务结果的时候直接从队列里拿,肯定是拿到最新的那一个。

异步结果

通常,我们都会用 FutureTask 来获取线程异步执行的结果,基于 AQS 实现。

900baaedbb8748e712e843ce3edd169b.jpg

这个没有说太多的必要,看看几个方法就行了。

public V get();
public V get(long timeout, TimeUnit unit);
public boolean cancel(boolean mayInterruptIfRunning);

get 会阻塞的获取线程异步执行的结果,一般不建议直接使用,最好是使用带超时时间的 get 方法。

我们可以通过 cancel 方法去尝试取消任务的执行,参数代表是否支持中断,如果任务未执行,那么可以直接取消,如果任务执行中,使用 cancel(true) 会尝试中断任务。

CompletableFuture

之前我们都在使用 Future,要么只能用 get 方法阻塞,要么就用 isDone 来判断,JDK1.8 之后新增了 CompletableFuture 用于异步编程,它针对 Future 的功能增加了回调能力,可以帮助我们简化异步编程。

CompletableFuture 主要包含四个静态方法去创建对象,主要区别在于 supplyAsync 返回计算结果,runAsync 不返回,另外两个方法则是可以指定线程池,如果不指定线程池则默认使用 ForkJoinPool,默认线程数为CPU核数。

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);
public static CompletableFuture<Void> runAsync(Runnable runnable);
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);

下面看看他的那些恶心人的几十个方法,我估计能疯。

串行

串行就不用解释了,A->B->C 按照顺序执行,下一个任务必须等上一个任务执行完成才可以。

主要包含 thenApply、thenAccept、thenRun 和 thenCompose,以及他们对应的带 async 的异步方法。

为了方便记忆我们要记住,有 apply 的有传参有返回值,带 accept 的有传参但是没有返回值,带 run 的啥也没有,带 compose 的会返回一个新的 CompletableFuture 实例。

public static void main(String[] args) throws Exception {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(1000);
            System.out.println(Thread.currentThread() + "工作完成");
            return "supplyAsync";
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    });
    CompletableFuture newFuture = future.thenApply((ret) -> {
        System.out.println(Thread.currentThread() + "thenApply=>" + ret);
        return "thenApply";
    }).thenAccept((ret) -> {
        System.out.println(Thread.currentThread() + "thenAccept=>" + ret);
    }).thenRun(() -> {
        System.out.println(Thread.currentThread() + "thenRun");
    });
    CompletableFuture<String> composeFuture = future.thenCompose((ret) -> {
        System.out.println(Thread.currentThread() + "thenCompose=>" + ret);
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
                System.out.println(Thread.currentThread() + "thenCompose工作完成");
                return "thenCompose";
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
    });
    System.out.println(future.get());
    System.out.println(newFuture.get());
    System.out.println(composeFuture.get());
}
//输出
Thread[ForkJoinPool.commonPool-worker-9,5,main]工作完成
Thread[ForkJoinPool.commonPool-worker-9,5,main]thenCompose=>supplyAsync
Thread[main,5,main]thenApply=>supplyAsync
Thread[main,5,main]thenAccept=>thenApply
Thread[main,5,main]thenRun
supplyAsync
null
Thread[ForkJoinPool.commonPool-worker-2,5,main]thenCompose工作完成
thenCompose

AND 聚合

这个意思是下一个任务执行必须等前两个任务完成可以。

主要包含 thenCombine、thenAcceptBoth、runAfterBoth ,以及他们对应的带 async 的异步方法,区别和上面一样。

public static void main(String[] args) throws Exception {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(1000);
            System.out.println(Thread.currentThread() + "A工作完成");
            return "A";
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    });
    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
            System.out.println(Thread.currentThread() + "B工作完成");
            return "B";
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    });
    CompletableFuture newFuture = future.thenCombine(future2, (ret1, ret2) -> {
        System.out.println(Thread.currentThread() + "thenCombine=>" + ret1 + "," + ret2);
        return "thenCombine";
    }).thenAcceptBoth(future2, (ret1, ret2) -> {
        System.out.println(Thread.currentThread() + "thenAcceptBoth=>" + ret1 + "," + ret2);
    }).runAfterBoth(future2, () -> {
        System.out.println(Thread.currentThread() + "runAfterBoth");
    });
    System.out.println(future.get());
    System.out.println(future2.get());
    System.out.println(newFuture.get());
}
//输出
Thread[ForkJoinPool.commonPool-worker-9,5,main]A工作完成
A
Thread[ForkJoinPool.commonPool-worker-2,5,main]B工作完成
B
Thread[ForkJoinPool.commonPool-worker-2,5,main]thenCombine=>A,B
Thread[ForkJoinPool.commonPool-worker-2,5,main]thenAcceptBoth=>thenCombine,B
Thread[ForkJoinPool.commonPool-worker-2,5,main]runAfterBoth
null

Or 聚合

Or 聚合代表只要多个任务中有一个完成了,就可以继续下面的任务。

主要包含 applyToEither、acceptEither、runAfterEither ,以及他们对应的带 async 的异步方法,区别和上面一样,不再举例了。

回调/异常处理

whenComplete、handle 代表执行完成的回调,一定会执行,exceptionally 则是任务执行发生异常的回调。

public static void main(String[] args) throws Exception {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(1000);
            int a = 1 / 0;
            return "success";
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    });
    CompletableFuture newFuture = future.handle((ret, exception) -> {
        System.out.println(Thread.currentThread() + "handle exception=>" + exception.getMessage());
        return "handle";
    });
    future.whenComplete((ret, exception) -> {
        System.out.println(Thread.currentThread() + "whenComplete exception=>" + exception.getMessage());
    });
    CompletableFuture exceptionFuture = future.exceptionally((e) -> {
        System.out.println(Thread.currentThread() + "exceptionally exception=>" + e.getMessage());
        return "exception";
    });
    System.out.println("task future = " + future.get());
    System.out.println("handle future = " + newFuture.get());
    System.out.println("exception future = " + exceptionFuture.get());
}
//输出
Thread[ForkJoinPool.commonPool-worker-9,5,main]exceptionally exception=>java.lang.RuntimeException: java.lang.ArithmeticException: / by zero
Thread[main,5,main]whenComplete exception=>java.lang.RuntimeException: java.lang.ArithmeticException: / by zero
Thread[ForkJoinPool.commonPool-worker-9,5,main]handle exception=>java.lang.RuntimeException: java.lang.ArithmeticException: / by zero
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: java.lang.ArithmeticException: / by zero
 at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
 at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
 at com.example.demo.CompletableFutureTest3.main(CompletableFutureTest3.java:31)
Caused by: java.lang.RuntimeException: java.lang.ArithmeticException: / by zero
 at com.example.demo.CompletableFutureTest3.lambda$main$0(CompletableFutureTest3.java:13)
 at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
 at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1596)
 at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
 at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)
 at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)
 at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)
Caused by: java.lang.ArithmeticException: / by zero
 at com.example.demo.CompletableFutureTest3.lambda$main$0(CompletableFutureTest3.java:10)
 ... 6 more


相关文章
|
缓存 安全 Java
温故知新-并发编程篇
温故知新-并发编程篇
61 0
|
3月前
|
安全 程序员 Go
深入浅出Go语言的并发之道
在本文中,我们将探索Go语言如何优雅地处理并发编程。通过对比传统多线程模型,我们将揭示Go语言独特的goroutine和channel机制是如何简化并发编程,并提高程序的效率和稳定性。本文不涉及复杂的技术术语,而是用通俗易懂的语言,结合生动的比喻,让读者能够轻松理解Go语言并发编程的核心概念。
|
算法 编译器 程序员
[笔记]C++并发编程实战 《一》你好,C++的并发世界(二)
[笔记]C++并发编程实战 《一》你好,C++的并发世界(二)
|
安全 程序员 API
[笔记]C++并发编程实战 《一》你好,C++的并发世界(一)
[笔记]C++并发编程实战 《一》你好,C++的并发世界
|
存储 缓存 算法
并发编程基础
并发编程基础
|
缓存 安全 Java
【并发编程】JAVA并发编程面试题合集
【并发编程】JAVA并发编程面试题合集
【并发编程】JAVA并发编程面试题合集
|
Java 调度
并发编程从入门到放弃系列开始和结束(一)
对于 Java 部分的面试来说,突然想到并发这一块的内容是不太完整的,这篇文章会通篇把多线程和并发都大致阐述一遍,至少能够达到了解原理和使用的目的,内容会比较多,从最基本的线程到我们常用的类会统一说一遍,慢慢看。
1646 3
并发编程从入门到放弃系列开始和结束(一)
|
缓存 安全 Java
并发编程从入门到放弃系列开始和结束(二)
对于 Java 部分的面试来说,突然想到并发这一块的内容是不太完整的,这篇文章会通篇把多线程和并发都大致阐述一遍,至少能够达到了解原理和使用的目的,内容会比较多,从最基本的线程到我们常用的类会统一说一遍,慢慢看。
并发编程从入门到放弃系列开始和结束(二)
|
存储 Java
并发编程从入门到放弃系列开始和结束(五)
对于 Java 部分的面试来说,突然想到并发这一块的内容是不太完整的,这篇文章会通篇把多线程和并发都大致阐述一遍,至少能够达到了解原理和使用的目的,内容会比较多,从最基本的线程到我们常用的类会统一说一遍,慢慢看。
并发编程从入门到放弃系列开始和结束(五)

相关课程

更多