Java线程池详解

简介: Java线程池详解

1. 核心参数


核心线程数(corePoolSize)


核心池大小,既然如前原理部分所述。需要注意的是在初创建线程池时线程不会立即启动,直到有任务提交才开始启动线程并逐渐时线程数目达到corePoolSize。若想一开始就创建所有核心线程需调用prestartAllCoreThreads方法


最大线程数(maximumPoolSize)


池中允许的最大线程数。需要注意的是当核心线程满且阻塞队列也满时才会判断当前线程数是否小于最大线程数,并决定是否创建新线程


空闲线程存活时间(keepAliveTime)


当线程数大于核心时,多于的空闲线程最多存活时间


存活时间单位(unit)


keepAliveTime 参数的时间单位


工作队列


当线程数目超过核心线程数时用于保存任务的队列,此队列仅保存实现Runnable接口的任务。
主要有3种类型的BlockingQueue可供选择:


  • 无界队列
    队列大小无限制,常用的为无界的LinkedBlockingQueue,使用该队列做为阻塞队列时要尤其当心,当任务耗时较长时可能会导致大量新任务在队列中堆积最终导致OOM。阅读代码发现,Executors.newFixedThreadPool采用就是LinkedBlockingQueue
  • 有界队列
    常用的有两类:
  • FIFO原则的队列如ArrayBlockingQueue
  • 优先级队列如PriorityBlockingQueue。PriorityBlockingQueue中的优先级由任务的Comparator决定

使用有界队列时队列大小需和线程池大小互相配合,线程池较小有界队列较大时可减少内存消耗,降低cpu使用率和上下文切换,但是可能会限制系统吞吐量。

  • 同步移交
    如果不希望任务在队列中等待而是希望将任务直接移交给工作线程,可使用SynchronousQueue作为等待队列。SynchronousQueue不是一个真正的队列,而是一种线程之间移交的机制。要将一个元素放入SynchronousQueue中,必须有另一个线程正在等待接收这个元素。

只有在使用无界线程池或者有饱和策略时才建议使用该队列。



线程工厂(threadFactory)


拒绝策略(rejectedHandler)


阻塞队列已满且线程数达到最大值时所采取的饱和策略。java默认提供了4种饱和策略的实现方式:


  • 中止(AbortPolicy)
    抛出错误RejectedExecutionException

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

           throw new RejectedExecutionException("Task " + r.toString() +

                                                " rejected from " +

                                                e.toString());

}


  • 抛弃当前(DiscardPolicy)
    抛弃当前的Runnable,这里是一个空方法,不执行任务

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

}


  • 抛弃最旧的(DiscardOldestPolicy)
    在队列中弹出队首的任务,执行当前任务

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

           if (!e.isShutdown()) {

               e.getQueue().poll();

               e.execute(r);

           }

       }


  • 调用者运行(CallerRunsPolicy)
    直接运行任务

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

           if (!e.isShutdown()) {

               r.run();

           }

       }



2. 工作原理


数据字典


网络异常,图片无法展示
|

线程池的数据字典主要存储 线程池工作状态 工作线程数量 ,通过一个32位的Integer类型的原子类对象进行管理和维护,高3位存储线程池工作状态,低29位存储工作线程数量。


// 创建一个原子类对象用于计算线程的中状态

   private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

   // integer.size 为 32

   private static final int COUNT_BITS = Integer.SIZE - 3;

   // 低29位存线程池数量,这里默认为二进制的29个1,即最大值

   private static final int CAPACITY   = (1 << COUNT_BITS) - 1;


   // 即高3位为111,低29位为0,该状态的线程池会接收新任务,也会处理在阻塞队列中等待处理的任务

   private static final int RUNNING    = -1 << COUNT_BITS;

   // 即高3位为000,低29位为0,该状态的线程池不会再接收新任务,但还会处理已经提交到阻塞队列中等待处理的任务

   private static final int SHUTDOWN   =  0 << COUNT_BITS;

   // 即高3位为001,低29位为0,该状态的线程池不会再接收新任务,不会处理在阻塞队列中等待的任务,而且还会中断正在运行的任务

   private static final int STOP       =  1 << COUNT_BITS;

   // 即高3位为010,低29位为0,所有任务都被终止了,workerCount为0,为此状态时还将调用terminated()方法

   private static final int TIDYING    =  2 << COUNT_BITS;

   // 即高3位为100,低29位为0,terminated()方法调用完成后变成此状态  

   private static final int TERMINATED =  3 << COUNT_BITS;


   // 用户计算线程的状态 32位中 高3位为1 低29位为0

   private static int runStateOf(int c)     { return c & ~CAPACITY; }

   // 用于计算线程池中线程的数量 32位中 高3位为0  低29位为1

   private static int workerCountOf(int c)  { return c & CAPACITY; }

   // rs 为 runState, wc 为 workerCount 通过工作状态和线程数量来计算出 ctl

   private static int ctlOf(int rs, int wc) { return rs | wc; }

字段

功能

实现

ctl

线程池状态、数量控制

AtomicInteger

runState

线程池状态

Integer(32位) 高3位控制

workCount

线程池数量

Integer(32位)低29位控制


线程池生命周期(RunState LifeCycle)

状态

含义

RUNNING

接收新任务,处理工作队列任务

SHUTDOWN

不再接收新任务,但是可以处理工作队列中任务

STOP

不再接收新任务,也不处理工作队列中的任务,打断正在进行中的任务

TIDYING

所有任务已经停止,工作线程数为0,下阶段将执行terminated() 钩子方法

TERMINATED

terminated()方法执行完成


工作流程


任务提交方式

任务提交方式

返回值

exec.execute(runnable)

exec.submit(runnable)


工作流程时序


核心方法


ThreadPoolExecutor.execute()


执行线程任务逻辑


public void execute(Runnable command) {

       if (command == null)

           throw new NullPointerException();

       // 获取ctl值

       int c = ctl.get();

       // workerCountOf计算当前工作线程数,如果比corePoolSize小

       if (workerCountOf(c) < corePoolSize) {

        // 执行addWorker方法

           if (addWorker(command, true))

               return;

           c = ctl.get();

       }

       // 执行到这里说明,工作线程数是不小于核心线程数的,需要进入队列处理

       // isRunning()判断线程池是否在运行状态,且放入工作队列queue线程任务成功

       if (isRunning(c) && workQueue.offer(command)) {

           int recheck = ctl.get();

           // 这里再次校验线程池工作状态

           // 如果是非running状态且执行remove成功则执行reject拒绝策略

           if (! isRunning(recheck) && remove(command))

               reject(command);

           // 如果不满足,则判断工作线程数是否为0

           else if (workerCountOf(recheck) == 0)

               addWorker(null, false);

       }

       // 不满足,则执行reject拒绝策略

       else if (!addWorker(command, false))

           reject(command);

   }


ThreadPoolExecutor.submit()


执行线程任务,支持返回值


public Future<?> submit(Runnable task) {

       if (task == null) throw new NullPointerException();

       RunnableFuture<Void> ftask = newTaskFor(task, null);

       execute(ftask);//执行的还是execute方法

       return ftask;

   }


executesubmit方法对比


方法

特殊处理

返回值

execute

submit

封装成RunnableFuture对象

支持


ThreadPoolExecutor.addWorker()


添加Worker工作线程任务的方法


private boolean addWorker(Runnable firstTask, boolean core) {

// 【1】主要是校验线程池状态,更新工作线程数量

// 循环断点标记

       retry:

       for (;;) {

        // 获取ctl元数据

           int c = ctl.get();

           // 获取线程池工作状态

           int rs = runStateOf(c);


           // Check if queue empty only if necessary.

           // 检查线程池工作状态、工作队列情况,保持可运行,否则直接返回

           if (rs >= SHUTDOWN &&

               ! (rs == SHUTDOWN &&

                  firstTask == null &&

                  ! workQueue.isEmpty()))

               return false;


           for (;;) {

            // 获取工作线程数

               int wc = workerCountOf(c);

               // 工作线程数校验,当工作线程数大于最大容量(几乎不可能)或 大于最大核心线程/最大线程数

               if (wc >= CAPACITY ||

                   wc >= (core ? corePoolSize : maximumPoolSize))

                   return false;

               // 增加工作线程数并更新ctl,更新成功则跳出该循环

               if (compareAndIncrementWorkerCount(c))

                   break retry;

               //重新读取线程池的ctl

               c = ctl.get();  // Re-read ctl

               // 如果CAS失败,则继续整个大循环进行更新逻辑

               if (runStateOf(c) != rs)

                   continue retry;

               // else CAS failed due to workerCount change; retry inner loop

           }

       }

 

//【2】线程任务入队操作

// worker启动完成的标识

       boolean workerStarted = false;

       // worker添加完成的标识

       boolean workerAdded = false;

       Worker w = null;

       try {

        //这里的Worker实现了Runnable,增加了线程任务的属性,new Worker(firstTask)开始创建线程,把传递进来的firstTask和Worker绑定了

           w = new Worker(firstTask);

           final Thread t = w.thread;

           if (t != null) {

               final ReentrantLock mainLock = this.mainLock;

               // ThreadPoolExecutor线程池的可重入锁控制

               mainLock.lock();

               try {

                   // Recheck while holding lock.

                   // Back out on ThreadFactory failure or if

                   // shut down before lock acquired.

                   // 获取线程池运行状态

                   int rs = runStateOf(ctl.get());

    // 如果是正常运行状态则进执行逻辑

                   if (rs < SHUTDOWN ||

                       (rs == SHUTDOWN && firstTask == null)) {

                       //防止线程被中断

                       if (t.isAlive()) // precheck that t is startable

                           throw new IllegalThreadStateException();

                       //存储Worker,这里使用的是HashSet<Worker>容器

                       workers.add(w);

                       //记录largestPoolSize,这里记录线程池水平情况,因为在执行addWorker方法之前已经做了MaxiumPoolSize判断,因此这里largestPoolSize<=MaxiumPoolSize;这里也可以把HashSet<Worker>当做是真实在执行线程的容器

                       int s = workers.size();

                       if (s > largestPoolSize)

                           largestPoolSize = s;

                       workerAdded = true;

                   }

               } finally {

                   mainLock.unlock();

               }

               //启动任务

               if (workerAdded) {

                //这里实际执行的是Worker中的run()方法,可以参考下面

                   t.start();

                   workerStarted = true;

               }

           }

       } finally {

        // 兜底处理,如果worker运行失败则执行

           if (! workerStarted)

            // 回滚ctl计数,在HashSet<Worker>移除Worker

               addWorkerFailed(w);

       }

       return workerStarted;

   }


ThreadPoolExecutor.reject()


线程池的拒绝策略执行方法


final void reject(Runnable command) {

// 根据配置的拒绝策略handler执行具体实现方法

       handler.rejectedExecution(command, this);

   }


ThreadPoolExecutor#Worker.runWorker()


ThreadPoolExecutor#Worker是一个内部类,它继承了AQS类,实现了Runnable接口,因此它具备线程、信号量同步等基础功能,它是ThreadPoolExecutor对线程任务内容的内部封装和加强


public void run() {

           runWorker(this);

}


final void runWorker(Worker w) {

       Thread wt = Thread.currentThread();

       Runnable task = w.firstTask;

       w.firstTask = null;

       w.unlock(); // allow interrupts

       //是否执行完成的标记,为最后的processWorkerExit方法提供决策判断

       boolean completedAbruptly = true;

       try {

        //这里会执行task线程任务

        //task != null 代表当前Worker封装的任务,有则拿当前Worker的任务,而且短路不执行后面

        //task = getTask()) != null 如果Worker当前封装的任务为空,要在工作队列queue中去拿任务,如果不为空则执行,为空

           while (task != null || (task = getTask()) != null) {

            //加锁 锁的是当前工作线程

               w.lock();

               // If pool is stopping, ensure thread is interrupted;

               // if not, ensure thread is not interrupted.  This

               // requires a recheck in second case to deal with

               // shutdownNow race while clearing interrupt

               // 根据线程池状态来判断是否要处理当前的线程任务

               if ((runStateAtLeast(ctl.get(), STOP) ||

                    (Thread.interrupted() &&

                     runStateAtLeast(ctl.get(), STOP))) &&

                   !wt.isInterrupted())

                   wt.interrupt();

               // 执行线程任务

               try {

                   beforeExecute(wt, task);

                   Throwable thrown = null;

                   try {

                       task.run();

                   } catch (RuntimeException x) {

                       thrown = x; throw x;

                   } catch (Error x) {

                       thrown = x; throw x;

                   } catch (Throwable x) {

                       thrown = x; throw new Error(x);

                   } finally {

                       afterExecute(task, thrown);

                   }

               } finally {

                   task = null;

                   w.completedTasks++;

                   w.unlock();

               }

           }

           completedAbruptly = false;

       } finally {

        //后置处理。判断是否线程复用,还是回收

           processWorkerExit(w, completedAbruptly);

       }

}


ThreadPoolExecutor.processWorkerExit()


Worker工作线程退出方法,处理工作线程回收


private void processWorkerExit(Worker w, boolean completedAbruptly) {

// 如果是立即中断,没有执行任务,减少工作线程数

       if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted

           decrementWorkerCount();


       final ReentrantLock mainLock = this.mainLock;

       mainLock.lock();

       try {

           completedTaskCount += w.completedTasks;

           // 做HashSet移除

           workers.remove(w);

       } finally {

           mainLock.unlock();

       }


       tryTerminate();


       int c = ctl.get();

       // 这里判断线程池不是停止状态

       if (runStateLessThan(c, STOP)) {

        // 这里说明工作线程进行了有效任务的执行

           if (!completedAbruptly) {

            // 这里相当于对工作线程的核心线程数进行控制

               int min = allowCoreThreadTimeOut ? 0 : corePoolSize;

               if (min == 0 && ! workQueue.isEmpty())

                   min = 1;

               if (workerCountOf(c) >= min)

                // 如果工作线程数大于核心线程数则将当前工作线程return掉也就是释放掉,相当于进行了回收

                   return; // replacement not needed

           }

           // 调用addWorker保持这个线程执行,相当于复用,如果没任务会去WorkQueue这个阻塞队列中去获取任务

           addWorker(null, false);

       }

   }


ThreadPoolExecutor.getTask()


private Runnable getTask() {

       boolean timedOut = false; // Did the last poll() time out?


       for (;;) {

        // 获取线程池runState工作状态

           int c = ctl.get();

           int rs = runStateOf(c);


           // Check if queue empty only if necessary.

           // 校验状态进行工作线程数量回收更新

           if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {

               decrementWorkerCount();

               return null;

           }

  // 获取工作线程总数

           int wc = workerCountOf(c);


           // Are workers subject to culling?

           boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;


           if ((wc > maximumPoolSize || (timed && timedOut))

               && (wc > 1 || workQueue.isEmpty())) {

               if (compareAndDecrementWorkerCount(c))

                   return null;

               continue;

           }


           try {

               Runnable r = timed ?

                // 设置超时时间 阻塞获取队列任务

                   workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :

                   // 阻塞获取队列任务

                   workQueue.take();

               if (r != null)

                   return r;

               timedOut = true;

           } catch (InterruptedException retry) {

               timedOut = false;

           }

       }

   }


异常处理


异常流转时序


关于线程池任务的提交分为execute、submit两种方式。

网络异常,图片无法展示
|

任务提交方式

异常流转大致节点

execute

①Runnable的try-catch ↓ ②ThreadPoolExecutor的afterExecute方法 ↓ ③ThreadFactory的UncaughtExceptionHandler方法

submit

①Runnable的try-catch ↓ ②FutureTask的run方法,有异常的话会通过setException(ex)收集到Future对象的outcome属性中暂存,当Future.get()方法执行时会根据线程任务执行状态进行report上报,如果有异常会进行抛出


3. 特殊扩展


异常捕获


一般来说有如下几种方式进行线程池异常的处理,根据任务提交execute、submit方式进行区分和适配。
线程池执行submit方法的底层实际也是对execute进行了调用,只是封装了入参对象FutureTask对象,支持返回对象值,也正是因为封装了FutureTask对于异常处理更为特殊,内部实现了try-catch捕获将异常对象调用setException(ex)进行封装,主要是通过Future.get()方法触发report()进行异常上报抛出。
异常一定要进行捕获处理,不要以静默吞噬异常方式忽略它,否则会失控。

任务提交方式

处理方法

注意事项

execute

① ② ③

异常信息传递层较多

submit

需要通过future.get()来触发


  • ① 直接try/catch捕获异常进行处理
  • UncaughtExceptionHandler机制
  • 线程直接重写整个方法:

Thread t = new Thread();

      t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {

          public void uncaughtException(Thread t, Throwable e) {

             LOGGER.error(t + " throws exception: " + e);

          }

       });

//如果是线程池的模式:

       ExecutorService threadPool = Executors.newFixedThreadPool(1, r -> {

           Thread t = new Thread(r);

           t.setUncaughtExceptionHandler(

               (t1, e) -> LOGGER.error(t1 + " throws exception: " + e));

           return t;

       });


  • 重写线程池UncaughtExceptionHandler机制

ThreadFactory executorThreadFactory = new BasicThreadFactory.Builder()

       .namingPattern("task-executor-%d")

       .uncaughtExceptionHandler(new LogUncaughtExceptionHandler(LOGGER))

       .build();

Executors.newSingleThreadExecutor(executorThreadFactory);


  • ③ 重写protected void afterExecute(Runnable r, Throwable t) {}方法


线程复用


线程回收


总的来说,ThreadPoolExecutor回收线程都是等getTask()获取不到任务,返回null时,调用processWorkerExit()方法从hashSet集合中remove掉线程Worker,getTask()返回null又分为2两种场景:


  • 线程正常执行完任务,并且已经等到超过keepAliveTime时间,大于核心线程数,那么会返回null,结束外层的runWorker中的while循环
  • 当调用shutdown()方法,会将线程池状态置为SHUTDOWN,并且需要等待正在执行的任务执行完,阻塞队列中的任务执行完才能返回null


4. 常用线程池


newCachedThreadPool


public static ExecutorService newCachedThreadPool() {

       return new ThreadPoolExecutor(0, Integer.MAX_VALUE,

                                     60L, TimeUnit.SECONDS,

                                     new SynchronousQueue<Runnable>());

}

参数

核心线程数

0

最大线程数

MAX

空闲线程回收时间

60秒

工作队列

SynchronousQueue

饱和策略

AbortPolicy


newFixedThreadPool


public static ExecutorService newFixedThreadPool(int nThreads) {

       return new ThreadPoolExecutor(nThreads, nThreads,

                                     0L, TimeUnit.MILLISECONDS,

                                     new LinkedBlockingQueue<Runnable>());

   }

参数

核心线程数

nThreads 设置

最大线程数

nThreads 设置

空闲线程回收时间

0秒(不回收)

工作队列

LinkedBlockingQueue(无界队列)

饱和策略

AbortPolicy


newScheduledThreadPool


public ScheduledThreadPoolExecutor(int corePoolSize) {

       super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,

             new DelayedWorkQueue());

   }

参数

核心线程数

corePoolSize设置

最大线程数

MAX

空闲线程回收时间

0秒(不回收)

工作队列

DelayedWorkQueue(延迟优先级队列)

饱和策略

AbortPolicy


newSingleThreadExecutor


public static ExecutorService newSingleThreadExecutor() {

       return new FinalizableDelegatedExecutorService

           (new ThreadPoolExecutor(1, 1,

                                   0L, TimeUnit.MILLISECONDS,

                                   new LinkedBlockingQueue<Runnable>()));

   }

参数

核心线程数

1

最大线程数

1

空闲线程回收时间

0秒(不回收)

工作队列

LinkedBlockingQueue(无界队列)

饱和策略

AbortPolicy


5. 优点


  • 工作线程可复用,避免频繁创建线程带来的性能损耗
  • 阻塞队列可以很好地控制线程资源的收放,起到缓冲池作用


6. 注意事项


使用规约要求


【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor的方式,这样 的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。 说明: Executors 返回的线程池对象的弊端如下:


1) FixedThreadPool 和 SingleThreadPool : 允许的请求队列长度为 Integer.MAX_VALUE ,可能会堆积大量的请求,从而导致 OOM 。


2) CachedThreadPool 和 ScheduledThreadPool : 允许的创建线程数量为 Integer.MAX_VALUE ,可能会创建大量的线程,从而导致 OOM 。


参考


线程池中的阻塞队列选择
java 线程池(线程的复用)
线程池中的工作线程如何被回收
线程池的工作原理与源码解读
Java 线程池的异常处理机制
深度解析Java线程池的异常处理机制
ThreadPool中变量ctl的分析
Java中线程池ThreadPoolExecutor原理探究

相关文章
|
4天前
|
安全 Java 测试技术
Java并行流陷阱:为什么指定线程池可能是个坏主意
本文探讨了Java并行流的使用陷阱,尤其是指定线程池的问题。文章分析了并行流的设计思想,指出了指定线程池的弊端,并提供了使用CompletableFuture等替代方案。同时,介绍了Parallel Collector库在处理阻塞任务时的优势和特点。
|
3月前
|
Java 调度 数据库
Java并发编程:深入理解线程池
在Java并发编程的海洋中,线程池是一艘强大的船,它不仅提高了性能,还简化了代码结构。本文将带你潜入线程池的深海,探索其核心组件、工作原理及如何高效利用线程池来优化你的并发应用。
|
3月前
|
存储 监控 Java
Java多线程优化:提高线程池性能的技巧与实践
Java多线程优化:提高线程池性能的技巧与实践
111 1
|
3月前
|
安全 Java 数据库
一天十道Java面试题----第四天(线程池复用的原理------>spring事务的实现方式原理以及隔离级别)
这篇文章是关于Java面试题的笔记,涵盖了线程池复用原理、Spring框架基础、AOP和IOC概念、Bean生命周期和作用域、单例Bean的线程安全性、Spring中使用的设计模式、以及Spring事务的实现方式和隔离级别等知识点。
|
3月前
|
存储 监控 安全
一天十道Java面试题----第三天(对线程安全的理解------>线程池中阻塞队列的作用)
这篇文章是Java面试第三天的笔记,讨论了线程安全、Thread与Runnable的区别、守护线程、ThreadLocal原理及内存泄漏问题、并发并行串行的概念、并发三大特性、线程池的使用原因和解释、线程池处理流程,以及线程池中阻塞队列的作用和设计考虑。
|
16天前
|
监控 安全 Java
在 Java 中使用线程池监控以及动态调整线程池时需要注意什么?
【10月更文挑战第22天】在进行线程池的监控和动态调整时,要综合考虑多方面的因素,谨慎操作,以确保线程池能够高效、稳定地运行,满足业务的需求。
96 38
|
16天前
|
Prometheus 监控 Cloud Native
JAVA线程池监控以及动态调整线程池
【10月更文挑战第22天】在 Java 中,线程池的监控和动态调整是非常重要的,它可以帮助我们更好地管理系统资源,提高应用的性能和稳定性。
46 4
|
16天前
|
Prometheus 监控 Cloud Native
在 Java 中,如何使用线程池监控以及动态调整线程池?
【10月更文挑战第22天】线程池的监控和动态调整是一项重要的任务,需要我们结合具体的应用场景和需求,选择合适的方法和策略,以确保线程池始终处于最优状态,提高系统的性能和稳定性。
76 2
|
18天前
|
缓存 监控 Java
java中线程池的使用
java中线程池的使用
|
2月前
|
Java 调度 开发者
Java并发编程:深入理解线程池
在Java的世界中,线程池是提升应用性能、实现高效并发处理的关键工具。本文将深入浅出地介绍线程池的核心概念、工作原理以及如何在实际应用中有效利用线程池来优化资源管理和任务调度。通过本文的学习,读者能够掌握线程池的基本使用技巧,并理解其背后的设计哲学。