我们知道使用多线程时,最初的Thread到线程池,此时对于线程的使用,提供了其使用的复用率。而实现多线程的三种方式:继承Thread;实现Runnable接口,重写run方法;实现Callable接口,同时重写call方法,同时通过Future获取执行的返回值。也就是说callable执行任务,而Future拿到执行的结果。Future具有阻塞性在于其get()方法具有阻塞性,而isDone()是不具有阻塞性的。
通常使用线程池+Runnable的时候,会发现Runnable不能返回值,也就执行的结果情况,同时对于出现异常,我们获取异常信息,进行相应的处理。如果需要返回结果,同时需要进一步加工的时候,就可以考虑使用Future+Callable了。同时接口Future的默认实现是FutureTask,因此对于其实现get()方法,会有一个问题,就是如果前面的任务一旦执行的时间耗时较长的时候,就会出现一直阻塞的状态,此时就会出现排队等待的状态,大大影响其性能。适用场景:当一个线程需要等待另一个线程把某个任务执行完成后它才能继续执行,此时可以使用FutureTask。因为FutureTask基于AQS实现,因此其具有阻塞性。
Future的使用
/**** @description: Future使用* <p>* 实现callable接口,同时重写call方法,其优点:与Runnable不同的是,其可以返回结果,* 同时可以声明异常,返回一个执行检查的异常信息,而Runnable返回的是void,* 因此在程序上方便排查问题,同时了解执行的结果情况,如果返回的结果想是void的,则可以在实现时选择* Callable<void>* </p>* @author: lyz* @date: 2020/05/24 11:23**/publicclassFutureTest { staticclassMyCallableimplementsCallable<String>{ publicStringcall() throwsException { //业务逻辑执行部分log.info("do something in callable"); Thread.sleep(1000); return"Done"; } } publicstaticvoidmain(String[] args) throwsInterruptedException, ExecutionException { //ExecutorService executorService = Executors.newCachedThreadPool();ThreadPoolExecutorexecutorService=newThreadPoolExecutor(2,3,5L,TimeUnit.SECONDS,newLinkedBlockingDeque<>()); //执行需要提交的任务,其get()方法拿到需要的结果Future<String>future=executorService.submit(newMyCallable()); log.info("do something in main"); Thread.sleep(1000); //拿到执行后返回的结果Stringresult=future.get(); log.info("result:{}",result); executorService.shutdown(); } }
执行结果:
INFO [main] -dosomethinginmainINFO [pool-1-thread-1] -dosomethingincallableINFO [main] -result:Done
Future+Runnable,不带返回值:
/**** @description: Future使用* <p>* 实现callable接口,同时重写call方法,方法submit不仅可以传入Callable对象,* 也可以传入Runnable对象,说明submit()方法支持有返回值和无返回值的功能* get具有阻塞性,而isDone不阻塞* Callable<void>* </p>* @author: lyz* @date: 2020/05/24 11:28**/publicclassFutureTest2 { publicstaticvoidmain(String[] args) { try{ /* Runnable runnable = new Runnable() {@Overridepublic void run() {System.out.println("查看get结果打印信息");}};*///采用lambda表达式Runnablerunnable= ()->System.out.println("查看get结果打印信息"); ExecutorServiceexecutor=Executors.newCachedThreadPool(); Futurefuture=executor.submit(runnable); //此时get返回的值为null,说明支持void的方式System.out.println("future get result:"+future.get()+" and isDone:"+future.isDone()); }catch(Exceptione){ log.error("do the thing error:{}"+e.getMessage()); } } }
运行结果:
查看get结果打印信息futuregetresult:nullandisDone:true
Future+Runnable,带返回结果
/**** @description: Future使用* <p>* 实现callable接口,同时重写call方法,方法submit不仅可以传入Callable对象,* 而且还可以Runnable,同时可以从api中可以看到submit(Runnable,result)携带返回信息* </p>* @author: lyz* @date: 2020/05/24 11:38**/publicclassFutureTest3 { //实现Runnable接口,重写run方法staticclassMyRunnableimplementsRunnable{ privateUseruser; publicMyRunnable(Useruser){ this.user=user; } publicvoidrun() { user.setUsername("在路上"); user.setPassword("123456"); } } publicstaticvoidmain(String[] args) { try{ Useruser=newUser("123","345"); MyRunnablemyRunnable=newMyRunnable(user); ThreadPoolExecutorexecutor=newThreadPoolExecutor(10,10,10, TimeUnit.SECONDS,newLinkedBlockingDeque<Runnable>()); Future<User>future=executor.submit(myRunnable,user); System.out.println("start Time="+System.currentTimeMillis()); user=future.get(); System.out.println("get Value="+user.getUsername()+"==="+user.getPassword()); System.out.println("end Time="+System.currentTimeMillis()); }catch (Exceptione){ log.info("执行异常:{}",e.getMessage()); } } }
运行结果:
startTime=1590322809154getValue=在路上===123456endTime=1590322809157
执行多个任务:
/*** @description: Future使用* <p>* 实现callable接口,同时重写call方法,方法submit不仅可以传入Callable对象,* 而且还可以Runnable,同时可以从api中可以看到submit(Runnable,result)* get具有阻塞性,而isDone不阻塞。cancel与isCancelled使用* 其优点是从线程中返回数据以便进行后期的处理,其缺点是具有阻塞性* Callable<void>* </p>* @author: lyz* @date: 2020/05/24 11:48**/publicclassFutureTest4 { publicstaticvoidmain(String[] args) { try { MyCallablecallable1=newMyCallable("123", 5000); MyCallablecallable2=newMyCallable("456", 4000); MyCallablecallable3=newMyCallable("236", 3000); MyCallablecallable4=newMyCallable("678", 2000); MyCallablecallable5=newMyCallable("789", 1000); List<Callable>callableList=newArrayList<>(); callableList.add(callable1); callableList.add(callable2); callableList.add(callable3); callableList.add(callable4); callableList.add(callable5); List<Future>futureList=newArrayList<>(); ThreadPoolExecutorexecutor=newThreadPoolExecutor(5, 10, 5, TimeUnit.SECONDS, newLinkedBlockingDeque<>()); for (inti=0; i<5; i++) { futureList.add(executor.submit(callableList.get(i))); } System.out.println("run first time="+System.currentTimeMillis()); for (inti=0; i<5; i++) { System.out.println(futureList.get(i).get()+" "+System.currentTimeMillis()); } } catch (Exceptione) { log.error("执行任务出错:{}",e.getMessage()); } } }
运行结果:
runfirsttime=1590322937399456678123236789return1231590322942515return4561590322942515return2361590322942515return6781590322942515return7891590322942515
从运行结果可以看到返回的结果线程是有序的,也即其会等待线程运行完成才会返回结果,而执行的线程我们可以不是有序的,是乱序的。因此可以看到其如果执行的程序中有一段任务出现执行过程较长时,就会被阻塞,进行排队。
使用FutureTask:
/**** @description: Future使用* <p>* FutureTask:是Future的实现类,而且在使用线程池时,默认的情况下也是使用* futureTask类作为接口Future的实现类,但需要注意的是,Future接口调用get()方法* 取得处理的结果时是阻塞性的,也就是如果调用get()方法时,任务尚未完成,则* 调用get()方法时一直阻塞到此任务完成时为止。如果是这样的相关,则前面先执行的任务* 一旦耗时很多,则后面的任务调用get()方法就呈现阻塞状态,也就是排队等待,大大影响运行效率。* 也即主线程并不能保证首先获的是最先完成任务的返回值,这是future的缺点,影响效率* </p>* @author: lyz* @date: 2020/05/24 12:00**/publicclassFutureTask { /*** FutureTask由线程池执行*/privatestaticvoidexeForPool(){ // 创建 FutureTask,采用三个线程执行主线程java.util.concurrent.FutureTask<Integer>futureTask=newjava.util.concurrent.FutureTask<>(()->1+2); // 创建线程池ThreadPoolExecutorexecutor=ThreadPoolBuilder.fixedPool().build(); try{ // 提交 FutureTaskexecutor.submit(futureTask); // 获取计算结果Integerresult=futureTask.get(); System.out.println(result); }catch(Exceptione){ e.printStackTrace(); }finally { //进行优雅关闭ThreadPoolUtil.gracefulShutdown(executor,1); } } /*** FutureTask由线程处理*/privatestaticvoidexeForThread(){ // 创建 FutureTaskjava.util.concurrent.FutureTask<Integer>futureTask=newjava.util.concurrent.FutureTask<>(()->1+2); // 创建并启动线程ThreadT1=newThread(futureTask); T1.start(); // 获取计算结果try{ Integerresult=futureTask.get(); System.out.println(result); }catch (Exceptione){ e.printStackTrace(); } } /*** 利用FutureTask实现烧水泡茶*/privatestaticvoidfireWater(){ // 创建任务 T2 的 FutureTaskjava.util.concurrent.FutureTask<String>ft2=newjava.util.concurrent.FutureTask<>(newT2Task()); // 创建任务 T1 的 FutureTaskjava.util.concurrent.FutureTask<String>ft1=newjava.util.concurrent.FutureTask<>(newT1Task(ft2)); // 线程 T1 执行任务 ft1Threadt1=newThread(ft1); t1.start(); // 线程 T2 执行任务 ft2Threadt2=newThread(ft2); t2.start(); // 等待线程 T1 执行结果try{ System.out.println(ft1.get()); }catch (Exceptione){ e.printStackTrace(); } } /*** 洗水壶、烧开水、泡茶,实现Callable接口,重写call方法*/staticclassT1TaskimplementsCallable<String> { java.util.concurrent.FutureTask<String>ft2; T1Task(java.util.concurrent.FutureTask<String>ft2){ this.ft2=ft2; } publicStringcall() throwsException { System.out.println("T1: 洗水壶..."); TimeUnit.SECONDS.sleep(1); System.out.println("T1: 烧开水..."); TimeUnit.SECONDS.sleep(15); // 获取 T2 线程的茶叶Stringtf=ft2.get(); System.out.println("T1: 拿到茶叶:"+tf); System.out.println("T1: 泡茶..."); return" 上茶:"+tf; } } /*** 洗茶壶、洗茶杯、拿茶叶,实现Callable接口,重写call方法*/staticclassT2TaskimplementsCallable<String> { publicStringcall() throwsException { System.out.println("T2: 洗茶壶..."); TimeUnit.SECONDS.sleep(1); System.out.println("T2: 洗茶杯..."); TimeUnit.SECONDS.sleep(2); System.out.println("T2: 拿茶叶..."); TimeUnit.SECONDS.sleep(1); return" 白茶 "; } } publicstaticvoidmain(String[] args) { exeForPool(); exeForThread(); fireWater(); } }
运行结果:
33T2: 洗茶壶... T1: 洗水壶... T2: 洗茶杯... T1: 烧开水... T2: 拿茶叶... T1: 拿到茶叶: 白茶T1: 泡茶... 上茶: 白茶
前面说到其阻塞性,影响了其运行的效率,而在jdk1.5之后,引入了CompletionService,CompletionService可以一边执行新的任务,一边处理返回的结果,将结果进行返回,采用submit+take或者采用poll()方法,而采用poll方法不具有阻塞性,因此性能上有提高。