线程池源码也是面试经常被提问到的点,我会将全局源码做一分析,然后告诉你面试考啥,怎么答。
为什么要用线程池?
简洁的答两点就行。
- 降低系统资源消耗。
- 提高线程可控性。
如何创建使用线程池?
JDK8提供了五种创建线程池的方法:
1.创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
2.(JDK8新增)会根据所需的并发数来动态创建和关闭线程。能够合理的使用CPU进行对任务进行并发操作,所以适合使用在很耗时的任务。
注意返回的是ForkJoinPool对象。
public static ExecutorService newWorkStealingPool(int parallelism) { return new ForkJoinPool (parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); }
什么是ForkJoinPool:
public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, boolean asyncMode) { this(checkParallelism(parallelism), checkFactory(factory), handler, asyncMode ? FIFO_QUEUE : LIFO_QUEUE, "ForkJoinPool-" + nextPoolId() + "-worker-"); checkPermission(); }
使用一个无限队列来保存需要执行的任务,可以传入线程的数量;不传入,则默认使用当前计算机中可用的cpu数量;使用分治法来解决问题,使用fork()和join()来进行调用。
3.创建一个可缓存的线程池,可灵活回收空闲线程,若无可回收,则新建线程。
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
4.创建一个单线程的线程池。
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
5.创建一个定长线程池,支持定时及周期性任务执行。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }
上层源码结构分析
Executor结构:
Executor
一个运行新任务的简单接口
public interface Executor { void execute(Runnable command); }
ExecutorService
扩展了Executor接口。添加了一些用来管理执行器生命周期和任务生命周期的方法
AbstractExecutorService
对ExecutorService接口的抽象类实现。不是我们分析的重点。
ThreadPoolExecutor
Java线程池的核心实现。
ThreadPoolExecutor源码分析
属性解释
// AtomicInteger是原子类 ctlOf()返回值为RUNNING; private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 高3位表示线程状态 private static final int COUNT_BITS = Integer.SIZE - 3; // 低29位表示workerCount容量 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits // 能接收任务且能处理阻塞队列中的任务 private static final int RUNNING = -1 << COUNT_BITS; // 不能接收新任务,但可以处理队列中的任务。 private static final int SHUTDOWN = 0 << COUNT_BITS; // 不接收新任务,不处理队列任务。 private static final int STOP = 1 << COUNT_BITS; // 所有任务都终止 private static final int TIDYING = 2 << COUNT_BITS; // 什么都不做 private static final int TERMINATED = 3 << COUNT_BITS; // 存放任务的阻塞队列 private final BlockingQueue<Runnable> workQueue;
值的注意的是状态值越大线程越不活跃。