Demo:
JDK8的CompletableFuture 自带多任务组合方法allOf和anyOf
- allOf是等待所有任务完成,构造后CompletableFuture完成
- anyOf是只要有一个任务完成,构造后CompletableFuture就完成
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) { return andTree(cfs, 0, cfs.length - 1); } public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) { return orTree(cfs, 0, cfs.length - 1); }
方式一:循环创建CompletableFuture list,调用sequence()组装返回一个有返回值的CompletableFuture,返回结果get()获取
/** * 多线程并发任务,取结果归集 * * @author fangshixiang@vipkid.com.cn * @description // * @date 2018/10/31 11:53 */ public class CompletableFutureDemo { public static void main(String[] args) { Long start = System.currentTimeMillis(); //定长10线程池 ExecutorService exs = Executors.newFixedThreadPool(10); //结果集 List<String> list = new ArrayList<>(); List<String> list2 = new ArrayList<>(); List<CompletableFuture<String>> futureList = new ArrayList<>(); final List<Integer> taskList = Arrays.asList(2, 1, 3, 4, 5, 6, 7, 8, 9, 10); try { //方式一:循环创建CompletableFuture list, 然后组装 组装返回一个有返回值的CompletableFuture,返回结果get()获取 for (int i = 0; i < taskList.size(); i++) { final int j = i; //异步执行 拿到每个有返回值的CompletableFuture对象 CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> calc(taskList.get(j)), exs) //Integer转换字符串 thenAccept只接受不返回不影响结果 .thenApply(e -> Integer.toString(e)) //如需获取任务完成先后顺序,此处代码即可 .whenComplete((v, e) -> { System.out.println("任务" + v + "完成!result=" + v + ",异常 e=" + e + "," + new Date()); list2.add(v); }); futureList.add(future); } //流式获取结果:此处是根据任务添加顺序获取的结果======================== //1.构造一个空CompletableFuture,子任务数为入参任务list size CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(futureList.stream() .filter(f -> f != null).collect(toList()).toArray(new CompletableFuture[futureList.size()])); //2.流式(总任务完成后,每个子任务join取结果,后转换为list) list = allDoneFuture.thenApply(v -> futureList.stream().map(CompletableFuture::join).collect(toList())).get(); //流式获取结果:此处是根据任务添加顺序获取的结果======================== System.out.println("任务完成先后顺序,结果list2=" + list2 + ";任务提交顺序,结果list=" + list + ",耗时=" + (System.currentTimeMillis() - start)); } catch (Exception e) { e.printStackTrace(); } finally { exs.shutdown(); } } //模拟任务的耗时方法 public static Integer calc(Integer i) { try { if (i == 1) { //任务1耗时3秒 Thread.sleep(3000); } else if (i == 5) { //任务5耗时5秒 Thread.sleep(5000); } else { //其它任务耗时1秒 Thread.sleep(1000); } System.out.println("task线程:" + Thread.currentThread().getName() + "任务i=" + i + ",完成!+" + new Date()); } catch (InterruptedException e) { e.printStackTrace(); } return i; } }
输出:
task线程:pool-1-thread-1任务i=2,完成!+Wed Oct 31 12:27:24 CST 2018 任务2完成!result=2,异常 e=null,Wed Oct 31 12:27:24 CST 2018 task线程:pool-1-thread-4任务i=4,完成!+Wed Oct 31 12:27:24 CST 2018 任务4完成!result=4,异常 e=null,Wed Oct 31 12:27:24 CST 2018 task线程:pool-1-thread-3任务i=3,完成!+Wed Oct 31 12:27:24 CST 2018 任务3完成!result=3,异常 e=null,Wed Oct 31 12:27:24 CST 2018 task线程:pool-1-thread-6任务i=6,完成!+Wed Oct 31 12:27:24 CST 2018 任务6完成!result=6,异常 e=null,Wed Oct 31 12:27:24 CST 2018 task线程:pool-1-thread-8任务i=8,完成!+Wed Oct 31 12:27:24 CST 2018 任务8完成!result=8,异常 e=null,Wed Oct 31 12:27:24 CST 2018 task线程:pool-1-thread-7任务i=7,完成!+Wed Oct 31 12:27:24 CST 2018 任务7完成!result=7,异常 e=null,Wed Oct 31 12:27:24 CST 2018 task线程:pool-1-thread-10任务i=10,完成!+Wed Oct 31 12:27:24 CST 2018 任务10完成!result=10,异常 e=null,Wed Oct 31 12:27:24 CST 2018 task线程:pool-1-thread-9任务i=9,完成!+Wed Oct 31 12:27:24 CST 2018 任务9完成!result=9,异常 e=null,Wed Oct 31 12:27:24 CST 2018 task线程:pool-1-thread-2任务i=1,完成!+Wed Oct 31 12:27:26 CST 2018 任务1完成!result=1,异常 e=null,Wed Oct 31 12:27:26 CST 2018 task线程:pool-1-thread-5任务i=5,完成!+Wed Oct 31 12:27:28 CST 2018 任务5完成!result=5,异常 e=null,Wed Oct 31 12:27:28 CST 2018 任务完成先后顺序,结果list2=[2, 4, 3, 6, 8, 7, 10, 9, 1, 5];任务提交顺序,结果list=[2, 1, 3, 4, 5, 6, 7, 8, 9, 10],耗时=5141
方式二:全流式处理转换成CompletableFuture[]+allOf组装成一个无返回值CompletableFuture,join等待执行完毕。返回结果whenComplete获取。—》推荐
/** * 多线程并发任务,取结果归集 * * @author fangshixiang@vipkid.com.cn * @description // * @date 2018/10/31 11:53 */ public class CompletableFutureDemo { public static void main(String[] args) { Long start = System.currentTimeMillis(); //定长10线程池 ExecutorService exs = Executors.newFixedThreadPool(10); //结果集 List<String> list = new ArrayList<>(); List<String> list2 = new ArrayList<>(); final List<Integer> taskList = Arrays.asList(2, 1, 3, 4, 5, 6, 7, 8, 9, 10); try { //方式二:全流式处理转换成CompletableFuture[]+组装成一个无返回值CompletableFuture,join等待执行完毕。返回结果whenComplete获取 CompletableFuture<Integer>[] cfs = taskList.stream().map(i -> //把计算任务 交给CompletableFuture异步去处理执行 CompletableFuture.supplyAsync(() -> calc(i), exs) // 把计算完成结果做Function处理:此处是转换成了字符串 .thenApply(h -> Integer.toString(h)) //如需获取任务完成先后顺序,此处代码即可 会先处理先完成的任务 后处理后完成的任务 使用起来比CompletionService确实方便不少 .whenComplete((v, e) -> { System.out.println("任务" + v + "完成!result=" + v + ",异常 e=" + e + "," + new Date()); list2.add(v); })).toArray(CompletableFuture[]::new); //此处直接toArray 不toList了 //等待总任务完成,但是封装后无返回值,必须自己whenComplete()获取 此处使用join来获取结果 CompletableFuture.allOf(cfs).join(); System.out.println("任务完成先后顺序,结果list2=" + list2 + ";任务提交顺序,结果list=" + list + ",耗时=" + (System.currentTimeMillis() - start)); } catch (Exception e) { e.printStackTrace(); } finally { exs.shutdown(); } } //模拟任务的耗时方法 public static Integer calc(Integer i) { try { if (i == 1) { //任务1耗时3秒 Thread.sleep(3000); } else if (i == 5) { //任务5耗时5秒 Thread.sleep(5000); } else { //其它任务耗时1秒 Thread.sleep(1000); } System.out.println("task线程:" + Thread.currentThread().getName() + "任务i=" + i + ",完成!+" + new Date()); } catch (InterruptedException e) { e.printStackTrace(); } return i; } }
输出:
task线程:pool-1-thread-7任务i=7,完成!+Wed Oct 31 12:19:41 CST 2018 task线程:pool-1-thread-3任务i=3,完成!+Wed Oct 31 12:19:41 CST 2018 task线程:pool-1-thread-4任务i=4,完成!+Wed Oct 31 12:19:41 CST 2018 任务4完成!result=4,异常 e=null,Wed Oct 31 12:19:41 CST 2018 task线程:pool-1-thread-8任务i=8,完成!+Wed Oct 31 12:19:41 CST 2018 任务8完成!result=8,异常 e=null,Wed Oct 31 12:19:41 CST 2018 task线程:pool-1-thread-9任务i=9,完成!+Wed Oct 31 12:19:41 CST 2018 task线程:pool-1-thread-1任务i=2,完成!+Wed Oct 31 12:19:41 CST 2018 task线程:pool-1-thread-10任务i=10,完成!+Wed Oct 31 12:19:41 CST 2018 任务2完成!result=2,异常 e=null,Wed Oct 31 12:19:41 CST 2018 任务10完成!result=10,异常 e=null,Wed Oct 31 12:19:41 CST 2018 任务3完成!result=3,异常 e=null,Wed Oct 31 12:19:41 CST 2018 任务7完成!result=7,异常 e=null,Wed Oct 31 12:19:41 CST 2018 task线程:pool-1-thread-6任务i=6,完成!+Wed Oct 31 12:19:41 CST 2018 任务9完成!result=9,异常 e=null,Wed Oct 31 12:19:41 CST 2018 任务6完成!result=6,异常 e=null,Wed Oct 31 12:19:41 CST 2018 task线程:pool-1-thread-2任务i=1,完成!+Wed Oct 31 12:19:43 CST 2018 任务1完成!result=1,异常 e=null,Wed Oct 31 12:19:43 CST 2018 task线程:pool-1-thread-5任务i=5,完成!+Wed Oct 31 12:19:45 CST 2018 任务5完成!result=5,异常 e=null,Wed Oct 31 12:19:45 CST 2018 任务完成先后顺序,结果list2=[4, 8, 2, 10, 3, 7, 9, 6, 1, 5];任务提交顺序,结果list=[],耗时=5166 ---》符合逻辑,10个任务,10个线程并发执行,其中任务1耗时3秒,任务5耗时5秒,耗时取最大值。
建议:CompletableFuture满足并发执行,顺序完成先手顺序获取的目标。而且支持每个任务的异常返回,配合流式编程,用起来速度飞起。JDK源生支持,API丰富,推荐使用。
总结
本文从原理、demo、建议三个方向分析了常用多线程并发,取结果归集的几种实现方案,希望对大家有所启发,整理表格如下: