2万字,看完这篇才敢说自己真的懂线程池!(二)

简介: 线程池可以说是 Java 进阶必备的知识点了,也是面试中必备的考点,可能不少人看了一些文章后能对线程池工作原理说上一二,但这还远远不够,如果碰到比较有经验的面试官再继续追问,很可能会被吊打,

线程池提交任务的两种方式

线程池创建好了,该怎么给它提交任务,有两种方式,调用 execute 和 submit 方法,来看下这两个方法的方法签名

// 方式一:execute 方法
public void execute(Runnable command) {
}
// 方式二:ExecutorService 中 submit 的三个方法
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);

区别在于调用 execute 无返回值,而调用  submit 可以返回 Future,那么这个 Future 能到底能干啥呢,看它的接口

public interface Future<V> {
    /**
     * 取消正在执行的任务,如果任务已执行或已被取消,或者由于某些原因不能取消则返回 false
     * 如果任务未开始或者任务已开始但可以中断(mayInterruptIfRunning 为 true),则
     * 可以取消/中断此任务
     */
    boolean cancel(boolean mayInterruptIfRunning);
    /**
     * 任务在完成前是否已被取消
     */
    boolean isCancelled();
    /**
     * 正常的执行完流程流程,或抛出异常,或取消导致的任务完成都会返回 true
     */
    boolean isDone();
    /**
     * 阻塞等待任务的执行结果
     */
    V get() throws InterruptedException, ExecutionException;
    /**
     * 阻塞等待任务的执行结果,不过这里指定了时间,如果在 timeout 时间内任务还未执行完成,
     * 则抛出 TimeoutException 异常
     */
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

可以用 Future 取消任务,判断任务是否已取消/完成,甚至可以阻塞等待结果。

submit 为啥能提交任务(Runnable)的同时也能返回任务(Future)的执行结果呢

44.jpg

原来在最后执行 execute 前用 newTaskFor 将 task 封装成了 RunnableFuture,newTaskFor 返回了 FutureTask 这个类,结构图如下

45.jpg

可以看到 FutureTask 这个接口既实现了 Runnable 接口,也实现 Future 接口,所以在提交任务的同时也能利用 Future 接口来执行任务的取消,获取任务的状态,等待执行结果这些操作。

execute 与 submit 除了是否能返回执行结果这一区别外,还有一个重要区别,那就是使用 execute 执行如果发生了异常,是捕获不到的,默认会执行 ThreadGroup 的 uncaughtException 方法(下图数字 2 对应的逻辑)46.jpg

所以如果你想监控执行 execute 方法时发生的异常,需要通过 threadFactory 来指定一个 UncaughtExceptionHandler,这样就会执行上图中的 1,进而执行 UncaughtExceptionHandler 中的逻辑,如下所示:

//1.实现一个自己的线程池工厂
ThreadFactory factory = (Runnable r) -> {
    //创建一个线程
    Thread t = new Thread(r);
    //给创建的线程设置UncaughtExceptionHandler对象 里面实现异常的默认逻辑
    t.setDefaultUncaughtExceptionHandler((Thread thread1, Throwable e) -> {
        // 在此设置统计监控逻辑
        System.out.println("线程工厂设置的exceptionHandler" + e.getMessage());
    });
    return t;
};
// 2.创建一个自己定义的线程池,使用自己定义的线程工厂
ExecutorService service = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,new LinkedBlockingQueue(10),factory);
//3.提交任务
service.execute(()->{
    int i=1/0;
});

执行以上逻辑最终会输出「线程工厂设置的exceptionHandler/ by zero」,通过这样的方式就能通过设定的 defaultUncaughtExceptionHandler 来执行我们的监控逻辑了。

如果用 submit ,如何捕获异常呢,当我们调用 future.get 就可以捕获

Callable testCallable = xxx;
Future future = executor.submit(myCallable);
try {
    future1.get(3));
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}

那么 future 为啥在 get 的时候才捕获异步呢,因为在执行 submit 时抛出异常后此异常被保存了起来,而在 get 的时候才被抛出

47.jpg

关于 execute 和 submit 的执行流程 why 神的这篇文章写得非常透彻,我就不拾人牙慧了,建议大家好好品品,收获会很大!

ThreadPoolExecutor 源码剖析

前面铺垫了这么多,终于到了最核心的源码剖析环节了。

对于线程池来说,我们最关心的是它的「状态」和「可运行的线程数量」,一般来说我们可以选择用两个变量来记录,不过 Doug Lea 只用了一个变量(ctl)就达成目的了,我们知道变量越多,代码的可维护性就越差,也越容易出 bug, 所以只用一个变量就达成了两个变量的效果,这让代码的可维护性大大提高,那么他是怎么设计的呢

// ThreadPoolExecutor.java
public class ThreadPoolExecutor extends AbstractExecutorService {
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    // 结果:111 00000000000000000000000000000
    private static final int RUNNING    = -1 << COUNT_BITS;
    // 结果: 000 00000000000000000000000000000
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    // 结果: 001 00000000000000000000000000000
    private static final int STOP       =  1 << COUNT_BITS;
    // 结果: 010 00000000000000000000000000000
    private static final int TIDYING    =  2 << COUNT_BITS;
    // 结果: 011 00000000000000000000000000000
    private static final int TERMINATED =  3 << COUNT_BITS;
    // 获取线程池的状态
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    // 获取线程数量
    private static int workerCountOf(int c)  { return c & CAPACITY; }
}

可以看到,ctl 是一个 原子类的 Integer 变量,有 32 位,低 29 位表示线程数量, 29 位最大可以表示 (2^29)-1 (大概 5 亿多),足够记录线程大小了,如果未来还是不够,可以把 ctl 声明为 AtomicLong,高 3 位用来表示线程池的状态,3 位可以表示 8 个线程池的状态,由于线程池总共只有五个状态,所以 3 位也是足够了,线程池的五个状态如下

  • RUNNING: 接收新的任务,并能继续处理 workQueue 中的任务
  • SHUTDOWN: 不再接收新的任务,不过能继续处理 workQueue 中的任务
  • STOP: 不再接收新的任务,也不再处理 workQueue 中的任务,并且会中断正在处理任务的线程
  • TIDYING: 所有的任务都完结了,并且线程数量(workCount)为 0 时即为此状态,进入此状态后会调用 terminated() 这个钩子方法进入 TERMINATED 状态
  • TERMINATED: 调用 terminated() 方法后即为此状态

线程池的状态流转及触发条件如下48.jpg

有了这些基础,我们来分析下 execute 的源码

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    // 如果当前线程数少于核心线程数(corePoolSize),无论核心线程是否忙碌,都创建线程,直到达到 corePoolSize 为止
    if (workerCountOf(c) < corePoolSize) {
        // 创建线程并将此任务交给 worker 处理(此时此任务即 worker 中的 firstTask)
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 如果线程池处于 RUNNING 状态,并且线程数大于 corePoolSize 或者 
    // 线程数少于 corePoolSize 但创建线程失败了,则将任务丢进 workQueue 中
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 这里需要再次检查线程池是否处于 RUNNING 状态,因为在任务入队后可能线程池状态会发生变化,(比如调用了 shutdown 方法等),如果线程状态发生变化了,则移除此任务,执行拒绝策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 如果线程池在 RUNNING 状态下,线程数为 0,则新建线程加速处理 workQueue 中的任务
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 这段逻辑说明线程数大于 corePoolSize 且任务入队失败了,此时会以最大线程数(maximumPoolSize)为界来创建线程,如果失败,说明线程数超过了 maximumPoolSize,则执行拒绝策略
    else if (!addWorker(command, false))
        reject(command);
}

从这段代码中可以看到,创建线程是调用 addWorker 实现的,在分析 addWorker 之前,有必要简单提一下 Worker,线程池把每一个执行任务的线程都封装为 Worker 的形式,取名为 Worker 很形象,线程池的本质是生产者-消费者模型,生产者不断地往 workQueue 中丢 task, workQueue 就像流水线一样不断地输送着任务,而 worker(工人) 不断地取任务来执行

image.gif那么问题来了,为啥要把线程封装到 worker 中呢,线程池拿到 task 后直接丢给线程处理或者让线程自己去 workQueue 中处理不就完了?

将线程封装为 worker 主要是为了更好地管理线程的中断

来看下 Worker 的定义

// 此处可以看出 worker 既是一个 Runnable 任务,也实现了 AQS(实际上是用 AQS 实现了一个独占锁,这样由于 worker 运行时会上锁,执行 shutdown,setCorePoolSize,setMaximumPoolSize等方法时会试着中断线程(interruptIdleWorkers) ,在这个方法中断方法中会先尝试获取 worker 的锁,如果不成功,说明 worker 在运行中,此时会先让 worker 执行完任务再关闭 worker 的线程,实现优雅关闭线程的目的)
private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
    {
        private static final long serialVersionUID = 6138294804551838833L;
        // 实际执行任务的线程
        final Thread thread;
        // 上文提到,如果当前线程数少于核心线程数,创建线程并将提交的任务交给 worker 处理处理,此时 firstTask 即为此提交的任务,如果 worker 从 workQueue 中获取任务,则 firstTask 为空
        Runnable firstTask;
        // 统计完成的任务数
        volatile long completedTasks;
        Worker(Runnable firstTask) {
            // 初始化为 -1,这样在线程运行前(调用runWorker)禁止中断,在 interruptIfStarted() 方法中会判断 getState()>=0
            setState(-1); 
            this.firstTask = firstTask;
            // 根据线程池的 threadFactory 创建一个线程,将 worker 本身传给线程(因为 worker 实现了 Runnable 接口)
            this.thread = getThreadFactory().newThread(this);
        }
        public void run() {
            // thread 启动后会调用此方法
            runWorker(this);
        }
        // 1 代表被锁住了,0 代表未锁
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }
        // 尝试获取锁
        protected boolean tryAcquire(int unused) {
            // 从这里可以看出它是一个独占锁,因为当获取锁后,cas 设置 state 不可能成功,这里我们也能明白上文中将 state 设置为 -1 的作用,这种情况下永远不可能获取得锁,而 worker 要被中断首先必须获取锁
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        // 尝试释放锁
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }    
        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }
        // 中断线程,这个方法会被 shutdowNow 调用,从中可以看出 shutdownNow 要中断线程不需要获取锁,也就是说如果线程正在运行,照样会给你中断掉,所以一般来说我们不用 shutdowNow 来中断线程,太粗暴了,中断时线程很可能在执行任务,影响任务执行
        void interruptIfStarted() {
            Thread t;
            // 中断也是有条件的,必须是 state >= 0 且 t != null 且线程未被中断
            // 如果 state == -1 ,不执行中断,再次明白了为啥上文中 setState(-1) 的意义
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

通过上文对 Worker 类的分析,相信大家不难理解 将线程封装为 worker 主要是为了更好地管理线程的中断 这句话。

理解了 Worker 的意义,我们再来看 addWorker 的方法

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        // 获取线程池的状态
        int rs = runStateOf(c);
        // 如果线程池的状态 >= SHUTDOWN,即为 SHUTDOWN,STOP,TIDYING,TERMINATED 这四个状态,只有一种情况有可能创建线程,即线程状态为 SHUTDOWN, 且队列非空时,firstTask == null 代表创建一个不接收新任务的线程(此线程会从 workQueue 中获取任务再执行),这种情况下创建线程是为了加速处理完 workQueue 中的任务
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        for (;;) {
            // 获取线程数
            int wc = workerCountOf(c);
            // 如果超过了线程池的最大 CAPACITY(5 亿多,基本不可能)
            // 或者 超过了 corePoolSize(core 为 true) 或者 maximumPoolSize(core 为 false) 时
            // 则返回 false
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 否则 CAS 增加线程的数量,如果成功跳出双重循环
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            // 如果线程运行状态发生变化,跳到外层循环继续执行
            if (runStateOf(c) != rs)
                continue retry;
            // 说明是因为 CAS 增加线程数量失败所致,继续执行 retry 的内层循环
        }
    }
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 能执行到这里,说明满足增加 worker 的条件了,所以创建 worker,准备添加进线程池中执行任务
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            // 加锁,是因为下文要把 w 添加进 workers 中, workers 是 HashSet,不是线程安全的,所以需要加锁予以保证
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                //  再次 check 线程池的状态以防执行到此步时发生中断等
                int rs = runStateOf(ctl.get());
                // 如果线程池状态小于 SHUTDOWN(即为 RUNNING),
                // 或者状态为 SHUTDOWN 但 firstTask == null(代表不接收任务,只是创建线程处理 workQueue 中的任务),则满足添加 worker 的条件
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                                        // 如果线程已启动,显然有问题(因为创建 worker 后,还没启动线程呢),抛出异常
                    if (t.isAlive()) 
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    // 记录最大的线程池大小以作监控之用
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            // 说明往 workers 中添加 worker 成功,此时启动线程
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        // 添加线程失败,执行 addWorkerFailed 方法,主要做了将 worker 从 workers 中移除,减少线程数,并尝试着关闭线程池这样的操作
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

从这段代码我们可以看到多线程下情况的不可预料性,我们发现在满足条件情况下,又对线程状态重新进行了 check,以防期间出现中断等线程池状态发生变更的操作,这也给我们以启发:多线程环境下的各种临界条件一定要考虑到位。

执行 addWorker 创建 worker 成功后,线程开始执行了(t.start()),由于在创建 Worker 时,将 Worker  自己传给了此线程,所以启动线程后,会调用  Worker 的 run 方法

public void run() {
    runWorker(this);
}

可以看到最终会调用  runWorker 方法,接下来我们来分析下 runWorker 方法

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    // unlock 会调用 tryRelease 方法将 state 设置成 0,代表允许中断,允许中断的条件上文我们在 interruptIfStarted() 中有提过,即 state >= 0
    w.unlock();
    boolean completedAbruptly = true;
    try {
        // 如果在提交任务时创建了线程,并把任务丢给此线程,则会先执行此 task
        // 否则从任务队列中获取 task 来执行(即 getTask() 方法)
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // 如果线程池状态为 >= STOP(即 STOP,TIDYING,TERMINATED )时,则线程应该中断
            // 如果线程池状态 < STOP, 线程不应该中断,如果中断了(Thread.interrupted() 返回 true,并清除标志位),再次判断线程池状态(防止在清除标志位时执行了 shutdownNow() 这样的方法),如果此时线程池为 STOP,执行线程中断
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 执行任务前,子类可实现此钩子方法作为统计之用
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 执行任务后,子类可实现此钩子方法作为统计之用
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 如果执行到这只有两种可能,一种是执行过程中异常中断了,一种是队列里没有任务了,从这里可以看出线程没有核心线程与非核心线程之分,哪个任务异常了或者正常退出了都会执行此方法,此方法会根据情况将线程数-1
        processWorkerExit(w, completedAbruptly);
    }
}

来看看 processWorkerExit 方法是咋样的

private void processWorkerExit(Worker w, boolean completedAbruptly) {
        // 如果异常退出,cas 执行线程池减 1 操作
    if (completedAbruptly) 
        decrementWorkerCount();
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        // 加锁确保线程安全地移除 worker 
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    // woker 既然异常退出,可能线程池状态变了(如执行 shutdown 等),尝试着关闭线程池
    tryTerminate();
    int c = ctl.get();
    //  如果线程池处于 STOP 状态,则如果 woker 是异常退出的,重新新增一个 woker,如果是正常退出的,在 wokerQueue 为非空的条件下,确保至少有一个线程在运行以执行 wokerQueue 中的任务    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

接下来我们分析 woker 从 workQueue 中取任务的方法 getTask

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // 如果线程池状态至少为 STOP 或者
        // 线程池状态 == SHUTDOWN 并且任务队列是空的
        // 则减少线程数量,返回 null,这种情况下上文分析的 runWorker 会执行 processWorkerExit 从而让获取此 Task 的 woker 退出
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        int wc = workerCountOf(c);
        // 如果 allowCoreThreadTimeOut 为 true,代表任何线程在 keepAliveTime 时间内处于 idle 状态都会被回收,如果线程数大于 corePoolSize,本身在 keepAliveTime 时间内处于 idle 状态就会被回收
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        // worker 应该被回收的几个条件,这个比较简单,就此略过
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
           // 阻塞获取 task,如果在 keepAliveTime 时间内未获取任务,说明超时了,此时 timedOut 为 true
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

经过以上源码剖析,相信我们对线程池的工作原理了解得八九不离十了,再来简单过一下其他一些比较有用的方法,开头我们提到线程池的监控问题,我们看一下可以监控哪些指标

  • int getCorePoolSize():获取核心线程数。
  • int getLargestPoolSize():历史峰值线程数。
  • int getMaximumPoolSize():最大线程数(线程池线程容量)。
  • int getActiveCount():当前活跃线程数
  • int getPoolSize():当前线程池中的线程总数
  • BlockingQueuegetQueue() 当前线程池的任务队列,据此可以获取积压任务的总数,getQueue.size()

监控思路也很简单,开启一个定时线程 ScheduledThreadPoolExecutor,定期对这些线程池指标进行采集,一般会采用一些开源工具如 Grafana + Prometheus + MicroMeter 来实现。

如何实现核心线程池的预热

使用  prestartAllCoreThreads() 方法,这个方法会一次性创建 corePoolSize 个线程,无需等到提交任务时才创建,提交创建好线程的话,一有任务提交过来,这些线程就可以立即处理。

如何实现动态调整线程池参数

  • setCorePoolSize(int corePoolSize) 调整核心线程池大小
  • setMaximumPoolSize(int maximumPoolSize)
  • setKeepAliveTime() 设置线程的存活时间

解答开篇的问题

其它问题基本都在源码剖析环节回答了,这里简单说下其他问题

1、Tomcat 的线程池和 JDK 的线程池实现有啥区别, Dubbo 中有类似 Tomcat 的线程池实现吗? Dubbo 中一个叫 EagerThreadPool 的东西,可以看看它的使用说明50.jpg

从注释里可以看出,如果核心线程都处于 busy 状态,如果有新的请求进来,EagerThreadPool 会选择先创建线程,而不是将其放入任务队列中,这样可以更快地响应这些请求。

Tomcat 实现也是与此类似的,只不过稍微有所不同,当 Tomcat 启动时,会先创建 minSpareThreads 个线程,如果经过一段时间收到请求时这些线程都处于忙碌状态,每次都会以 minSpareThreads 的步长创建线程,本质上也是为了更快地响应处理请求。具体的源码可以看它的 ThreadPool 实现,这里就不展开了。

2、我司网关 dubbo 调用线程池曾经出现过这样的一个问题:压测时接口可以正常返回,但接口 RT 很高,假设设置的核心线程大小为 500,最大线程为 800,缓冲队列为 5000,你能从这个设置中发现出一些问题并对这些参数进行调优吗?这个参数明显能看出问题来,首先任务队列设置过大,任务达到核心线程后,如果再有请求进来会先进入任务队列,队列满了之后才创建线程,创建线程也是需要不少开销的,所以我们后来把核心线程设置成了与最大线程一样,并且调用 prestartAllCoreThreads() 来预热核心线程,就不用等请求来时再创建线程了。

线程池的几个最佳实践

1、线程池执行的任务应该是互相独立的,如果互相依赖的话,可能导致死锁,比如下面这样的代码

ExecutorService pool = Executors
  .newSingleThreadExecutor();
pool.submit(() -> {
  try {
    String qq=pool.submit(()->"QQ").get();
    System.out.println(qq);
  } catch (Exception e) {
  }
});

2、核心任务与非核心任务最好能用多个线程池隔离开来

曾经我们业务上就出现这样的一个故障:突然很多用户反馈短信收不到了,排查才发现发短信是在一个线程池里,而另外的定时脚本也是用的这个线程池来执行任务,这个脚本一分钟可能产生几百上千条任务,导致发短信的方法在线程池里基本没机会执行,后来我们用了两个线程池把发短信和执行脚本隔离开来解决了问题。

3、添加线程池监控,动态设置线程池

如前文所述,线程池的各个参数很难一次性确定,既然难以确定,又要保证发现问题后及时解决,我们就需要为线程池增加监控,监控队列大小,线程数量等,我们可以设置 3 分钟内比如队列任务一直都是满了的话,就触发告警,这样可以提前预警,如果线上因为线程池参数设置不合理而触发了降级等操作,可以通过动态设置线程池的方式来实时修改核心线程数,最大线程数等,将问题及时修复。

总结

本文详细剖析了线程池的工作原理,相信大家对其工作机制应该有了较深入的了解,也对开头的几个问题有了较清楚的认识,本质上设置线程池的目的是为了利用有效的资源最大化性能,最小化风险,同时线程池的使用本质上是为了更好地为用户服务,据此也不难明白 Tomcat, Dubbo 要另起炉灶来设置自己的线程池了。

最后欢迎大家加我私人微信,一起讨论,共同进步,拉你进读者群,2020 难过,我们一起抱团取暖!

巨人的肩膀


最后欢迎大家加我好友,拉你进技术交流群,群里有很多 BAT 的大咖,可以提问,互相交流,内推等,进群一起抱团取暖



巨人的肩膀

相关文章
|
8月前
|
安全 Java
Java多线程(全知识点)(下)
Java多线程(全知识点)(下)
74 0
|
2月前
|
算法 安全 Linux
万字详解并发编程!!!
本文介绍了并发编程的基本概念和技术,涵盖了操作系统的发展历程、进程与线程的原理和使用方法。主要内容包括: 操作系统发展史:从手工操作到多道程序系统、分时系统、实时系统,再到通用操作系统,逐步介绍了操作系统的演变过程。 并发编程技术:强调并发编程的目标是充分利用CPU资源,提高系统性能 进程:详细讲解了进程的概念、组成、状态、调度算法、进程间通信(IPC)以及守护进程和僵尸进程等问题。 线:介绍了线程的基本概念、与进程的区别、线程的创建、多线程共享资源、线程同步与互斥锁、递归锁和死锁问题 5. **队列**:讲解了队列的基本概念,包括先进先出队列、后进先出队列和优先级队列,并提供了具体的实现示例
145 38
|
8月前
|
Java 调度
Java多线程(全知识点)(上)
Java多线程(全知识点)
87 0
|
6月前
|
安全 Java 容器
第一篇:并发容器学习开篇介绍
第一篇:并发容器学习开篇介绍
50 4
|
8月前
|
缓存 Java 编译器
第一章 Java线程池技术应用
第一章 Java线程池技术应用
42 0
|
Java 编译器
java并发编程专栏(十一)
java并发编程专栏(十一)
70 0
|
存储 缓存 Java
Java线程池笔记
总结一下Java线程池相关八股文
230 0
|
缓存 Java 调度
手把手入门学习:Java线程池(代码详解)
本篇整理了线程池相关的代码案例,整理了好几天才实践完这篇文章,也希望自己对线程池有更深入的理解,欢迎阅读学习交流,分享获取新知,大家一起进步!
138 0
手把手入门学习:Java线程池(代码详解)
|
Java 开发者
手撕源码!线程池核心组件源码剖析
看源码之前,先了解一下该组件 最主要的几个 接口、抽象类和实现类的结构关系。
177 0
手撕源码!线程池核心组件源码剖析
|
存储 安全 Java
(十六)关于Java多线程锁的升级原理,这篇文章会让你另有收获
对象头用于存储对象的元数据信息,包括运行时数据和类型指针、实例数据存储的是真正有效数据、对齐填充主要补充字节,使得内存所占字节能被8整除。