Java8 异步编程

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
实时计算 Flink 版,5000CU*H 3个月
智能开放搜索 OpenSearch行业算法版,1GB 20LCU 1个月
简介: 使用异步编程的方式,可以发挥出多核 CPU 的优势,最大程度提升程序性能。Java 作为一门使用最广泛的编程语言,自然在语法上支持了这个特性。本文详细描述了异步编程的各种语法。

本文大纲速看

一、异步编程

通常来说,程序都是顺序执行,同一时刻只会发生一件事情。如果一个函数依赖于另一个函数的结果,它只能等待那个函数结束才能继续执行,从用户角度来说,整个程序才算执行完毕。

但现在的计算机普遍拥有多核 CPU,在那里干等着毫无意义,完全可以在另一个处理器内核上干其他工作,耗时长的任务结束之后会主动通知你。这就是异步编程的出发点:充分使用多核 CPU 的优势,最大程度提高程序性能。

一句话来说:所谓异步编程,就是实现一个无需等待被调用函数的返回值而让操作继续运行的方法。

二、抛出一个问题:如何实现烧水泡茶的程序

最后我们会使用传统方式和 Java8 异步编程方式分别实现,来对比一下实现复杂度。

三、Java5 的 Future 实现的异步编程

Future 是 Java 5 添加的类,用来描述一个异步计算的结果。你可以使用 isDone() 方法检查计算是否完成,或者使用 get() 方法阻塞住调用线程,直到计算完成返回结果,也可以使用 cancel() 方法停止任务的执行。

public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService es = Executors.newFixedThreadPool(5);
        Future<Integer> f = es.submit(() -> 100);
        System.out.println(f.get());
        es.shutdown();
    }

虽然 Future 提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们异步编程的初衷相违背,轮询的方式又会耗费无谓的 CPU 资源,而且也不能及时的获取结果。

当然,很多其他的语言采用回调的方式来实现异步编程,比如 Node.js;Java 的一些框架,比如 Netty,Google Guava 也扩展了 Future 接口,提供了很多回调的机制,封装了工具类,辅助异步编程开发。

Java 作为老牌编程语言,自然也不会落伍。在 Java 8 中,新增了一个包含 50 多个方法的类:CompletableFuture,提供了非常强大的 Future 扩展功能,可以帮助我们简化异步编程的复杂性,提供函数式编程的能力。

四、CompletableFuture 类功能概览

如下图是 CompletableFuture 实现的接口:

它实现了 Future 接口,拥有 Future 所有的特性,比如可以使用 get() 方法获取返回值等;还实现了 CompletionStage 接口,这个接口有超过 40 个方法,功能太丰富了,它主要是为了编排任务的工作流。

我们可以把工作流和工作流之间的关系分类为三种:串行关系,并行关系,汇聚关系。

  • 串行关系

提供了如下的 api 来实现(先大致浏览一遍):

CompletionStage<R> thenApply(fn);
CompletionStage<R> thenApplyAsync(fn);
CompletionStage<Void> thenAccept(consumer);
CompletionStage<Void> thenAcceptAsync(consumer);
CompletionStage<Void> thenRun(action);
CompletionStage<Void> thenRunAsync(action);
CompletionStage<R> thenCompose(fn);
CompletionStage<R> thenComposeAsync(fn);
  • 并行关系

多线程异步执行就是并行关系

  • 汇聚关系

汇聚关系,又分为 AND 汇聚关系和 OR 汇聚关系:

AND 汇聚关系,就是所有依赖的任务都完成之后再执行;OR 汇聚关系,就是依赖的任务中有一个执行完成,就开始执行。

AND 汇聚关系由这些接口表达:

CompletionStage<R> thenCombine(other, fn);
CompletionStage<R> thenCombineAsync(other, fn);
CompletionStage<Void> thenAcceptBoth(other, consumer);
CompletionStage<Void> thenAcceptBothAsync(other, consumer);
CompletionStage<Void> runAfterBoth(other, action);
CompletionStage<Void> runAfterBothAsync(other, action);

OR 汇聚关系由这些接口来表达:

CompletionStage applyToEither(other, fn);
CompletionStage applyToEitherAsync(other, fn);
CompletionStage acceptEither(other, consumer);
CompletionStage acceptEitherAsync(other, consumer);
CompletionStage runAfterEither(other, action);
CompletionStage runAfterEitherAsync(other, action);

五、CompletableFuture 接口精讲

1、提交执行的静态方法

方法名

描述

runAsync(Runnable runnable)

执行异步代码,使用 ForkJoinPool.commonPool() 作为它的线程池

runAsync(Runnable runnable, Executor executor)

执行异步代码,使用指定的线程池

supplyAsync(Supplier<U> supplier)

异步执行代码,有返回值,使用 ForkJoinPool.commonPool() 作为它的线程池

supplyAsync(Supplier<U> supplier, Executor executor)

异步执行代码,有返回值,使用指定的线程池执行

上述四个方法,都是提交任务的,runAsync 方法需要传入一个实现了 Runnable 接口的方法,supplyAsync 需要传入一个实现了 Supplier 接口的方法,实现 get 方法,返回一个值。

(1)run 和 supply 的区别

run 就是执行一个方法,没有返回值,supply 执行一个方法,有返回值。

(2)一个参数和两个参数的区别

第二个参数是线程池,如果没有传,则使用自带的 ForkJoinPool.commonPool() 作为线程池,这个线程池默认创建的线程数是 CPU 的核数(也可以通过 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 来设置 ForkJoinPool 线程池的线程数)

2、串行关系 api

这些 api 之间主要是能否获得前一个任务的返回值与自己是否有返回值的区别。

api

是否可获得前一个任务的返回值

是否有返回值

thenApply

thenAccept

thenRun

不能

thenCompose

(1) thenApply 和 thenApplyAsync 使用

thenApply 和 thenApplyAsync 把两个并行的任务串行化,另一个任务在获得上一个任务的返回值之后,做一些加工和转换。它也是有返回值的。

public class BasicFuture4 {
    @Data
    @AllArgsConstructor
    @ToString
    static class Student {
        private String name;
    }
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Student> future = CompletableFuture.supplyAsync(() -> "Jack")
                .thenApply(s -> s + " Smith")
                .thenApply(String::toUpperCase)
                .thenApplyAsync(Student::new);
        System.out.println(future.get());
    }
}

结果可以看到,输入是一个字符串,拼接了一个字符串,转换成大写,new 了一个 Student 对象返回。

BasicFuture4.Student(name=JACK SMITH)

和 thenApply 一起的还有 thenAccept 和 thenRun,thenAccept 能获得到前一个任务的返回值,但是自身没有返回值;thenRun 不能获得前一个任务的返回值,自身也没有返回值。

(2)thenApply 和 thenApplyAsync 的区别

这两个方法的区别,在于谁去执行任务。如果使用 thenApplyAsync,那么执行的线程是从 ForkJoinPool.commonPool() 或者自己定义的线程池中取线程去执行。如果使用 thenApply,又分两种情况,如果 supplyAsync 方法执行速度特别快,那么 thenApply 任务就使用主线程执行,如果 supplyAsync 执行速度特别慢,就是和 supplyAsync 执行线程一样。

可以使用下面的例子演示一下:

package com.dsj361.future;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
/**
 * @Author wangkai
 */
public class BasicFuture8 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("----------supplyAsync 执行很快");
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            return "1";
        }).thenApply(s -> {
            System.out.println(Thread.currentThread().getName());
            return "2";
        });
        System.out.println(future1.get());
        System.out.println("----------supplyAsync 执行很慢");
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
            }
            System.out.println(Thread.currentThread().getName());
            return "1";
        }).thenApply(s -> {
            System.out.println(Thread.currentThread().getName());
            return "2";
        });
        System.out.println(future2.get());
    }
}

执行结果:

----------supplyAsync 执行很快
ForkJoinPool.commonPool-worker-1
main
2
----------supplyAsync 执行很慢
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-1
2
(3)thenCompose 的使用

假设有两个异步任务,第二个任务想要获取第一个任务的返回值,并且做运算,我们可以用 thenCompose。此时使用 thenApply 也可以实现,看一段代码发现他们的区别:

public class BasicFuture9 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = getLastOne().thenCompose(BasicFuture9::getLastTwo);
        System.out.println(future.get());
        CompletableFuture<CompletableFuture<String>> future2 = getLastOne().thenApply(s -> getLastTwo(s));
        System.out.println(future2.get().get());
    }
    public static CompletableFuture<String> getLastOne(){
        return CompletableFuture.supplyAsync(()-> "topOne");
    }
    public static CompletableFuture<String> getLastTwo(String s){
        return CompletableFuture.supplyAsync(()-> s + "  topTwo");
    }
}

可以看到使用 thenApply 的时候,需要使用两个 get() 方法才能获取到最终的返回值,使用 thenCompose 只要一个即可。

3、And 汇聚关系 Api

(1)thenCombine 的使用

加入我们要计算两个异步方法返回值的和,就必须要等到两个异步任务都计算完才能求和,此时可以用 thenCombine 来完成。

public static void main(String[] args) throws ExecutionException, InterruptedException {
    CompletableFuture<Integer> thenComposeOne = CompletableFuture.supplyAsync(() -> 192);
    CompletableFuture<Integer> thenComposeTwo = CompletableFuture.supplyAsync(() -> 196);
    CompletableFuture<Integer> thenComposeCount = thenComposeOne
        .thenCombine(thenComposeTwo, (s, y) -> s + y);
    thenComposeOne.thenAcceptBoth(thenComposeTwo,(s,y)-> System.out.println("thenAcceptBoth"));
    thenComposeOne.runAfterBoth(thenComposeTwo, () -> System.out.println("runAfterBoth"));
    System.out.println(thenComposeCount.get());
}

可以看到 thenCombine 第二个参数是一个 Function 函数,前面两个异步任务都完成之后,使用这个函数来完成一些运算。

(2)thenAcceptBoth

接收前面两个异步任务的结果,执行一个回调函数,但是这个回调函数没有返回值。

(3)runAfterBoth

接收前面两个异步任务的结果,但是回调函数,不接收参数,也不返回值。

4、Or 汇聚关系 Api

public class BasicFuture11 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> thenComposeOne = CompletableFuture.supplyAsync(() -> 192);
        CompletableFuture<Integer> thenComposeTwo = CompletableFuture.supplyAsync(() -> 196);
        CompletableFuture<Integer> thenComposeCount = thenComposeOne
                .applyToEither(thenComposeTwo, s -> s + 1);
        thenComposeOne.acceptEither(thenComposeTwo,s -> {});
        thenComposeOne.runAfterEither(thenComposeTwo,()->{});
        System.out.println(thenComposeCount.get());
    }
}
(1)applyToEither

任何一个执行完就执行回调方法,回调方法接收一个参数,有返回值

(2)acceptEither

任何一个执行完就执行回调方法,回调方法接收一个参数,无返回值

(3)runAfterEither

任何一个执行完就执行回调方法,回调方法不接收参数,也无返回值

5、处理异常

上面我们讲了如何把几个异步任务编排起来,执行一些串行或者汇聚操作。还有一个重要的地方,就是异常的处理。

先看下面的例子:

public static void main(String[] args) throws ExecutionException, InterruptedException {
    CompletableFuture.supplyAsync(() -> {
        System.out.println("execute one ");
        return 100;
    })
        .thenApply(s -> 10 / 0)
        .thenRun(() -> System.out.println("thenRun"))
        .thenAccept(s -> System.out.println("thenAccept"));
    CompletableFuture.runAsync(() -> System.out.println("other"));
}

结果:

execute one 
other

可以发现,只要链条上有一个任务发生了异常,这个链条下面的任务都不再执行了。

但是 main 方法上的接下来的代码还是会执行的。

所以这个时候,需要合理的去处理异常来完成一些收尾的工作。

public class BasicFuture12 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture.supplyAsync(() -> {
            System.out.println("execute one ");
            return 100;
        })
                .thenApply(s -> 10 / 0)
                .thenRun(() -> System.out.println("thenRun"))
                .thenAccept(s -> System.out.println("thenAccept"))
                .exceptionally(s -> {
                    System.out.println("异常处理");
                    return null;
                });
        CompletableFuture.runAsync(() -> System.out.println("other"));
    }
}

可以使用  exceptionally 来处理异常。

使用 handle() 方法也可以处理异常。但是 handle() 方法的不同之处在于,即使没有发生异常,也会执行。

六、烧水泡茶程序的实现

1、使用 Thread 多线程和 CountDownLatch 来实现

public class MakeTee {
    private static CountDownLatch countDownLatch = new CountDownLatch(2);
    static class HeatUpWater implements Runnable {
        private CountDownLatch countDownLatch;
        public HeatUpWater(CountDownLatch countDownLatch) {
            this.countDownLatch = countDownLatch;
        }
        @Override
        public void run() {
            try {
                System.out.println("洗水壶");
                Thread.sleep(1000);
                System.out.println("烧开水");
                Thread.sleep(5000);
                countDownLatch.countDown();
            } catch (InterruptedException e) {
            }
        }
    }
    static class PrepareTee implements Runnable {
        private CountDownLatch countDownLatch;
        public PrepareTee(CountDownLatch countDownLatch) {
            this.countDownLatch = countDownLatch;
        }
        @Override
        public void run() {
            try {
                System.out.println("洗茶壶");
                Thread.sleep(1000);
                System.out.println("洗茶杯");
                Thread.sleep(1000);
                System.out.println("拿茶叶");
                Thread.sleep(1000);
                countDownLatch.countDown();
            } catch (InterruptedException e) {
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        new Thread(new HeatUpWater(countDownLatch) ).start();
        new Thread(new PrepareTee(countDownLatch)).start();
        countDownLatch.await();
        System.out.println("准备就绪,开始泡茶");
    }
}

这里我们使用两个线程,分别执行烧水和泡茶的程序,使用 CountDownLatch 来协调两个线程的进度,等到他们都执行完成之后,再执行泡茶的动作。

可以看到这种方法,多了很多不必要的代码,new Thread,人工维护 CountDownLatch 的进度。

2、使用 CompletableFuture 来实现

public class MakeTeeFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
            try {
                System.out.println("洗水壶");
                Thread.sleep(1000);
                System.out.println("烧开水");
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
            try {
                System.out.println("洗茶壶");
                Thread.sleep(1000);
                System.out.println("洗茶杯");
                Thread.sleep(1000);
                System.out.println("拿茶叶");
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        CompletableFuture<Void> finish = future1.runAfterBoth(future2, () -> {
            System.out.println("准备完毕,开始泡茶");
        });
        System.out.println(finish.get());
    }
}

这个程序极度简单,无需手工维护线程,给任务分配线程的工作也不需要关注。

同时语义也更加清晰,future1.runAfterBoth(future2,......) 能够清晰的表述“任务 3 要等到任务 1 和任务 2 都完成之后才能继续开始”

然后代码更加简练并且专注于业务逻辑,几乎所有的代码都是业务逻辑相关的。

七、总结

本文介绍了异步编程的概念,以及 Java8 的 CompletableFuture 是如何优雅的处理多个异步任务之间的协调工作的。CompletableFuture 能够极大简化我们对于异步任务编排的工作,Flink 在提交任务时,也是使用这种异步任务的方式,去编排提交时和提交后对于任务状态处理的一些工作的。

相信读了本篇文章,会对于你日后的工作以及阅读 Flink 源码由很大的帮助的!

谢谢!

相关文章
|
2月前
|
Java API 调度
Java 多线程编程详解
《Java多线程编程详解》深入浅出地讲解了Java平台下的多线程核心概念、API使用及最佳实践。从基础理论到实战案例,本书帮助读者掌握并发编程技巧,提升软件开发中的效率与性能,是Java开发者不可或缺的参考指南。
58 6
|
2月前
|
安全 Java 调度
理解 Java 中的多线程编程
本文深入探讨了Java中的多线程编程,涵盖线程创建与管理、同步机制、锁及死锁避免策略。介绍了通过继承`Thread`类或实现`Runnable`接口创建线程的方法,并讨论了线程的生命周期状态。此外,还讲解了如何使用`ExecutorService`线程池以及`java.util.concurrent`包中的工具类来简化并发编程。理解这些概念和技术,有助于开发高效稳定的多线程应用程序。
|
5月前
|
安全 Java API
如何在Java中实现多线程编程
如何在Java中实现多线程编程
|
5月前
|
并行计算 安全 Java
如何使用Java进行异步编程
如何使用Java进行异步编程
|
7月前
|
Java 程序员 调度
Java 多线程编程
5月更文挑战第21天
|
7月前
|
消息中间件 安全 Java
理解Java中的多线程编程
【5月更文挑战第18天】本文介绍了Java中的多线程编程,包括线程和多线程的基本概念。Java通过继承Thread类或实现Runnable接口来创建线程,此外还支持使用线程池(如ExecutorService和Executors)进行更高效的管理。多线程编程需要注意线程安全、性能优化和线程间通信,以避免数据竞争、死锁等问题,并确保程序高效运行。
|
7月前
|
Java 程序员
Java中的多线程编程
本文将深入探讨Java中的多线程编程,包括线程的创建、启动、控制和同步等关键技术。我们将通过实例代码演示如何在Java中实现多线程,并讨论多线程编程的优势和挑战。
|
7月前
|
Java
Java中的多线程编程实现
【4月更文挑战第17天】本文将深入探讨Java编程语言中多线程编程的实现方法。我们将详细讨论如何创建和管理线程,以及如何使用同步机制来避免常见的并发问题。我们还将介绍一些高级主题,如线程池和Future接口,以帮助读者更好地理解和利用Java的多线程功能。
|
安全 Java
Java的多线程编程
Java的多线程编程
|
7月前
|
Java
JAVA的多线程编程
JAVA的多线程编程
25 0