线程池的内部结构
如图所示,线程池的内部结构主要由线程池管理器、工作线程、任务队列以及任务四部分组成。
线程池的阻塞队列
先上张图,表格左侧是线程池,右侧为它们对应的阻塞队列,你可以看到 5 种线程池对应了 3 种阻塞队列。
下面逐一说下它们的特点:
- LinkedBlockingQueue,底层是链表结构、采用先进先出原则,默认容量是 Integer.MAX_VALUE,几乎可以认为是无界队列(几乎不可能达到这个数)。而由于 FixedThreadPool 和 SingleThreadExecutor 的线程数是固定的,所以只能用容量无穷大的队列来存任务。
- SynchronousQueue,容量为 0,不做存储,只做转发。由于 CachedThreadPool 的最大线程数是 Integer.MAX_VALUE,有任务提交就转发给线程或者创建新线程来执行,并不需要队列存储任务。所以在自定义使用 SynchronousQueue 的线程池应该把最大线程数设置得尽量大,避免任务数大于最大线程数时,没办法把任务放到队列中也没有足够线程来执行任务的情况。
- DelayedWorkQueue,内部用的是堆数据结构,初始容量为 16,跟 hashmap 一样动态扩容,对任务延时长短进行排序。
为什么不自动创建线程池?
阿里巴巴 Java 规约也约定了,手动创建线程池,效果会更好。为什么呢?回答这个问题之前,先来看看五种线程池初始化的方法:
// FixedThreadPool public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0 L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue < Runnable > ()); } // SingleThreadExecutor public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0 L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue < Runnable > ())); }
首先是 FixedThreadPool 和 SingleThreadExecutor,它两的问题在于都是用默认容量的无界队列 LinkedBlockingQueue,当任务处理慢时,队列迅速积压任务并占用大量内存,发生 OOM(内存溢出)。所以在使用时我们可以根据业务指定队列长度:
ExecutorService threadPool = new ThreadPoolExecutor(2, 5, 1 L, TimeUnit.SECONDS, new LinkedBlockingQueue < > (3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
然后是 CachedThreadPool,也可以发现一个问题:它默认的最大线程是 Integer.MAX_VALUE,当任务贼多时,它就会不断创建线程,而线程执行比较耗时来不及回收。最终也会造成 OOM,所以应该手动指定最大线程数。
// CachedThreadPool public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60 L, TimeUnit.SECONDS, new SynchronousQueue < Runnable > ()); }
最后是 ScheduledThreadPool 和 ScheduledThreadPoolExecutor,这两问题就更大了。首先最大线程数是 Integer.MAX_VALUE,然后阻塞队列是 DelayedWorkQueue,它也是无界队列,最终还是会造成 OOM。
// ScheduledThreadPool public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } // ScheduledThreadPoolExecutor public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }