线程池原理
线程池有哪些?
一般开发者是利用 Executors 提供的统一线程创建方法,取创建不同配置的线程池,主要区别在于不同的 ExecutorService类型或者不同的初始参数。
Executors 提供了 5 种不同的线程池创建方式:
- newChachedThreadPool() ,可以用来处理大量短时间工作任务的线程池,具有如下几个特点:试图缓存线程并重用,当无缓存线程可用时,会创建新的工作线程,如果线程限制的时间超过 60秒,则被终止移除缓存;长时间闲置时,这种线程池不会消耗什么资源,内部使用 SynchronousQueue,作为工作队列。
- newFixedThreadPool(it nThreads),重用指定数目(nThreads)的线程, 采用的是无界的工作队列,任何时候最多有 nThreads 工作线程是活动的,这个意味着,如果任务数量超过了活动队列数据,将在工作队列中等待空闲线程出现。
- newSingleThreadExecutor(),它的特点是工作线程被线程为1,操作的是一个无界工作队列,索引它保证了所有任务的都是被顺序执行的,最多会有一个任务处于活动状态,并且不允许使用这改动线程池实例,因此可以避免其改变线程数目。
- newSingleThreadScheduledExecutor()和newScheduledThreadPool(int corePoolSize),创建的是个ScheduledExecutorService,可以进行定时或周期性的工作调度,区别在于单一工作线程还是多个工作线程。
- newWorkStealingPool(in parallesim),这四经常被忽略的线程池,JDK8 后才加入这个创建方法,内部会构建 ForkJoinPool ,利用 Work-Stealing 算法,并行地处理任务,不保证处理顺序。
线程池内部工作流程
- 工作队列负责存储用户提交的各个任务,这个工作队列,可以是容量为 0 的SynchronousQueue(new ChacheThreadPool),也可以是固定大小线程池 newFiexedThreadPool ,那样使用 LinkedBlockingQueue。
private fnal BlockingQueue<Runnable> workQueue;
- 内部“线程池”,是指保持工作线程的集合,线程池需要在创建中管理线程创建销毁,例如,带缓存的线程池,当任务压力较大时,线程池会创建新的工作线程。当业务压力褪去,线程会在闲置一段时间后结束线程。线程池的工作线程被抽象为静态内部类 Worker,基于 AQS 实现。
private fnal HashSet<Worker> workers = new HashSet<>();
- ThreadFactory 提供所需要的创建线程逻辑。
- 如果任务提交时被拒绝,比如线程池已经处于 Shutdown 状态,需要使用拒绝策略,Java 标准库中提供了类似 ThreadPoolExecutor.AbortPolicy 等默认实现。也可以自定义。
线程池构造函数参数说明
线程池构造函数如下:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
- corePoolSize,所谓的核心线程数,可以大致理解为长期驻留的线程数目(除非设置了allowCoreThreadTimeOut)。对于不同的线程池,这个值可能会有很大区别,比如newFixedThreadPool会将其设置为nThreads,而对于newCachedThreadPool则是为0。
- maximumPoolSize,顾名思义,就是线程不够时能够创建的最大线程数。同样进行对比,对于newFixedThreadPool,当然就是nThreads,因为其要求是固定大小,而newCachedThreadPool则是Integer.MAX_VALUE。
- keepAliveTime和TimeUnit,这两个参数指定了额外的线程能够闲置多久,显然有些线程池不需要它。
- workQueue,工作队列,必须是BlockingQueue。
线程池的生命周期
private fnal AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 真正决定了工作线程数的理论上限 private satic fnal int COUNT_BITS = Integer.SIZE - 3; private satic fnal int COUNT_MASK = (1 << COUNT_BITS) - 1; // 线程池状态,存储在数字的高位 private satic fnal int RUNNING = -1 << COUNT_BITS; … // Packing and unpacking ctl private satic int runStateOf(int c) { return c & ~COUNT_MASK; } private satic int workerCountOf(int c) { return c & COUNT_MASK; } private satic int ctlOf(int rs, int wc) { return rs | wc; }
execute 实现代码说明
public void execute(Runnable command) { … int c = ctl.get(); // 检查工作线程数目,低于corePoolSize则添加Worker if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // isRunning就是检查线程池是否被shutdown // 工作队列可能是有界的, ofer是比较友好的入队方式 if (isRunning(c) && workQueue.ofer(command)) { int recheck = ctl.get(); // 再次进行防御性检查 if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 尝试添加一个worker,如果失败以为着已经饱和或者被shutdown了 else if (!addWorker(command, false)) reject(command); }
线程池使用需要注意的问题
- 避免任务堆积。前面我说过newFixedThreadPool是创建指定数目的线程,但是其工作队列是无界的,如果工作线程数目太少,导致处理跟不上入队的速度,这就很有可能占用大量系统内存,甚至是出现OOM。诊断时,你可以使用jmap之类的工具,查看是否有大量的任务对象入队。
- 避免过度扩展线程。我们通常在处理大量短时任务时,使用缓存的线程池,比如在最新的HTTP/2 client API中,目前的默认实现就是如此。我们在创建线程池的时候,并不能准确预计任务压力有多大、数据特征是什么样子(大部分请求是1K 、 100K还是1M以上?),所以很难明确设定一个线程数目。
- 另外,如果线程数目不断增长(可以使用jstack等工具检查),也需要警惕另外一种可能性,就是线程泄漏,这种情况往往是因为任务逻辑有问题,导致工作线程迟迟不能被释放。建议你排查下线程栈,很有可能多个线程都是卡在近似的代码处。避免死锁等同步问题,对于死锁的场景和排查,
- 尽量避免在使用线程池时操作ThreadLocal,工作线程的生命周期通常都会超过任务的生命周期。
如何选择线程池大小
- 如果我们的任务主要是进行计算,那么就意味着CPU的处理能力是稀缺的资源,我们能够通过大量增加线程数提高计算能力吗?往往是不能的,如果线程太多,反倒可能导致大量的上下文切换开销。所以,这种情况下,通常建议按照CPU核的数目N或者N+1。
- 如果是需要较多等待的任务,例如I/O操作比较多,可以参考Brain Goetz推荐的计算方法:
线程数 = CPU核数 × (1 + 平均等待时间/平均工作时间)
这些时间并不能精准预计,需要根据采样或者概要分析等方式进行计算,然后在实际中验证和调整。
- 上面是仅仅考虑了CPU等限制,实际还可能受各种系统资源限制影响,例如我最近就在Mac OS X上遇到了大负载时<ephemeral端口受限>的情况。当然,我是通过扩大可用端口范围解决的,如果我们不能调整资源的容量,那么就只能限制工作线程的数目了。这里的资源可以是文件句柄、内存等。
- http://danielmendel.github.io/blog/2013/04/07/benchmarkers-beware-the-ephemeral-port-limit/