java并发编程笔记--Executor相关API整理

简介: Executor框架是concurrent包提供的用于执行线程任务的框架,它基于生产者-消费者模式实现,将提交任务的线程和执行任务的线程解耦。提交任务的线程视作生产者,执行任务的线程视作消费者。任务的执行策略可以通过定制不同的消费者实现,比如:任务可以同步执行,也可以异步执行;任务可以按照编排优先级,高优先级的任务可以优先执行;任务可以延迟执行或者按周期执行...这些实现对于生产者而言透明,生产者无需关注消费者的具体实现,仅需要按照业务需求提交任务即可。

Executor框架是concurrent包提供的用于执行线程任务的框架,它基于生产者-消费者模式实现,将提交任务的线程和执行任务的线程解耦。提交任务的线程视作生产者,执行任务的线程视作消费者。任务的执行策略可以通过定制不同的消费者实现,比如:任务可以同步执行,也可以异步执行;任务可以按照编排优先级,高优先级的任务可以优先执行;任务可以延迟执行或者按周期执行...这些实现对于生产者而言透明,生产者无需关注消费者的具体实现,仅需要按照业务需求提交任务即可。

类图

image

Executor接口定义了concurrent包线程任务执行的入口,其扩展接口和实现类形成了一套满足线程任务执行的通用框架。

Executor接口

image

一个线程任务的执行入口,包含一个execute方法,接收Runnable参数,用于解耦线程任务的提交和执行。execute方法依赖于具体的实现,即可以在提交任务的线程中执行线程任务,也可以启动新的线程异步执行,或者将提交的任务进行编排按照一定规则执行。

ExecutorService接口

image

作为Executor的扩展,分离了任务的执行(execute)和提交方法(submit/invoke*)。包含3种状态:

  • 运行(running):ExecutorService创建后的状态,表示执行器处于正常状态,可以接受任务;
  • 关闭(shutdown):执行shutdown/shutdownNow后的状态,此时不再接受新任务,且等待ExecutorService中的工作线程终止;
  • 终止(terminated):ExecutorService中的所有工作线程终止运行,可通过isTerminated判断是否终止;通过awaitTermination同步等待终止;

ExecutorService定义了多个方法实现对线程任务状态更加精细的控制,扩展方法包括:

  • void shutdown():触发Executor关闭操作,Executor在关闭前会等待已经提交的任务执行完成后才关闭。shutdown方法触发关闭操作后即刻返回,并不会等待任务执行完成才返回;
  • List shutdownNow():触发Executor关闭操作,停止所有正在执行的任务,并返回所有等待执行的任务列表;
  • boolean isShutdown():是否已经触发Executor的关闭操作,当执行shutdown/shutdownNow方法后,该方法返回true;
  • boolean isTerminated():是否已经结束关闭操作;当关闭操作触发时,Executor并不一定立即关闭,可能需要等待已经提交任务执行完成后才关闭,因此只有当Executor真正执行完关闭操作后,该方法返回true;
  • boolean awaitTermination(long timeout, TimeUnit unit):调用该方法的线程会阻塞等待,直到Executor执行完关闭操作;阻塞期间可能因超时(返回false)或者被中断(抛出InterruptedException异常)返回;
  • Future submit(Callable task):提交一个线程任务,返回一个Futrue对象,以便接收和处理线程执行结果;
  • Future submit(Runnable task, T result):同上,因为Runnable的run方法没有返回值,故单独用一个参数存放执行结果;
  • Future<?> submit(Runnable task):同上;
  • List> invokeAll(Collection<? extends Callable> tasks):批量提交任务,同步等待全部任务执行完并返回Future对象集合存放每个任务的执行结果;
  • List> invokeAll(Collection<? extends Callable> tasks, long timeout, TimeUnit unit):同上,允许设置超时时间;
  • T invokeAny(Collection<? extends Callable> tasks):执行所提交的任务,同步等待第一个已经执行完成的任务并返回结果,其余未完成任务将被取消执行;
  • T invokeAny(Collection<? extends Callable> tasks, long timeout, TimeUnit unit):同上,允许设置超时时间;

面试问题:Runnable接口和Callable接口有何异同?
相同点:均被设计用来抽象线程任务的执行,将任务的提交和任务的执行解耦,使得任务的提交和执行可以放在不同的线程中执行;
不同点:Callable接口允许返回计算结果,也可以抛出异常,在任务执行过程中可以异步的捕获异常,也可以获取线程执行结果;而Runnable接口不允许这样做,通常异常处理需要放在接口内实现,计算结果也无法直接返回,需要借助消息队列等其它数据结构和组件实现;

注意:是用execute方法和submit方法提交任务在异常处理上存在区别:
execute提交的任务在执行时如果抛出未捕获异常,则可以由java.lang.Thread.UncaughtExceptionHandler捕获处理。而submit提交的任务执行过程中如果抛出未捕获异常,将被视为任务执行结果的一部分,异常通过返回Future.get封装在ExecutionException中抛出。

ScheduledExecutorService接口

image

定时任务执行入口,允许延迟或者周期性调度线程任务,类似java.util.Timer的调度功能。

面试问题:java.util.Timer和ScheduledExecutorService接口的实现有何异同?

image

  • 提交任务参数不同:Timer的任务参数必须是TimerTask类型;而ScheduledExecutorService参数相对灵活,可以是Runnable也可以是Callable类型;
  • 调度线程不同:一个Timer对象包含一个TimerThread和一个TaskQueue对象,TimerThread线程用于调度所有的TimerTask任务,TaskQueue负责存放当前Timer对象等待调度的所有任务。TaskQueue是一个优先队列,基于堆实现,调度效率为log(n),任务通过下一次执行时间进行排序,以便保证最近的任务最先被调度执行。在创建Timer之后,TimerThread即已创建并且调用start方法启动,Timer调度任务是逐个调度,如果队列中的任务执行时间过长,将会导致整个任务序列执行时间的延迟。ScheduledExecutorService的实现类对执行任务的线程控制更加灵活,如ScheduledThreadPoolExecutor使用线程池来调度任务,可以同时调度多个任务,减少了因为任务排队执行而造成的延迟;当ScheduledThreadPoolExecutor指定的线程池大小为1时,效果等同于使用Timer;
  • 时间单位不同:ScheduledExecutorService使用TimeUnit指定时间,更加灵活;
  • 调度时间基准不同:Timer基于绝对时间调度,因而对操作系统的时钟变化敏感,一旦操作系统调整时钟,可能导致任务执行时间变化;ScheduledThreadPoolExecutor基于相对时间调度,因而对于系统时钟变化不敏感,具有更好的兼容性;
  • Timer存在线程泄漏问题:Timer对于执行任务抛出的异常不作捕获处理,一旦任务抛出异常,将导致整个Timer不可用,累及其它待执行任务;而ScheduledThreadPoolExecutor可以正确处理任务抛出的异常;

ScheduledExecutorService接口方法包括:

  • ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit):以给定延迟调度一个一次性的任务,当任务执行完成后,ScheduledFuture<?>的get方法返回null;
  • ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit):同上,在任务执行完后,返回Callable执行结果;
  • ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit):周期性的执行给定任务。任务第一次执行时延迟initialDelay时长,然后按照( initialDelay + period*( n - 1 ) ,n表示任务执行次数)固定周期执行;如果单次任务执行时间超过period,则下一次执行将会等待当次执行完成后立即执行,而不会并行执行;
  • ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit):周期性的执行给定任务。任务第一次执行时延迟initialDelay时长,然后按照( 上一次结束时间 + period )周期性执行,因而该方法执行任务的周期并不固定,会根据任务执行时间的长短变化;上一次任务结束到下一次任务执行开始,总是相隔period时长;

面试问题:ScheduledExecutorService接口的scheduleAtFixedRate与scheduleWithFixedDelay区别?
解释如上

AbstractExecutorService抽象类

image

作为ScheduledExecutorService的默认实现,实现了提交任务submit/invoke*方法,使用RunnableFuture作为返回结果(FutureTask作为默认实现类);未实现execute方法,等待子类实现;

ThreadPoolExecutor类

基于线程池的ExecutorService的实现,底层通过线程池并发执行任务,从而提高执行效率,ThreadPoolExecutor本身即为一个线程池的实现,线程池中用于执行任务的线程称为工作线程。根据不同的场景,线程池的执行策略可能不同,需要通过一些列参数进行控制,包括:corePoolSize(核心工作线程数),maximumPoolSize(最大工作线程数),keepAliveTime,workQueue(任务队列),threadFactory(线程创建类),handler(提交任务被拒绝后的处理策略)。为了简化操作,concurrent包的Executors类提供了几种常用线程池的创建方法,用来简化操作。

corePoolSize与maximumPoolSize参数

1)当poolSize < corePoolSize(poolSize为线程池当前的大小,通过getPoolSize方法获取),线程池会新建一个core线程用来执行新来的任务,即便此时线程池中有空闲的线程;
2)当corePoolSize <= poolSize <= maximumPoolSize,且任务队列未满,线程池将会优先将新来的任务放入workQueue中等待core线程执行;
3)当corePoolSize <= poolSize < maximumPoolSize,且任务队列已满,线程池新建一个线程执行新到任务;
4)当corePoolSize和maximumPoolSize设置相同时,则线程池大小固定;
5)corePoolSize和maximumPoolSize可以被设置为整型的上限Integer.MAX_VALUE,然而线程池大小的设置依赖于业务需要,同时也受限于所在物理设备的资源(CPU个数、内存大小、文件句柄数等),因此一般情况下,线程池大小不应该设置过大。如果需要处理海量的并发任务,则考虑使用分布式方法来处理;
6)corePoolSize和maximumPoolSize可以通过构造函数设置,也可以通过setter方法设置;

prestartCoreThread / prestartAllCoreThreads方法

当线程池大小poolSize

threadFactory参数

image

创建线程时可以指定线程名称,所属的ThreadGroup,线程的优先级,是否是守护线程等。有时创建线程时,我们希望这些参数都能够遵守一定规则,比如:创建的线程都归属于同一个ThreadGroup、线程名称以相同的前缀开始...ThreadFactory接口的定义就是为了创建某一类线程,通过不同的实现类实现对不同类型线程测创建。ThreadPoolExecutor默认使用DefaultThreadFactory作为线程创建的工厂类,该类创建的线程都归属于同一个ThreadGroup,线程名以统一的前缀开始,优先级都是Thread.NORM_PRIORITY,非守护线程。可以在构造函数中通过threadFactory参数进行定制。

注意:当ThreadFactory创建线程返回null时,ThreadPoolExecutor将继续运行,但是可能无法正常执行任务。

keepAliveTime参数

如果线程池的poolSize>corePoolSize,并且有空闲的非core线程,则当非core线程空闲时间达到keepAliveTime时,将会被回收,以便减少资源消耗。该参数可以通过构造函数指定,也可以通过setter方法动态指定。当设置为Long.MAX_VALUE时,表示线程永久保留。当keepAliveTime不为0,且同时设置allowCoreThreadTimeOut为true时,将会对core线程也执行相同的回收策略。

注:allowCoreThreadTimeOut设置为true还有一项额外的作用,即可以确保当ThreadPoolExecutor需要被terminate时,所有的工作线程均被shutdown。

workQueue参数

ThreadPoolExecutor中的workQueue用于暂存等待执行的任务。workQueue可以是任意BlockingQueue类型。workQueue的使用同poolSize相关。
1)当poolSize < corePoolSize时,线程池将会通过新增core线程的方式处理新来的任务,而不会放入workQueue中;
2)当corePoolSize <= poolSize <= maximumPoolSize时,且workQueue未满时,线程池将会优先将新来的任务放入workQueue中等待core线程执行;
3)当corePoolSize <= poolSize < maximumPoolSize,且workQueue已满,线程池会新建非core线程处理新来的任务;

workQueue常用的3种策略:

  • 直接传递任务:使用SynchronousQueue,SynchronousQueue可以理解为一个size为0的阻塞队列,每一个任务提交都必须等待有对应的任务获取操作,省去了中间的存储和查找操作。如果当前池中没有线程接收正在提交的任务,则会新建一个线程,直到工作线程数达到maximumPoolSizes后开始驳回任务。为了避免任务驳回,通常需要设置maximumPoolSizes为不限制。当提交任务的速率大于处理任务的速率时,将会导致线程数的无限制增加。
  • 无界队列:使用LinkedBlockingQueue等无界队列,当poolSize达到corePoolSize,并且没有core线程空闲时,新任务将会进入队列等待,由于队列长度没有限制,因而线程池中最多只会有corePoolSize个线程,此时maximumPoolSizes参数无效。该策略适合于任务之前没有依赖的情况,以确保任务执行不会相互影响,比如处理http请求时,即可使用该策略。当提交任务的速率大于处理任务的速率时,将会导致任务队列中任务的积压。
  • 有界队列:使用ArrayBlockingQueue等有界队列,可以避免大量创建线程或者任务积压造成资源耗尽。但是,需要权衡queueSize和maximumPoolSizes设置。比如:I/O密集型任务可以设置较小的queueSize和较大的
    maximumPoolSizes,以便CPU资源被充分利用,避免因为等待I/O操作造成CPU资源不能够被充分利用。

注:只要是BlockingQueue的实现类可以被ThreadPoolExecutor使用,因而可以根据各种BlockingQueue的子类特性影响任务执行的顺序,比如:使用PriorityBlockingQueue能够将任务按照优先级排序,使得高优先级的任务能够被优先执行。使用DelayQueue则适合于执行延迟任务,队列中的任务会按照延迟时间排序,仅当任务超过延迟时间后,才可以执行take操作;

ThreadPoolExecutor提供了对workQueue元素操作的外部方法,如getQueue()允许直接获取workQueue的引用,方便监控和调试;remove和purge方法允许删除workQueue中的元素。

RejectedExecutionHandler参数

image

RejectedExecutionHandler接口定义了当任务被线程池驳回时,需要执行的处理策略。任务驳回包括2种情况:

  • 线程池已经shutdown;
  • 线程池能够处理的任务已经达到饱和,如:poolSize达到maximumPoolSizes,且workQueue已满,此时新来的任务将会被驳回;

concurrent包提供4种处理策略,用户也可以通过实现RejectedExecutionHandler接口自定义处理策略:

image

  • AbortPolicy:中止策略,直接抛出RejectedExecutionException异常;
  • CallerRunsPolicy:直接在调用execute方法的线程中执行被驳回的任务;该策略由于占用了任务提交线程,因而会降低任务提交的速率。等同于任务提交线程直接执行任务;
  • DiscardPolicy:丢弃策略,将会丢弃被驳回的任务;
  • DiscardOldestPolicy:丢弃最老任务策略,将会删除workQueue头部的任务,然后重新提交被驳回任务;如果任务一直被驳回,该策略将会一直重复执行;

注意:使用CallerRunsPolicy策略可能引起线程安全问题
场景:假设任务访问了某种非线程安全的资源,为了保证线程安全,使用poolSize为1的线程池串行处理任务,如果任务被驳回,触发驳回策略使用外部线程重新执行,则可能和工作线程同时访问非线程安全的资源,引发线程安全问题。

回调方法

ThreadPoolExecutor提供了beforeExecute,afterExecute,terminated等回调用法,用于根据线程池的反馈执行相关操作。这些方法可以通过子类重写。

  • beforeExecute/afterExecute方法,会在每个任务执行前/后被调用,可以用作重置执行参数、打印日志、统计执行数据等;
  • terminated方法,在ThreadPoolExecutor被termniate之后调用,可用于执行资源清理等操作;

线程池大小&工作队列大小设置的考量

很难精确的给出线程池应该设置的大小,需要根据业务运行的情况不断调整。但是我们也可以根据一些经验给出大概的大小。

CPU密集型:较小的poolSize和较大的workQueue;
因为CPU密集型任务,资源的瓶颈主要在CPU,较小的poolSize可以保障cpu资源充分被用于任务执行,而非进行上下文切换;设置较大的workQueue可以确保充分接收更多的任务,提高吞吐。poolSize通常设置Ncpu+1个线程比较合适;

I/O密集型:设置较大的poolSize和较小的workQueue;
I/O密集型任务的瓶颈主要在I/O的等待,任务的I/O等待降低了工作线程处理速率,可能会导致后续任务的延迟,因此提高任务处理的并发度,能够有效减少I/O等待,故设置一个较大的poolSize。同时,为了保障任务不因为I/O等待而过多积压,造成时延增加,故设置一个较小的workQueue。

经验公式:Nthread = Ncpu Ucpu ( 1 + W/C )
Nthread,计算得出的poolSize大小;
Ncpu,可使用的cpu个数/核数;
Ucpu,期望最高的cpu使用率,考虑需要限制CPU使用率的场景,不能够总是将CPU使用率估计为100%,因而使用此参数;
W,线程的等待时间,需要自己统计求平均值;
C,线程的运行时间,需要自己统计求平均值;

注:决定线程池数量上限的因素还包括一些内存、文件句柄、socket句柄、数据库连接等资源的限制,在设置时也应该考虑;

ForkJoinPool类

  • 主要用于执行ForkJoinTask任务,同时支持非ForkJoinTask类型的任务。ForkJoinPool的优点在于实现了『工作窃取算法』:池中的工作线程不仅可以执行外部的线程任务,同时也可以执行其它运行中的任务所产生的子任务。这种策略适合于处理任务产生大量子任务的场景,也适合于处理由外部提交的大量细小的任务;
  • 静态方法commonPool返回一个通用的ForkJoinPool,可用来执行那些没有明确指定线程池的任务;
  • 线程池支持的最大线程数为32767;

ScheduledThreadPoolExecutor类

  • 基于线程池的ScheduledExecutorService的实现,底层通过线程池并发调度任务,该类继承ThreadPoolExecutor,使用无界队列策略,即使用corePoolSize个线程和无界的workQueue;
  • 不推荐将corePoolSize设置为0,或者allowCoreThreadTimeOut设置为true,因为当任务执行周期较长超过keepAliveTime时,可能导致任务执行时,池中的线程都被回收的情况;
  • 当任务在执行前被cancel后,将不会被执行,任务不会从队列中立即删除,而是等到超过delay时间后,才会从队列中删除。通过setRemoveOnCancelPolicy(true)可以设置cancel后立即删除任务;
  • 当一个任务同时被scheduleAtFixedRate和scheduleAtFixedRate调度,可能会在不同的线程中执行,但不会产生重叠执行的情况,因为前一次执行happens-before后一次执行;

Executors类

Executors类的主要功能包括如下:

  • 提供ExecutorService的创建方法,并使用DelegatedExecutorService包装,仅暴露接口方法,防止外部修改操作;
  • 提供ScheduledExecutorService的创建方法,并使用DelegatedScheduledExecutorService包装,仅暴露接口方法,防止外部修改操作;
  • 提供ThreadFactory的创建方法;
  • 提供Callable类型转换方法;

ExecutorService线程池相关工具方法

ExecutorService newSingleThreadExecutor()

无界队列策略,创建只有一个线程的线程池,因为使用了包装器,可以防止外部修改;

image

ExecutorService newFixedThreadPool(int nThreads)

无界队列策略,创建固定大小的线程池,使用无界队列;keepAliveTime为0;

image

image

ExecutorService newCachedThreadPool()

直接传递任务策略,使用SynchronousQueue作为workQueue;

image

ExecutorService newCachedThreadPool(ThreadFactory threadFactory)

image

ScheduledExecutorService newSingleThreadScheduledExecutor()

创建单线程的任务调度;
image

ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

创建指定线程池大小的任务调度;

image

ExecutorService newWorkStealingPool(int parallelism)

创建一个ForkJoinPool,使用parallelism指定并发度;

image

ExecutorService newWorkStealingPool()

创建ForkJoinPool,使用处理器个数作为并发度;

image

目录
相关文章
|
1天前
|
Java API
【JAVA进阶篇教学】第三篇:JDK8中Stream API使用
【JAVA进阶篇教学】第三篇:JDK8中Stream API使用
|
1天前
|
缓存 Java 数据库
Java并发编程学习11-任务执行演示
【5月更文挑战第4天】本篇将结合任务执行和 Executor 框架的基础知识,演示一些不同版本的任务执行Demo,并且每个版本都实现了不同程度的并发性。
19 4
Java并发编程学习11-任务执行演示
|
1天前
|
存储 安全 Java
12条通用编程原则✨全面提升Java编码规范性、可读性及性能表现
12条通用编程原则✨全面提升Java编码规范性、可读性及性能表现
|
1天前
|
缓存 Java 程序员
关于创建、销毁对象⭐Java程序员需要掌握的8个编程好习惯
关于创建、销毁对象⭐Java程序员需要掌握的8个编程好习惯
关于创建、销毁对象⭐Java程序员需要掌握的8个编程好习惯
|
2天前
|
网络协议 Dubbo Java
【网络编程】理解客户端和服务器并使用Java提供的api实现回显服务器
【网络编程】理解客户端和服务器并使用Java提供的api实现回显服务器
8 0
|
2天前
|
SQL Java 数据库连接
JDBC Java标准库提供的一些api(类+方法) 统一各种数据库提供的api
JDBC Java标准库提供的一些api(类+方法) 统一各种数据库提供的api
9 0
|
2天前
|
缓存 Java 数据库
Java并发编程中的锁优化策略
【5月更文挑战第9天】 在高负载的多线程应用中,Java并发编程的高效性至关重要。本文将探讨几种常见的锁优化技术,旨在提高Java应用程序在并发环境下的性能。我们将从基本的synchronized关键字开始,逐步深入到更高效的Lock接口实现,以及Java 6引入的java.util.concurrent包中的高级工具类。文中还会介绍读写锁(ReadWriteLock)的概念和实现原理,并通过对比分析各自的优势和适用场景,为开发者提供实用的锁优化策略。
3 0
|
2天前
|
算法 安全 Java
深入探索Java中的并发编程:CAS机制的原理与应用
总之,CAS机制是一种用于并发编程的原子操作,它通过比较内存中的值和预期值来实现多线程下的数据同步和互斥,从而提供了高效的并发控制。它在Java中被广泛应用于实现线程安全的数据结构和算法。
17 0
|
2天前
|
JavaScript 小程序 Java
基于java的少儿编程网上报名系统
基于java的少儿编程网上报名系统
11 2
|
3天前
|
存储 安全 算法
掌握Java并发编程:Lock、Condition与并发集合
掌握Java并发编程:Lock、Condition与并发集合
11 0