业务背景
A平台调用B平台提供的soa接口查询司机详情信息,由于B平台提供的是批量查询接口,对批量查询接口,做了单次查询的数量限制,那就是限制每次查询的数量不能超过指定的值(100个)。
优化前:
而A平台为了提高获取信息的速度,考虑到使用多线程并发进行获取,于是第一版的代码是下面这样的:
private List<Long> selectDriverDetailsBySoa(List<Long> driverIdList, List<Long> vehicleTypeIds) { List<Long> availableDriverIdList = new ArrayList<>(); //获取批量调用分页数量 int pageCount = PageUtil.getPageCount(driverIdList.size(), PAGE_SIZE); try { //根据分页数量设置线程池大小 ExecutorService executor = new ThreadPoolExecutor(pageCount, pageCount, ONE_THOUSAND, TimeUnit.MILLISECONDS, new SynchronousQueue<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); List<Future<List<DriverBO>>> resultFutureList = new ArrayList<>(); for (int i = 1; i <= pageCount; i++) { //根据分页数量,循环提交查询任务 List<Long> pageDriverIdList = PageUtil.startPage(driverIdList, i, PAGE_SIZE); DriverListResquest driverListResquest = driverService.getDriverListRequest(pageDriverIdList, vehicleTypeIds); resultFutureList.add(executor.submit(() -> driverService.getDriverList(driverListResquest))); } //阻塞获取请求结果 for (Future<List<DriverBO>> futureResult : resultFutureList) { List<DriverBO> driverBOList = futureResult.get(); if (!CollectionUtil.isEmpty(driverBOList)) { driverBOList.forEach(e -> availableDriverIdList.add(e.getDriverId())); } } //关闭线程池 executor.shutdown(); } catch (Exception e) { log.error("selectDriverDetailsBySoa,并发批量查询司机详情,处理出错", e); } log.info("selectDriverDetailsBySoa,并发批量查询司机详情,处理结果:{}", JsonUtils.writeValueAsString(availableDriverIdList)); return availableDriverIdList; }
缺点:虽然这里线程池的大小是根据分页数设置的,但每次请求都需要反复创建和停止线程池,在请求量大时这将是非常消耗资源的,也会是很致命的。
优化后:
经过优化调整之后,代码是这样的:
private List<Long> selectDriverDetailsBySoa(List<Long> driverIdList, List<Long> vehicleTypeIds) { List<Long> availableDriverIdList = new ArrayList<>(); try { //使用google第三方工具类直接进行分组(分页) List<List<Long>> pagesDriverIdList = Lists.partition(driverIdList, PAGE_SIZE); //分组提交任务 List<CompletableFuture<List<DriverBO>>> completableFutureList = pagesDriverIdList.stream() .map(pageDriverIdList -> asyncRequestSoaDriverSoaService(pageDriverIdList, vehicleTypeIds)).collect(Collectors.toList()); //汇总处理结果 List<List<DriverBO>> boLists = completableFutureList.stream().map(CompletableFuture::join).collect(Collectors.toList()); boLists.forEach(listItem -> listItem.forEach(item -> availableDriverIdList.add(item.getDriverId()))); log.info("selectDriverDetailsBySoa,并发批量查询司机详情,处理结果:{}", JsonUtils.writeValueAsString(availableDriverIdList)); } catch (Exception e) { log.info("selectDriverDetailsBySoa,并发批量查询司机详情,处理失败", e); } return availableDriverIdList; } private CompletableFuture<List<DriverBO>> asyncRequestSoaDriverSoaService(List<Long> driverIdList, List<Long> vehicleTypeIds){ //封装请求参数 DriverListResquest driverListResquest = driverService.getDriverListRequest(driverIdList, vehicleTypeIds); //使用针对业务统一管理的线程池,提交任务 return AsyncExecutor.runAsync(() -> sdpDriverService.getDriverList(driverListResquest)); }
是不是简洁了很多。另外,针对特定业务场景,专门建了异步线程池类进行管理线程池:
@Slf4j @NoArgsConstructor @Component(ASYNC_EXECUTOR) public class AsyncExecutor implements Executor { public static final String ASYNC_EXECUTOR = "asyncExecutor"; public static ExecutorService executors; @Value("${async.executor.thread.corePoolSize:#{null}}") private Integer corePoolSize; @Value("${async.thread.maxPoolSize:#{null}}") private Integer maxPoolSize; @Value("${executor.thread.nameFormat:async-pool-%d}") private String threadNameFormat; @PostConstruct public void init() { log.info("async executor,corePoolSize:{},maxPoolSize{},threadNameFormat:{}", this.corePoolSize, this.maxPoolSize, this.threadNameFormat); ThreadFactory threadFactory = (new ThreadFactoryBuilder()).setNameFormat(this.threadNameFormat).build(); if (Objects.isNull(corePoolSize)) { this.corePoolSize = Runtime.getRuntime().availableProcessors() * 4; } if (Objects.isNull(maxPoolSize)) { this.maxPoolSize = Runtime.getRuntime().availableProcessors() * 8; } AsyncExecutor.executors = new ThreadPoolExecutor(this.corePoolSize, this.maxPoolSize, 3000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), threadFactory, new ThreadPoolExecutor.AbortPolicy()); } @Override public void execute(@NotNull Runnable command) { if (command == null) { throw new NullPointerException("AsyncExecutor async executor:command is marked non-null but is null"); } else { AsyncExecutor.executors.execute(new RunnableWrapper(command)); } } public static <T> CompletableFuture<T> runAsync(@NonNull Supplier<T> supplier) { if (supplier == null) { throw new NullPointerException("AsyncExecutor async executor:command is marked non-null but is null"); } else { return CompletableFuture.supplyAsync(new SupplierWrapper<>(supplier), AsyncExecutor.executors); } } }
其实可以看得出来,这里单纯使用CompletableFuture,并没有真正发挥出来其优势,因为上面的核心问题不在Future,而在线程池的使用不当上。而使用Java Stream的流式开发进行优化,同样可以达到上面代码的间接的效果。
public static List<Long> selectDriverDetailsBySoa(List<Long> driverIdList) { List<Long> availableDriverIdList = new ArrayList<>(); List<List<Long>> pagesDriverIdList = Lists.partition(driverIdList, 1); List<Future<List<DriverBO>>> resultFutureList = pagesDriverIdList.stream() .map(pageDriverIdList -> GrabHallAsyncExecutor.executors.submit(() -> getDriverList(pageDriverIdList))) .collect(Collectors.toList()); resultFutureList.stream().map(future -> getFutureResult(future)) .collect(Collectors.toList()) .forEach((e) -> e.forEach(o -> { availableDriverIdList.add(o.getDriverId()); })); return availableDriverIdList; }
那为什么还要用CompletableFuture的呢,一是为了和现有代码中的使用保持一致,其次是在学习之后,发现CompletableFuture比Future更好用,能力更强。
所以如果可以,推荐你也把CompletableFuture使用起来,真的很好用。