从Future到CompleteableFuture的转化实践之路

简介: 从Future到CompleteableFuture的转化实践之路
业务背景


A平台调用B平台提供的soa接口查询司机详情信息,由于B平台提供的是批量查询接口,对批量查询接口,做了单次查询的数量限制,那就是限制每次查询的数量不能超过指定的值(100个)。


优化前:


而A平台为了提高获取信息的速度,考虑到使用多线程并发进行获取,于是第一版的代码是下面这样的:

private List<Long> selectDriverDetailsBySoa(List<Long> driverIdList, List<Long> vehicleTypeIds) {
        List<Long> availableDriverIdList = new ArrayList<>();
        //获取批量调用分页数量
        int pageCount = PageUtil.getPageCount(driverIdList.size(), PAGE_SIZE);
        try {
            //根据分页数量设置线程池大小
            ExecutorService executor = new ThreadPoolExecutor(pageCount, pageCount,
                    ONE_THOUSAND, TimeUnit.MILLISECONDS,
                    new SynchronousQueue<>(),
                    Executors.defaultThreadFactory(),
                    new ThreadPoolExecutor.AbortPolicy());
            List<Future<List<DriverBO>>> resultFutureList = new ArrayList<>();
            for (int i = 1; i <= pageCount; i++) {
                    //根据分页数量,循环提交查询任务
                List<Long> pageDriverIdList = PageUtil.startPage(driverIdList, i, PAGE_SIZE);
                DriverListResquest driverListResquest = driverService.getDriverListRequest(pageDriverIdList, vehicleTypeIds);
                resultFutureList.add(executor.submit(() -> driverService.getDriverList(driverListResquest)));
            }
            //阻塞获取请求结果
            for (Future<List<DriverBO>> futureResult : resultFutureList) {
                List<DriverBO> driverBOList = futureResult.get();
                if (!CollectionUtil.isEmpty(driverBOList)) {
                    driverBOList.forEach(e -> availableDriverIdList.add(e.getDriverId()));
                }
            }
            //关闭线程池
            executor.shutdown();
        } catch (Exception e) {
            log.error("selectDriverDetailsBySoa,并发批量查询司机详情,处理出错", e);
        }
        log.info("selectDriverDetailsBySoa,并发批量查询司机详情,处理结果:{}", JsonUtils.writeValueAsString(availableDriverIdList));
        return availableDriverIdList;
    }


缺点:虽然这里线程池的大小是根据分页数设置的,但每次请求都需要反复创建和停止线程池,在请求量大时这将是非常消耗资源的,也会是很致命的。


优化后:


经过优化调整之后,代码是这样的:

private List<Long> selectDriverDetailsBySoa(List<Long> driverIdList, List<Long> vehicleTypeIds) {
        List<Long> availableDriverIdList = new ArrayList<>();
        try {
                //使用google第三方工具类直接进行分组(分页)
            List<List<Long>> pagesDriverIdList = Lists.partition(driverIdList, PAGE_SIZE);
            //分组提交任务
            List<CompletableFuture<List<DriverBO>>> completableFutureList = pagesDriverIdList.stream()
                    .map(pageDriverIdList -> asyncRequestSoaDriverSoaService(pageDriverIdList, vehicleTypeIds)).collect(Collectors.toList());
            //汇总处理结果
            List<List<DriverBO>> boLists = completableFutureList.stream().map(CompletableFuture::join).collect(Collectors.toList());
            boLists.forEach(listItem -> listItem.forEach(item -> availableDriverIdList.add(item.getDriverId())));
            log.info("selectDriverDetailsBySoa,并发批量查询司机详情,处理结果:{}", JsonUtils.writeValueAsString(availableDriverIdList));
        } catch (Exception e) {
            log.info("selectDriverDetailsBySoa,并发批量查询司机详情,处理失败", e);
        }
        return availableDriverIdList;
    }
    private CompletableFuture<List<DriverBO>> asyncRequestSoaDriverSoaService(List<Long> driverIdList, List<Long> vehicleTypeIds){
            //封装请求参数
        DriverListResquest driverListResquest = driverService.getDriverListRequest(driverIdList, vehicleTypeIds);
        //使用针对业务统一管理的线程池,提交任务
        return AsyncExecutor.runAsync(() ->  sdpDriverService.getDriverList(driverListResquest));
    }


是不是简洁了很多。另外,针对特定业务场景,专门建了异步线程池类进行管理线程池:

@Slf4j
@NoArgsConstructor
@Component(ASYNC_EXECUTOR)
public class AsyncExecutor implements Executor {
    public static final String ASYNC_EXECUTOR = "asyncExecutor";
    public static ExecutorService executors;
    @Value("${async.executor.thread.corePoolSize:#{null}}")
    private Integer corePoolSize;
    @Value("${async.thread.maxPoolSize:#{null}}")
    private Integer maxPoolSize;
    @Value("${executor.thread.nameFormat:async-pool-%d}")
    private String threadNameFormat;
    @PostConstruct
    public void init() {
        log.info("async executor,corePoolSize:{},maxPoolSize{},threadNameFormat:{}", this.corePoolSize, this.maxPoolSize, this.threadNameFormat);
        ThreadFactory threadFactory = (new ThreadFactoryBuilder()).setNameFormat(this.threadNameFormat).build();
        if (Objects.isNull(corePoolSize)) {
            this.corePoolSize = Runtime.getRuntime().availableProcessors() * 4;
        }
        if (Objects.isNull(maxPoolSize)) {
            this.maxPoolSize = Runtime.getRuntime().availableProcessors() * 8;
        }
        AsyncExecutor.executors = new ThreadPoolExecutor(this.corePoolSize, this.maxPoolSize, 3000L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(1024), threadFactory, new ThreadPoolExecutor.AbortPolicy());
    }
    @Override
    public void execute(@NotNull Runnable command) {
        if (command == null) {
            throw new NullPointerException("AsyncExecutor async executor:command is marked non-null but is null");
        } else {
            AsyncExecutor.executors.execute(new RunnableWrapper(command));
        }
    }
    public static <T> CompletableFuture<T> runAsync(@NonNull Supplier<T> supplier) {
        if (supplier == null) {
            throw new NullPointerException("AsyncExecutor async executor:command is marked non-null but is null");
        } else {
            return CompletableFuture.supplyAsync(new SupplierWrapper<>(supplier), AsyncExecutor.executors);
        }
    }
}


其实可以看得出来,这里单纯使用CompletableFuture,并没有真正发挥出来其优势,因为上面的核心问题不在Future,而在线程池的使用不当上。而使用Java Stream的流式开发进行优化,同样可以达到上面代码的间接的效果。

public static List<Long> selectDriverDetailsBySoa(List<Long> driverIdList) {
    List<Long> availableDriverIdList = new ArrayList<>();
    List<List<Long>> pagesDriverIdList = Lists.partition(driverIdList, 1);
    List<Future<List<DriverBO>>> resultFutureList = pagesDriverIdList.stream()
            .map(pageDriverIdList -> GrabHallAsyncExecutor.executors.submit(() -> getDriverList(pageDriverIdList)))
            .collect(Collectors.toList());
    resultFutureList.stream().map(future -> getFutureResult(future))
            .collect(Collectors.toList())
            .forEach((e) -> e.forEach(o -> {
                availableDriverIdList.add(o.getDriverId());
            }));
    return availableDriverIdList;
}


那为什么还要用CompletableFuture的呢,一是为了和现有代码中的使用保持一致,其次是在学习之后,发现CompletableFuture比Future更好用,能力更强。


所以如果可以,推荐你也把CompletableFuture使用起来,真的很好用。

相关文章
|
1月前
|
消息中间件 缓存 安全
Future与FutureTask源码解析,接口阻塞问题及解决方案
【11月更文挑战第5天】在Java开发中,多线程编程是提高系统并发性能和资源利用率的重要手段。然而,多线程编程也带来了诸如线程安全、死锁、接口阻塞等一系列复杂问题。本文将深度剖析多线程优化技巧、Future与FutureTask的源码、接口阻塞问题及解决方案,并通过具体业务场景和Java代码示例进行实战演示。
47 3
|
5月前
|
存储
向量化代码实践问题之Task<T>类中的on_completed函数是如何工作的
向量化代码实践问题之Task<T>类中的on_completed函数是如何工作的
|
6月前
|
Java C#
经验大分享:Task的用法
经验大分享:Task的用法
45 0
|
7月前
|
安全 Java
【亮剑】Java中的`Future`接口代表异步计算结果,常与`ExecutorService`配合启动任务并获取结果
【4月更文挑战第30天】Java中的`Future`接口代表异步计算结果,常与`ExecutorService`配合启动任务并获取结果。`Future`接口提供`isDone()`、`get()`、`get(timeout, unit)`和`cancel(mayInterruptIfRunning)`等方法。`FutureTask`是`Future`的实现类,可作为`Runnable`执行并返回结果。
75 1
|
7月前
并发编程之Callable方法的详细解析(带小案例)
并发编程之Callable方法的详细解析(带小案例)
104 0
|
7月前
|
存储 消息中间件 分布式计算
流计算中的状态管理是什么?请解释其作用和常用方法。
流计算中的状态管理是什么?请解释其作用和常用方法。
93 0
|
设计模式 Java API
【JUC基础】15. Future模式
Future 模式是多线程开发中非常常见的一种设计模式,它的核心思想是异步调用。当我们需要调用一个函数方法时,如果这个函数执行得很慢,那么我们就要进行等待。但有时候,我们可能并不急着要结果。因此,我们可以让被调者立即返回,让它在后台慢慢处理这个请求。对于调用者来说,则可以先处理一些其他任务,在真正需要数据的场合再去尝试获得需要的数据。
151 0
【JUC基础】15. Future模式
|
容器
并发编程-21J.U.C组件拓展之Future和FutureTask
并发编程-21J.U.C组件拓展之Future和FutureTask
71 0
|
Java API
Java并发编程-Future系列之Future的介绍和基本用法
Java并发编程-Future系列之Future的介绍和基本用法
195 0
Java并发编程-Future系列之Future的介绍和基本用法
|
缓存 Java
JDK源码(10)-Integer(用处最多,重点讲解)
JDK源码(10)-Integer(用处最多,重点讲解)
119 0
JDK源码(10)-Integer(用处最多,重点讲解)