如何判断线程池任务执行完?

简介: 如何判断线程池任务执行完?

无论是在项目开发中,还是在面试中过程中,总会被问到或使用到并发编程来完成项目中的某个功能。

例如某个复杂的查询,无法使用一个查询语句来完成此功能,此时我们就需要执行多个查询语句,然后再将各自查询的结果,组装之后返回给前端了,那么这种场景下,我们就必须使用线程池来进行并发查询了。

PS:磊哥做的最复杂的查询,总共关联了 21 张表,在和产品及需求方的沟通多次沟通下,才将查询的业务从 21 张表,降到了至少要查询 12 张表(非常难搞),那么这种场景下是无法使用一个查询语句来实现的,那么并发查询是必须要给安排上的。

1.需求分析

线程池的使用并不复杂,麻烦的是如何判断线程池中的任务已经全部执行完了?因为我们要等所有任务都执行完之后,才能进行数据的组装和返回,所以接下来,我们就来看如何判断线程中的任务是否已经全部执行完?

2.实现概述

判断线程池中的任务是否执行完的方法有很多,比如以下几个:

  1. 使用 getCompletedTaskCount() 统计已经执行完的任务,和 getTaskCount() 线程池的总任务进行对比,如果相等则说明线程池的任务执行完了,否则既未执行完。
  2. 使用 FutureTask 等待所有任务执行完,线程池的任务就执行完了。
  3. 使用 CountDownLatch 或 CyclicBarrier 等待所有线程都执行完之后,再执行后续流程。

具体实现代码如下。

3.具体实现

3.1 统计完成任务数

通过判断线程池中的计划执行任务数和已完成任务数,来判断线程池是否已经全部执行完,如果计划执行任务数=已完成任务数,那么线程池的任务就全部执行完了,否则就未执行完。

示例代码如下:

private static void isCompletedByTaskCount(ThreadPoolExecutor threadPool) {
   
   
    while (threadPool.getTaskCount() != threadPool.getCompletedTaskCount()) {
   
   
    }
}

以上程序执行结果如下:

方法说明

  • getTaskCount():返回计划执行的任务总数。由于任务和线程的状态可能在计算过程中动态变化,因此返回的值只是一个近似值。
  • getCompletedTaskCount():返回完成执行任务的总数。因为任务和线程的状态可能在计算过程中动态地改变,所以返回的值只是一个近似值,但是在连续的调用中并不会减少。

缺点分析

此判断方法的缺点是 getTaskCount() 和 getCompletedTaskCount() 返回的是一个近似值,因为线程池中的任务和线程的状态可能在计算过程中动态变化,所以它们两个返回的都是一个近似值。

3.2 FutureTask

FutrueTask 的优势是任务判断精准,调用每个 FutrueTask 的 get 方法就是等待该任务执行完,如下代码所示:

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;

/**
 * 使用 FutrueTask 等待线程池执行完全部任务
 */
public class FutureTaskDemo {
   
   
    public static void main(String[] args) throws ExecutionException, InterruptedException {
   
   
        // 创建一个固定大小的线程池
        ExecutorService executor = Executors.newFixedThreadPool(3);
        // 创建任务
        FutureTask<Integer> task1 = new FutureTask<>(() -> {
   
   
            System.out.println("Task 1 start");
            Thread.sleep(2000);
            System.out.println("Task 1 end");
            return 1;
        });
        FutureTask<Integer> task2 = new FutureTask<>(() -> {
   
   
            System.out.println("Task 2 start");
            Thread.sleep(3000);
            System.out.println("Task 2 end");
            return 2;
        });
        FutureTask<Integer> task3 = new FutureTask<>(() -> {
   
   
            System.out.println("Task 3 start");
            Thread.sleep(1500);
            System.out.println("Task 3 end");
            return 3;
        });
        // 提交三个任务给线程池
        executor.submit(task1);
        executor.submit(task2);
        executor.submit(task3);

        // 等待所有任务执行完毕并获取结果
        int result1 = task1.get();
        int result2 = task2.get();
        int result3 = task3.get();
        System.out.println("Do main thread.");
    }
}

以上程序的执行结果如下:

image.png

3.3 CountDownLatch和CyclicBarrier

CountDownLatch 和 CyclicBarrier 类似,都是等待所有任务到达某个点之后,再进行后续的操作,如下图所示:

CountDownLatch 使用的示例代码如下:

public static void main(String[] args) throws InterruptedException {
   
   
    // 创建线程池
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10, 20,
        0, TimeUnit.SECONDS, new LinkedBlockingDeque<>(1024));
    final int taskCount = 5;    // 任务总数
    // 单次计数器
    CountDownLatch countDownLatch = new CountDownLatch(taskCount); // ①
    // 添加任务
    for (int i = 0; i < taskCount; i++) {
   
   
        final int finalI = i;
        threadPool.submit(new Runnable() {
   
   
            @Override
            public void run() {
   
   
                try {
   
   
                    // 随机休眠 0-4s
                    int sleepTime = new Random().nextInt(5);
                    TimeUnit.SECONDS.sleep(sleepTime);
                } catch (InterruptedException e) {
   
   
                    e.printStackTrace();
                }
                System.out.println(String.format("任务%d执行完成", finalI));
                // 线程执行完,计数器 -1
                countDownLatch.countDown();  // ②
            }
        });
    }
    // 阻塞等待线程池任务执行完
    countDownLatch.await();  // ③
    // 线程池执行完
    System.out.println();
    System.out.println("线程池任务执行完成!");
}

代码说明:以上代码中标识为 ①、②、③ 的代码行是核心实现代码,其中:

① 是声明一个包含了 5 个任务的计数器;

② 是每个任务执行完之后计数器 -1;

③ 是阻塞等待计数器 CountDownLatch 减为 0,表示任务都执行完了,可以执行 await 方法后面的业务代码了。

以上程序的执行结果如下:

image.png

缺点分析

CountDownLatch 缺点是计数器只能使用一次,CountDownLatch 创建之后不能被重复使用。

CyclicBarrier 和 CountDownLatch 类似,它可以理解为一个可以重复使用的循环计数器,CyclicBarrier 可以调用 reset 方法将自己重置到初始状态,CyclicBarrier 具体实现代码如下:

public static void main(String[] args) throws InterruptedException {
   
   
    // 创建线程池
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10, 20,
        0, TimeUnit.SECONDS, new LinkedBlockingDeque<>(1024));
    final int taskCount = 5;    // 任务总数
    // 循环计数器 ①
    CyclicBarrier cyclicBarrier = new CyclicBarrier(taskCount, new Runnable() {
   
   
        @Override
        public void run() {
   
   
            // 线程池执行完
            System.out.println();
            System.out.println("线程池所有任务已执行完!");
        }
    });
    // 添加任务
    for (int i = 0; i < taskCount; i++) {
   
   
        final int finalI = i;
        threadPool.submit(new Runnable() {
   
   
            @Override
            public void run() {
   
   
                try {
   
   
                    // 随机休眠 0-4s
                    int sleepTime = new Random().nextInt(5);
                    TimeUnit.SECONDS.sleep(sleepTime);
                    System.out.println(String.format("任务%d执行完成", finalI));
                    // 线程执行完
                    cyclicBarrier.await(); // ②
                } catch (InterruptedException e) {
   
   
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
   
   
                    e.printStackTrace();
                }
            }
        });
    }
}

以上程序的执行结果如下:
image.png

方法说明

CyclicBarrier 有 3 个重要的方法:

  1. 构造方法:构造方法可以传递两个参数,参数 1 是计数器的数量 parties,参数 2 是计数器为 0 时,也就是任务都执行完之后可以执行的事件(方法)。
  2. await 方法:在 CyclicBarrier 上进行阻塞等待,当调用此方法时 CyclicBarrier 的内部计数器会 -1,直到发生以下情形之一:
    1. 在 CyclicBarrier 上等待的线程数量达到 parties,也就是计数器的声明数量时,则所有线程被释放,继续执行。
    2. 当前线程被中断,则抛出 InterruptedException 异常,并停止等待,继续执行。
    3. 其他等待的线程被中断,则当前线程抛出 BrokenBarrierException 异常,并停止等待,继续执行。
    4. 其他等待的线程超时,则当前线程抛出 BrokenBarrierException 异常,并停止等待,继续执行。
    5. 其他线程调用 CyclicBarrier.reset() 方法,则当前线程抛出 BrokenBarrierException 异常,并停止等待,继续执行。
  3. reset 方法:使得CyclicBarrier回归初始状态,直观来看它做了两件事:
    1. 如果有正在等待的线程,则会抛出 BrokenBarrierException 异常,且这些线程停止等待,继续执行。
    2. 将是否破损标志位 broken 置为 false。

优缺点分析

CyclicBarrier 从设计的复杂度到使用的复杂度都高于 CountDownLatch,相比于 CountDownLatch 来说它的优点是可以重复使用(只需调用 reset 就能恢复到初始状态),缺点是使用难度较高。

小结

在实现判断线程池任务是否执行完成的方案中,通过统计线程池执行完任务的方式(实现方法 1),以及实现方法 3(CountDownLatch 或 CyclicBarrier)等统计,都是“不记名”的,只关注数量,不关注(具体)对象,所以这些方式都有可能受到外界代码的影响,因此使用 FutureTask 等待具体任务执行完的方式是最推荐的判断方法。


本文已收录到我的面试小站 www.javacn.site,其中包含的内容有:Redis、JVM、并发、并发、MySQL、Spring、Spring MVC、Spring Boot、Spring Cloud、MyBatis、设计模式、消息队列等模块。

相关文章
|
24天前
|
存储 Java 数据库
如何处理线程池关闭时未完成的任务?
总之,处理线程池关闭时未完成的任务需要综合考虑多种因素,并根据实际情况选择合适的处理方式。通过合理的处理,可以最大程度地减少任务丢失和数据不一致等问题,确保系统的稳定运行和业务的顺利开展。
114 64
|
24天前
|
消息中间件 监控 Java
线程池关闭时未完成的任务如何保证数据的一致性?
保证线程池关闭时未完成任务的数据一致性需要综合运用多种方法和机制。通过备份与恢复、事务管理、任务状态记录与恢复、数据同步与协调、错误处理与补偿、监控与预警等手段的结合,以及结合具体业务场景进行分析和制定策略,能够最大程度地确保数据的一致性,保障系统的稳定运行和业务的顺利开展。同时,不断地优化和改进这些方法和机制,也是提高系统性能和可靠性的重要途径。
116 62
|
18天前
|
缓存 监控 Java
Java线程池提交任务流程底层源码与源码解析
【11月更文挑战第30天】嘿,各位技术爱好者们,今天咱们来聊聊Java线程池提交任务的底层源码与源码解析。作为一个资深的Java开发者,我相信你一定对线程池并不陌生。线程池作为并发编程中的一大利器,其重要性不言而喻。今天,我将以对话的方式,带你一步步深入线程池的奥秘,从概述到功能点,再到背景和业务点,最后到底层原理和示例,让你对线程池有一个全新的认识。
47 12
|
5月前
|
缓存 Java 调度
Java并发编程:深入解析线程池与Future任务
【7月更文挑战第9天】线程池和Future任务是Java并发编程中非常重要的概念。线程池通过重用线程减少了线程创建和销毁的开销,提高了资源利用率。而Future接口则提供了检查异步任务状态和获取任务结果的能力,使得异步编程更加灵活和强大。掌握这些概念,将有助于我们编写出更高效、更可靠的并发程序。
|
2月前
|
缓存 负载均衡 Java
c++写高性能的任务流线程池(万字详解!)
本文介绍了一种高性能的任务流线程池设计,涵盖多种优化机制。首先介绍了Work Steal机制,通过任务偷窃提高资源利用率。接着讨论了优先级任务,使不同优先级的任务得到合理调度。然后提出了缓存机制,通过环形缓存队列提升程序负载能力。Local Thread机制则通过预先创建线程减少创建和销毁线程的开销。Lock Free机制进一步减少了锁的竞争。容量动态调整机制根据任务负载动态调整线程数量。批量处理机制提高了任务处理效率。此外,还介绍了负载均衡、避免等待、预测优化、减少复制等策略。最后,任务组的设计便于管理和复用多任务。整体设计旨在提升线程池的性能和稳定性。
84 5
|
4月前
|
前端开发 JavaScript 大数据
React与Web Workers:开启前端多线程时代的钥匙——深入探索计算密集型任务的优化策略与最佳实践
【8月更文挑战第31天】随着Web应用复杂性的提升,单线程JavaScript已难以胜任高计算量任务。Web Workers通过多线程编程解决了这一问题,使耗时任务独立运行而不阻塞主线程。结合React的组件化与虚拟DOM优势,可将大数据处理等任务交由Web Workers完成,确保UI流畅。最佳实践包括定义清晰接口、加强错误处理及合理评估任务特性。这一结合不仅提升了用户体验,更为前端开发带来多线程时代的全新可能。
95 1
|
4月前
|
存储 监控 Java
|
5月前
|
Java Linux
Java演进问题之1:1线程模型对于I/O密集型任务如何解决
Java演进问题之1:1线程模型对于I/O密集型任务如何解决
|
4月前
|
Cloud Native Java 调度
项目环境测试问题之线程同步器会造成执行完任务的worker等待的情况如何解决
项目环境测试问题之线程同步器会造成执行完任务的worker等待的情况如何解决
|
4月前
|
Java 测试技术 PHP
父子任务使用不当线程池死锁怎么解决?
在Java多线程编程中,线程池有助于提升性能与资源利用效率,但若父子任务共用同一池,则可能诱发死锁。本文通过一个具体案例剖析此问题:在一个固定大小为2的线程池中,父任务直接调用`outerTask`,而`outerTask`再次使用同一线程池异步调用`innerTask`。理论上,任务应迅速完成,但实际上却超时未完成。经由`jstack`输出的线程调用栈分析发现,线程陷入等待状态,形成“死锁”。原因是子任务需待父任务完成,而父任务则需等待子任务执行完毕以释放线程,从而相互阻塞。此问题在测试环境中不易显现,常在生产环境下高并发时爆发,重启或扩容仅能暂时缓解。