java多线程之FutureTask、Future、CompletableFuture

简介: java多线程之FutureTask、Future、CompletableFuture

前面已经在多线程创建的时候有提到Future和FutureTask的简单用法,这里详细介绍下FutureTask以及CompletableFuture

一、FutureTask

1、FutureTask简介

FutureTask除了实现Future接口外,还实现了Runnable接口。因此,FutureTask可以交给 Executor执行,也可以由调用线程直接执行(FutureTask.run())。根据FutureTask.run()方法被执行 的时机,FutureTask可以处于下面3种状态。

1)未启动。FutureTask.run()方法还没有被执行之前,FutureTask处于未启动状态。当创建一 个FutureTask,且没有执行FutureTask.run()方法之前,这个FutureTask处于未启动状态。

2)已启动。FutureTask.run()方法被执行的过程中,FutureTask处于已启动状态。

3)已完成。FutureTask.run()方法执行完后正常结束,或被取消(FutureTask.cancel(…)),或 执行FutureTask.run()方法时抛出异常而异常结束,FutureTask处于已完成状态。

FutureTask的状态迁移的示意图。

当FutureTask处于未启动或已启动状态时,执行FutureTask.get()方法将导致调用线程阻塞; 当FutureTask处于已完成状态时,执行FutureTask.get()方法将导致调用线程立即返回结果或抛 出异常。

当FutureTask处于未启动状态时,执行FutureTask.cancel()方法将导致此任务永远不会被执 行;当FutureTask处于已启动状态时,执行FutureTask.cancel(true)方法将以中断执行此任务线程 的方式来试图停止任务;当FutureTask处于已启动状态时,执行FutureTask.cancel(false)方法将 不会对正在执行此任务的线程产生影响(让正在执行的任务运行完成);当FutureTask处于已完 成状态时,执行FutureTask.cancel(…)方法将返回false。

图get方法和cancel方法的执行示意图。

2、FutureTask的使用

可以把FutureTask交给Executor执行;也可以通过ExecutorService.submit(…)方法返回一个 FutureTask,然后执行FutureTask.get()方法或FutureTask.cancel(…)方法。除此以外,还可以单独 使用FutureTask。

当一个线程需要等待另一个线程把某个任务执行完后它才能继续执行,此时可以使用 FutureTask。假设有多个线程执行若干任务,每个任务最多只能被执行一次。当多个线程试图 同时执行同一个任务时,只允许一个线程执行任务,其他线程需要等待这个任务执行完后才 能继续执行。下面是对应的示例代码。

public class Test {
    private final ConcurrentMap<Object, Future<String>> taskCache =
            new ConcurrentHashMap<Object, Future<String>>();
 
    private String executionTask(final String taskName)
            throws ExecutionException, InterruptedException {
        while (true) {
            Future<String> future = taskCache.get(taskName);
            if (future == null) {
                Callable<String> task = new Callable<String>() {
                    public String call() throws InterruptedException {
                        return taskName;
                    }
                }; //创建任务
                FutureTask<String> futureTask = new FutureTask<String>(task);
                future = taskCache.putIfAbsent(taskName, futureTask);
                if (future == null) {
                    future = futureTask;
                    futureTask.run(); // 执行任务
                }
            }
            try {
                return future.get(); // 线程在此等待任务执行完成
            } catch (CancellationException e) {
                taskCache.remove(taskName, future);
            }
        }
    }
}

当两个线程试图同时执行同一个任务时,如果Thread 1执行1.3后Thread 2执行2.1,那么接 下来Thread 2将在2.2等待,直到Thread 1执行完1.4后Thread 2才能从2.2(FutureTask.get())返回。

3、FutureTask的实现

FutureTask的实现基于AbstractQueuedSynchronizer(以下简称为AQS)。java.util.concurrent中 的很多可阻塞类(比如ReentrantLock)都是基于AQS来实现的。AQS是一个同步框架,它提供通 用机制来原子性管理同步状态、阻塞和唤醒线程,以及维护被阻塞线程的队列。JDK 6中AQS 被广泛使用,基于AQS实现的同步器包括:ReentrantLock、Semaphore、ReentrantReadWriteLock、 CountDownLatch和FutureTask。

每一个基于AQS实现的同步器都会包含两种类型的操作,如下。

至少一个acquire操作。这个操作阻塞调用线程,除非/直到AQS的状态允许这个线程继续 执行。FutureTask的acquire操作为get()/get(long timeout,TimeUnit unit)方法调用。

至少一个release操作。这个操作改变AQS的状态,改变后的状态可允许一个或多个阻塞 线程被解除阻塞。FutureTask的release操作包括run()方法和cancel(…)方法。

基于“复合优先于继承”的原则,FutureTask声明了一个内部私有的继承于AQS的子类 Sync,对FutureTask所有公有方法的调用都会委托给这个内部子类。

AQS被作为“模板方法模式”的基础类提供给FutureTask的内部子类Sync,这个内部子类只 需要实现状态检查和状态更新的方法即可,这些方法将控制FutureTask的获取和释放操作。具 体来说,Sync实现了AQS的tryAcquireShared(int)方法和tryReleaseShared(int)方法,Sync通过这 两个方法来检查和更新同步状态。

FutureTask的设计示意图如图

如图所示,Sync是FutureTask的内部私有类,它继承自AQS。

创建FutureTask时会创建内部 私有的成员对象Sync,FutureTask所有的的公有方法都直接委托给了内部私有的Sync。 FutureTask.get()方法会调用AQS.acquireSharedInterruptibly(int arg)方法,这个方法的执行 过程如下。

1)调用AQS.acquireSharedInterruptibly(int arg)方法,这个方法首先会回调在子类Sync中实 现的tryAcquireShared()方法来判断acquire操作是否可以成功。acquire操作可以成功的条件为: state为执行完成状态RAN或已取消状态CANCELLED,且runner不为null。

2)如果成功则get()方法立即返回。如果失败则到线程等待队列中去等待其他线程执行 release操作。

3)当其他线程执行release操作(比如FutureTask.run()或FutureTask.cancel(…))唤醒当前线 程后,当前线程再次执行tryAcquireShared()将返回正值1,当前线程将离开线程等待队列并唤 醒它的后继线程(这里会产生级联唤醒的效果,后面会介绍)。

4)最后返回计算的结果或抛出异常。

FutureTask.run()的执行过程如下。

1)执行在构造函数中指定的任务(Callable.call())。

2)以原子方式来更新同步状态(调用AQS.compareAndSetState(int expect,int update),设置 state为执行完成状态RAN)。如果这个原子操作成功,就设置代表计算结果的变量result的值为 Callable.call()的返回值,然后调用AQS.releaseShared(int arg)。

3)AQS.releaseShared(int arg)首先会回调在子类Sync中实现的tryReleaseShared(arg)来执 行release操作(设置运行任务的线程runner为null,然会返回true);AQS.releaseShared(int arg), 然后唤醒线程等待队列中的第一个线程。

4)调用FutureTask.done()。

当执行FutureTask.get()方法时,如果FutureTask不是处于执行完成状态RAN或已取消状态 CANCELLED,当前执行线程将到AQS的线程等待队列中等待(见下图的线程A、B、C和D)。当 某个线程执行FutureTask.run()方法或FutureTask.cancel(...)方法时,会唤醒线程等待队列的第一 个线程(见图10-16所示的线程E唤醒线程A)。

假设开始时FutureTask处于未启动状态或已启动状态,等待队列中已经有3个线程(A、B和 C)在等待。此时,线程D执行get()方法将导致线程D也到等待队列中去等待。

当线程E执行run()方法时,会唤醒队列中的第一个线程A。线程A被唤醒后,首先把自己从 队列中删除,然后唤醒它的后继线程B,最后线程A从get()方法返回。线程B、C和D重复A线程 的处理流程。最终,在队列中等待的所有线程都被级联唤醒并从get()方法返回。

二、CompletableFuture

1、简介

CompletableFuture 在 Java 里面被用于异步编程,异步通常意味着非阻塞, 可以使得我们的任务单独运行在与主线程分离的其他线程中,并且通过回调可 以在主线程中得到异步任务的执行状态,是否完成,和是否异常等信息。

CompletableFuture 实现了 Future, CompletionStage 接口,实现了 Future 接口就可以兼容现在有线程池框架,而 CompletionStage 接口才是异步编程 的接口抽象,里面定义多种异步方法,通过这两者集合,从而打造出了强大的 CompletableFuture 类。

Futrue 在 Java 里面,通常用来表示一个异步任务的引用,比如我们将任务提 交到线程池里面,然后我们会得到一个 Futrue,在 Future 里面有 isDone 方 法来 判断任务是否处理结束,还有 get 方法可以一直阻塞直到任务结束然后获 取结果,但整体来说这种方式,还是同步的,因为需要客户端不断阻塞等待或 者不断轮询才能知道任务是否完成。

Future 的主要缺点如下:

(1) 不支持手动完成 我提交了一个任务,但是执行太慢了,我通过其他路径已经获取到了任务结果, 现在没法把这个任务结果通知到正在执行的线程,所以必须主动取消或者一直 等待它执行完成

(2) 不支持进一步的非阻塞调用 通过 Future 的 get 方法会一直阻塞到任务完成,但是想在获取任务之后执行 额外的任务,因为 Future 不支持回调函数,所以无法实现这个功能

(3) 不支持链式调用 对于 Future 的执行结果,我们想继续传到下一个 Future 处理使用,从而形成 一个链式的 pipline 调用,这在 Future 中是没法实现的。

(4) 不支持多个 Future 合并比如我们有 10 个 Future 并行执行,我们想在所有的 Future 运行完毕之后, 执行某些函数,是没法通过 Future 实现的。

(5) 不支持异常处理 Future 的 API 没有任何的异常处理的 api,所以在异步运行时,如果出了问题 是不好定位的。

2、使用 CompletableFuture

2.1、入门使用

场景:主线程里面创建一个 CompletableFuture,然后主线程调用 get 方法会 阻塞,最后我们在一个子线程中使其终止。

public class Test {
    public static void main(String[] args) throws
            Exception{ CompletableFuture<String> future = new
            CompletableFuture<>();new Thread(() -> {
        try{
            System.out.println(Thread.currentThread().getName() + "子线程开始干活");
//子线程睡 5 秒
            Thread.sleep(5000);
//在子线程中完成主线程
            future.complete("success");
        }catch (Exception
                e){ e.printStackTrace();
        }
    }, "A").start();
//主线程调用 get 方法阻塞
        System.out.println("主线程调用 get 方法获取结果为: " + future.get());
        System.out.println("主线程完成,阻塞结束!!!!!!");
    }
}

结果

A子线程开始干活
主线程调用 get 方法获取结果为: success
主线程完成,阻塞结束!!!!!!

2.2、没有返回值的异步任务

public class Test {
    public static void main(String[] args) throws
            Exception {
        System.out.println("主线程开始");
        //运行一个没有返回值的异步任务
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            try {
                System.out.println("子线程启动干活");
                Thread.sleep(5000);
                System.out.println("子线程完成");
            } catch (Exception e) {
                e.printStackTrace()
                ;
            }
        });
        //主线程阻塞
        future.get();
        System.out.println("主线程结束");
    }
}

2.3、有返回值的异步任务

public class Test {
    public static void main(String[] args) throws
            Exception{System.out.println("主线程开始");//运行一个有返回值的异步任务
        CompletableFuture<String> future =
                CompletableFuture.supplyAsync(() -> {
                    try {
                        System.out.println("子线程开始任务");
                        Thread.sleep(5000);
                    } catch (Exception e)
                    {e.printStackTrace()
                    ;
                    }
                    return "子线程完成了!";
                });//主线程阻塞
        String s = future.get();
        System.out.println("主线程结束, 子线程的结果为:" + s);
    }
 
}

2.4、线程依赖

当一个线程依赖另一个线程时,可以使用 thenApply 方法来把这两个线程串行 化

public class Test {
    private static Integer num = 10;
    /**
     * 先对一个数加 10,然后取平方
     * @param args
     */
    public static void main(String[] args) throws Exception {
        System.out.println("主线程开始");
        CompletableFuture<Integer> future =
                CompletableFuture.supplyAsync(() -> {
                    try {
                        System.out.println("加 10 任务开始");
                        num += 10;
                    } catch (Exception e) {
                        e.printStackTrace()
                        ;
                    }
                    return num;
                }).thenApply(integer ->
                {
                    return num * num;
                });
        Integer integer = future.get();
        System.out.println("主线程结束, 子线程的结果为:" + integer);
    }
}

2.5、消费处理结果

thenAccept 消费处理结果, 接收任务的处理结果,并消费处理,无返回结果

 
public class Test {
    private static Integer num = 10;
 
    public static void main(String[] args) throws Exception {
        System.out.println("主线程开始");
        CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println("加 10 任务开始");
                num += 10;
            } catch (Exception e) {
                e.printStackTrace()
                ;
            }
            return num;
        }).thenApply(integer ->
        {
            return num * num;
        }).thenAccept(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) {
                System.out.println("子线程全部处理完成,最后调用了 accept,结果为:" +
                        integer);
            }
        });
    }
 
}

2.6、异常处理

exceptionally 异常处理,出现异常时触发

public class Test {
    private static Integer num = 10;
 
    public static void main(String[] args) throws
            Exception{System.out.println("主线程开始");
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() ->
        {int i= 1/0;
            System.out.println("加 10 任务开始");
            num += 10;
            return num;
        }).exceptionally(ex ->
        { System.out.println(ex.getMessage());
            return -1;
        });
        System.out.println(future.get());
    }
 
}

handle 类似于 thenAccept/thenRun 方法,是最后一步的处理调用,但是同时可以处理异常

public class Test {
    private static Integer num = 10;
 
    public static void main(String[] args) throws
            Exception {
        System.out.println("主线程开始");
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() ->
        {
            System.out.println("加 10 任务开始");
            num += 10;
            return num;
        }).handle((i, ex) -> {
            System.out.println("进入 handle 方法");
            if (ex != null) {
                System.out.println("发生了异常,内容为:" + ex.getMessage());
                return -1;
            } else {
                System.out.println("正常完成,内容为: " + i);
                return i;
            }
        });
        System.out.println(future.get());
    }
 
}

2.7、结果合并

thenCompose 合并两个有依赖关系的 CompletableFutures 的执行结果

public class Test {
    private static Integer num = 10;
 
    public static void main(String[] args) throws
            Exception{System.out.println("主线程开始");
//第一步加 10
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() ->
        {System.out.println("加 10 任务开始");
            num += 10;
            return num;
        });
//合并
        CompletableFuture<Integer> future1 = future.thenCompose(i ->
//再来一个CompletableFuture
                CompletableFuture.supplyAsync(() -> {
                    return i + 1;
                }));
        System.out.println(future.get());
        System.out.println(future1.get());
    }
 
}

thenCombine 合并两个没有依赖关系的 CompletableFutures 任务

 
public class Test {
    private static Integer num = 10;
 
    public static void main(String[] args) throws
            Exception {
        System.out.println("主线程开始");
        CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() ->
        {
            System.out.println("加 10 任务开始");
            num += 10;
            return num;
        });
        CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("乘以 10 任务开始");
            num = num * 10;
            return num;
        });
//合并两个结果
        CompletableFuture<Object> future = job1.thenCombine(job2, new
                BiFunction<Integer, Integer, List<Integer>>() {
                    @Override
                    public List<Integer> apply(Integer a, Integer b) {
                        List<Integer> list = new ArrayList<>();
                        list.add(a);
                        list.add(b);
                        return list;
                    }
                });
        System.out.println("合并结果为:" + future.get());
    }
}

合并多个任务的结果allOf 与anyOf

allOf: 一系列独立的future 任务,等其所有的任务执行完后做一些事情

public class Test {
    private static Integer num = 10;
 
    /**
     * 先对一个数加 10,然后取平方
     *
     * @param args
     */
    public static void main(String[] args) throws
            Exception {
        System.out.println("主线程开始");
        List<CompletableFuture> list = new ArrayList<>();
        CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() ->
        {
            System.out.println("加 10 任务开始");
            num += 10;
            return num;
        });
        list.add(job1);
        CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() ->
        {
            System.out.println("乘以 10 任务开始");
            num = num * 10;
            return num;
        });
        list.add(job2);
        CompletableFuture<Integer> job3 = CompletableFuture.supplyAsync(() ->
        {
            System.out.println("减以 10 任务开始");
            num = num * 10;
            return num;
        });
        list.add(job3);
        CompletableFuture<Integer> job4 = CompletableFuture.supplyAsync(() ->
        {
            System.out.println("除以 10 任务开始");
            num = num * 10;
            return num;
        });
        list.add(job4);
//多任务合并
        List<Integer> collect =
                list.stream().map(CompletableFuture<Integer>::join).collect(Collectors.toList());
        System.out.println(collect);
    }
 
}

anyOf: 只要在多个future 里面有一个返回,整个任务就可以结束,而不需要等到每一个 future 结束

public class Test {
    private static Integer num = 10;
 
    /**
     * 先对一个数加 10,然后取平方
     *
     * @param args
     */
    public static void main(String[] args) throws
            Exception {
        System.out.println("主线程开始");
        CompletableFuture<Integer>[] futures = new CompletableFuture[4];
        CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(5000);
                System.out.println("加 10 任务开始");
                num += 10;
                return num;
            } catch (Exception
                    e) {
                return 0;
            }
        });
        futures[0] = job1;
        CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() ->
        {
            try {
                Thread.sleep(2000);
                System.out.println("乘以 10 任务开始");
                num = num * 10;
                return num;
            } catch (Exception
                    e) {
                return 1;
            }
        });
        futures[1] = job2;
        CompletableFuture<Integer> job3 = CompletableFuture.supplyAsync(() ->
        {
            try {
                Thread.sleep(3000);
                System.out.println("减以 10 任务开始");
                num = num * 10;
                return num;
            } catch (Exception
                    e) {
                return 2;
            }
        });
        futures[2] = job3;
        CompletableFuture<Integer> job4 = CompletableFuture.supplyAsync(() ->
        {
            try {
                Thread.sleep(4000);
                System.out.println("除以 10 任务开始");
                num = num * 10;
                return num;
            } catch (Exception
                    e) {
                return 3;
            }
        });
        futures[3] = job4;
        CompletableFuture<Object> future = CompletableFuture.anyOf(futures);
        System.out.println(future.get());
    }
}


相关文章
|
2天前
|
安全 Java Kotlin
Java多线程——synchronized、volatile 保障可见性
Java多线程中,`synchronized` 和 `volatile` 关键字用于保障可见性。`synchronized` 保证原子性、可见性和有序性,通过锁机制确保线程安全;`volatile` 仅保证可见性和有序性,不保证原子性。代码示例展示了如何使用 `synchronized` 和 `volatile` 解决主线程无法感知子线程修改共享变量的问题。总结:`volatile` 确保不同线程对共享变量操作的可见性,使一个线程修改后,其他线程能立即看到最新值。
|
2天前
|
消息中间件 缓存 安全
Java多线程是什么
Java多线程简介:本文介绍了Java中常见的线程池类型,包括`newCachedThreadPool`(适用于短期异步任务)、`newFixedThreadPool`(适用于固定数量的长期任务)、`newScheduledThreadPool`(支持定时和周期性任务)以及`newSingleThreadExecutor`(保证任务顺序执行)。同时,文章还讲解了Java中的锁机制,如`synchronized`关键字、CAS操作及其实现方式,并详细描述了可重入锁`ReentrantLock`和读写锁`ReadWriteLock`的工作原理与应用场景。
|
3天前
|
安全 Java 编译器
深入理解Java中synchronized三种使用方式:助您写出线程安全的代码
`synchronized` 是 Java 中的关键字,用于实现线程同步,确保多个线程互斥访问共享资源。它通过内置的监视器锁机制,防止多个线程同时执行被 `synchronized` 修饰的方法或代码块。`synchronized` 可以修饰非静态方法、静态方法和代码块,分别锁定实例对象、类对象或指定的对象。其底层原理基于 JVM 的指令和对象的监视器,JDK 1.6 后引入了偏向锁、轻量级锁等优化措施,提高了性能。
14 3
|
3天前
|
存储 安全 Java
Java多线程编程秘籍:各种方案一网打尽,不要错过!
Java 中实现多线程的方式主要有四种:继承 Thread 类、实现 Runnable 接口、实现 Callable 接口和使用线程池。每种方式各有优缺点,适用于不同的场景。继承 Thread 类最简单,实现 Runnable 接口更灵活,Callable 接口支持返回结果,线程池则便于管理和复用线程。实际应用中可根据需求选择合适的方式。此外,还介绍了多线程相关的常见面试问题及答案,涵盖线程概念、线程安全、线程池等知识点。
30 2
|
Java API 网络架构
20个使用 Java CompletableFuture的示例(下)
20个使用 Java CompletableFuture的示例(下)
281 1
|
Java API
20个使用 Java CompletableFuture的示例(上)
20个使用 Java CompletableFuture的示例(上)
501 0
20个使用 Java CompletableFuture的示例(上)
|
11天前
|
安全 Java API
java如何请求接口然后终止某个线程
通过本文的介绍,您应该能够理解如何在Java中请求接口并根据返回结果终止某个线程。合理使用标志位或 `interrupt`方法可以确保线程的安全终止,而处理好网络请求中的各种异常情况,可以提高程序的稳定性和可靠性。
42 6
|
26天前
|
设计模式 Java 开发者
Java多线程编程的陷阱与解决方案####
本文深入探讨了Java多线程编程中常见的问题及其解决策略。通过分析竞态条件、死锁、活锁等典型场景,并结合代码示例和实用技巧,帮助开发者有效避免这些陷阱,提升并发程序的稳定性和性能。 ####
|
24天前
|
存储 监控 小程序
Java中的线程池优化实践####
本文深入探讨了Java中线程池的工作原理,分析了常见的线程池类型及其适用场景,并通过实际案例展示了如何根据应用需求进行线程池的优化配置。文章首先介绍了线程池的基本概念和核心参数,随后详细阐述了几种常见的线程池实现(如FixedThreadPool、CachedThreadPool、ScheduledThreadPool等)的特点及使用场景。接着,通过一个电商系统订单处理的实际案例,分析了线程池参数设置不当导致的性能问题,并提出了相应的优化策略。最终,总结了线程池优化的最佳实践,旨在帮助开发者更好地利用Java线程池提升应用性能和稳定性。 ####
|
26天前
|
缓存 Java 开发者
Java多线程编程的陷阱与最佳实践####
本文深入探讨了Java多线程编程中常见的陷阱,如竞态条件、死锁和内存一致性错误,并提供了实用的避免策略。通过分析典型错误案例,本文旨在帮助开发者更好地理解和掌握多线程环境下的编程技巧,从而提升并发程序的稳定性和性能。 ####