前言导读
由Doug Lea在JCP JSR-166 专家组成员的协助下撰写,并已发布到公共领域,如
creativecommons.org/publicdomai…
一个ExecutorService ,它使用可能是多个池线程中的一个来执行每个提交的任务,通常使用Executors工厂方法对其进行配置。 线程池解决了两个不同的问题:由于减少了每个任务的调用开销,它们通常在执行大量异步任务时提供改进的性能,并且它们提供了一种绑定和管理资源(包括线程)的方法,该资源在执行集合时消耗掉了任务。 每个ThreadPoolExecutor还维护一些基本统计信息,例如已完成任务的数量。 为了在广泛的上下文中有用,该类提供了许多可调整的参数和可扩展性挂钩。 但是,建议程序员使用更方便的Executors工厂方法
Executors.newCachedThreadPool (无边界线程池,具有自动线程回收),
Executors.newFixedThreadPool (固定大小的线程池)和
Executors.newSingleThreadExecutor (单个后台线程),这些方法可以预先配置设置。最常见的使用场景。 否则,在手动配置和调整此类时,请使用以下指南:
核心和最大池大小
ThreadPoolExecutor将根据corePoolSize(请参见getCorePoolSize )和getCorePoolSize (请参见getMaximumPoolSize )设置的界限自动调整池大小(请参见getPoolSize )。 当在方法execute(Runnable)提交新任务,并且正在运行的线程少于corePoolSize线程时,即使其他工作线程处于空闲状态,也会创建一个新线程来处理请求。 如果运行的线程数大于corePoolSize但小于maximumPoolSize,则仅在队列已满时才创建新线程。 通过将corePoolSize和maximumPoolSize设置为相同,可以创建固定大小的线程池。 通过将maximumPoolSize设置为一个本质上不受限制的值(例如Integer.MAX_VALUE ,可以允许地容纳任意数量的并发任务。 最典型的,核心和最大池大小仅在构造时设置,但也可以使用setCorePoolSize和setMaximumPoolSize动态更改。
按需施工
默认情况下,甚至核心线程也仅在新任务到达时才开始创建和启动,但是可以使用prestartCoreThread或prestartAllCoreThreads方法动态地覆盖它。 如果使用非空队列构造池,则可能要预启动线程。
创建新线程
使用ThreadFactory创建新线程。 如果没有另外指定,则使用
Executors.defaultThreadFactory ,该线程创建的线程全部位于相同的ThreadGroup并且具有相同的NORM_PRIORITY优先级和非守护程序状态。 通过提供其他ThreadFactory,可以更改线程的名称,线程组,优先级,守护程序状态等。如果在通过询问newThread返回null来询问ThreadFactory无法创建线程时,执行器将继续执行,但可能无法执行执行任何任务。 线程应具有“ modifyThread” RuntimePermission 。 如果使用该池的工作线程或其他线程不具有此许可权,则服务可能会降级:配置更改可能不会及时生效,并且关闭池可能保持在可能终止但未完成的状态。
保活时间
如果当前池中的线程数超过corePoolSize,则多余的线程将在空闲时间超过keepAliveTime时终止(请参见getKeepAliveTime(TimeUnit) )。 当不积极使用池时,这提供了一种减少资源消耗的方法。 如果池稍后变得更加活跃,则将构建新线程。 也可以使用setKeepAliveTime(long, TimeUnit)方法动态更改此参数。 使用Long.MAX_VALUE TimeUnit.NANOSECONDS的值Long.MAX_VALUE有效地使空闲线程永远不会在关闭之前终止。 默认情况下,仅当corePoolSize线程数多时,保持活动策略才适用。 但是,只要keepAliveTime值不为零,方法allowCoreThreadTimeOut(boolean)还可用于将此超时策略应用于核心线程。
排队
任何BlockingQueue均可用于传输和保留提交的任务。 此队列的使用与池大小交互: 如果正在运行的线程少于corePoolSize线程,则执行程序总是喜欢添加新线程,而不是排队。 如果正在运行corePoolSize或更多线程,则执行程序总是更喜欢对请求进行排队,而不是添加新线程。 如果无法将请求放入队列中,则将创建一个新线程,除非该线程超过了maximumPoolSize,在这种情况下,该任务将被拒绝。
有三种一般的排队策略:
直接交接。 对于工作队列,一个很好的默认选择是SynchronousQueue ,它可以将任务移交给线程,而不必另外保留它们。 在这里,如果没有立即可用的线程来运行任务,则尝试将其排队的尝试将失败,因此将构造一个新线程。 在处理可能具有内部依赖项的请求集时,此策略避免了锁定。 直接切换通常需要无限制的maximumPoolSizes以避免拒绝新提交的任务。 反过来,当平均而言,命令继续以比其处理速度更快的速度到达时,这可能会带来无限线程增长的可能性。
无限队列。 使用无界队列(例如,没有预定义容量的LinkedBlockingQueue )将在所有corePoolSize线程繁忙时使新任务在队列中等待。 因此,将仅创建corePoolSize线程。 (因此,maximumPoolSize的值没有任何作用。)当每个任务完全独立于其他任务时,这可能是适当的,因此任务不会影响彼此的执行。 例如,在网页服务器中。 尽管这种排队方式对于消除短暂的请求突发很有用,但它承认当命令平均继续以比处理速度更快的速度到达时,无限制的工作队列增长是可能的。
有界队列。 当与有限的maximumPoolSizes一起使用时,有界队列(例如ArrayBlockingQueue )有助于防止资源耗尽,但调优和控制起来会更加困难。 队列大小和最大池大小可能会相互折衷:使用大队列和小池可以最大程度地减少CPU使用率,操作系统资源和上下文切换开销,但会导致人为地降低吞吐量。 如果任务频繁阻塞(例如,如果它们受I / O约束),则系统可能能够安排比您原先允许的线程更多的时间。 使用小队列通常需要更大的池大小,这会使CPU繁忙,但可能会遇到无法接受的调度开销,这也会降低吞吐量。
被拒绝的任务
在方法提交新的任务execute(Runnable)将在执行程序已关闭了拒绝,并且也当执行器使用有限的边界两个最大线程和工作队列容量,且饱和。 在任一情况下, execute方法调用
RejectedExecutionHandler.rejectedExecution(Runnable, ThreadPoolExecutor)其的方法RejectedExecutionHandler 。 提供了四个预定义的处理程序策略:
在默认的
ThreadPoolExecutor.AbortPolicy ,处理程序在拒绝时会抛出运行时RejectedExecutionException 。
在
ThreadPoolExecutor.CallerRunsPolicy ,调用execute自己的线程运行任务。 这提供了一种简单的反馈控制机制,该机制将减慢新任务的提交速度。
在
ThreadPoolExecutor.DiscardPolicy ,简单地删除了无法执行的任务。
在
ThreadPoolExecutor.DiscardOldestPolicy ,如果未关闭执行程序,则将丢弃工作队列开头的任务,然后重试执行(该操作可能再次失败,导致重复执行此操作)。
可以定义和使用其他种类的RejectedExecutionHandler类。 这样做需要格外小心,尤其是在设计策略仅在特定容量或排队策略下才能工作时。
挂钩方法
此类提供protected可重写的beforeExecute(Thread, Runnable)和afterExecute(Runnable, Throwable)方法,这些方法在每个任务执行前后被调用。 这些可以用来操纵执行环境。 例如,重新初始化ThreadLocals,收集统计信息或添加日志条目。 另外,一旦执行程序完全终止,可以terminated方法terminated以执行需要执行的任何特殊处理。 如果钩子或回调方法引发异常,内部工作线程可能进而失败并突然终止。
队列维护
方法getQueue()允许访问工作队列,以进行监视和调试。 强烈建议不要将此方法用于任何其他目的。 当取消大量排队的任务时,可以使用提供的两种方法remove(Runnable)和purge来帮助回收存储。 定案 这在程序不再被引用,也没有剩余的线程将成为池shutdown自动。 如果即使在用户忘记调用shutdown也要确保回收未引用的池,则必须通过使用零核心线程的下限和/或设置allowCoreThreadTimeOut(boolean)来设置适当的保活时间,以安排未使用的线程最终死掉allowCoreThreadTimeOut(boolean) 。
扩展示例。 此类的大多数扩展都覆盖一个或多个受保护的hook方法。 例如,以下是一个子类,它添加了一个简单的暂停/继续功能:
class PausableThreadPoolExecutor extends ThreadPoolExecutor { private boolean isPaused; private ReentrantLock pauseLock = new ReentrantLock(); private Condition unpaused = pauseLock.newCondition(); public PausableThreadPoolExecutor(...) { super(...); } protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); pauseLock.lock(); try { while (isPaused) unpaused.await(); } catch (InterruptedException ie) { t.interrupt(); } finally { pauseLock.unlock(); } } public void pause() { pauseLock.lock(); try { isPaused = true; } finally { pauseLock.unlock(); } } public void resume() { pauseLock.lock(); try { isPaused = false; unpaused.signalAll(); } finally { pauseLock.unlock(); } } }
想要了解透彻线程池,先了解一下线程吧
以下基于JDK1.8介绍:
摘自源码片段:一些核心的定义
private volatile String name; // 线程的名字 // 线程的优先级,默认为5,可自行设置,越大代表可以获得的时间片几率越高 private int priority; /* 是否是守护线程,守护线程在JVM结束时自动销毁 */ private boolean daemon = false; /* 将要运行的目标. */ private Runnable target; /* 线程组-就是给线程分组,挺简单,初始化会被分配,与线程池无直接联系 */ private ThreadGroup group; /* 此线程的上下文ClassLoader */ private ClassLoader contextClassLoader; /* The inherited AccessControlContext of this thread */ private AccessControlContext inheritedAccessControlContext; /* 用于命名是哪个线程的编号 */ private static int threadInitNumber; private static synchronized int nextThreadNum() { return threadInitNumber++; } /* 与此线程有关的ThreadLocal值。该映射由ThreadLocal类维护 */ ThreadLocal.ThreadLocalMap threadLocals = null; /* *与此线程有关的InheritableThreadLocal值。该映射由InheritreadLableThocal类维护. */ ThreadLocal.ThreadLocalMap inheritableThreadLocals = null; /* 此线程请求的堆栈大小,如果创建者未指定堆栈大小,则为0。 VM可以根据此数字执行*喜欢的任何事情;一些虚拟机将忽略它. */ private long stackSize; /* * Thread ID */ private long tid; /* 用于生成线程ID */ private static long threadSeqNumber; /* Java thread status */ private volatile int threadStatus = 0;
注意几个重要的方法:
1.有一个start方法,这个方法里面调用了操作系统,利用操作系统去调用我们的run方法。
private native void start0();
\2. interruput方法,这只是一个标志,不会立即中断
interrupted()是静态方法:内部实现是调用的当前线程的isInterrupted(),并且会重置当前线程的中断状态
isInterrupted()是实例方法,是调用该方法的对象所表示的那个线程的isInterrupted(),不会重置当前线程的中断状态
\3. join 面试常问:其实是通过wait来阻塞线程,例如:t1.join(),无限制阻塞t1完成,再继续执行下面的方法。
\4. getAllStackTraces 获取所有线程的堆栈信息,可以用来扩展监控。
其他方法大家看看就行。
下面讲讲线程的状态:
private volatile String name; // 线程的名字 // 线程的优先级,默认为5,可自行设置,越大代表可以获得的时间片几率越高 private int priority; /* 是否是守护线程,守护线程在JVM结束时自动销毁 */ private boolean daemon = false; /* 将要运行的目标. */ private Runnable target; /* 线程组-就是给线程分组,挺简单,初始化会被分配,与线程池无直接联系 */ private ThreadGroup group; /* 此线程的上下文ClassLoader */ private ClassLoader contextClassLoader; /* The inherited AccessControlContext of this thread */ private AccessControlContext inheritedAccessControlContext; /* 用于命名是哪个线程的编号 */ private static int threadInitNumber; private static synchronized int nextThreadNum() { return threadInitNumber++; } /* 与此线程有关的ThreadLocal值。该映射由ThreadLocal类维护 */ ThreadLocal.ThreadLocalMap threadLocals = null; /* *与此线程有关的InheritableThreadLocal值。该映射由InheritreadLableThocal类维护. */ ThreadLocal.ThreadLocalMap inheritableThreadLocals = null; /* 此线程请求的堆栈大小,如果创建者未指定堆栈大小,则为0。 VM可以根据此数字执行*喜欢的任何事情;一些虚拟机将忽略它. */ private long stackSize; /* * Thread ID */ private long tid; /* 用于生成线程ID */ private static long threadSeqNumber; /* Java thread status */ private volatile int threadStatus = 0;
线程了解差不多了,接下来看看线程池吧!
线程池ThreadPoolExecutor
看看线程池的UML图吧
我们从上往下依次分析:
/ ** *在将来的某个时间执行给定命令。由 Executor实现决定,命令可以在新线程 池或调用线程中执行。 @param命令可运行任务,如果无法接受此任务, 则@throws RejectedExecutionException 如果命令为null,则@throws NullPointerException * / void execute(Runnable command);
简单来说就是调度线程来执行任务,用户只需提供Runnable对象,将任务的运行**逻辑提交到执行器****(**Executor)中
ExecutorService接口增加了一些能力:(1)扩充执行任务的能力,补充可以为一个或一批异步任务生成Future的方法;(2)提供了管控线程池的方法,比如停止线程池的运行。可以从上面UML图简单看出。
public interface ExecutorService extends Executor { // 请求关闭、发生超时或者当前线程中断,无论哪一个首先发生之后,都将导致阻塞,直到所有任务完成执行。 boolean awaitTermination(long timeout, TimeUnit unit); // 执行给定的任务,当所有任务完成时,返回保持任务状态和结果的 Future 列表。 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks); // 执行给定的任务,当所有任务完成或超时期满时(无论哪个首先发生),返回保持任务状态和结果的 Future 列表。 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit); // 执行给定的任务,如果某个任务已成功完成(也就是未抛出异常),则返回其结果。 <T> T invokeAny(Collection<? extends Callable<T>> tasks); // 执行给定的任务,如果在给定的超时期满前某个任务已成功完成(也就是未抛出异常),则返回其结果。 <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit); // 如果此执行程序已关闭,则返回 true。 boolean isShutdown(); // 如果关闭后所有任务都已完成,则返回 true。 boolean isTerminated(); // 启动一次顺序关闭,执行以前提交的任务,但不接受新任务。 void shutdown(); // 试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表。 List<Runnable> shutdownNow(); // 提交一个返回值的任务用于执行,返回一个表示任务的未决结果的 Future。 <T> Future<T> submit(Callable<T> task); // 提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。 Future<?> submit(Runnable task); // 提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。 <T> Future<T> submit(Runnable task, T result); }
AbstractExecutorService则是上层的抽象类,这里延伸出Future,简单的说就是获取异步执行的结果,例如在Netty中,我们处理消息是通过一个双向链表来处理的,需要对消息一层层处理,所以说这里也用到了Future来获取消息处理的结果。
最下层的实现类ThreadPoolExecutor实现最复杂的运行部分,ThreadPoolExecutor将会一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务。
线程池生命周期
线程池的生命周期也就是线程池在运行时所经历的线程池状态。线程池内部使用一个变量维护两个值:运行状态(runState)和线程数量 (workerCount)。在具体实现中,线程池将运行状态(runState)、线程数量 (workerCount)两个关键参数的维护放在了一起,如下代码所示:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; // 32 -3 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 1 << 29 - 1 = 2^29 -1 // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; // -2^29 = 11100000000000000000000000000000 private static final int SHUTDOWN = 0 << COUNT_BITS; // 0 = 00000000000000000000000000000000 private static final int STOP = 1 << COUNT_BITS; // 2^29 = 00100000000000000000000000000000 private static final int TIDYING = 2 << COUNT_BITS; // 2*2^29 = 01000000000000000000000000000000 private static final int TERMINATED = 3 << COUNT_BITS; // 3*2^29 = 01100000000000000000000000000000 // Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; } // 线程池运行状态 private static int workerCountOf(int c) { return c & CAPACITY; } // 线程数量 private static int ctlOf(int rs, int wc) { return rs | wc; }
ctl这个AtomicInteger类型,是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它同时包含两部分的信息:线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),高3位保存runState,低29位保存workerCount,两个变量之间互不干扰。用一个变量去存储两个值,可避免在做相关决策时,出现不一致的情况,不必为了维护两者的一致,而占用锁资源。通过阅读线程池源代码也可以发现,经常出现要同时判断线程池运行状态和线程数量的情况。线程池也提供了若干方法去供用户获得线程池当前的运行状态、线程个数。这里都使用的是位运算的方式,相比于基本运算,速度也会快很多。
如上代码中给出了线程池状态的二进制数据,下面分别描述一下
- RUNNING: 能接受新提交的任务,并且也能处理阻塞队列中的任务。
- SHUTDOWN: 关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务。
- STOP : 不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。
- TIDYING : 所有的任务都已经终止了,workerCount(有效线程数)为0。
- TERMINATED : 在terminated()方法执行完成后进入该状态。
线程池运行流程
本文的核心重点。
首先,所有任务的调度都是由execute方法完成的,这部分完成的工作是:检查现在线程池的运行状态、运行线程数、运行策略,决定接下来执行的流程,是直接申请线程执行,或是缓冲到队列中执行,亦或是直接拒绝该任务
我们直接看一下源码,这样比较直观,印象比较深刻,代码不难。
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { // 如果小于核心线程数 if (addWorker(command, true)) return; c = ctl.get(); } // offer就是如果队列未满就添加到队列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 如果队列也满了,就直接起一个线程,失败走拒绝策略 else if (!addWorker(command, false)) reject(command); }
下面我们来看看addwork相关部分代码去掉了部分条件判断
private boolean addWorker(Runnable firstTask, boolean core) { if (compareAndIncrementWorkerCount(c)) break retry; // 增加线程数,跳出循环 try { w = new Worker(firstTask); //this.thread = getThreadFactory().newThread(this); final Thread t = w.thread; // 这里通过线程工厂new一个线程 if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); // 独占锁- try { int rs = runStateOf(ctl.get());// 获取线程池状态 if (rs < SHUTDOWN || // 线程池在运行或者shutdown (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w);// 添加任务到阻塞队列 int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); // 启动线程 workerStarted = true; } } return workerStarted;
上面可以看到t.start()开启了系统线程调度,接下来在跟下run方法
public void run() { runWorker(this); }
可以看到,接下来执行了runworker(this),this就是刚刚加入的w任务。
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // 以独占的方式释放资源 boolean completedAbruptly = true; try { // 如果task!= null,就getTask获取一个任务 while (task != null || (task = getTask()) != null) { w.lock(); // 1.以独占额方式获得资源,忽略异常 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 2.可扩展:用于重新初始化 threadlocals 或者执行日志记录。 beforeExecute(wt, task); Throwable thrown = null; try { task.run();// 任务执行 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { // 线程回收 processWorkerExit(w, completedAbruptly); } }
分析一下getTask
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
简单的说如果设置了核心线程可以超时=true或者当前线程数>核心线程数,就限时获取任务,否则就阻塞获取任务。
逻辑其实都很简单,有些东西还是需要我们仔细分析一下:例如代码中
第一点
1.w.lock() 2.public void lock() { acquire(1); } 3.public final void acquire(int arg) { // class AbstractQueuedSynchronizer if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
可以看到这里是直接调用的AQS的独占锁-公平锁实现方式,而在线程回收processWorkerExit 这个方法使用的是AQS的独占锁-非公平锁
private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); // 默认使用非公平锁 new NonfairSync() final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 锁内实现移除任务,同时也移除了Thread引用 completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } // 尝试中断线程,如果线程池正在关闭,则关闭线程池 tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { // 如果线程池没有停止 if (!completedAbruptly) { // 没有异常结束 // 线程池最小空闲数,允许core thread超时就是0,否则就是corePoolSize int min = allowCoreThreadTimeOut ? 0 : corePoolSize; // 如果min == 0但是队列不为空要保证有1个线程来执行队列中的任务 if (min == 0 && ! workQueue.isEmpty()) min = 1; // // 线程数不为空 if (workerCountOf(c) >= min) return; // replacement not needed } // 1.线程异常退出 // 2.线程池为空,但是队列中还有任务没执行,看addWoker方法对这种情况的处理 addWorker(null, false); } }
简单分析一下线程回收流程:
- lock方法一旦获取了独占锁,表示当前线程正在执行任务中。
- 如果正在执行任务,则不应该中断线程。
- 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断。
- 线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;如果线程是空闲状态则可以安全回收。
final void tryTerminate() { for (;;) { int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; // 这里可以看到只要线程数!=0,线程就可以被回收 if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; }
private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; // 这里看到进行了trylock判断 if (!t.isInterrupted() && w.tryLock()) { try { // 进行线程中断标识 t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
例如:A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程在tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多少次,这样才能保证state是能回到零态的。
再以CountDownLatch为例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1。等到所有子线程都执行完后(即state=0),会**unpark()**主调用线程,然后主调用线程就会从await()函数返回,继续后余动作。
一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、
tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。
任务拒绝
线程池中的线程数目达到maximumPoolSize时,就需要拒绝掉该任务,采取任务拒绝策略,保护线程池。
拒绝策略是一个公共接口,说明我们可以自定义扩展,其设计如下:
public interface RejectedExecutionHandler { void rejectedExecution(Runnable r, ThreadPoolExecutor executor); }
我们看看JDK提供的几种拒绝策略:
一般业务线程采用:调用提交任务的线程去处理(前提是所有任务都执行完毕)
ThreadPoolExecutor.CallerRunsPolicy
public static class CallerRunsPolicy implements RejectedExecutionHandler { /** * Creates a {@code CallerRunsPolicy}. */ public CallerRunsPolicy() { } /** 调用者线程中执行任务r * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
业务实战
场景1:快速响应用户请求 B/S
从用户体验角度看,这个结果响应的越快越好,如果一个页面半天都刷不出,用户可能就放弃查看这个商品了。而面向用户的功能聚合通常非常复杂,伴随着调用与调用之间的级联、多级级联等情况,业务开发同学往往会选择使用线程池这种简单的方式,将调用封装成任务并行的执行,缩短总体响应时间。另外,使用线程池也是有考量的,这种场景最重要的就是获取最大的响应速度去满足用户,所以应该不设置队列去缓冲并发任务,调高****corePoolSize和maxPoolSize去尽可能创造多的线程快速执行任务。
场景2:快速处理批量任务
离线的大量计算任务,需要快速执行。比如说,统计某个报表,需要计算出全国各个门店中有哪些商品有某种属性,用于后续营销策略的分析,那么我们需要查询全国所有门店中的所有商品,并且记录具有某属性的商品,然后快速生成报表
这种场景需要执行大量的任务,我们也会希望任务执行的越快越好。这种情况下,也应该使用多线程策略,并行计算。但与响应速度优先的场景区别在于,这类场景任务量巨大,并不需要瞬时地完成,而是关注如何使用有限的资源,尽可能在单位时间内处理更多的任务,也就是吞吐量优先的问题。所以应该设置队列去缓冲并发任务,调整合适的corePoolSize去设置处理任务的线程数。在这里,设置的线程数过多可能还会引发线程上下文切换频繁的问题,也会降低处理任务的速度,降低吞吐量。
场景3:队列设置过长
由于队列设置过长,最大线程数设置失效,导致请求数量增加时,大量任务堆积在队列中,任务执行时间过长,最终导致下游服务的大量调用超时失败
那么具体数值怎么配置呢?
可以看出,这些计算公式都偏离了实际的业务场景。I/O密集型和CPU密集型差别很大,不过都跟CPU核心数挂钩的,I/O密集型任务常常需要我们进行线程池参数动态化,所有线程池也非常友好的提供了几个公共方法,供我们动态配置线程池的线程核心数和线程最大数和阻塞队列大小。
除了这些,我们前面提到的拒绝策略和任务执行前处理和任务执行后处理都可以作为我们对线程池的扩展。通过这些配置,我们可以实现对线程池的动态参数调整,任务执行情况,队列负载情况,监控,日志等等。
这里给出任务前置/后置处理的扩展做一个监控:
public class TimingThreadPool extends ThreadPoolExecutor { public TimingThreadPool() { super(1, 1, 0L, TimeUnit.SECONDS, null); } private static final Logger logger = LoggerFactory.getLogger(TimingThreadPool.class); private final ThreadLocal<Long> startTime = new ThreadLocal<Long>(); private final AtomicLong numTasks = new AtomicLong(); private final AtomicLong totalTime = new AtomicLong(); @Override protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); logger.info(String.format("Thread %s: start %s", t, r)); startTime.set(System.nanoTime()); } @Override protected void afterExecute(Runnable r, Throwable t) { try { long endTime = System.nanoTime(); long taskTime = endTime - startTime.get(); numTasks.incrementAndGet(); totalTime.addAndGet(taskTime); logger.info(String.format("Thread %s: end %s, time=%dns", t, r, taskTime)); } finally { super.afterExecute(r, t); } } @Override protected void terminated() { try { logger.info(String.format("Terminated: avg time=%dns", totalTime.get() / numTasks.get())); } finally { super.terminated(); } } }