Executor框架
Executor
框架将任务的提交与任务的执行解耦了。
Executor
顶层接口,Executor
中只有一个execute
方法,用于执行任务。线程的创建、调度等细节均由其子类实现ExecutorService
继承并扩展了Executor
,在ExecutorService
内部提供了更全面的提交机制以及线程池关闭等方法。ThreadPoolExecutor
:实现了ExecutorService
接口,线程池机制都封装在这个类中。ScheduledExecutorService
:实现了ExecutorService
接口,增加了定时任务的相关方法。ScheduledThreadPoolExecutor
:继承自ThreadPoolExecutor
并实现了ScheduledExecutorService
接口ForkJoinPool
:是一种支持任务分解的线程池,一般配合接口ForkJoinTask
使用。
ThreadPoolExecutor
配置ThreadPoolExecutor
线程池时,要避免线程池大小出现过大或过小的情况。如果线程池过大,那么大量的线程将会在cpu及内存资源上发生竞争,从而导致更高的内存使用量,严重情况下会导致资源耗尽;如果线程池过小,会导致空闲的cpu处理器无法工作,从而降低了吞吐率。要设置合适的线程池大小,需要考虑下面的几个因素:
CPU
个数( 可以通过Runtime.getRuntime().availableProcessors()
获取)- 内存大小
- 任务类型:
计算密集型
、I/O密集型
还是两者皆可。如:任务为计算密集型,当CPU
处理器个数为N
时,线程池大小为N+1
比较合适。
public class ThreadPoolExecutor extends AbstractExecutorService {
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
}
ThreadPoolExecutor
继承自AbstractExecutorService
类(实现了ExecutorService
接口),ThreadPoolExecutor
中有四个构造器,但最后都会调用到上面代码中的这个,来看构造器中的这些参数代表的意义:
corePoolSize(int类型)
: 核心线程数,默认一直存活(即使线程处于空闲状态)。maximumPoolSize(int类型)
: 线程池允许的最大线程数,其值大小>=corePoolSize
。keepAliveTime(long类型)
: 线程的存活时间,默认是超过corePoolSize
大小之后启动的非核心线程的存活时间,当线程池设置allowCoreThreadTimeOut=true
时,对核心线程也会起作用。unit(TimeUnit类型)
: 时间单位
NANOSECONDS //纳秒
MICROSECONDS //微秒
MILLISECONDS //毫秒
SECONDS //秒
MINUTES //分
HOURS //小时
DAYS //天
workQueue(BlockingQueue<Runnable>)
: 阻塞队列(实现了BlockingQueue
接口),当线程数超过核心线程数corePoolSize
大小时,会将任务放入阻塞队列中,ThreadPoolExecutor
中使用了下面几种队列:ArrayBlockingQueue :数组实现的有界阻塞队列,队列满时,后续提交的任务通过
handler
中的拒绝策略去处理。LinkedBlockingQueue:链表实现的阻塞队列,默认大小是
Integer.MAX_VALUE
(无界队列),也可以通过传入指定队列大小capacity
。SynchronousQueue:内部并没有缓存数据,缓存的是线程。当生产者线程进行添加操作(
put
)时必须等待消费者线程的移除操作(take
)才会返回。SynchronousQueue
可以用于实现生产者与消费者的同步。PriorityBlockingQueue:二叉堆实现的优先级阻塞队列,传入队列的元素不能为
null
并且必须实现Comparable
接口。
DelayQueue: 延时阻塞队列,队列元素需要实现Delayed
接口。
threadFactory(ThreadFactory)
: 线程工厂,用于创建线程。Executors
中使用了默认的DefaultThreadFactory
:private static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }
也可以自定义ThreadFactory
,在newThread
中自行配置Thread
(如:配置线程名、是否是守护线程、线程优先级等)。
handler(RejectedExecutionHandler)
: 提交的任务被拒绝执行的饱和策略,通常有下面几种形式:
AbortPolicy: 默认饱和策略,丢弃任务并抛出RejectedExecutionException异常。调用者可以捕获这个异常并自行处理。
CallerRunsPolicy:线程池中不再处理该任务,由调用线程处理该任务。
DiscardPolicy:当任务无法添加到队列中等待执行时,DiscardPolicy策略会丢弃任务,并且不抛异常。
DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试提交新的任务。
具体使用哪个饱和策略要根据具体的业务场景指定。当然也可以自定义RejectedExecutionHandler
来自行决定拒绝任务的处理策略。
执行流程
线程池ThreadPoolExecutor的工作流程大致分为下面几步:
- 当工作线程数小于核心线程数(
corePoolSize
)时,直接创建核心线程去执行任务 - 当线程数超过核心线程数(
corePoolSize
)时,将任务加入等待队列(BlockingQueue
)中 - 队列满时,继续创建非核心线程去执行任务(注意:核心线程
corePoolSize
+非核心线程<=maximumPoolSize
)
简单总结一下执行顺序:核心线程数满->队列满->最大线程数满->任务拒绝策略
用流程图大致表示为:
线程池状态
ThreadPoolExecutor
有三种状态:运行、关闭、终止
。
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
RUNNING
: 运行状态,接收并处理队列中的任务SHUTDOWN
: 不再接收新任务,队列中已提交的任务执行完成。STOP
: 尝试取消所有正在运行中的任务,并且不再处理队列中尚未开始执行的任务。TIDYING
: 所有任务已终止,线程池中线程数为变为0,此时线程池状态变为TIDYING
。TERMINATED
: 线程池终止。
AsyncTask
AsyncTask
是Android
内置线程池,相当于封装了Thread+Handler
,可以很方便地在后台执行耗时任务并将结果更新到UI线程
。AsyncTask
内部的线程池是通过ThreadPoolExecutor
实现的。
public abstract class AsyncTask<Params, Progress, Result> {
......
}
AsyncTask
是一个抽象类,几个关键方法如下:
onPreExecute()
:运行在UI线程,可以用来做一些初始化工作doInBackground(Params…)
:运行在子线程,用于执行一些耗时操作,执行过程中可以通过publishProgress(Progress…values)
来通知UI线程任务的进度。onPostExecute(Result result)
:运行在UI线程,doInBackground
执行完毕后返回Result
,并将该值传递到onPostExecute(Result result)
中。onProgressUpdate(Progress…)
:运行在UI线程,用于显示doInBackground
的执行进度,在doInBackground()
中执行publishProgress(Progress…values)
后会回调到此方法。onCancelled()
:运行在UI线程,当调用cancel(true)
后会回调此方法。此时onPostExecute()
不会再执行。
注:AsyncTask
有三种泛型类型,Params,Progress、Result
(可以都指定为空,如AsyncTask<Void, Void, Void>
),其中:
Params
:在execute(Params... params)
传递,会传递到doInBackground(Params…)
中。Progress
:后台任务进度值,在onProgressUpdate(Progress…)
使用Result
:最终返回的结果类型,在onPostExecute(Result result)
使用
AsyncTask中任务的串行&并行
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
// We want at least 2 threads and at most 4 threads in the core pool,
// preferring to have 1 less than the CPU count to avoid saturating
// the CPU with background work
private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
private static final int KEEP_ALIVE_SECONDS = 30;
private static final ThreadFactory sThreadFactory = new ThreadFactory() {
private final AtomicInteger mCount = new AtomicInteger(1);
public Thread newThread(Runnable r) {
return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
}
};
private static final BlockingQueue<Runnable> sPoolWorkQueue =
new LinkedBlockingQueue<Runnable>(128);
//并行线程池
public static final Executor THREAD_POOL_EXECUTOR;
static {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
sPoolWorkQueue, sThreadFactory);
threadPoolExecutor.allowCoreThreadTimeOut(true);
THREAD_POOL_EXECUTOR = threadPoolExecutor;
}
//任务串行
@MainThread
public final AsyncTask<Params, Progress, Result> execute(Params... params) {
return executeOnExecutor(sDefaultExecutor, params);
}
//任务并行
@MainThread
public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
Params... params) {
if (mStatus != Status.PENDING) {
switch (mStatus) {
case RUNNING:
throw new IllegalStateException("Cannot execute task:"
+ " the task is already running.");
case FINISHED:
throw new IllegalStateException("Cannot execute task:"
+ " the task has already been executed "
+ "(a task can be executed only once)");
}
}
mStatus = Status.RUNNING;
onPreExecute();
mWorker.mParams = params;
exec.execute(mFuture);
return this;
}
@MainThread
public static void execute(Runnable runnable) {
sDefaultExecutor.execute(runnable);
}
AsyncTask执行任务可以通过execute()或者executeOnExecutor()去提交并执行任务,其中execute()是串行执行任务,而executeOnExecutor是并行执行任务。其中并行线程池就是用的THREAD_POOL_EXECUTOR
,来看串行是如何实现的:
private static class SerialExecutor implements Executor {
final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
Runnable mActive;
public synchronized void execute(final Runnable r) {
mTasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
scheduleNext();
}
}
});
if (mActive == null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
if ((mActive = mTasks.poll()) != null) {
THREAD_POOL_EXECUTOR.execute(mActive);
}
}
}
串行并不是将任务直接交给THREAD_POOL_EXECUTOR
去执行,而是内部自行维护了一个mTasks
双向队列,每次有新Runnable
任务来时,先添加到队列中,然后判断当前是否有正在执行的任务(mActive != null
即表示有正在执行的任务),没有的话才会将任务提交给线程池执行;否则会等待前一个任务执行完成,然后从队列中取出新任务(按FIFO
顺序)继续交给线程池执行。
参考
【1】Java多线程复习与巩固(六)--线程池ThreadPoolExecutor详解
【2】彻底弄懂 Java 线程池原理
【3】Executors框架:https://blog.csdn.net/tongdanping/article/details/79604637
【4】AsyncTask:https://juejin.im/post/5b65c71af265da0f9402ca4a
【5】AsyncTask:https://blog.csdn.net/SEU_Calvin/article/details/52172248