addWorker()中会取出当前队列中的第一个线程并调用start()方法开启
其中线程 t 由以下代码获取
观察Worker的构造方法,使用 getThreadFactory 工厂创建一个线程:
Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }
线程中的run方法调用runWorker方法,对应上图绿色部分
/** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); }
5.5.2 tryTerminate*
5.5.3 runWorker*
执行流程如下图所示:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); /** * 同时满足如下两个条件,则执行wt.interrupt() * 1> 线程状态为STOP、TIDYING、TERMINATED或者(当前线程被中断(清除中断标记)并且线程状态为STOP、TIDYING、TERMINATED) * 2> 当前线程wt是否被标记中断 */ if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
runWorker方法的核心是调用了getTask方法,即上图中绿色框部分。
5.5.4 getTask*
private Runnable getTask() { // 表示上次从阻塞队列中获取任务是否超时 boolean timedOut = false; for (;;) { int c = ctl.get(); int rs = runStateOf(c); /** * 同时满足如下两点,则线程池中工作线程数减1,并返回null * 1> rs >= SHUTDOWN,表示线程池不是RUNNING状态 * 2> rs >= STOP 表示STOP、TIDYING和TERMINATED这三个状态,它们共同点就是【不接收新任务】也【不处理workQueue里的线程任务】 or 阻塞队列workQueue为空 */ if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); // 线程池中工作线程数 - 1 return null; } int wc = workerCountOf(c); // timed用于判断是否需要进行超时控制,当allowCoreThreadTimeOut被设置为ture或者活跃线程数大于核心线程数,则需要进行超时控制 // allowCoreThreadTimeOut默认为false,则表明核心线程不允许超时 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; /** * 同时满足以下两种情况,则线程池中工作线程数减1并返回nul1: * case1:当前活动线程数workCount大于最大线程数,或者需要超时控制(timed = true)并且上次从阻塞队列中获取任务 * case2:如果有效线程数大于1,或者阻塞队列为空。 */ if ((wc > maximumPoolSize // 因为在执行该方法的同时被执行了setMaximumPoolSize,导致最大线程数被缩小 || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) // 线程池中工作线程数 - 1 return null; // 如果 - 1失败,则循环重试 continue; } try { // 如果需要超时控制,则通过阻塞队列的pol1方法进行超时控制, // 否则,直接获取,如果队列为空,task方法会阻塞直到队列不为空 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // pol1-->若队列为空,返回null workQueue.take(); // take-->若队列为空,发生阻塞,等待元素 if (r != null) return r; // 如果r=nul1,表示超时了,则timeOut设置为true,标记为上一次超时状态 timedOut = true; } catch (InterruptedException retry) { timedOut = false; }
7 ThreadPoolExecutor 的执行方法
执行方法有两个,分别是 execute() 和 submit(),最主要的区别就是 submit() 方法可以接受线程池执行的返回值,而 execute() 不能接收返回值。
示例代码:
public static void main(String[] args) throws Exception{ ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 10, 10L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(20)); // execute 使用例子 executor.execute(new Runnable() { @Override public void run() { System.out.println("我是一个 execute"); } }); // submit 使用例子 Future<String> future = executor.submit(new Callable<String>() { @Override public String call() throws Exception { return "我是一个 submit"; } }); System.out.println(future.get()); }
另一个区别是 execute() 方法 属于 顶级接口 Executor 的方法 ,而 submit() 属于 子类接口 ExecutorService 的方法。
8 线程的拒绝策略
当线程池中的任务队列已满,再有任务来添加时会先判断当前线程池中的线程数是否大于等于线程池的最大值,如果是,则会触发线程池的拒绝策略。
自带的拒绝策略有4种:
AbortPolicy :终止策略,线程池抛出一个异常并终止执行,是默认策略
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); }
CallerRunsPolicy :把任务交给当前线程执行
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } }
DiscardPolicy : 丢弃新进来的任务
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { }
DiscardOldestPolicy :丢弃最早的任务(最先加入队列中的任务)
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } }
8.1 自定义拒绝策略
自定义拒绝策略只需要新建一个 RejectedExecutionHandler 对象,然后重写其 rejectedExecution 方法即可。
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 1, 3, 10L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(2) , new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println("我是自定义的拒绝策略"); } }); for (int i = 0; i < 6; i++) { threadPoolExecutor.execute(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()); } }); }
9 面试题
1、如果线程池的当前大小还没有达到基本大小(poolSize < corePoolSize),那么就新增加一个线程处理新提交的任务;
2、如果当前大小已经达到了基本大小,就将新提交的任务提交到阻塞队列排队,等候处理workQueue.offer(command);
3、如果队列容量已达上限,并且当前大小poolSize没有达到maximumPoolSize,那么就新增线程来处理任务;
4、如果队列已满,并且当前线程数目也已经达到上限,那么意味着线程池的处理能力已经达到了极限,此时需要拒绝新增加的任务。至于如何拒绝处理新增的任务,取决于线程池的饱和策略RejectedExecutionHandler。
corePoolSize=10, maximumPoolSize=10,queueSize = 10
20个并发任务过来,有多少个活跃线程?
10个。corePoolSize = maximumPoolSize 定长线程池,corePoolSize先打满,queueSize也满
队列里面有几个线程?
10个。corePoolSize先打满,queueSize也满。
如果有21个并发队列过来呢?
corePoolSize先打满,queueSize也满还多了一个,这个时候如果是丢弃策略就丢弃。
corePoolSize=10, maximumPoolSize=20,queueSize = 10?
20个并发任务过来,有多少个活跃线程?
10个。corePoolSize打满,queueSize 也满
21个并发任务过来,有多少个活跃线程?
11个。corePoolSize打满,queueSize 也满还多一个,maximumPoolSize = 20,所以corePoolSize + 1此时活跃的为11个。
30个并发任务过来,有多少个活跃线程?
20个。corePoolSize打满,queueSize 也满,corePoolSize扩充至20,此时有20个活跃任务。
31个并发任务过来,有多少个活跃线程?
20个。corePoolSize打满,queueSize 也满,corePoolSize扩充至20还多一个,如果是丢弃策略,此时有20个活跃任务。