异步编程 - 07 基于JDK中的Future实现异步编程(下)_当Stream遇见CompletableFuture

简介: 异步编程 - 07 基于JDK中的Future实现异步编程(下)_当Stream遇见CompletableFuture


JDK8 Stream


JDK8中提供了流式对数据进行处理的功能,它的出现允许我们以声明式方式对数据集合进行处理。所谓声明式是相对于我们平时所用的命令式编程来说的,使用声明式编程会让我们对业务的表达更清晰。另外使用流可以让我们很方便地对数据集进行并行处理。


比如下面的代码,我们从person列表中过滤出年龄大于10岁的人,并且收集对应的name字段到list,然后统一打印处理。在使用非Stream的情况下,我们会使用如下代码来实现。


public static List<Person> makeList() {
    List<Person> personList = new ArrayList<Person>();
    Person p1 = new Person();
    p1.setAge(10);
    p1.setName("zlx");
    personList.add(p1);
    p1 = new Person();
    p1.setAge(12);
    p1.setName("jiaduo");
    personList.add(p1);
    p1 = new Person();
    p1.setAge(5);
    p1.setName("ruoran");
    personList.add(p1);
    return personList;
}
    public static void noStream(List<Person> personList) {
    List<String> nameList = new ArrayList<>();
    for (Person person : personList) {
        if (person.age >= 10) {
            nameList.add(person.getName());
        }
    }
    for(String name: nameList) {
        System.out.println(name);
    }
}
    public static void main(String[] args) {
        List<Person> personList = makeList();
        noStream(personList);
    }


从上述代码可知,noStream方法是典型的命令式编码,我们用for循环来一个个判断当前person对象中的age字段值是否大于等于10,如果是则把当前对象的name字段放到手动创建的nameList列表中,然后再开启新的for循环逐个遍历nameList中的name字段。


下面我们使用Stream方式来修改上面的代码。

public static void useStream(List<Person> personList) {
    List<String> nameList = personList.stream().filter(person -> person.getAge() >= 10)// 1.过滤大于等于10的age字段值
            .map(person -> person.getName())// 2.使用map映射元素
            .collect(Collectors.toList());// 3.收集映射后元素
    nameList.stream().forEach(name -> System.out.println(name));
}


在上面的代码中我们首先从personList获取到流对象,然后在其上进行了filter运算,过滤出年龄大于等于10的person,然后运用map方法映射person对象到name字段,再使用collect方法收集所有的name字段到nameList,最后从nameList上获取流并调用forEach进行打印。


上面的代码就是声明式编程,其可读性很强,代码直接可以说明想要什么(从代码就可以知道我们要过滤出年龄大于等于10岁的人,并且把满足条件的person的name字段收集起来,然后打印)。


需要注意的是,这里的filter和map操作是中间操作符,也就是当我们在流上施加这些操作时并不会真的被执行。而collect操作是终端操作符,当在流上执行终端操作符时,流上施加的操作才会执行。


JDK8中对于Steam提供了很多操作符,只是简单的列出了filter、map、collect这几种方法。




Stream遇见CompletableFuture


下面我们来看看当Stream与CompletableFuture相结合时会产生什么样的火花。


首先我们来看一个需求,这个需求是消费端对服务提供方集群中的某个服务进行广播调用(轮询调用同一个服务的不同提供者的机器),正常同步调用代码如下所示。

public class StreamTestFuture {
    public static String rpcCall(String ip, String param) {
        System.out.println(ip + " rpcCall:" + param);
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return param;
    }
    public static void main(String[] args) {
        // 1.生成ip列表
        List<String> ipList = new ArrayList<String>();
        for (int i = 1; i <=10; ++i) {
            ipList.add("192.168.0." + i);
        }
        // 2.发起广播调用
        long start = System.currentTimeMillis();
        List<String> result = new ArrayList<>();
        for (String ip : ipList) {
            result.add(rpcCall(ip, ip));
        }
        // 3.输出
        result.stream().forEach(r -> System.out.println(r));
        System.out.println("cost:" + (System.currentTimeMillis() - start));
    }


   代码1生成ip列表,这代表了所有服务提供者的机器ip。

   代码2轮询每个ip,使用ip作为参数调用rpcCall方法(这里面使用休眠1s来模拟远程rpc过程执行)并且把结果保存到result中。

   代码3则等所有服务调用完成后打印执行结果,运行上面代码时会发现耗时大概为10s,这是因为代码2发起广播调用是顺序的,也就是当上次rpc调用返回结果后才会进行下一次调用。


下面我们借用Stream和CompletableFuture来看看业务线程如何并发地发起多次rpc请求,从而缩短整个处理流程的耗时。

   // 1.生成ip列表
        List<String> ipList = new ArrayList<String>();
        for (int i = 1; i <= 10; ++i) {
            ipList.add("192.168.0." + i);
        }
        // 2.并发调用
        long start = System.currentTimeMillis();
        List<CompletableFuture<String>> futureList = ipList.stream()
                .map(ip -> CompletableFuture.supplyAsync(() -> rpcCall(ip, ip)))//同步转换为异步
                .collect(Collectors.toList());//收集结果
       //3.等待所有异步任务执行完毕
        List<String> resultList = futureList.stream()
                                           .map(future -> future.join())
                                             //同步等待结果
                                           .collect(Collectors.toList());
                                             //对结果进行收集
        // 4.输出
        resultList.stream().forEach(r -> System.out.println(r));
        System.out.println("cost:" + (System.currentTimeMillis() - start));



   代码2从ipList处获取了stream,然后通过map操作符把ip转换为远程调用。


   注意,这里通过使用CompletableFuture.supplyAsync方法把rpc的同步调用转换为了异步,也就是把同步调用结果转换为了CompletableFuture对象,所以操作符map返回的是一个CompletableFuture,然后collect操作把所有的CompletableFuture对象收集为list后返回。


   此外,这里多个rpc调用时是并发执行的,不是顺序执行,因为CompletableFuture.supplyAsync方法把rpc的同步调用转换为了异步。


   代码3从futureList获取流,然后使用map操作符把future对象转换为future的执行结果,这里是使用future的join方法来阻塞获取每个异步任务执行完毕,然后返回执行结果,最后使用collect操作把所有的结果收集到resultList。

   代码4从resultList获取流,然后打印结果。


   运行上面的代码会发现耗时大大减少了,这可以证明上面10个rpc调用时是并发运行的,并不是串行执行。


注意:具体这10个rpc请求是否全部并发运行取决于CompletableFuture内线程池内线程的个数,如果你的机器是单核的或者线程池内线程个数为1,那么这10个任务还是会顺序执行的。



小结


我们了解了CompletableFuture如何解决其缺点,以及CompletableFuture与JDK Stream是如何完美结合的,可知使用CompletableFuture实现异步编程属于声明式编程,一般情况下不需要我们显式创建线程池并提交任务到线程池,这大大减轻了编程者的负担。



相关文章
|
6月前
|
API
重学JDK8新特性之Stream(上)
重学JDK8新特性之Stream(上)
42 0
|
6月前
JDK8之stream流的使用:分组
JDK8之stream流的使用:分组
285 0
|
3月前
|
API
JDK8的stream有求和方法吗?
【8月更文挑战第20天】JDK8的stream有求和方法吗?
123 3
|
6月前
|
安全 Java 数据库
重学JDK8新特性之Stream(下)
重学JDK8新特性之Stream
60 0
|
6月前
|
Java API
【JAVA进阶篇教学】第三篇:JDK8中Stream API使用
【JAVA进阶篇教学】第三篇:JDK8中Stream API使用
|
6月前
|
Java API 数据处理
JDK 8:Stream API——数据处理的新篇章
JDK 8引入了Stream API,为Java中的数据处理提供了一种全新的方式。本文将详细介绍Stream API的原理、优势以及如何在实际开发中应用这一特性。
|
6月前
|
存储 Java 关系型数据库
JDK8中的新特性(Lambda、函数式接口、方法引用、Stream)(二)
JDK8中的新特性(Lambda、函数式接口、方法引用、Stream)(二)
|
6月前
|
JavaScript 前端开发 Java
JDK8中的新特性(Lambda、函数式接口、方法引用、Stream)(一)
JDK8中的新特性(Lambda、函数式接口、方法引用、Stream)(一)
jdk8 Stream流中将集合转成map,重复key处理,统计最大值,获取某个属性集合等10种最常用方法
jdk8 Stream流中将集合转成map,重复key处理,统计最大值,获取某个属性集合等10种最常用方法
170 5
|
6月前
JDK8之stream流的使用:归约类方法
JDK8之stream流的使用:归约类方法
42 0