背景:
当调用一些耗时接口时,如果我们一直在原地等待方法返回,整体程序的运行效率会大大降低。可以把调用的过程放到子线程去执行,再通过 Future 去控制子线程的调用过程,最后获取到计算结果。提高整个程序的运行效率。
创建线程池:
publicclassExecutorConfig { privatefinalstaticintTHREAD_COUNT=Runtime.getRuntime().availableProcessors() *2; name="batchCallThreadPool") (publicThreadPoolExecutorbatchPredictThreadPool() { returnnewThreadPoolExecutor(THREAD_COUNT, 200, 0L, TimeUnit.MILLISECONDS, newLinkedBlockingQueue<Runnable>(2048), newThreadFactoryBuilder().setNameFormat("batch-call-pool-%d").build()); } }
利用Future获取线程执行结果
importjava.util.List; importjava.util.Map; importjava.util.concurrent.ExecutionException; importjava.util.concurrent.Future; importjava.util.concurrent.ThreadPoolExecutor; importjava.util.concurrent.TimeUnit; importjava.util.concurrent.TimeoutException; importcom.google.common.collect.Maps; importorg.springframework.beans.factory.annotation.Autowired; importorg.springframework.beans.factory.annotation.Qualifier; publicclassFutureTestService { privateCallServicecallService; "batchPredictThreadPool") (privateThreadPoolExecutorbatchPredictThreadPool; publicvoidfun(List<Long>idList) { Map<Long, Future<Integer>>futureMap=Maps.newHashMapWithExpectedSize(idList.size()); // 让所有调用的子线程启动,参与竞争for (Longid : idList) { Future<Integer>future=batchPredictThreadPool.submit(() ->callService.call(id)); futureMap.put(id, future); } for (Longid : futureMap.keySet()) { try { // 阻塞获取执行结果,如果 3s 未获取到会抛出超时异常Integerresult=futureMap.get(id).get(3000, TimeUnit.MILLISECONDS); } catch (TimeoutExceptione) { // 处理超时 } catch (ExecutionExceptione) { // 处理执行时异常 } catch (InterruptedExceptione) { // 处理 中断 } } } }
利用CompletableFuture获取线程执行结果
importjava.util.HashMap; importjava.util.LinkedList; importjava.util.List; importjava.util.Map; importjava.util.concurrent.CompletableFuture; importjava.util.concurrent.ExecutionException; importjava.util.concurrent.ThreadPoolExecutor; importjava.util.concurrent.TimeUnit; importjava.util.concurrent.TimeoutException; importorg.springframework.beans.factory.annotation.Autowired; importorg.springframework.beans.factory.annotation.Qualifier; publicclassFutureTestService { privateCallServicecallService; "batchPredictThreadPool") (privateThreadPoolExecutorbatchPredictThreadPool; publicvoidfun(List<Long>idList) throwsExecutionException, InterruptedException, TimeoutException { List<CompletableFuture<Integer>>completableFutureList=newLinkedList<>(); Map<Long, Integer>resultMap=newHashMap<>(idList.size()); // 让所有调用的子线程启动,参与竞争for (Longid : idList) { CompletableFuture<Integer>future=CompletableFuture.supplyAsync(() -> { Integerresult=callService.call(id); resultMap.put(id, result); returnnull; }, batchPredictThreadPool); completableFutureList.add(future); } // 在此处聚合CompletableFuture<Void>allCompletableFuture=CompletableFuture.allOf(completableFutureList.toArray( newCompletableFuture[completableFutureList.size()])); /*** 如果在 3 秒钟之内这些任务都可以顺利返回,则这个 get 方法就可以及时正常返回,并且往下执行。* 如果有某一个任务没能来得及在 3 秒钟之内返回,那么这个带超时参数的 get 方法便会抛出 TimeoutException 异常* 会尝试等待所有的任务完成,但是最多只会等 3 秒钟,在此之间,如及时完成则及时返回。*/allCompletableFuture.get(3, TimeUnit.SECONDS); } }