前面已经说到Future的默认实现是FutureTask,因此你可以看到其在jdk1.5的时候采用的是AQS去实现的,因此具有阻塞性,但jdk1.6之后,可以看到其基于CAS实现的。之所以学习Future,除了其具备异步功能,同时其采用的思想也是在设计模式中有体现的,也即Future模式,而且可以在kafka源码中看到基于Future构建的异步编程。
前面说到其基于AQS具有阻塞性,但从源码中,可以看到在jdk1.6之后采用的是CAS:
publicVget(longtimeout, TimeUnitunit) throwsInterruptedException, ExecutionException, TimeoutException { if (unit==null) thrownewNullPointerException(); ints=state; if (s<=COMPLETING&& (s=awaitDone(true, unit.toNanos(timeout))) <=COMPLETING) thrownewTimeoutException(); returnreport(s); }
可以看到awaitDone(true, unit.toNanos(timeout)))方法:
/*** Awaits completion or aborts on interrupt or timeout.** @param timed true if use timed waits* @param nanos time to wait, if timed* @return state upon completion*/privateintawaitDone(booleantimed, longnanos) throwsInterruptedException { finallongdeadline=timed?System.nanoTime() +nanos : 0L; WaitNodeq=null; booleanqueued=false; for (;;) { //进行自旋if (Thread.interrupted()) { removeWaiter(q); thrownewInterruptedException(); } ints=state; //进行状态匹配if (s>COMPLETING) { if (q!=null) q.thread=null; returns; } elseif (s==COMPLETING) // cannot time out yetThread.yield(); elseif (q==null) q=newWaitNode(); elseif (!queued) queued=UNSAFE.compareAndSwapObject(this, waitersOffset, q.next=waiters, q); elseif (timed) { nanos=deadline-System.nanoTime(); if (nanos<=0L) { removeWaiter(q); returnstate; } LockSupport.parkNanos(this, nanos); } elseLockSupport.park(this); } }
还有其run方法:
publicvoidrun() {//采用casif (state!=NEW||!UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V>c=callable; if (c!=null&&state==NEW) { Vresult; booleanran; try { result=c.call(); ran=true; } catch (Throwableex) { result=null; ran=false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to// prevent concurrent calls to run()runner=null; // state must be re-read after nulling runner to prevent// leaked interruptsints=state; if (s>=INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
也即可以看到jdk1.6之后,FutureTask采用park和cas来实现的。
下面来学习CompletionService。
如果向Executor提交一组计算任务,并且希望在计算完成后获得结果,那么可以保留与每个任务关联的future,然后返回使用get方法,同时将timeout指定为0,从而通过轮询的方式来判断任务是否完成,但是这样有些繁琐。因此可以采用CompletionService,其将Executor和BlockingQueue的功能融合在一起,可以采用队列操作take、poll获取已完结的结果。--摘自《并发编程实战》
/*** @description: CompletionSerivce使用* <p>* 采用异步的方式一边处理新的任务,一边处理完成任务的结果* 也就是说在处理多个任务时,可以实现先处理的任务,先拿到结果* 采用 submit+take,不至于在一个任务没有完成的情况下,其余的结果不能处理* 你可以将其理解成Executor+BlockingQueue的结合体,此时你可以使用其实现* ExecutorCompletionService,进行异构并行* </p>* @author: lyz* @date: 2020/05/24 22:02**/publicclassCompletionServiceTest { publicstaticvoidmain(String[] args) { Longstart=System.currentTimeMillis(); //开启5个线程ExecutorServiceexs=Executors.newFixedThreadPool(4); try { inttaskCount=10; // 结果集List<Integer>list=newArrayList<Integer>(); List<Future<Integer>>futureList=newArrayList<Future<Integer>>(); // 1.定义CompletionServiceCompletionService<Integer>completionService=newExecutorCompletionService<Integer>(exs); // 2.添加任务,需要执行的业务for (inti=0; i<taskCount; i++) { Future<Integer>future=completionService.submit(newTask(i+1)); futureList.add(future); } // 3.获取结果for (inti=0; i<taskCount; i++) { Integerresult=completionService.take().get(); System.out.println("任务i=="+result+"完成!"+newDate()); list.add(result); } System.out.println("list="+list); } catch (Exceptione) { e.printStackTrace(); } finally { //关闭线程池exs.shutdown(); } } //实现Callable接口,重写call方法staticclassTaskimplementsCallable<Integer> { Integeri; publicTask(Integeri) { super(); this.i=i; } publicIntegercall() throwsException { if (i==4) { Thread.sleep(5000); } else { Thread.sleep(1000); } System.out.println("线程:"+Thread.currentThread().getName() +"任务i="+i+",执行完成!"); returni; } } }
运行结果:
线程:pool-1-thread-2任务i=2,执行完成!线程:pool-1-thread-1任务i=1,执行完成!线程:pool-1-thread-3任务i=3,执行完成!任务i==2完成!SunMay2422:50:01CST2020任务i==1完成!SunMay2422:50:01CST2020任务i==3完成!SunMay2422:50:01CST2020线程:pool-1-thread-2任务i=5,执行完成!任务i==5完成!SunMay2422:50:02CST2020线程:pool-1-thread-1任务i=6,执行完成!线程:pool-1-thread-3任务i=7,执行完成!任务i==6完成!SunMay2422:50:02CST2020任务i==7完成!SunMay2422:50:02CST2020线程:pool-1-thread-2任务i=8,执行完成!任务i==8完成!SunMay2422:50:03CST2020线程:pool-1-thread-1任务i=9,执行完成!线程:pool-1-thread-3任务i=10,执行完成!任务i==9完成!SunMay2422:50:03CST2020任务i==10完成!SunMay2422:50:03CST2020线程:pool-1-thread-4任务i=4,执行完成!任务i==4完成!SunMay2422:50:05CST2020list=[2, 1, 3, 5, 6, 7, 8, 9, 10, 4]
可以看到其与FutureTask相比,运行的结果是不相同的,其返回结果的线程是乱序的,同时是先执行完的,先返回结果,不同于FutureTask的顺序返回。同时性能上优于FutureTask。当然,在JDK8中,我们可以是一个更为强大的CompletableFuture来实现异构应用。