一、 前言
最近对重构Dubbo服务线程池调优,工作线程使用 CachedThreadPool 线程策略,可是上线之后,出现线程池一路上升,差点导致线上事故。
所以本篇文章对线程池揭开谜底。
二、Dubbo线程池介绍
Dubbo中 CachedThreadPool源代码
package org.apache.dubbo.common.threadpool.support.cached; import org.apache.dubbo.common.URL; import org.apache.dubbo.common.threadlocal.NamedInternalThreadFactory; import org.apache.dubbo.common.threadpool.ThreadPool; import org.apache.dubbo.common.threadpool.support.AbortPolicyWithReport; import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import static org.apache.dubbo.common.constants.CommonConstants.ALIVE_KEY; import static org.apache.dubbo.common.constants.CommonConstants.CORE_THREADS_KEY; import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_ALIVE; import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_CORE_THREADS; import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_QUEUES; import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_THREAD_NAME; import static org.apache.dubbo.common.constants.CommonConstants.QUEUES_KEY; import static org.apache.dubbo.common.constants.CommonConstants.THREADS_KEY; import static org.apache.dubbo.common.constants.CommonConstants.THREAD_NAME_KEY; /** * This thread pool is self-tuned. Thread will be recycled after idle for one minute, and new thread will be created for * the upcoming request. * * @see java.util.concurrent.Executors#newCachedThreadPool() */ public class CachedThreadPool implements ThreadPool { @Override public Executor getExecutor(URL url) { //1 获取线程名称前缀 如果没有 默认是Dubbo String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME); //2. 获取线程池核心线程数大小 int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS); //3. 获取线程池最大线程数大小,默认整型最大值 int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE); //4. 获取线程池队列大小 int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES); //5. 获取线程池多长时间被回收 单位毫秒 int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE); //6. 使用JUC包里的ThreadPoolExecutor创建线程池 return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue<Runnable>() : (queues < 0 ? new LinkedBlockingQueue<Runnable>() : new LinkedBlockingQueue<Runnable>(queues)), new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url)); } }
可以看出,Dubbo本质上是使用JUC包里的ThreadPoolExecutor创建线程池,源码如下
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
大致流程图如下:
1、 当线程池小于corePoolSize
时,新任务将创建一个新的线程,即使此时线程池中存在空闲线程。
2、 当线程池达到corePoolSize
时,新提交的任务将被放入workQueue
中,等待线程池任务调度执行。
3、 当workQueue
已满,且maximumPoolSize
>corePoolSize
时,新任务会创建新线程执行任务。
4、 当提交任务数超过maximumPoolSize
时,新提交任务由RejectedExecutionHandler
处理。
5、 当线程池中超过corePoolSize
时,空闲时间达到keepAliveTime
时,关闭空闲线程。
另外,当设置了allowCoreThreadTimeOut(true)
时,线程池中corePoolSize
线程空闲时间达到keepAliveTime
也将关闭。
RejectedExecutionHandler 默认提供了四种拒绝策略
1、AbortPolicy策略:该策略会直接抛出异常,阻止系统正常工作;
2、CallerRunsPolicy策略:如果线程池的线程数量达到上限,该策略会把任务队列中的任务放在调用者线程当中运行;
3、DiscardOledestPolicy策略:该策略会丢弃任务队列中最老的一个任务,也就是当前任务队列中最先被添加进去的,马上要被执行的那个任务,并尝试再次提交。
4、DiscardPolicy策略:该策略会默默丢弃无法处理的任务,不予任何处理。当然使用此策略,业务场景中需允许任务的丢失;
值得注意的是,Dubbo中拒绝策略 AbortPolicyWithReport 实际上是继承了 ThreadPoolExecutor.AbortPolicy 策略,主要是多打印了一些关键信息和堆栈信息。