从Future到CompleteableFuture的转化实践之路

简介: 从Future到CompleteableFuture的转化实践之路
业务背景


A平台调用B平台提供的soa接口查询司机详情信息,由于B平台提供的是批量查询接口,对批量查询接口,做了单次查询的数量限制,那就是限制每次查询的数量不能超过指定的值(100个)。


优化前:


而A平台为了提高获取信息的速度,考虑到使用多线程并发进行获取,于是第一版的代码是下面这样的:

private List<Long> selectDriverDetailsBySoa(List<Long> driverIdList, List<Long> vehicleTypeIds) {
        List<Long> availableDriverIdList = new ArrayList<>();
        //获取批量调用分页数量
        int pageCount = PageUtil.getPageCount(driverIdList.size(), PAGE_SIZE);
        try {
            //根据分页数量设置线程池大小
            ExecutorService executor = new ThreadPoolExecutor(pageCount, pageCount,
                    ONE_THOUSAND, TimeUnit.MILLISECONDS,
                    new SynchronousQueue<>(),
                    Executors.defaultThreadFactory(),
                    new ThreadPoolExecutor.AbortPolicy());
            List<Future<List<DriverBO>>> resultFutureList = new ArrayList<>();
            for (int i = 1; i <= pageCount; i++) {
                    //根据分页数量,循环提交查询任务
                List<Long> pageDriverIdList = PageUtil.startPage(driverIdList, i, PAGE_SIZE);
                DriverListResquest driverListResquest = driverService.getDriverListRequest(pageDriverIdList, vehicleTypeIds);
                resultFutureList.add(executor.submit(() -> driverService.getDriverList(driverListResquest)));
            }
            //阻塞获取请求结果
            for (Future<List<DriverBO>> futureResult : resultFutureList) {
                List<DriverBO> driverBOList = futureResult.get();
                if (!CollectionUtil.isEmpty(driverBOList)) {
                    driverBOList.forEach(e -> availableDriverIdList.add(e.getDriverId()));
                }
            }
            //关闭线程池
            executor.shutdown();
        } catch (Exception e) {
            log.error("selectDriverDetailsBySoa,并发批量查询司机详情,处理出错", e);
        }
        log.info("selectDriverDetailsBySoa,并发批量查询司机详情,处理结果:{}", JsonUtils.writeValueAsString(availableDriverIdList));
        return availableDriverIdList;
    }


缺点:虽然这里线程池的大小是根据分页数设置的,但每次请求都需要反复创建和停止线程池,在请求量大时这将是非常消耗资源的,也会是很致命的。


优化后:


经过优化调整之后,代码是这样的:

private List<Long> selectDriverDetailsBySoa(List<Long> driverIdList, List<Long> vehicleTypeIds) {
        List<Long> availableDriverIdList = new ArrayList<>();
        try {
                //使用google第三方工具类直接进行分组(分页)
            List<List<Long>> pagesDriverIdList = Lists.partition(driverIdList, PAGE_SIZE);
            //分组提交任务
            List<CompletableFuture<List<DriverBO>>> completableFutureList = pagesDriverIdList.stream()
                    .map(pageDriverIdList -> asyncRequestSoaDriverSoaService(pageDriverIdList, vehicleTypeIds)).collect(Collectors.toList());
            //汇总处理结果
            List<List<DriverBO>> boLists = completableFutureList.stream().map(CompletableFuture::join).collect(Collectors.toList());
            boLists.forEach(listItem -> listItem.forEach(item -> availableDriverIdList.add(item.getDriverId())));
            log.info("selectDriverDetailsBySoa,并发批量查询司机详情,处理结果:{}", JsonUtils.writeValueAsString(availableDriverIdList));
        } catch (Exception e) {
            log.info("selectDriverDetailsBySoa,并发批量查询司机详情,处理失败", e);
        }
        return availableDriverIdList;
    }
    private CompletableFuture<List<DriverBO>> asyncRequestSoaDriverSoaService(List<Long> driverIdList, List<Long> vehicleTypeIds){
            //封装请求参数
        DriverListResquest driverListResquest = driverService.getDriverListRequest(driverIdList, vehicleTypeIds);
        //使用针对业务统一管理的线程池,提交任务
        return AsyncExecutor.runAsync(() ->  sdpDriverService.getDriverList(driverListResquest));
    }


是不是简洁了很多。另外,针对特定业务场景,专门建了异步线程池类进行管理线程池:

@Slf4j
@NoArgsConstructor
@Component(ASYNC_EXECUTOR)
public class AsyncExecutor implements Executor {
    public static final String ASYNC_EXECUTOR = "asyncExecutor";
    public static ExecutorService executors;
    @Value("${async.executor.thread.corePoolSize:#{null}}")
    private Integer corePoolSize;
    @Value("${async.thread.maxPoolSize:#{null}}")
    private Integer maxPoolSize;
    @Value("${executor.thread.nameFormat:async-pool-%d}")
    private String threadNameFormat;
    @PostConstruct
    public void init() {
        log.info("async executor,corePoolSize:{},maxPoolSize{},threadNameFormat:{}", this.corePoolSize, this.maxPoolSize, this.threadNameFormat);
        ThreadFactory threadFactory = (new ThreadFactoryBuilder()).setNameFormat(this.threadNameFormat).build();
        if (Objects.isNull(corePoolSize)) {
            this.corePoolSize = Runtime.getRuntime().availableProcessors() * 4;
        }
        if (Objects.isNull(maxPoolSize)) {
            this.maxPoolSize = Runtime.getRuntime().availableProcessors() * 8;
        }
        AsyncExecutor.executors = new ThreadPoolExecutor(this.corePoolSize, this.maxPoolSize, 3000L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(1024), threadFactory, new ThreadPoolExecutor.AbortPolicy());
    }
    @Override
    public void execute(@NotNull Runnable command) {
        if (command == null) {
            throw new NullPointerException("AsyncExecutor async executor:command is marked non-null but is null");
        } else {
            AsyncExecutor.executors.execute(new RunnableWrapper(command));
        }
    }
    public static <T> CompletableFuture<T> runAsync(@NonNull Supplier<T> supplier) {
        if (supplier == null) {
            throw new NullPointerException("AsyncExecutor async executor:command is marked non-null but is null");
        } else {
            return CompletableFuture.supplyAsync(new SupplierWrapper<>(supplier), AsyncExecutor.executors);
        }
    }
}


其实可以看得出来,这里单纯使用CompletableFuture,并没有真正发挥出来其优势,因为上面的核心问题不在Future,而在线程池的使用不当上。而使用Java Stream的流式开发进行优化,同样可以达到上面代码的间接的效果。

public static List<Long> selectDriverDetailsBySoa(List<Long> driverIdList) {
    List<Long> availableDriverIdList = new ArrayList<>();
    List<List<Long>> pagesDriverIdList = Lists.partition(driverIdList, 1);
    List<Future<List<DriverBO>>> resultFutureList = pagesDriverIdList.stream()
            .map(pageDriverIdList -> GrabHallAsyncExecutor.executors.submit(() -> getDriverList(pageDriverIdList)))
            .collect(Collectors.toList());
    resultFutureList.stream().map(future -> getFutureResult(future))
            .collect(Collectors.toList())
            .forEach((e) -> e.forEach(o -> {
                availableDriverIdList.add(o.getDriverId());
            }));
    return availableDriverIdList;
}


那为什么还要用CompletableFuture的呢,一是为了和现有代码中的使用保持一致,其次是在学习之后,发现CompletableFuture比Future更好用,能力更强。


所以如果可以,推荐你也把CompletableFuture使用起来,真的很好用。

相关文章
|
4月前
|
JSON 前端开发 Java
JAVA后端向前端传递Long类型数据,导致数据不一致
JAVA后端向前端传递Long类型数据,导致数据不一致
90 0
|
6月前
|
并行计算 Java
【Future&ForkJoin框架原理】
【Future&ForkJoin框架原理】
|
10月前
|
设计模式 Java API
【JUC基础】15. Future模式
Future 模式是多线程开发中非常常见的一种设计模式,它的核心思想是异步调用。当我们需要调用一个函数方法时,如果这个函数执行得很慢,那么我们就要进行等待。但有时候,我们可能并不急着要结果。因此,我们可以让被调者立即返回,让它在后台慢慢处理这个请求。对于调用者来说,则可以先处理一些其他任务,在真正需要数据的场合再去尝试获得需要的数据。
113 0
【JUC基础】15. Future模式
|
11月前
|
容器
并发编程-21J.U.C组件拓展之Future和FutureTask
并发编程-21J.U.C组件拓展之Future和FutureTask
47 0
|
Java API
Java并发编程-Future系列之Future的介绍和基本用法
Java并发编程-Future系列之Future的介绍和基本用法
162 0
Java并发编程-Future系列之Future的介绍和基本用法
|
JavaScript 安全 开发者
重新认识number类型
重新认识number类型
141 0
|
并行计算 API
|
缓存 Java
通过JDK源码角度分析Long类详解
通过JDK源码角度分析Long类详解
通过JDK源码角度分析Long类详解
两个例子解释Callable 和 Future接口
两个例子解释Callable 和 Future接口
|
Java API
java8实战读书笔记:数值流、Stream创建与Optional类的使用
java8实战读书笔记:数值流、Stream创建与Optional类的使用
java8实战读书笔记:数值流、Stream创建与Optional类的使用