在编程中经常会使用线程池来异步处理任务,但是每个线程池的创建和销毁都有一定的开销。如果每次执行一个任务都需要开一个新线程去执行,则这些线程的创建和销毁将消耗大量的资源;并且线程都是各自为政,很难对其进行控制,更何况有一堆的线程在执行。这时就需要线程池来对线程进行管理。在java中提供了 Executor框架用于把任务的提交和执行解耦,任务的调教交给 Runnable 和 Callable,而Executor 框架用来处理任务。Executor 框架中最核心的成员就是 ThreadPoolExeeutor,它是线程池的核心实现类.
ThreadPoolExector
可以通过 ThreadPoolExector来创建一个线程池,ThreadPoolExecutor 类一共有4个构造方法,其中拥有最多参数的构造方法如下。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
corePoolSize: 核心线程数。默认情况下线程池是空的,只有任务提交时才会创建线程。如果当前运行的线程数少于 corePoolSize ,则创建新线程来处理任务;如果等于或者多于 crePoolSize ,则不再创建。如果调用线程池的 prrestartAllcoreThread 方法,线程池会提前创建并启动所有核心线程来等待任务。
maximumPoolSize: 线程池允许创建的最大线程数。如果任务队列满了并且线程数小于 maximumPoolSize时,则线程池仍旧会创建新的线程来处理任务。
keepAliveTime: 非核心线程闲置的超过时间。超过这个时间则回收。如果任务很多,并且每个任务的执行事件很短,则可以调大KeepAliveTime 来提高线程的利用率。另外,如果设置allowCoreThreadTimOut属性为 true 时,keepAliveTime 也会应用到核心线程上。
TimeOut: keepAliveTime 参数的时间单位。可选的单位有 天(days),小时(hours),分钟(minutes),秒(seconds),毫秒(milliseconds)等。
workQueue: 任务队列。如果当前线程数大于corePoolSize,则将任务添加到此任务队列中。该任务队列是BlockingQueue 类型,也就是阻塞队列。
ThreadFactory: 线程工厂。 可以用线程工厂给每个创建出来的线程设置名字。一般情况下无序设置改参数。
RejectdExecutionHandler: 饱和策略。这是当任务队列和线程池都满了时所采用的应对策略,默认是AbordPolicy,表示无法处理新任务,并抛出 RejectedExecutionException异常。此处还有3种策略,他们分别如下。
- CallerRunsolicy : 用调用者所在的线程来处理任务。此策略提供加单的反馈控制机制,能够缓解新任务的提交速度。
- DiscardPolicy: 不能执行的任务,并将该任务删除
DiscardOldestPolicy: 丢弃队列最近的任务,并执行当前的任务。
线程池的处理流程和原理
- 提交任务后,线程池先判断线程数时候达到了核心线程数。如果未达到核心线程数,则创建核心线程处理任务;否则,就执行下一步操作。
- 接着线程池判断任务队列是否满了。如果没满,则将任务添加到队列中;否则,就执行下一步操作。
- 接着因为线程池判断满了,线程池就判断线程数是否达到了最大线程数。如果未达到,则创建非核心线程处理任务;否则,就执行饱和策略,默认会抛出RejectedExecutionException异常。
- 如果线程池中的线程数未达到核心线程数,则创建核心线程处理任务。
- 如果线程数大于或者等于核心线程数,则将任务加入任务队列,线程池中的空闲线程会不断地从任务队列中取出任务进行处理。
- 如果任务队列满了,并且线程数没有达到最大线程数,则创建非核心线程去处理。
- 如果线程数超过了最大线程数,则执行饱和策略。
线程池的种类
FixedThreadPool
FixedThreadPool是可重用固定线程数的线程。在Executors 类中提供了创建FixedThreadPool 的方法,如下所示,
public static ExecutorService newFixedThreadPool(int var0) { return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); }
FixedThreadPool 的 corePoolSize 和maximumPoolsize 都设置为 创建 FixedThreadPool 指定的参数Threads ,也就意味着FixedThreadPool 只有核心线程,并且数量是固定的,没有非核心线程。keepAliveTime 设置为0L,意味着多余的线程会被立即终止。因为不会产生多余的线程,所以keepAliveTime 是无效的参数。另外,任务队列采用了无界阻塞队列 LinkBlockingQueue(容量默认为Integer.MAX_VALUE)。
LinkBlockingQueue(容量默认为Integer.MAX_VALUE)。
当执行execute方法时,如果当前运行的线程未达到 corePoolSize(核心线程数)时就创建核心线程来处理任务,如果达到了核心线程数则将任务添加·到·LinkedBlockingQueue中,FixedThreadPool 就是一个有固定核心线程的线程池,并且这些线程不会被回收。当线程数超过 corePoolize 时,就将任务存储在任务队列中,当线程池有空闲线程是,则从任务队列中去任务执行。
public class MyClass { public static void main(String[] args) { // ExecutorService fipool= Executors.newFixedThreadPool(5); for (int i=0;i<10;i++){ fipool.execute(getThread(i)); } //退出线程池 fipool.shutdown(); } private static Runnable getThread(final int i){ return new Runnable() { @Override public void run() { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(i); } }; } }
CachedThreadPool
CachedThreadPool 是一个根据需要创建的线程池。
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue()); }
CachedThreadPool 的 corePoolSize 为0,maximumPoolSize 设置为 Integer.Max_value.这意味着CachedThreadPool 没有核心线程,非核心线程是无界的。 keepAliveTime 设置为 60L,则空闲线程等待任务的最长时间为60s 。在此用了阻塞队列 SynchronousQueue ,它是一个不存储元素的阻塞队列,每个插入操作必须等待另一个线程的移除操作,同样任何一个移除操作都等待另一个线程的插入操作。
当执行 excute方法时,首先会执行 SynchrnousQueue的 offer 方法来提交任务,并且查询线程池是否有空线程执行 SynuchronousQueue 的 poll 方法移除任务。如果有子线程配对成功,将任务交给这个空闲的线程处理;当执行 excute方法时则这个空闲线程将终止。因为 maximumPoolSize 是无界的,所以如果提交的任务大于线程池中线程处理任务的速度就会不断创建新线程。另外,每次提交任务都会立即有线程去处理。所以 CachedThreadPool 比较适合于大量需要立即执行并且耗时较少的任务。
public class MyClass { public static void main(String[] args) { // ExecutorService fipool= Executors.newCachedThreadPool(); for (int i=0;i<10;i++){ cachPool.execute(getThread(i)); } //退出线程池 // cachPool.shutdown(); } private static Runnable getThread(final int i){ return new Runnable() { @Override public void run() { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(i); } }; } }
SingleThreadExecutor
SingThreadExecutor 是使用单个工作线程的线程池,其创建源码如下所示
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory)); }
corePoolSize 和 maxImumPoolSize 都为1,意味着 SingleThradExecutor 只有一个核心线程,其他的参数和 FixedThreadPool 一样。
阻塞队列使用的是LinkedBlockingQueue,若有多余的任务提交到线程池中,则会被暂存到阻塞队列,待空闲时再去执行。按照先进先出的顺序执行任务。需要注意的是,使用完之后需要shutdown线程池,因为它与CachedThreadPool 一样,在没有任务完成时,不会释放资源。
public class MyClass { public static void main(String[] args) { // ExecutorService singPool= Executors.newSingleThreadExecutor(); for (int i=0;i<10;i++){ singPool.execute(getThread(i)); } //退出线程池 singPool.shutdown(); } private static Runnable getThread(final int i){ return new Runnable() { @Override public void run() { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(i); } }; } }
ScheduledThreadPool
ScheduledThreadPool 是一个能实现定时和周期性任务的线程池,
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }
这里创建了 ScheduledThreadPoolExecutor,ScheduledThreadPoolExecutor继承自 自ThreadPoolExecutor,它主要用于给定延时之后的任务或者定期处理任务。
ScheduledThreadPoolExecutor 构造方法如下
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue()); }
最终调用的是 ThreadPoolExecutor 的构造方法。corePoolSize 是传入的固定值,maximumPoolSize的值是 Integer.MAX_VALUE .因为采用的 DelayedWorkQueue是无界的,所以maximumPoolSize这个参数是无效的。它的执行过程如图
当执行 ScheduledThreadPoolExecutor 的 scheduleAtFixedRate 或者 scheduleWithFixedDelay方法时,会向DelayedWorkQueue 添加一个 实现 RunnableScheduledFuture 接口的ScheduledFutureTask(任务的包装类),并会检查运行的线程是否达到 corePoolSize。如果没有则新建线程并启动它,但并不是立即去执行任务,而是去 DelayedWorkQueue 中取ScheduledFutureTask,然后去执行任务。如果运行的线程达到了corePoolSize时,则将任务添加到DelayedWorkQueue中。DelayedWorkQueue会将任务进行排序,先要执行的任务放在队列的前面。其跟此前介绍的线程池不同的是,当执行完任务后,会将ScheduledFutureTask中的time变量改为下次要执行的时间并放回到DelayedWorkQueue中
public class MyClass { public static void main(String[] args) { // ScheduledExecutorService singPool= Executors.newScheduledThreadPool(10); singPool.scheduleAtFixedRate(new Runnable() { @Override public void run() { System.out.println("开始"); } },3,3, TimeUnit.SECONDS); } }
scheduleAtFixedRate:以上一个任务开始的时间计时,period时间过去后,检测上一个任务是否执行完毕,如果上一个任务执行完毕,则当前任务立即执行,如果上一个任务没有执行完毕,则需要等上一个任务执行完毕后立即执行。
scheduleWithFixedDelay:是以上一个任务结束时开始计时,period时间过去后,立即执行。