一、说明
Future的不足
- 当通过
.get()
方法获取线程的返回值时,会导致阻塞
- 也就是和当前这个Future关联的计算任务真正执行完成的时候才返回结果
- 新任务必须等待已完成任务的结果才能继续进行处理,会浪费很多时间,最好是谁最先执行完成谁最先返回
CompletionService的引入
- 解决阻塞的问题
- 以异步的方式一边处理新的线程任务,一边处理已完成任务的结果,将执行任务与处理任务分开进行处理
二、理解
CompletionService
java.util.concurrent
包下CompletionService<V>
接口,但并不继承Executor
接口,仅有一个实现类ExecutorCompletionService
用于管理线程对象
- 更加有效地处理Future的返回值,避免阻塞,使用
.submit()
方法执行任务,使用.take()
取得已完成的任务,并按照完成这些任务的时间顺序处理它们的结果
public interface CompletionService<V> { Future<V> submit(Callable<V> task); Future<V> submit(Runnable task, V result); Future<V> take() throws InterruptedException; Future<V> poll(); Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException; }
submit()
方法用来执行线程任务
take()
方法从队列中获取完成任务的Future对象,谁最先执行完成谁最先返回,获取到的对象再调用.get()
方法获取结果
poll()
方法获取并删除代表下一个已完成任务的 Future,如果不存在,则返回null,此无阻塞的效果
poll(long timeout, TimeUnit unti)
timeout表示等待的最长时间,unit表示时间单位,在指定时间内还没获取到结果,则返回null
ExecutorCompletionService
java.util.concurrent
包下ExecutorCompletionService<V>
类实现CompletionService<V>
接口,方法与接口相同
- 比
ExecutorService
可以更精确和简便地完成异步任务的执行
executor
执行任务,completionQueue
保存异步任务执行的结果
public class ExecutorCompletionService<V> implements CompletionService<V> { private final Executor executor; private final AbstractExecutorService aes; private final BlockingQueue<Future<V>> completionQueue; …… Future<V> submit(Callable<V> task) Future<V> submit(Runnable task, V result) Future<V> take() throws InterruptedException Future<V> poll() Future<V> poll(long timeout, TimeUnit unit) …… }
completionQueue
初始化了一个LinkedBlockingQueue
类型的先进先出阻塞队列
public ExecutorCompletionService(Executor executor) { if (executor == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = new LinkedBlockingQueue<Future<V>>(); }
submit()
方法中QueueingFuture
是ExecutorCompletionService
中的内部类
public Future<V> submit(Callable<V> task) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task); executor.execute(new QueueingFuture<V>(f, completionQueue)); return f; }
QueueingFuture
将RunnableFuture
实例对象赋值给了task
,内部的done()
方法将task
添加到已完成阻塞队列中,调用take()
或poll()
方法获取已完成的Future
private static class QueueingFuture<V> extends FutureTask<Void> { QueueingFuture(RunnableFuture<V> task, BlockingQueue<Future<V>> completionQueue) { super(task, null); this.task = task; this.completionQueue = completionQueue; } private final Future<V> task; private final BlockingQueue<Future<V>> completionQueue; protected void done() { completionQueue.add(task); } }
三、实现
1.使用Future
创建CompletionServiceDemo
类,创建好的线程对象,使用Executors
工厂类来创建ExecutorService
的实例(即线程池),通过ThreadPoolExecutor
的.submit()
方法提交到线程池去执行,线程执行后,返回值Future可被拿到
public class CompletionServiceDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { // 1.创建线程池 ExecutorService executorService = Executors.newFixedThreadPool(5); // 2.创建Callable子线程对象任务 Callable callable_1 = new Callable() { @Override public String call() throws Exception { Thread.sleep(5000); return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " ); } }; Callable callable_2 = new Callable() { @Override public String call() throws Exception { Thread.sleep(3000); return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " ); } }; Callable callable_3 = new Callable() { @Override public String call() throws Exception { Thread.sleep(1000); return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " ); } }; // 3.使用Future提交三个任务到线程池 Future future_1 = executorService.submit(callable_1); Future future_2 = executorService.submit(callable_2); Future future_3 = executorService.submit(callable_3); // 4.获取返回值 System.out.println("开始获取结果 " + getStringDate()); System.out.println(future_1.get() + "" + getStringDate()); System.out.println(future_2.get() + "" + getStringDate()); System.out.println(future_3.get() + "" + getStringDate()); System.out.println("结束 " + getStringDate()); // 5.关闭线程池 executorService.shutdown(); } // 获取时间函数 public static String getStringDate() { Date currentTime = new Date(); SimpleDateFormat simpleDateFormat = new SimpleDateFormat("HH:mm:ss.SSS"); String date = simpleDateFormat.format(currentTime); return date; } }
future_1.get()
会等待执行时间阻塞5秒再获取到结果,而在这5秒内future_2
和future_3
的任务已完成,所以会立马得到结果
2.使用ExecutorCompletionService
创建一个ExecutorCompletionService
放入线程池实现CompletionService
接口,将创建好的线程对象通过CompletionService
提交任务和获取结果
public class CompletionServiceDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { // 1.创建线程池 ExecutorService executorService = Executors.newFixedThreadPool(5); // 2.创建一个ExecutorCompletionService放入线程池实现CompletionService接口 CompletionService completionService = new ExecutorCompletionService(executorService); // 3.创建Callable子线程对象任务 Callable callable_1 = new Callable() { @Override public String call() throws Exception { Thread.sleep(5000); return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " ); } }; Callable callable_2 = new Callable() { @Override public String call() throws Exception { Thread.sleep(3000); return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " ); } }; Callable callable_3 = new Callable() { @Override public String call() throws Exception { Thread.sleep(1000); return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " ); } }; // 3.使用CompletionService提交三个任务到线程池 completionService.submit(callable_1); completionService.submit(callable_2); completionService.submit(callable_3); // 4.获取返回值 System.out.println("开始获取结果 " + getStringDate()); System.out.println(completionService.take().get() + "" + getStringDate()); System.out.println(completionService.take().get() + "" + getStringDate()); System.out.println(completionService.take().get() + "" + getStringDate()); System.out.println("结束 " + getStringDate()); // 5.关闭线程池 executorService.shutdown(); } // 获取时间函数 public static String getStringDate() { Date currentTime = new Date(); SimpleDateFormat simpleDateFormat = new SimpleDateFormat("HH:mm:ss.SSS"); String date = simpleDateFormat.format(currentTime); return date; } }
提交顺序是1-2-3,按照完成这些任务的时间顺序处理它们的结果,返回顺序是3-2-1
3.take()方法
take()
方法从队列中获取完成任务的Future对象,会阻塞,一直等待线程池中返回一个结果,谁最先执行完成谁最先返回,获取到的对象再调用.get()
方法获取结果
如果调用take()
方法的次数大于任务数,会因为等不到有任务返回结果而阻塞,只有三个任务,第四次take等不到结果而阻塞
4.poll()方法
poll()
方法不会去等结果造成阻塞,没有结果则返回null,接着程序继续往下运行
直接用completionService.poll().get()
会引发 NullPointerException
创建一个循环,连续调用poll()
方法,每次隔1秒调用,没有结果则返回null
public class CompletionServiceDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { // 1.创建线程池 ExecutorService executorService = Executors.newFixedThreadPool(5); // 2.创建一个ExecutorCompletionService放入线程池实现CompletionService接口 CompletionService completionService = new ExecutorCompletionService(executorService); // 3.创建Callable子线程对象任务 Callable callable_1 = new Callable() { @Override public String call() throws Exception { Thread.sleep(5000); return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " ); } }; Callable callable_2 = new Callable() { @Override public String call() throws Exception { Thread.sleep(3000); return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " ); } }; Callable callable_3 = new Callable() { @Override public String call() throws Exception { Thread.sleep(1000); return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " ); } }; // 3.使用CompletionService提交三个任务到线程池 completionService.submit(callable_1); completionService.submit(callable_2); completionService.submit(callable_3); // 4.获取返回值 System.out.println("开始获取结果 " + getStringDate()); // 5.创建一个循环,连续调用poll()方法,间隔1秒 for (int i = 0; i < 8; i++) { Future future = completionService.poll(); if (future!=null){ System.out.println(future.get() + getStringDate()); }else { System.out.println(future+" "+getStringDate()); } Thread.sleep(1000); } System.out.println("结束 " + getStringDate()); // 6.关闭线程池 executorService.shutdown(); } // 获取时间函数 public static String getStringDate() { Date currentTime = new Date(); SimpleDateFormat simpleDateFormat = new SimpleDateFormat("HH:mm:ss.SSS"); String date = simpleDateFormat.format(currentTime); return date; } }
5.poll(long timeout, TimeUnit unit)方法
poll(long timeout, TimeUnit unit)
方法设置了等待时间,等待超时还没有结果就返回null
不使用 Thread.sleep(1000)
,将等待时间设置成0.5秒,由于只有8次循环,也就是4秒执行时间,而callable_1
需要执行5秒,获取不到结果则返回null
public class CompletionServiceDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { // 1.创建线程池 ExecutorService executorService = Executors.newFixedThreadPool(5); // 2.创建一个ExecutorCompletionService放入线程池实现CompletionService接口 CompletionService completionService = new ExecutorCompletionService(executorService); // 3.创建Callable子线程对象任务 Callable callable_1 = new Callable() { @Override public String call() throws Exception { Thread.sleep(5000); return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " ); } }; Callable callable_2 = new Callable() { @Override public String call() throws Exception { Thread.sleep(3000); return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " ); } }; Callable callable_3 = new Callable() { @Override public String call() throws Exception { Thread.sleep(1000); return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " ); } }; // 3.使用CompletionService提交三个任务到线程池 completionService.submit(callable_1); completionService.submit(callable_2); completionService.submit(callable_3); // 4.获取返回值 System.out.println("开始获取结果 " + getStringDate()); // 5.创建一个循环,连续调用poll()方法,间隔1秒 for (int i = 0; i < 8; i++) { Future future = completionService.poll(500, TimeUnit.MILLISECONDS); if (future!=null){ System.out.println(future.get() + getStringDate()); }else { System.out.println(future+" "+getStringDate()); } } System.out.println("结束 " + getStringDate()); // 6.关闭线程池 executorService.shutdown(); } // 获取时间函数 public static String getStringDate() { Date currentTime = new Date(); SimpleDateFormat simpleDateFormat = new SimpleDateFormat("HH:mm:ss.SSS"); String date = simpleDateFormat.format(currentTime); return date; } }