CompletionService学习

简介: 前面已经说到Future的默认实现是FutureTask,因此你可以看到其在jdk1.5的时候采用的是AQS去实现的,因此具有阻塞性,但jdk1.6之后,可以看到其基于CAS实现的。之所以学习Future,除了其具备异步功能,同时其采用的思想也是在设计模式中有体现的,也即Future模式,而且可以在kafka源码中看到基于Future构建的异步编程。前面说到其基于AQS具有阻塞性,但从源码中,可以看到在jdk1.6之后采用的是CAS

前面已经说到Future的默认实现是FutureTask,因此你可以看到其在jdk1.5的时候采用的是AQS去实现的,因此具有阻塞性,但jdk1.6之后,可以看到其基于CAS实现的。之所以学习Future,除了其具备异步功能,同时其采用的思想也是在设计模式中有体现的,也即Future模式,而且可以在kafka源码中看到基于Future构建的异步编程。

前面说到其基于AQS具有阻塞性,但从源码中,可以看到在jdk1.6之后采用的是CAS:

publicVget(longtimeout, TimeUnitunit)
throwsInterruptedException, ExecutionException, TimeoutException {
if (unit==null)
thrownewNullPointerException();
ints=state;
if (s<=COMPLETING&&        (s=awaitDone(true, unit.toNanos(timeout))) <=COMPLETING)
thrownewTimeoutException();
returnreport(s);
}

可以看到awaitDone(true, unit.toNanos(timeout)))方法:

/*** Awaits completion or aborts on interrupt or timeout.** @param timed true if use timed waits* @param nanos time to wait, if timed* @return state upon completion*/privateintawaitDone(booleantimed, longnanos)
throwsInterruptedException {
finallongdeadline=timed?System.nanoTime() +nanos : 0L;
WaitNodeq=null;
booleanqueued=false;
for (;;) {  //进行自旋if (Thread.interrupted()) {
removeWaiter(q);
thrownewInterruptedException();
            }
ints=state;  //进行状态匹配if (s>COMPLETING) {
if (q!=null)
q.thread=null;
returns;
            }
elseif (s==COMPLETING) // cannot time out yetThread.yield();
elseif (q==null)
q=newWaitNode();
elseif (!queued)
queued=UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next=waiters, q);
elseif (timed) {
nanos=deadline-System.nanoTime();
if (nanos<=0L) {
removeWaiter(q);
returnstate;
                }
LockSupport.parkNanos(this, nanos);
            }
elseLockSupport.park(this);
        }
    }

还有其run方法:

publicvoidrun() {//采用casif (state!=NEW||!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V>c=callable;
if (c!=null&&state==NEW) {
Vresult;
booleanran;
try {
result=c.call();
ran=true;
            } catch (Throwableex) {
result=null;
ran=false;
setException(ex);
            }
if (ran)
set(result);
        }
    } finally {
// runner must be non-null until state is settled to// prevent concurrent calls to run()runner=null;
// state must be re-read after nulling runner to prevent// leaked interruptsints=state;
if (s>=INTERRUPTING)
handlePossibleCancellationInterrupt(s);
    }
}

也即可以看到jdk1.6之后,FutureTask采用park和cas来实现的。

下面来学习CompletionService。

如果向Executor提交一组计算任务,并且希望在计算完成后获得结果,那么可以保留与每个任务关联的future,然后返回使用get方法,同时将timeout指定为0,从而通过轮询的方式来判断任务是否完成,但是这样有些繁琐。因此可以采用CompletionService,其将Executor和BlockingQueue的功能融合在一起,可以采用队列操作take、poll获取已完结的结果。--摘自《并发编程实战》

/*** @description: CompletionSerivce使用* <p>* 采用异步的方式一边处理新的任务,一边处理完成任务的结果* 也就是说在处理多个任务时,可以实现先处理的任务,先拿到结果* 采用 submit+take,不至于在一个任务没有完成的情况下,其余的结果不能处理* 你可以将其理解成Executor+BlockingQueue的结合体,此时你可以使用其实现* ExecutorCompletionService,进行异构并行* </p>* @author: lyz* @date: 2020/05/24 22:02**/publicclassCompletionServiceTest {
publicstaticvoidmain(String[] args) {
Longstart=System.currentTimeMillis();
//开启5个线程ExecutorServiceexs=Executors.newFixedThreadPool(4);
try {
inttaskCount=10;
// 结果集List<Integer>list=newArrayList<Integer>();
List<Future<Integer>>futureList=newArrayList<Future<Integer>>();
// 1.定义CompletionServiceCompletionService<Integer>completionService=newExecutorCompletionService<Integer>(exs);
// 2.添加任务,需要执行的业务for (inti=0; i<taskCount; i++) {
Future<Integer>future=completionService.submit(newTask(i+1));
futureList.add(future);
            }
// 3.获取结果for (inti=0; i<taskCount; i++) {
Integerresult=completionService.take().get();
System.out.println("任务i=="+result+"完成!"+newDate());
list.add(result);
            }
System.out.println("list="+list);
        } catch (Exceptione) {
e.printStackTrace();
        } finally {
//关闭线程池exs.shutdown();
        }
    }
//实现Callable接口,重写call方法staticclassTaskimplementsCallable<Integer> {
Integeri;
publicTask(Integeri) {
super();
this.i=i;
        }
@OverridepublicIntegercall() throwsException {
if (i==4) {
Thread.sleep(5000);
            } else {
Thread.sleep(1000);
            }
System.out.println("线程:"+Thread.currentThread().getName() +"任务i="+i+",执行完成!");
returni;
        }
    }
}

运行结果:

线程:pool-1-thread-2任务i=2,执行完成!线程:pool-1-thread-1任务i=1,执行完成!线程:pool-1-thread-3任务i=3,执行完成!任务i==2完成!SunMay2422:50:01CST2020任务i==1完成!SunMay2422:50:01CST2020任务i==3完成!SunMay2422:50:01CST2020线程:pool-1-thread-2任务i=5,执行完成!任务i==5完成!SunMay2422:50:02CST2020线程:pool-1-thread-1任务i=6,执行完成!线程:pool-1-thread-3任务i=7,执行完成!任务i==6完成!SunMay2422:50:02CST2020任务i==7完成!SunMay2422:50:02CST2020线程:pool-1-thread-2任务i=8,执行完成!任务i==8完成!SunMay2422:50:03CST2020线程:pool-1-thread-1任务i=9,执行完成!线程:pool-1-thread-3任务i=10,执行完成!任务i==9完成!SunMay2422:50:03CST2020任务i==10完成!SunMay2422:50:03CST2020线程:pool-1-thread-4任务i=4,执行完成!任务i==4完成!SunMay2422:50:05CST2020list=[2, 1, 3, 5, 6, 7, 8, 9, 10, 4]

可以看到其与FutureTask相比,运行的结果是不相同的,其返回结果的线程是乱序的,同时是先执行完的,先返回结果,不同于FutureTask的顺序返回。同时性能上优于FutureTask。当然,在JDK8中,我们可以是一个更为强大的CompletableFuture来实现异构应用。


目录
相关文章
|
4月前
|
Java
CompletionService 使用小结
CompletionService 使用小结
28 1
|
4月前
|
存储 缓存 安全
(八)深入并发之Runnable、Callable、FutureTask及CompletableFuture原理分析
关于Runnable、Callable接口大家可能在最开始学习Java多线程编程时,都曾学习过一个概念:在Java中创建多线程的方式有三种:继承Thread类、实现Runnable接口以及实现Callable接口。但是实则不然,真正创建多线程的方式只有一种:继承Thread类,因为只有`new Thread().start()`这种方式才能真正的映射一条OS的内核线程执行,而关于实现Runnable接口以及实现Callable接口创建出的Runnable、Callable对象在我看来只能姑且被称为“多线程任务”,因为无论是Runnable对象还是Callable对象,最终执行都要交由Threa
|
6月前
|
Java
Java并发编程:理解并使用Future和Callable接口
【2月更文挑战第25天】 在Java中,多线程编程是一个重要的概念,它允许我们同时执行多个任务。然而,有时候我们需要等待一个或多个线程完成,然后才能继续执行其他任务。这就需要使用到Future和Callable接口。本文将深入探讨这两个接口的用法,以及它们如何帮助我们更好地管理多线程。
|
6月前
|
Java API
java多线程之FutureTask、Future、CompletableFuture
java多线程之FutureTask、Future、CompletableFuture
256 0
|
Java
Java多线程 CompletionService和ExecutorCompletionService
Java多线程 CompletionService和ExecutorCompletionService
142 0
Java多线程 CompletionService和ExecutorCompletionService
|
Java
Java多线程 Future和FutureTask的区别
Java多线程 Future和FutureTask的区别
186 0
Java多线程 Future和FutureTask的区别
|
Java
Java多线程 Callable和Future
Java多线程 Callable和Future
129 0
Java多线程 Callable和Future
|
安全
线程池中CompletionService的应用
线程池中CompletionService的应用
114 0
|
消息中间件 Dubbo Java
“既生 ExecutorService, 何生 CompletionService?”
“既生 ExecutorService, 何生 CompletionService?”
“既生 ExecutorService, 何生 CompletionService?”