开发者社区> 问答> 正文

为什么`parallelStream`比`CompletableFuture`实现更快?

我想在某种操作上提高后端REST API的性能,该操作顺序地轮询了多个不同的外部API并收集了它们的响应,并将它们全部拼合为一个响应列表。

刚了解CompletableFutures之后,我决定尝试一下,然后将该解决方案与仅将my更改stream为的解决方案进行比较parallelStream。

这是用于基准测试的代码:

package com.alithya.platon;

    import java.util.Arrays;
    import java.util.List;
    import java.util.Objects;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.TimeUnit;
    import java.util.stream.Collectors;
    import org.junit.jupiter.api.AfterEach;
    import org.junit.jupiter.api.BeforeEach;
    import org.junit.jupiter.api.Test;


    public class ConcurrentTest {

        static final List<String> REST_APIS =
                Arrays.asList("api1", "api2", "api3", "api4", "api5", "api6", "api7", "api8");
        MyTestUtil myTest = new MyTestUtil();
        long millisBefore; // used to benchmark

        @BeforeEach
        void setUp() {
            millisBefore = System.currentTimeMillis();
        }

        @AfterEach
        void tearDown() {
            System.out.printf("time taken : %.4fs\n",
                    (System.currentTimeMillis() - millisBefore) / 1000d);
        }

        @Test
        void parallelSolution() { // 4s
            var parallel = REST_APIS.parallelStream()
                    .map(api -> myTest.collectOneRestCall())
                    .flatMap(List::stream)
                    .collect(Collectors.toList());

            System.out.println("List of responses: " + parallel.toString());
        }

        @Test
        void futureSolution() throws Exception { // 8s
            var futures = myTest.collectAllResponsesAsync(REST_APIS);

            System.out.println("List of responses: " + futures.get()); // only blocks here
        }

        @Test
        void originalProblem() { // 32s
            var sequential = REST_APIS.stream()
                    .map(api -> myTest.collectOneRestCall())
                    .flatMap(List::stream)
                    .collect(Collectors.toList());

            System.out.println("List of responses: " + sequential.toString());
        }
    }


    class MyTestUtil {

        public static final List<String> RESULTS = Arrays.asList("1", "2", "3", "4");

        List<String> collectOneRestCall() {
            try {
                TimeUnit.SECONDS.sleep(4); // simulating the await of the response
            } catch (Exception io) {
                throw new RuntimeException(io);
            } finally {
                return MyTestUtil.RESULTS; // always return something, for this demonstration
            }
        }

        CompletableFuture<List<String>> collectAllResponsesAsync(List<String> restApiUrlList) {

            /* Collecting the list of all the async requests that build a List<String>. */
            List<CompletableFuture<List<String>>> completableFutures = restApiUrlList.stream()
                    .map(api -> nonBlockingRestCall())
                    .collect(Collectors.toList());

            /* Creating a single Future that contains all the Futures we just created ("flatmap"). */
            CompletableFuture<Void> allFutures = CompletableFuture.allOf(completableFutures
                    .toArray(new CompletableFuture[restApiUrlList.size()]));

            /* When all the Futures have completed, we join them to create merged List<String>. */
            CompletableFuture<List<String>> allCompletableFutures = allFutures
                    .thenApply(future -> completableFutures.stream()
                            .filter(Objects::nonNull) // we filter out the failed calls
                            .map(CompletableFuture::join)
                            .flatMap(List::stream) // creating a List<String> from List<List<String>>
                            .collect(Collectors.toList())
                    );

            return allCompletableFutures;
        }

        private CompletableFuture<List<String>> nonBlockingRestCall() {
            /* Manage the Exceptions here to ensure the wrapping Future returns the other calls. */
            return CompletableFuture.supplyAsync(() -> collectOneRestCall())
                    .exceptionally(ex -> {
                        return null; // gets managed in the wrapping Future
                    });
        }

    }

这里列出了8个(伪)API。每个响应需要4秒钟的时间来执行,并返回4个实体的列表(为简单起见,在我们的示例中为String)。

结果:

stream :32秒 parallelStream :4秒 CompletableFuture :8秒 我很惊讶,并期望最后两个几乎相同。到底是什么造成了这种差异?据我所知,他们都使用ForkJoinPool.commonPool()。

我的天真的解释是parallelStream,由于它是一个阻塞操作,MainThread因此将实际值用于其工作负载,因此与CompletableFuture异步的(因此不能使用)相比,有一个额外的活动线程可以使用MainThread。

展开
收起
垚tutu 2019-12-04 17:15:37 1085 0
0 条回答
写回答
取消 提交回答
问答分类:
问答地址:
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载