Java线程池——Executor框架

简介: Executor 框架是 Java5 之后引进的,在 Java 5 之后,通过 Executor 来启动线程比使用 Thread 的 start 方法更好,除了更易管理,效率更好(用线程池实现,节约开销)。Executor 框架不仅包括了线程池的管理,还提供了线程工厂、队列以及拒绝策略等,Executor 框架让并发编程变得更加简单。

Executor 框架是 Java5 之后引进的,在 Java 5 之后,通过 Executor 来启动线程比使用 Thread 的 start 方法更好,除了更易管理,效率更好(用线程池实现,节约开销)。

Executor 框架不仅包括了线程池的管理,还提供了线程工厂、队列以及拒绝策略等,Executor 框架让并发编程变得更加简单。


Executor框架的组成:


  1. 任务(Runnable/Callable):任务通过Runnable接口或Callable接口进行定义。Runnable接口或Callable接口实现类都可以被 ThreadPoolExecutor执行
  2. 任务的执行(Executor):任务执行机制的核心接口 Executor,以及继承自Executor接口的ExecutorService接口。ThreadPoolExecutor实现了ExecutorService接口
  3. 异步的计算结果(Future):Future接口以及Future接口的实现类FutureTask类都可以代表异步计算的结果。当我们把Runnable接口或Callable接口的实现类提交给ThreadPoolExecutor执行后就会返回一个Future对象


一、Executor接口

线程池简化了线程的管理工作, 并且JUC提供了一种灵活的线程池实现来作为Executor框架的一部分。在Java类库中,任务执行的主要抽象不是Thread而是Executor,Executor只定义了execute一个方法,是最顶层的接口。


 

 /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);

execute方法接受一个Runnable参数,这个方法定义为在未来的某个时间执行传入的方法,方法的运行可以在一个新的线程,在线程池或者在调用的线程中,该方法无法接收到线程的执行结果(当然Runnable接口本身也没有返回值)。


虽然Executor是个简单的接口,但它却为灵活且强大的异步任务执行框架提供了基础,该框架能支持多种不同类型的任务执行策略。它提供了一种标准的方法将任务的提交过程与执行过程解耦开来,并用Runnable来表示任务。


Executor的实现还提供了对生命周期的支持,以及统计信息收集、应用程序管理机制和性能监视等机制。


**Executor基于生产者-消费者模式,提交任务的操作相当于生产者,执行任务的线程相当于消费者。**如果要在程序中实现一个生产者-消费者的设计,那么最简单的方式就是使用Executor。


二、ExecutorService接口

Executor框架使用Runnable作为其基本的任务表示形式。Runnable是一种有很大局限的抽象,它的run方法执行任务后不能返回一个值或者抛出一个受检查的异常。许多任务实际上都是存在延迟的计算——执行数据库查询,从网络上获取资源,或者计算某个复杂的功能。对于这些任务,Callable是一种更好的抽象,它认为主入口点(call)将返回一个值,并可能抛出一个异常。


因此引入了ExecutorService,它继承自Executor,并且在其基础上增加了很多功能,可以生成用于跟踪一个或多个异步任务进度的Future的方法,以解决Executor的局限性。


Future表示一个任务的生命周期,并且提供了响应的方法来判断是否已经完成或取消,以及获取任务的结果和取消任务等。

在Future规范中包含的隐含意义是任务的生命周期只能前进,不能后退,就像ExecutorService的生命周期一样。当某个任务完成后,它就永远停留在完成状态。

get方法的行为取决于任务的状态(尚未开始、正在运行、已完成)。如果任务已完成那么get会立即返回或者抛出一个Exception,如果任务没有完成,那么get将阻塞并直到任务完成。如果任务抛出了一场,那么get将该异常封装为ExecutionException并重新抛出。如果任务被取消,那么get将抛出CancellationException。如果get抛出了ExecutionException,那么可以通过getCause来获得被封装的初始异常。


定义了以下方法:


  1. void shutdown():启动有序关机,其中执行先前提交的任务,但不接受新任务。如果调用已经关闭,则没有额外的效果。不会阻塞等待先前提交的任务执行完成


  1. List<Runnable> shutdownNow():尝试停止所有正在执行的任务,停止对等待任务的处理,并返回等待执行的任务列表。不会阻塞等待正在执行的任务终止


  • 只是尽最大努力停止处理正在执行的任务之外,没有任何保证。例如,典型的实现将通过Thread.interrupt取消,因此任何未能响应中断的任务可能永远不会终止


  1. boolean isShutdown():如果此执行器已关闭(调用了关闭方法),则返回true


  1. 4.boolean isTerminated():如果关闭后所有任务都已完成,则返回true。注意,除非先调用shutdown或shutdownNow,否则isTerminated永远不会为真


  1. 5.boolean awaitTermination(long timeout, TimeUnit unit):阻塞直到所有任务在关机请求后完成执行,或者超时发生,或者当前线程被中断,以先发生的为准


  1. <T> Future<T> submit(Callablet task):提交一个带返回值的任务以供执行,并返回表示该任务的挂起结果的Future。Future的get方法将在成功完成任务时返回任务的结果


  1. <T> Future<T> submit(Runnable task, T result):提交可运行任务以供执行,并返回表示该任务的Future。Future的get方法将在成功完成时返回给定的结果


  1. Future<?> submit(Runnable task):提交可运行任务以供执行,并返回表示该任务的Future。Future的get方法将在成功完成时返回null


  1. <T> Listfuture<t> invokeAll(Collection? extends Callable<T> tasks):执行给定的任务,并在所有任务完成时返回保存其状态和结果的future列表。每个Future对象的isDone方法都会返回true。请注意,已完成的任务可以正常终止,也可以抛出异常终止。如果在执行此操作时修改了给定的集合,则此方法的结果是未定义的


  1. <T> T invokeAny(Collection? extends Callable<T> tasks):执行给定的任务,如果有成功完成的任务,则返回成功完成的任务的结果(即不抛出异常)。在正常或异常返回时,未完成的任务将被取消。如果在执行此操作时修改了给定的集合,则此方法的结果是未定义的。


三、ThreadPoolExecutor类

ThreadPoolExecutor实现了ExecutorService(实际上是继承了AbstractExecutorService),为了在广泛的上下文中发挥作用,该类提供了许多可调参数和可扩展性挂钩:


  1. corePoolSize:指定了线程池中的线程数量,它的数量决定了添加的任务是开辟新的线程去执行,还是放到workQueue任务队列中去
  2. maximumPoolSize:指定了线程池中的最大线程数量,这个参数会根据你使用的workQueue任务队列的类型,决定线程池会开辟的最大线程数量
  3. keepAliveTime:当线程池中空闲线程数量超过corePoolSize时,多余的线程(救急线程)会在多长时间内被销毁
  4. unit:keepAliveTime的单位
  5. workQueue:任务队列,被添加到线程池中,但尚未被执行的任务;它一般分为直接提交队列、有界任务队列、无界任务队列、优先任务队列几种
  6. threadFactory:线程工厂,用于创建线程,一般用默认即可
  7. handler:拒绝策略;当任务太多来不及处理时,如何拒绝任务


1、状态

  1. RUNNING(-1<<29):接受新任务并且处理排队任务
  2. SHUTDOWN(0<<29):不接受新任务但是处理排队任务
  3. STOP(1<<29):不接受新任务也不处理排队任务,同时中断处理中的任务
  4. TYDING(2<<29):所有任务被终止,工作线程数为0之后进入该状态,并且会执行terminated()钩子函数
  5. TERMINATED(3<<29):terminated()钩子函数执行完毕后


状态单调地随时间增加,但不需要达到每个状态。


  1. RUNNING->SHUTDOWN:调用shutdown()
  2. RUNNING/SHUTDOWN->STOP:调用shutdownNow()
  3. STOP->TYDING:当队列和池都为空时
  4. TIDYING -> TERMINATED:terminated()钩子函数执行完毕后


ThreadPoolExecutor中对于状态的记录保存在一个AtomicInteger类型的变量中,其中高三位就是用于记录线程池的状态,而低的29位用于记录线程数量。


2、Worker

ThreadPoolExecutor中定义了一个私有静态类Worker,其继承自AbstractQueuedSynchronizer类,并实现了Runnable接口。其中维护了线程实例(Thread)、任务实例(Runnable)、线程任务计数器(long)变量。

这个类适当地扩展了AbstractQueuedSynchronizer,以简化获取和释放围绕每个任务执行的锁。这可以防止中断,这些中断旨在唤醒等待任务的工作线程,而不是中断正在运行的任务。我们实现了一个简单的不可重入互斥锁,而不是使用ReentrantLock,因为我们不希望工作任务在调用setCorePoolSize等池控制方法时能够重新获得锁。此外,为了在线程实际开始运行任务之前抑制中断,我们将锁状态初始化为负值,并在启动时(在runWorker中)清除它。


3、扩展

该类还定义了三个protected类型的钩子函数:


  1. beforeExecute:线程池任务运行前执行
  2. afterExecute:线程池任务运行后执行
  3. terminated:线程池退出后执行


这几个方法在ThreadPoolExecutor中为空实现。


四、ForkJoinPool类

Fork/Join框架是Java7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。


Fork就是把一个大任务切分为若干个子任务并行的执行,Join就是合并这些子任务的执行结果,最后得到这个大任务的结果。比如计算1+2+…+10000,可以分割成10个子任务,每个子任务分别对1000个数进行求和,最终汇总这10个子任务的结果。


1、工作窃取算法

ForkJoinPool是运行ForkJoinTasks的ExecutorService。**ForkJoinPool与其他类型的ExecutorService的区别主要在于使用了工作窃取。工作窃取算法是指某个线程从其他队列里窃取任务来执行。**那么为什么要使用工作窃取算法呢?**假如我们需要做一个比较大的任务,可以把这个任务分割为若干个互不干扰的子任务,为了减少线程间的竞争,把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应。**比如A线程负责处理A队列里的任务。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而这时它们会访问同一个队列,所以为了减少窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务,而窃取任务的线程永远从双端队列的尾部拿任务执行。


工作窃取算法的优点:充分利用线程进行并行计算,减少了线程间的竞争


工作窃取算法的缺点:在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且该算法会消耗了更多的系统资源,比如创建多个线程和多个双端队列。


2、Fork/Join的设计

想要设计一个Fork/Join框架,需要完成两个步骤:


  1. 分割任务:需要有一个fork类来把大任务分割成子任务,有可能子任务还是很大,所以还需要不停地分割,直到分割出的子任务足够小
  2. 执行任务并合并结果:分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据。


Fork/Join使用两个类来完成以上两件事情:


  1. ForkJoinTask(抽象类):我们要使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork()和join()操作的机制。通常情况下我们不需要直接继承ForkJoinTask类,只需要继承它的子类,Fork/Join框架提供了以下两个子类:


  1. RecursiveAction(抽象类):用于没有返回结果的任务
  2. RecursiveTask(抽象类):用于有返回结果的任务


2.ForkJoinPool:ForkJoinTask需要通过ForkJoinPool来执行


任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个线程。


public class CountTask extends RecursiveTask<Integer> {
  private int start;
  private int end;
  public CountTask(int start, int end) {
    this.start = start;
    this.end = end;
  }
  @Override
  protected Integer compute() {
    int sum = 0;
    // 如果任务足够小就计算任务
    boolean canCompute = (end - start) <= THRESHOLD;
    if (canCompute) {
      for (int i = start; i <= end; i++) {
        sum += i;
      }
    } else {
      // 如果任务大于阈值,就分裂成两个子任务计算
      int middle = (start + end) / 2;
      CountTask leftTask = new CountTask(start, middle);
      CountTask rightTask = new CountTask(middle + 1, end);
      // 执行子任务
      leftTask.fork();
      rightTask.fork();
      // 等待子任务执行完,并得到其结果
      int leftResult = leftTask.join();
      int rightResult = rightTask.join();
      // 合并子任务
      sum = leftResult + rightResult;
    }
    return sum;
  }
  public static void main(String[] args) {
    ForkJoinPool forkJoinPool = new ForkJoinPool();
    CountTask task = new CountTask(1, 4);
    // 执行一个任务
    Future<Integer> result = forkJoinPool.submit(task);
    try {
      System.out.println(result.get());
    } catch (InterruptedException e) {
    } catch (ExecutionException e) {
    }
  }
}

RecursiveTask需要实现compute方法,在这个方法里,首先需要判断任务是否足够小,如果足够小就直接执行任务。如果不足够小就必须分割成两个子任务,每个子任务在调用fork方法时又会进入compute方法。最后使用join方法等待子任务执行完成并得到其结果。


ForkJoinTask在执行的时候可能会抛出异常,但是我们没办法在主线程里直接捕获异常,所以ForkJoinTask提供了isCompletedAbnormally()方法来检查任务是否已经抛出异常或已经被取消了,并且可以通过ForkJoinTask的getException方法来获取异常。


getException方法返回Throwable对象,如果任务被取消了则返回CancellationException,如果任务没有完成或者没有抛出异常则返回null。


3、执行原理

ForkJoinPool由ForkJoinTask数组和ForkJoinWorkerThread数组组成,ForkJoinTask数组负责存放程序提交给ForkJoinPool的任务,而ForkJoinWorkerThread数组负责执行这些任务。


当我们调用ForkJoinTask的fork方法时,程序会调用ForkJoinWorkerThread的pushTask方法异步地执行这个任务,然后立即返回结果。


pushTask方法把当前任务存放在ForkJoinTask数组队列里,然后再调用ForkJoinPool的signalWork反复噶唤醒或创建一个工作线程来执行任务。


五、ScheduledThreadPool类

ScheduledThreadPoolExecutor主要用来在给定的延迟后运行任务,或者定期执行任务。ScheduledThreadPoolExecutor使用任务队列DelayQueue封装了一个PriorityQueue,PriorityQueue会对队列中的任务进行排序,执行所需时间短的放在前面先被执行(ScheduledFutureTask的time变量小的先执行),如果执行所需时间相同则先提交的任务将被先执行(ScheduledFutureTask的squenceNumber变量小的先执行)。


1、ScheduledExecutorService

ScheduledThreadPool类继承自ThreadPoolExecutor,并且实现了ScheduledExecutorService接口。


ScheduledExecutorService接口定义了几个方法:


  1. ScheduleFuture<?> schedule(Runnable command, long delay, TimeUnit unit):提交在给定延迟后启用的一次性任务
  2. ScheduleFuture schedule(Callable command, long delay, TimeUnit unit):提交在给定延迟之后启用的带返回值的一次性任务
  3. ScheduleFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit):提交一个周期性任务,任务开始时进行计时(如果任务执行时间过长甚至超过period时间,会导致任务连续执行)
  4. ScheduleFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit):提交一个周期性任务,任务执行完成之后才进行计时


ScheduledFuture继承自Delayed接口和Future接口,自己本身没有定义新的方法。

Delayed接口是一个混合风格的接口,用于标记应该在给定延迟后执行的对象。这个接口定义了一个getDelay方法,用于返回剩余的延迟时间。此外这个接口继承了Comparable接口,意味着此接口的实现必须定义一个compareTo方法,该方法提供与其getDelay方法一致的排序


2、比较Timer

  • Timer对系统时钟的变化敏感,ScheduledThreadPoolExecutor不是
  • Timer只有一个执行线程,因此长时间运行的任务可以延迟其他任务。 ScheduledThreadPoolExecutor可以配置任意数量的线程。 此外,如果你想(通过提供 ThreadFactory),你可以完全控制创建的线程
  • 在TimerTask中抛出的运行时异常会杀死一个线程,从而导致 Timer 死机,即计划任务将不再运行。ScheduledThreadExecutor不仅捕获运行时异常,还允许您在需要时处理它们(通过重写afterExecute方法ThreadPoolExecutor)。抛出异常的任务将被取消,但其他任务将继续运行


六、Executors类

Executors是Java中用于创建线程池的工厂类,它提供了一系列的静态工厂方法,用于创建不同类型的线程池。这些工厂方法隐藏了线程池的复杂性,使得线程池的创建变得非常简单。Executors工厂类提供的线程池有以下几种类型:


  • newCachedThreadPool():CachedThreadPool的corePoolSize 被设置为0,maximumPoolSize被设置为Integer.MAX.VALUE,即它是无界的,这也就意味着如果主线程提交任务的速度高于maximumPool中线程处理任务的速度时,CachedThreadPool会不断创建新的线程。极端情况下,这样会导致耗尽 cpu和内存资源
  • newFixedThreadPool(int nThreads):创建一个固定大小的线程池,其中包含指定数量的线程。线程数量是固定的,不会自动扩展,即没有救急线程
  • newSingleThreadExecutor():创建一个单线程的线程池。这个线程池中只包含一个线程,用于串行执行任务。适用于需要按顺序执行任务的场景
  • newScheduledThreadPool(int corePoolSize):创建一个固定大小的线程池,用于定时执行任务。线程数量固定,不会自动扩展。适用于定时执行任务的场景
  • newSingleThreadScheduledExecutor():创建一个单线程的定时执行线程池。只包含一个线程,用于串行定时执行任务
  • newWorkStealingPool(int parallelism):该线程池维护足够的线程以支持给定的并行级别,并且可以使用多个队列来减少争用。并行性级别对应于积极参与或可用参与任务处理的最大线程数。实际的线程数可以动态地增加和减少。工作窃取池不能保证所提交任务的执行顺序


除此之外还提供了创建ThreadFactory实例,将Runnable实例转换为Callable实例等方法。


相关文章
|
26天前
|
Java 数据库
在Java中使用Seata框架实现分布式事务的详细步骤
通过以上步骤,利用 Seata 框架可以实现较为简单的分布式事务处理。在实际应用中,还需要根据具体业务需求进行更详细的配置和处理。同时,要注意处理各种异常情况,以确保分布式事务的正确执行。
|
26天前
|
消息中间件 Java Kafka
在Java中实现分布式事务的常用框架和方法
总之,选择合适的分布式事务框架和方法需要综合考虑业务需求、性能、复杂度等因素。不同的框架和方法都有其特点和适用场景,需要根据具体情况进行评估和选择。同时,随着技术的不断发展,分布式事务的解决方案也在不断更新和完善,以更好地满足业务的需求。你还可以进一步深入研究和了解这些框架和方法,以便在实际应用中更好地实现分布式事务管理。
|
20天前
|
存储 监控 小程序
Java中的线程池优化实践####
本文深入探讨了Java中线程池的工作原理,分析了常见的线程池类型及其适用场景,并通过实际案例展示了如何根据应用需求进行线程池的优化配置。文章首先介绍了线程池的基本概念和核心参数,随后详细阐述了几种常见的线程池实现(如FixedThreadPool、CachedThreadPool、ScheduledThreadPool等)的特点及使用场景。接着,通过一个电商系统订单处理的实际案例,分析了线程池参数设置不当导致的性能问题,并提出了相应的优化策略。最终,总结了线程池优化的最佳实践,旨在帮助开发者更好地利用Java线程池提升应用性能和稳定性。 ####
|
16天前
|
监控 Java 开发者
深入理解Java中的线程池实现原理及其性能优化####
本文旨在揭示Java中线程池的核心工作机制,通过剖析其背后的设计思想与实现细节,为读者提供一份详尽的线程池性能优化指南。不同于传统的技术教程,本文将采用一种互动式探索的方式,带领大家从理论到实践,逐步揭开线程池高效管理线程资源的奥秘。无论你是Java并发编程的初学者,还是寻求性能调优技巧的资深开发者,都能在本文中找到有价值的内容。 ####
|
21天前
|
存储 缓存 安全
Java 集合框架优化:从基础到高级应用
《Java集合框架优化:从基础到高级应用》深入解析Java集合框架的核心原理与优化技巧,涵盖列表、集合、映射等常用数据结构,结合实际案例,指导开发者高效使用和优化Java集合。
33 4
|
23天前
|
存储 缓存 监控
Java中的线程池深度解析####
本文深入探讨了Java并发编程中的核心组件——线程池,从其基本概念、工作原理、核心参数解析到应用场景与最佳实践,全方位剖析了线程池在提升应用性能、资源管理和任务调度方面的重要作用。通过实例演示和性能对比,揭示合理配置线程池对于构建高效Java应用的关键意义。 ####
|
1月前
|
开发框架 Java 关系型数据库
Java哪个框架适合开发API接口?
在快速发展的软件开发领域,API接口连接了不同的系统和服务。Java作为成熟的编程语言,其生态系统中出现了许多API开发框架。Magic-API因其独特优势和强大功能,成为Java开发者优选的API开发框架。本文将从核心优势、实际应用价值及未来展望等方面,深入探讨Magic-API为何值得选择。
42 2
|
1月前
|
前端开发 Java 数据库连接
你不可不知道的JAVA EE 框架有哪些?
本文介绍了框架的基本概念及其在编程领域的应用,强调了软件框架作为通用、可复用的软件环境的重要性。文章分析了早期Java EE开发中使用JSP+Servlet技术的弊端,包括可维护性差和代码重用性低等问题,并阐述了使用框架的优势,如提高开发效率、增强代码规范性和可维护性及提升软件性能。最后,文中详细描述了几种主流的Java EE框架,包括Spring、Spring MVC、MyBatis、Hibernate和Struts 2,这些框架通过提供强大的功能和支持,显著提升了Java EE应用的开发效率和稳定性。
59 1
|
1月前
|
Java 数据库连接 API
Spring 框架的介绍(Java EE 学习笔记02)
Spring是一个由Rod Johnson开发的轻量级Java SE/EE一站式开源框架,旨在解决Java EE应用中的多种问题。它采用非侵入式设计,通过IoC和AOP技术简化了Java应用的开发流程,降低了组件间的耦合度,支持事务管理和多种框架的无缝集成,极大提升了开发效率和代码质量。Spring 5引入了响应式编程等新特性,进一步增强了框架的功能性和灵活性。
46 0
|
2月前
|
存储 消息中间件 资源调度
C++ 多线程之初识多线程
这篇文章介绍了C++多线程的基本概念,包括进程和线程的定义、并发的实现方式,以及如何在C++中创建和管理线程,包括使用`std::thread`库、线程的join和detach方法,并通过示例代码展示了如何创建和使用多线程。
58 1
C++ 多线程之初识多线程
下一篇
DataWorks