前文
开启线程执行任务,不管是使用Runnable(无返回值不支持上报异常)还是Callable(有返回值支持上报异常)接口,都可以轻松实现。那么如果是开启线程池并需要获取结果归集的情况下,如何实现,以及优劣?
本文将分别以这四种方式解决归集的问题,然后看看效率和使用的方便程度即可
1、Futrue
Future接口封装了取消,获取线程结果,以及状态判断是否取消,是否完成这几个方法,都很有用。
Demo:
使用线程池提交Callable接口任务,返回Future接口,添加进list,最后遍历该List且内部使用while轮询,并发获取结果,代码如下
/** * 使用Futrue来实现多线程执行归集操作 * * @author fangshixiang@vipkid.com.cn * @description // * @date 2018/10/31 11:02 */ public class FutureDemo { public static void main(String[] args) { Long start = Instant.now().toEpochMilli(); //定义一个线程池 方便开启和执行多线程 此处为了方便,直接使用 newFixedThreadPool ExecutorService exs = Executors.newFixedThreadPool(10); //结果集 装载在list里面 List<Integer> list = new ArrayList<>(); List<Future<Integer>> futureList = new ArrayList<>(); try { //1.高速提交10个任务,每个任务返回一个Future入futureList 装载起来 这样10个线程就并行去处理和计算了 for (int i = 0; i < 10; i++) { futureList.add(exs.submit(new CallableTask(i + 1))); } Long getResultStart = Instant.now().toEpochMilli(); System.out.println("结果归集开始时间=" + LocalDateTime.now()); //2.结果归集,用迭代器遍历futureList,高速轮询(模拟实现了并发),任务完成就移除 while (futureList.size() > 0) { Iterator<Future<Integer>> iterable = futureList.iterator(); //遍历 轮询 while (iterable.hasNext()) { Future<Integer> future = iterable.next(); //如果任务完成就立马取结果,并且,并且把该任务直接从futureList移除掉 否则判断下一个任务是否完成 if (future.isDone() && !future.isCancelled()) { //获取结果 Integer i = future.get(); System.out.println("任务i=" + i + "获取完成,移出任务队列!" + LocalDateTime.now()); //把结果装入进去 然后把futrue任务移除 list.add(i); iterable.remove(); } else { Thread.sleep(1);//避免CPU高速运转(这就是轮询的弊端),这里休息1毫秒,CPU纳秒级别 } } } System.out.println("list=" + list); //任务的处理结果 System.out.println("总耗时=" + (System.currentTimeMillis() - start) + ",取结果归集耗时=" + (System.currentTimeMillis() - getResultStart)); } catch (Exception e) { e.printStackTrace(); } finally { exs.shutdown(); } } // 任务 采用sleep模拟处理任务需要消耗的时间 static class CallableTask implements Callable<Integer> { Integer i; //用来编号任务 方便日志里输出识别 public CallableTask(Integer i) { super(); this.i = i; } @Override public Integer call() throws Exception { if (i == 1) { Thread.sleep(3000);//任务1耗时3秒 } else if (i == 5) { Thread.sleep(5000);//任务5耗时5秒 } else { Thread.sleep(1000);//其它任务耗时1秒 } System.out.println("task线程:" + Thread.currentThread().getName() + "任务i=" + i + ",完成!" + LocalDateTime.now()); return i; } } }
如上图,开启定长为10的线程池:ExecutorService exs = Executors.newFixedThreadPool(10);+任务1耗时3秒,任务5耗时5秒,其他1秒。控制台打印如下:
结果归集开始时间=2018-10-31T11:01:19.457 task线程:pool-1-thread-2任务i=2,完成!2018-10-31T11:01:19.976 task线程:pool-1-thread-4任务i=4,完成!2018-10-31T11:01:19.977 task线程:pool-1-thread-3任务i=3,完成!2018-10-31T11:01:19.977 任务i=4获取完成,移出任务队列!2018-10-31T11:01:19.978 task线程:pool-1-thread-9任务i=9,完成!2018-10-31T11:01:19.978 task线程:pool-1-thread-8任务i=8,完成!2018-10-31T11:01:19.978 task线程:pool-1-thread-7任务i=7,完成!2018-10-31T11:01:19.978 task线程:pool-1-thread-6任务i=6,完成!2018-10-31T11:01:19.978 任务i=6获取完成,移出任务队列!2018-10-31T11:01:19.979 任务i=7获取完成,移出任务队列!2018-10-31T11:01:19.979 task线程:pool-1-thread-10任务i=10,完成!2018-10-31T11:01:19.979 任务i=8获取完成,移出任务队列!2018-10-31T11:01:19.979 任务i=9获取完成,移出任务队列!2018-10-31T11:01:19.979 任务i=10获取完成,移出任务队列!2018-10-31T11:01:19.979 任务i=2获取完成,移出任务队列!2018-10-31T11:01:19.980 任务i=3获取完成,移出任务队列!2018-10-31T11:01:19.980 task线程:pool-1-thread-1任务i=1,完成!2018-10-31T11:01:21.964 任务i=1获取完成,移出任务队列!2018-10-31T11:01:21.965 task线程:pool-1-thread-5任务i=5,完成!2018-10-31T11:01:23.977 任务i=5获取完成,移出任务队列!2018-10-31T11:01:23.979 list=[4, 6, 7, 8, 9, 10, 2, 3, 1, 5] 总耗时=5070,取结果归集耗时=5037
看最后的两个结果输出:
list=[4, 6, 7, 8, 9, 10, 2, 3, 1, 5]--》多执行几遍,最后2个总是1,5最后加进去的,可实现按照任务完成先后顺序获取结果! 因为1需要3s,5需要5s是最慢的,所以最后进入list 总耗时=5046,取结果归集耗时=5040 ---》符合逻辑,10个任务,定长10线程池,其中一个任务耗时3秒,一个任务耗时5秒,由于并发高速轮训,耗时取最长5秒
建议:此种方法可实现基本目标,任务并行且按照完成顺序获取结果。使用很普遍,老少皆宜,就是CPU有消耗,可以使用!
2、FutureTask
FutureTask是接口RunnableFuture的唯一实现类(实现了Future+Runnable).
1.Runnable接口,可开启单个线程执行。
2.Future接口,可接受Callable接口的返回值,futureTask.get()阻塞获取结果。
demo:
demo1:两个步骤:1.开启单个线程执行任务,2.阻塞等待执行结果,分离这两步骤,可在这两步中间穿插别的相关业务逻辑
/** * FutureTask弥补了Future必须用线程池提交返回Future的缺陷,实现功能如下: * 这两个步骤:一个开启线程执行任务,一个阻塞等待执行结果,分离这两步骤,可在这两步中间穿插别的相关业务逻辑。 * * @author fangshixiang@vipkid.com.cn * @description // * @date 2018/10/31 11:15 */ public class FutureTaskContorlDemo { public static void main(String[] args) { try { System.out.println("=====例如一个统计公司总部和分部的总利润是否达标100万=========="); //利润 记录总公司的利润综合 Integer count = 0; //1.定义一个futureTask,假设去远程http获取各个分公司业绩(任务都比较耗时). FutureTask<Integer> futureTask = new FutureTask<>(new CallableTask()); Thread futureTaskThread = new Thread(futureTask); futureTaskThread.start(); System.out.println("futureTaskThread start!" + new Date()); //2.主线程先做点别的事 System.out.println("主线程查询总部公司利润开始时间:" + new Date()); Thread.sleep(5000); count += 10; //10表示北京集团总部利润。 System.out.println("主线程查询总部公司利润结果时间:" + new Date()); //总部已达标100万利润,就不再继续执行获取分公司业绩任务了 if (count >= 100) { System.out.println("总部公司利润达标,取消futureTask!" + new Date()); futureTask.cancel(true);//不需要再去获取结果,那么直接取消即可 } else { System.out.println("总部公司利润未达标,进入阻塞查询分公司利润!" + new Date()); //3总部未达标.阻塞获取,各个分公司结果 然后分别去获取分公司的利润 Integer i = futureTask.get();//真正执行CallableTask System.out.println("i=" + i + "获取到结果!" + new Date()); } } catch (Exception e) { e.printStackTrace(); } } // 模拟一个十分耗时的任务 去所有的分公司里去获取利润结果 static class CallableTask implements Callable<Integer> { @Override public Integer call() throws Exception { System.out.println("CallableTask-call,查询分公司利润,执行开始!" + new Date()); Thread.sleep(10000); System.out.println("CallableTask-call,查询分公司利润,执行完毕!" + new Date()); return 10; } } }
FutureTask这个任务,是Thread.start()的时候就开始执行了的。而结果是.get()的时候才会给你(如果提前完成,结果也会先给你缓存在对象内喽。否则get就会阻塞直到有结果了)
注意:倘若你的任务里抛出了异常。那么get方法就会报错从而中断主线程(相当于不需要返回值的异步执行嘛~),但是但是但是,如果你不调用get方法,主线程是不会中断的。
输出:
=====例如一个统计公司总部和分部的总利润是否达标100万========== futureTaskThread start!Wed Oct 31 11:21:33 CST 2018 主线程查询总部公司利润开始时间:Wed Oct 31 11:21:33 CST 2018 CallableTask-call,查询分公司利润,执行开始!Wed Oct 31 11:21:33 CST 2018 主线程查询总部公司利润结果时间:Wed Oct 31 11:21:38 CST 2018 总部公司利润未达标,进入阻塞查询分公司利润!Wed Oct 31 11:21:38 CST 2018 CallableTask-call,查询分公司利润,执行完毕!Wed Oct 31 11:21:43 CST 2018 i=10获取到结果!Wed Oct 31 11:21:43 CST 2018
如上,分离之后,futureTaskThread耗时10秒期间,主线程还穿插的执行了耗时5秒的操作,大大减小总耗时。且可根据业务逻辑实时判断是否需要继续执行futureTask。
Demo2:FutureTask一样可以并发执行任务并获取结果,如下:
/** * FutureTask实现多线程并发执行任务并取结果归集 * * @author fangshixiang@vipkid.com.cn * @description // * @date 2018/10/31 11:26 */ public class FutureTaskDemo { public static void main(String[] args) { Long start = System.currentTimeMillis(); ExecutorService exs = Executors.newFixedThreadPool(10); //结果集 List<Integer> list = new ArrayList<>(); List<FutureTask<Integer>> futureList = new ArrayList<>(); try { //启动线程池 和上面Futrue对比,只有这块有点不一样 for (int i = 0; i < 10; i++) { FutureTask<Integer> futureTask = new FutureTask<>(new CallableTask(i + 1)); //提交任务,添加返回,Runnable特性 exs.submit(futureTask); //Future特性 提交任务后 把futureTask添加进futureList futureList.add(futureTask); } Long getResultStart = System.currentTimeMillis(); System.out.println("结果归集开始时间=" + new Date()); //结果归集 while (futureList.size() > 0) { Iterator<FutureTask<Integer>> iterable = futureList.iterator(); //遍历一遍 while (iterable.hasNext()) { Future<Integer> future = iterable.next(); if (future.isDone() && !future.isCancelled()) { //Future特性 Integer i = future.get(); System.out.println("任务i=" + i + "获取完成,移出任务队列!" + new Date()); list.add(i); //任务完成移除任务 iterable.remove(); } else { //避免CPU高速轮循,可以休息一下。 Thread.sleep(1); } } } System.out.println("list=" + list); System.out.println("总耗时=" + (System.currentTimeMillis() - start) + ",取结果归集耗时=" + (System.currentTimeMillis() - getResultStart)); } catch (Exception e) { e.printStackTrace(); } finally { exs.shutdown(); } } static class CallableTask implements Callable<Integer> { Integer i; public CallableTask(Integer i) { super(); this.i = i; } @Override public Integer call() throws Exception { if (i == 1) { Thread.sleep(3000);//任务1耗时3秒 } else if (i == 5) { Thread.sleep(5000);//任务5耗时5秒 } else { Thread.sleep(1000);//其它任务耗时1秒 } System.out.println("task线程:" + Thread.currentThread().getName() + "任务i=" + i + ",完成!" + new Date()); return i; } } }
输出:
结果归集开始时间=Wed Oct 31 11:26:41 CST 2018 task线程:pool-1-thread-8任务i=8,完成!Wed Oct 31 11:26:42 CST 2018 task线程:pool-1-thread-7任务i=7,完成!Wed Oct 31 11:26:42 CST 2018 task线程:pool-1-thread-6任务i=6,完成!Wed Oct 31 11:26:42 CST 2018 task线程:pool-1-thread-4任务i=4,完成!Wed Oct 31 11:26:42 CST 2018 task线程:pool-1-thread-3任务i=3,完成!Wed Oct 31 11:26:42 CST 2018 task线程:pool-1-thread-2任务i=2,完成!Wed Oct 31 11:26:42 CST 2018 task线程:pool-1-thread-10任务i=10,完成!Wed Oct 31 11:26:42 CST 2018 task线程:pool-1-thread-9任务i=9,完成!Wed Oct 31 11:26:42 CST 2018 任务i=8获取完成,移出任务队列!Wed Oct 31 11:26:42 CST 2018 任务i=9获取完成,移出任务队列!Wed Oct 31 11:26:42 CST 2018 任务i=10获取完成,移出任务队列!Wed Oct 31 11:26:42 CST 2018 任务i=2获取完成,移出任务队列!Wed Oct 31 11:26:42 CST 2018 任务i=3获取完成,移出任务队列!Wed Oct 31 11:26:42 CST 2018 任务i=4获取完成,移出任务队列!Wed Oct 31 11:26:42 CST 2018 任务i=6获取完成,移出任务队列!Wed Oct 31 11:26:42 CST 2018 任务i=7获取完成,移出任务队列!Wed Oct 31 11:26:42 CST 2018 task线程:pool-1-thread-1任务i=1,完成!Wed Oct 31 11:26:44 CST 2018 任务i=1获取完成,移出任务队列!Wed Oct 31 11:26:44 CST 2018 task线程:pool-1-thread-5任务i=5,完成!Wed Oct 31 11:26:46 CST 2018 任务i=5获取完成,移出任务队列!Wed Oct 31 11:26:46 CST 2018 list=[8, 9, 10, 2, 3, 4, 6, 7, 1, 5] 总耗时=5066,取结果归集耗时=5058
建议:
demo1在特定场合例如有十分耗时的业务但有依赖于其他业务不一定非要执行的,可以尝试使用。
demo2多线程并发执行并结果归集,这里多套一层FutureTask比较鸡肋(直接返回Future简单明了)不建议使用。