线程池_02_Executor框架
- 一、Executor 的两级调度
- 背景知识
- 在HotSpot Vm 的线程模型中,Java 线程被一对一映射为本地操作系统线程。Java 线程启动时会创建一个本地操作系统线程;当该Java线程终止时,这个操作系统线程也会被回收。
- Executor
- 在上层,Java多线程程序通常把应用分解为若干个任务,然后使用用户级的调度器(Executor框架)将这些任务映射为固定数量的线程;换句话说就是应用程序通过Executor框架控制上层的调度。
- 在底层,操作系统内核将这些线程映射到硬件处理器上。换句话说,下层的调度由操作系统内核控制,下层的调度不受应用程序的控制。
- 背景知识
- 二、Executor 的组成
- 1、任务;
- 包括被执行任务需要实现的接口:Runbable或Callable 接口
- 2、任务的执行;
- 包括任务执行机制的核心接口Executor,以及继承自Executor 的ExecutorService接口。Executor 框架有两个关键类实现了ExecutorService 接口
- 3、异步计算的结果;
- 包括接口Future 和 实现Future接口的FutureTask类。
- 1、任务;
- 三、Executor 使用
- 1、主线程首先创建Runnable或Callable的任务对象A;
- 2、然后把对象A直接交给ExecutorService通过Execute或Submit方法执行;
- 3、如果使用的是submit方法,会返回一个实现Future接口的对象,主线程可以执行FutureTask.get()方法来等待任务执行完成。也可以执行FutureTask.cancle()来取消任务。
- 四、Executor 主要的类与接口的简介
- 1、Executor
- 接口,是Executor框架的基础,它将任务的提交与任务的执行分离开来。
- 2、ThreadPoolExecutor
- 线程池的核心实现类,用来执行被提交的任务
- 3、ScheduledThreadPoolExecutor(定时任务类,这里不讨论)
- 实现类,可以在给定的延迟后运行命令,或者定期执行命令。
- 4、Future接口和实现Future接口的FutureTask类,代表异步结算的结果
- 5、Runnable 和Callable接口的实现类
- 1、Executor
- 五、Executor 主要的类与接口的详解
- ThreadPoolExecutor
- 四个核心参数:
- 1、corePool 核心线程池的大小;
- 2、maximumPool 最大线程池的大小;
- 3、BlockingQueue 用来暂时保存任务的工作队列;
- 4、RejectedExecutionHandler 当ThreadPoolExecutor 可以关闭或ThreadExecutor 已经饱和时(达到了最大线程池大小并且工作队列已满),execute()方法将要调用的Handler。
- ThreadPoolExecutor通常使用工厂类Executors来创建,Executors可以创建3种类型的ThreadPoolExecutor: FixedThreadPool、SingleThreadExecutor、CachedThreadPool;
- 1、FixedThreadPool ,创建固定线程数的API。
- 创建的方法:
- FixedThreadPool 的核心线程和最大线程数是一样的;线程的空闲存活时间为0,则多余的空闲线程会被立即终止。
- 执行execute(),流程如下:
- 1、如果当前运行的线程数少于corePoolSize,则创建新线程来执行任务;
- 2、如果当前运行的线程数等于corePoolSize,将任务加入LinkedBlockingQueue;
- 3、线程执行完1中的任务后,会在循环中反复从LinkedBlockingQueue获取任务来执行。
- 使用无界队列LinkedBlockingQueue,影响:
- 1、如果当前运行的线程数等于corePoolSize,新任务将在无界队列中等待,因此线程池中的线程数不会超过corePoolSize;
- 2、maximumPoolSize 与 keepAliveTime 会是无效的;
- 3、运行中的FixedThreadPool 不会拒绝任务(因为工作队列可以无限增大);
- 创建的方法:
- 2、SingleThreadExecutor,使用单个工作线程的Executor。
- 创建的方法:
- 于corePoolSize与maximumPoolSize 设为1,其它参数与FixedThreadPool一样。
- 执行execute(),流程如下:
- 1、如果线程池中无运行的线程,则创建一个新线程来执行任务;
- 2、当前线程池中有一个运行的线程,将任务加入LinkedBlockingQueue;
- 3、线程执行完1中的任务后,会在循环中反复从LinkedBlockingQueue获取任务来执行。
- 使用无界队列LinkedBlockingQueue,与FixedThreadPool一样的结果。
- 创建的方法:
- 3、CachedThreadPool,根据需要创建新线程的线程池。
- 创建的方法:
- corePoolSize 为0,maximumPoolSize被设置为Integer.MAX_VALUE,即是maximumPoolSize无界的。keepAliveTime 设为60秒,空闲存活60秒后被终止。
- CachedThreadPool 使用没有容量的SynchronousQueue作为线程池的工作队列,但是maximumPoolSize是无界的。这意味着,如果主线程提交任务的速度高于maximumPool中线程处理任务的速度时,CachedThreadPool会不断创建新线程。极端情况下,CachedThreadPool会因为创建过多线程而耗尽CPU和内存资源。
- 创建的方法:
- 总结:
- FixedThreadPool 与 SingleThreadExecutor 允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而引起OOM异常
- CachedThreadPool => 允许创建的线程数为Integer.MAX_VALUE,可能会创建大量的线程,从而引起OOM异常
- 1、FixedThreadPool ,创建固定线程数的API。
- 四个核心参数:
- FutureTask
- 使用:
- 1、由调用线程直接执行FutureTask.run(); ---- 实现了Runnable接口
- 2、通过ExecutorService.submit()方法返回一个FutureTask; ---- 实现了Future 接口
- 实现原理
- 基于AbstractQueuedSynchronizer(简称AQS)。AQS是一个同步框架,它提供通用机制来原子性管理同步状态、阻塞和唤醒线程,以及维护被阻塞线程的队列。
- 基于AQS实现的同步器包含两种类型:
- 1、至少一个acquire操作。这个操作阻塞调用线程,除非/直到AQS的状态允许这个线程继续执行。
- 在FutureTask 类中get()和get(long timeout, TimeUnit unit)两个方法为acquire操作;
- 2、至少一个release操作。这个操作改变AQS的状态,改变后的状态可允许一个或多个阻塞线程被解除阻塞。
- 在FutureTask 类中run()和cancel()两个方法为acquire操作;
- 1、至少一个acquire操作。这个操作阻塞调用线程,除非/直到AQS的状态允许这个线程继续执行。
- 使用:
- ThreadPoolExecutor