java多线程之FutureTask、Future、CompletableFuture

简介: java多线程之FutureTask、Future、CompletableFuture

前面已经在多线程创建的时候有提到Future和FutureTask的简单用法,这里详细介绍下FutureTask以及CompletableFuture

一、FutureTask

1、FutureTask简介

FutureTask除了实现Future接口外,还实现了Runnable接口。因此,FutureTask可以交给 Executor执行,也可以由调用线程直接执行(FutureTask.run())。根据FutureTask.run()方法被执行 的时机,FutureTask可以处于下面3种状态。

1)未启动。FutureTask.run()方法还没有被执行之前,FutureTask处于未启动状态。当创建一 个FutureTask,且没有执行FutureTask.run()方法之前,这个FutureTask处于未启动状态。

2)已启动。FutureTask.run()方法被执行的过程中,FutureTask处于已启动状态。

3)已完成。FutureTask.run()方法执行完后正常结束,或被取消(FutureTask.cancel(…)),或 执行FutureTask.run()方法时抛出异常而异常结束,FutureTask处于已完成状态。

FutureTask的状态迁移的示意图。

当FutureTask处于未启动或已启动状态时,执行FutureTask.get()方法将导致调用线程阻塞; 当FutureTask处于已完成状态时,执行FutureTask.get()方法将导致调用线程立即返回结果或抛 出异常。

当FutureTask处于未启动状态时,执行FutureTask.cancel()方法将导致此任务永远不会被执 行;当FutureTask处于已启动状态时,执行FutureTask.cancel(true)方法将以中断执行此任务线程 的方式来试图停止任务;当FutureTask处于已启动状态时,执行FutureTask.cancel(false)方法将 不会对正在执行此任务的线程产生影响(让正在执行的任务运行完成);当FutureTask处于已完 成状态时,执行FutureTask.cancel(…)方法将返回false。

图get方法和cancel方法的执行示意图。

2、FutureTask的使用

可以把FutureTask交给Executor执行;也可以通过ExecutorService.submit(…)方法返回一个 FutureTask,然后执行FutureTask.get()方法或FutureTask.cancel(…)方法。除此以外,还可以单独 使用FutureTask。

当一个线程需要等待另一个线程把某个任务执行完后它才能继续执行,此时可以使用 FutureTask。假设有多个线程执行若干任务,每个任务最多只能被执行一次。当多个线程试图 同时执行同一个任务时,只允许一个线程执行任务,其他线程需要等待这个任务执行完后才 能继续执行。下面是对应的示例代码。

public class Test {
    private final ConcurrentMap<Object, Future<String>> taskCache =
            new ConcurrentHashMap<Object, Future<String>>();
 
    private String executionTask(final String taskName)
            throws ExecutionException, InterruptedException {
        while (true) {
            Future<String> future = taskCache.get(taskName);
            if (future == null) {
                Callable<String> task = new Callable<String>() {
                    public String call() throws InterruptedException {
                        return taskName;
                    }
                }; //创建任务
                FutureTask<String> futureTask = new FutureTask<String>(task);
                future = taskCache.putIfAbsent(taskName, futureTask);
                if (future == null) {
                    future = futureTask;
                    futureTask.run(); // 执行任务
                }
            }
            try {
                return future.get(); // 线程在此等待任务执行完成
            } catch (CancellationException e) {
                taskCache.remove(taskName, future);
            }
        }
    }
}

当两个线程试图同时执行同一个任务时,如果Thread 1执行1.3后Thread 2执行2.1,那么接 下来Thread 2将在2.2等待,直到Thread 1执行完1.4后Thread 2才能从2.2(FutureTask.get())返回。

3、FutureTask的实现

FutureTask的实现基于AbstractQueuedSynchronizer(以下简称为AQS)。java.util.concurrent中 的很多可阻塞类(比如ReentrantLock)都是基于AQS来实现的。AQS是一个同步框架,它提供通 用机制来原子性管理同步状态、阻塞和唤醒线程,以及维护被阻塞线程的队列。JDK 6中AQS 被广泛使用,基于AQS实现的同步器包括:ReentrantLock、Semaphore、ReentrantReadWriteLock、 CountDownLatch和FutureTask。

每一个基于AQS实现的同步器都会包含两种类型的操作,如下。

至少一个acquire操作。这个操作阻塞调用线程,除非/直到AQS的状态允许这个线程继续 执行。FutureTask的acquire操作为get()/get(long timeout,TimeUnit unit)方法调用。

至少一个release操作。这个操作改变AQS的状态,改变后的状态可允许一个或多个阻塞 线程被解除阻塞。FutureTask的release操作包括run()方法和cancel(…)方法。

基于“复合优先于继承”的原则,FutureTask声明了一个内部私有的继承于AQS的子类 Sync,对FutureTask所有公有方法的调用都会委托给这个内部子类。

AQS被作为“模板方法模式”的基础类提供给FutureTask的内部子类Sync,这个内部子类只 需要实现状态检查和状态更新的方法即可,这些方法将控制FutureTask的获取和释放操作。具 体来说,Sync实现了AQS的tryAcquireShared(int)方法和tryReleaseShared(int)方法,Sync通过这 两个方法来检查和更新同步状态。

FutureTask的设计示意图如图

如图所示,Sync是FutureTask的内部私有类,它继承自AQS。

创建FutureTask时会创建内部 私有的成员对象Sync,FutureTask所有的的公有方法都直接委托给了内部私有的Sync。 FutureTask.get()方法会调用AQS.acquireSharedInterruptibly(int arg)方法,这个方法的执行 过程如下。

1)调用AQS.acquireSharedInterruptibly(int arg)方法,这个方法首先会回调在子类Sync中实 现的tryAcquireShared()方法来判断acquire操作是否可以成功。acquire操作可以成功的条件为: state为执行完成状态RAN或已取消状态CANCELLED,且runner不为null。

2)如果成功则get()方法立即返回。如果失败则到线程等待队列中去等待其他线程执行 release操作。

3)当其他线程执行release操作(比如FutureTask.run()或FutureTask.cancel(…))唤醒当前线 程后,当前线程再次执行tryAcquireShared()将返回正值1,当前线程将离开线程等待队列并唤 醒它的后继线程(这里会产生级联唤醒的效果,后面会介绍)。

4)最后返回计算的结果或抛出异常。

FutureTask.run()的执行过程如下。

1)执行在构造函数中指定的任务(Callable.call())。

2)以原子方式来更新同步状态(调用AQS.compareAndSetState(int expect,int update),设置 state为执行完成状态RAN)。如果这个原子操作成功,就设置代表计算结果的变量result的值为 Callable.call()的返回值,然后调用AQS.releaseShared(int arg)。

3)AQS.releaseShared(int arg)首先会回调在子类Sync中实现的tryReleaseShared(arg)来执 行release操作(设置运行任务的线程runner为null,然会返回true);AQS.releaseShared(int arg), 然后唤醒线程等待队列中的第一个线程。

4)调用FutureTask.done()。

当执行FutureTask.get()方法时,如果FutureTask不是处于执行完成状态RAN或已取消状态 CANCELLED,当前执行线程将到AQS的线程等待队列中等待(见下图的线程A、B、C和D)。当 某个线程执行FutureTask.run()方法或FutureTask.cancel(...)方法时,会唤醒线程等待队列的第一 个线程(见图10-16所示的线程E唤醒线程A)。

假设开始时FutureTask处于未启动状态或已启动状态,等待队列中已经有3个线程(A、B和 C)在等待。此时,线程D执行get()方法将导致线程D也到等待队列中去等待。

当线程E执行run()方法时,会唤醒队列中的第一个线程A。线程A被唤醒后,首先把自己从 队列中删除,然后唤醒它的后继线程B,最后线程A从get()方法返回。线程B、C和D重复A线程 的处理流程。最终,在队列中等待的所有线程都被级联唤醒并从get()方法返回。

二、CompletableFuture

1、简介

CompletableFuture 在 Java 里面被用于异步编程,异步通常意味着非阻塞, 可以使得我们的任务单独运行在与主线程分离的其他线程中,并且通过回调可 以在主线程中得到异步任务的执行状态,是否完成,和是否异常等信息。

CompletableFuture 实现了 Future, CompletionStage 接口,实现了 Future 接口就可以兼容现在有线程池框架,而 CompletionStage 接口才是异步编程 的接口抽象,里面定义多种异步方法,通过这两者集合,从而打造出了强大的 CompletableFuture 类。

Futrue 在 Java 里面,通常用来表示一个异步任务的引用,比如我们将任务提 交到线程池里面,然后我们会得到一个 Futrue,在 Future 里面有 isDone 方 法来 判断任务是否处理结束,还有 get 方法可以一直阻塞直到任务结束然后获 取结果,但整体来说这种方式,还是同步的,因为需要客户端不断阻塞等待或 者不断轮询才能知道任务是否完成。

Future 的主要缺点如下:

(1) 不支持手动完成 我提交了一个任务,但是执行太慢了,我通过其他路径已经获取到了任务结果, 现在没法把这个任务结果通知到正在执行的线程,所以必须主动取消或者一直 等待它执行完成

(2) 不支持进一步的非阻塞调用 通过 Future 的 get 方法会一直阻塞到任务完成,但是想在获取任务之后执行 额外的任务,因为 Future 不支持回调函数,所以无法实现这个功能

(3) 不支持链式调用 对于 Future 的执行结果,我们想继续传到下一个 Future 处理使用,从而形成 一个链式的 pipline 调用,这在 Future 中是没法实现的。

(4) 不支持多个 Future 合并比如我们有 10 个 Future 并行执行,我们想在所有的 Future 运行完毕之后, 执行某些函数,是没法通过 Future 实现的。

(5) 不支持异常处理 Future 的 API 没有任何的异常处理的 api,所以在异步运行时,如果出了问题 是不好定位的。

2、使用 CompletableFuture

2.1、入门使用

场景:主线程里面创建一个 CompletableFuture,然后主线程调用 get 方法会 阻塞,最后我们在一个子线程中使其终止。

public class Test {
    public static void main(String[] args) throws
            Exception{ CompletableFuture<String> future = new
            CompletableFuture<>();new Thread(() -> {
        try{
            System.out.println(Thread.currentThread().getName() + "子线程开始干活");
//子线程睡 5 秒
            Thread.sleep(5000);
//在子线程中完成主线程
            future.complete("success");
        }catch (Exception
                e){ e.printStackTrace();
        }
    }, "A").start();
//主线程调用 get 方法阻塞
        System.out.println("主线程调用 get 方法获取结果为: " + future.get());
        System.out.println("主线程完成,阻塞结束!!!!!!");
    }
}

结果

A子线程开始干活
主线程调用 get 方法获取结果为: success
主线程完成,阻塞结束!!!!!!

2.2、没有返回值的异步任务

public class Test {
    public static void main(String[] args) throws
            Exception {
        System.out.println("主线程开始");
        //运行一个没有返回值的异步任务
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            try {
                System.out.println("子线程启动干活");
                Thread.sleep(5000);
                System.out.println("子线程完成");
            } catch (Exception e) {
                e.printStackTrace()
                ;
            }
        });
        //主线程阻塞
        future.get();
        System.out.println("主线程结束");
    }
}

2.3、有返回值的异步任务

public class Test {
    public static void main(String[] args) throws
            Exception{System.out.println("主线程开始");//运行一个有返回值的异步任务
        CompletableFuture<String> future =
                CompletableFuture.supplyAsync(() -> {
                    try {
                        System.out.println("子线程开始任务");
                        Thread.sleep(5000);
                    } catch (Exception e)
                    {e.printStackTrace()
                    ;
                    }
                    return "子线程完成了!";
                });//主线程阻塞
        String s = future.get();
        System.out.println("主线程结束, 子线程的结果为:" + s);
    }
 
}

2.4、线程依赖

当一个线程依赖另一个线程时,可以使用 thenApply 方法来把这两个线程串行 化

public class Test {
    private static Integer num = 10;
    /**
     * 先对一个数加 10,然后取平方
     * @param args
     */
    public static void main(String[] args) throws Exception {
        System.out.println("主线程开始");
        CompletableFuture<Integer> future =
                CompletableFuture.supplyAsync(() -> {
                    try {
                        System.out.println("加 10 任务开始");
                        num += 10;
                    } catch (Exception e) {
                        e.printStackTrace()
                        ;
                    }
                    return num;
                }).thenApply(integer ->
                {
                    return num * num;
                });
        Integer integer = future.get();
        System.out.println("主线程结束, 子线程的结果为:" + integer);
    }
}

2.5、消费处理结果

thenAccept 消费处理结果, 接收任务的处理结果,并消费处理,无返回结果

 
public class Test {
    private static Integer num = 10;
 
    public static void main(String[] args) throws Exception {
        System.out.println("主线程开始");
        CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println("加 10 任务开始");
                num += 10;
            } catch (Exception e) {
                e.printStackTrace()
                ;
            }
            return num;
        }).thenApply(integer ->
        {
            return num * num;
        }).thenAccept(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) {
                System.out.println("子线程全部处理完成,最后调用了 accept,结果为:" +
                        integer);
            }
        });
    }
 
}

2.6、异常处理

exceptionally 异常处理,出现异常时触发

public class Test {
    private static Integer num = 10;
 
    public static void main(String[] args) throws
            Exception{System.out.println("主线程开始");
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() ->
        {int i= 1/0;
            System.out.println("加 10 任务开始");
            num += 10;
            return num;
        }).exceptionally(ex ->
        { System.out.println(ex.getMessage());
            return -1;
        });
        System.out.println(future.get());
    }
 
}

handle 类似于 thenAccept/thenRun 方法,是最后一步的处理调用,但是同时可以处理异常

public class Test {
    private static Integer num = 10;
 
    public static void main(String[] args) throws
            Exception {
        System.out.println("主线程开始");
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() ->
        {
            System.out.println("加 10 任务开始");
            num += 10;
            return num;
        }).handle((i, ex) -> {
            System.out.println("进入 handle 方法");
            if (ex != null) {
                System.out.println("发生了异常,内容为:" + ex.getMessage());
                return -1;
            } else {
                System.out.println("正常完成,内容为: " + i);
                return i;
            }
        });
        System.out.println(future.get());
    }
 
}

2.7、结果合并

thenCompose 合并两个有依赖关系的 CompletableFutures 的执行结果

public class Test {
    private static Integer num = 10;
 
    public static void main(String[] args) throws
            Exception{System.out.println("主线程开始");
//第一步加 10
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() ->
        {System.out.println("加 10 任务开始");
            num += 10;
            return num;
        });
//合并
        CompletableFuture<Integer> future1 = future.thenCompose(i ->
//再来一个CompletableFuture
                CompletableFuture.supplyAsync(() -> {
                    return i + 1;
                }));
        System.out.println(future.get());
        System.out.println(future1.get());
    }
 
}

thenCombine 合并两个没有依赖关系的 CompletableFutures 任务

 
public class Test {
    private static Integer num = 10;
 
    public static void main(String[] args) throws
            Exception {
        System.out.println("主线程开始");
        CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() ->
        {
            System.out.println("加 10 任务开始");
            num += 10;
            return num;
        });
        CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("乘以 10 任务开始");
            num = num * 10;
            return num;
        });
//合并两个结果
        CompletableFuture<Object> future = job1.thenCombine(job2, new
                BiFunction<Integer, Integer, List<Integer>>() {
                    @Override
                    public List<Integer> apply(Integer a, Integer b) {
                        List<Integer> list = new ArrayList<>();
                        list.add(a);
                        list.add(b);
                        return list;
                    }
                });
        System.out.println("合并结果为:" + future.get());
    }
}

合并多个任务的结果allOf 与anyOf

allOf: 一系列独立的future 任务,等其所有的任务执行完后做一些事情

public class Test {
    private static Integer num = 10;
 
    /**
     * 先对一个数加 10,然后取平方
     *
     * @param args
     */
    public static void main(String[] args) throws
            Exception {
        System.out.println("主线程开始");
        List<CompletableFuture> list = new ArrayList<>();
        CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() ->
        {
            System.out.println("加 10 任务开始");
            num += 10;
            return num;
        });
        list.add(job1);
        CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() ->
        {
            System.out.println("乘以 10 任务开始");
            num = num * 10;
            return num;
        });
        list.add(job2);
        CompletableFuture<Integer> job3 = CompletableFuture.supplyAsync(() ->
        {
            System.out.println("减以 10 任务开始");
            num = num * 10;
            return num;
        });
        list.add(job3);
        CompletableFuture<Integer> job4 = CompletableFuture.supplyAsync(() ->
        {
            System.out.println("除以 10 任务开始");
            num = num * 10;
            return num;
        });
        list.add(job4);
//多任务合并
        List<Integer> collect =
                list.stream().map(CompletableFuture<Integer>::join).collect(Collectors.toList());
        System.out.println(collect);
    }
 
}

anyOf: 只要在多个future 里面有一个返回,整个任务就可以结束,而不需要等到每一个 future 结束

public class Test {
    private static Integer num = 10;
 
    /**
     * 先对一个数加 10,然后取平方
     *
     * @param args
     */
    public static void main(String[] args) throws
            Exception {
        System.out.println("主线程开始");
        CompletableFuture<Integer>[] futures = new CompletableFuture[4];
        CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(5000);
                System.out.println("加 10 任务开始");
                num += 10;
                return num;
            } catch (Exception
                    e) {
                return 0;
            }
        });
        futures[0] = job1;
        CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() ->
        {
            try {
                Thread.sleep(2000);
                System.out.println("乘以 10 任务开始");
                num = num * 10;
                return num;
            } catch (Exception
                    e) {
                return 1;
            }
        });
        futures[1] = job2;
        CompletableFuture<Integer> job3 = CompletableFuture.supplyAsync(() ->
        {
            try {
                Thread.sleep(3000);
                System.out.println("减以 10 任务开始");
                num = num * 10;
                return num;
            } catch (Exception
                    e) {
                return 2;
            }
        });
        futures[2] = job3;
        CompletableFuture<Integer> job4 = CompletableFuture.supplyAsync(() ->
        {
            try {
                Thread.sleep(4000);
                System.out.println("除以 10 任务开始");
                num = num * 10;
                return num;
            } catch (Exception
                    e) {
                return 3;
            }
        });
        futures[3] = job4;
        CompletableFuture<Object> future = CompletableFuture.anyOf(futures);
        System.out.println(future.get());
    }
}


相关文章
|
1天前
|
Java
阅读《代码整洁之道》总结(1),java多线程面试
阅读《代码整洁之道》总结(1),java多线程面试
|
1天前
|
缓存 安全 Java
7张图带你轻松理解Java 线程安全,java缓存机制面试
7张图带你轻松理解Java 线程安全,java缓存机制面试
|
2天前
|
安全 Java 开发者
深入理解Java并发编程:线程安全与性能优化
【5月更文挑战第15天】本文将深入探讨Java并发编程的核心概念,包括线程安全和性能优化。我们将通过实例分析,理解线程安全的重要性,并学习如何通过各种技术和策略来实现它。同时,我们也将探讨如何在保证线程安全的同时,提高程序的性能。
|
2天前
|
消息中间件 并行计算 Java
Java中的多线程编程:基础知识与实践
【5月更文挑战第15天】 在现代计算机编程中,多线程是一个复杂但必不可少的概念。特别是在Java这种广泛使用的编程语言中,理解并掌握多线程编程是每个开发者必备的技能。本文将深入探讨Java中的多线程编程,从基础概念到实际应用场景,为读者提供全面的理论支持和实践指导。
|
7月前
|
设计模式 缓存 Java
Java多线程线程池:提升应用性能的终极利器
Java多线程线程池:提升应用性能的终极利器
574 0
|
2天前
|
Java 数据库
【Java多线程】对线程池的理解并模拟实现线程池
【Java多线程】对线程池的理解并模拟实现线程池
16 1
|
2天前
|
Java 调度
Java多线程:什么是线程池(ThreadPool)?
Java多线程:什么是线程池(ThreadPool)?
49 0
|
2天前
|
监控 Java 调度
Java多线程实战-从零手搓一个简易线程池(四)线程池生命周期状态流转实现
Java多线程实战-从零手搓一个简易线程池(四)线程池生命周期状态流转实现
|
2天前
|
设计模式 Java
Java多线程实战-从零手搓一个简易线程池(三)线程工厂,核心线程与非核心线程逻辑实现
Java多线程实战-从零手搓一个简易线程池(三)线程工厂,核心线程与非核心线程逻辑实现
|
2天前
|
Java 测试技术
Java多线程实战-从零手搓一个简易线程池(二)线程池实现与拒绝策略接口定义
Java多线程实战-从零手搓一个简易线程池(二)线程池实现与拒绝策略接口定义