分析线程池中的keepAliveTime参数具体实现
参数意义
创建线程池时,有一个重要参数就是keepAliveTime,标记线程空闲多久后被释放。
那么他到底是怎么实现的呢?
猜想
有一个线程在维护时间,可笑。专门有一个线程去维护,浪费资源,而且时间也不够精确。而且还要开辟空间记录线程开始空闲的时间,消耗空间。
源码分析
java.util.concurrent.ThreadPoolExecutor#getTask
/** * Performs blocking or timed wait for a task, depending on * current configuration settings, or returns null if this worker * must exit because of any of: * 1. There are more than maximumPoolSize workers (due to * a call to setMaximumPoolSize). * 2. The pool is stopped. * 3. The pool is shutdown and the queue is empty. * 4. This worker timed out waiting for a task, and timed-out * workers are subject to termination (that is, * {@code allowCoreThreadTimeOut || workerCount > corePoolSize}) * both before and after the timed wait, and if the queue is * non-empty, this worker is not the last thread in the pool. * * @return task, or null if the worker must exit, in which case * workerCount is decremented */ private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } // 当前线程数 int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // 这一步是关键,需要了解poll和take的区别,take会进行阻塞。 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
- poll 方法作用是移除并返回队列的头节点。但是如果当队列里面是空的,没有任何东西可以移除的时候,便会返回 null 作为提示。
- 带时间参数的 poll 方法:如果能够移除,便会立刻返回这个节点的内容;如果队列是空的就会进行等待,等待时间正是我们指定的时间,直到超时时间到了,如果队列里依然没有元素可供移除,便会返回 null 作为提示。
- take 方法的作用是获取并移除队列的头结点。通常在队列里有数据的时候会正常取出数据并删除;但是如果执行 take 的时候队列里无数据,则阻塞,直到队列里有数据;一旦队列里有数据了,就会立刻解除阻塞状态,并且取到数据。
- timed为true则标志着:允许核心线程超时被释放或者当前线程数超过核心线程数。一旦为tue,就会去使用阻塞对垒的poll方法,如果keepAliveTime的时间里获取不到任务,就会返回Null,在上一级,也就是runWorker方法中去释放资源。
java.util.concurrent.ThreadPoolExecutor#runWorker
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // while循环获取任务,如果获取不到,就跳出循环,在finally中进行释放。 while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { // 去释放当前的worker w。 processWorkerExit(w, completedAbruptly); } }
阻塞队列的poll方法又是怎么实现超时的呢?
但是我们较真来看,打破砂锅问到底,阻塞队列又是如何做到去完成超时的呢?
以java.util.concurrent.DelayQueue为例
源码分析
java.util.concurrent.DelayQueue#poll(long, java.util.concurrent.TimeUnit)
/** * Retrieves and removes the head of this queue, waiting if necessary * until an element with an expired delay is available on this queue, * or the specified wait time expires. * * @return the head of this queue, or {@code null} if the * specified waiting time elapses before an element with * an expired delay becomes available * @throws InterruptedException {@inheritDoc} */ public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) { if (nanos <= 0) // 已经超时就返回null return null; else // 未超时就去await nanos = available.awaitNanos(nanos); } else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return q.poll(); if (nanos <= 0) return null; first = null; // don't retain ref while waiting if (nanos < delay || leader != null) // 未超时就去await nanos = available.awaitNanos(nanos); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { // 未超时就去await long timeLeft = available.awaitNanos(delay); nanos -= delay - timeLeft; } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }
可以看到反复调用了available.awaitNanos(delay)方法,继续往下看
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#awaitNanos
/** * Implements timed condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled, interrupted, or timed out. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * </ol> */ public final long awaitNanos(long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); final long deadline = System.nanoTime() + nanosTimeout; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { transferAfterCancelledWait(node); break; } if (nanosTimeout >= spinForTimeoutThreshold) // 这一步去做了等待 LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return deadline - System.nanoTime(); }
看到执行了LockSupport.parkNanos(this, nanosTimeout);
java.util.concurrent.locks.LockSupport#parkNanos(java.lang.Object, long)
public static void parkNanos(Object blocker, long nanos) { if (nanos > 0) { Thread t = Thread.currentThread(); setBlocker(t, blocker); UNSAFE.park(false, nanos); setBlocker(t, null); } }
执行了UNSAFE.park(false, nanos);
sun.misc.Unsafe#park
public native void park(boolean var1, long var2);
这是一个native方法,一般来说到了native我就不看了…c++都还给夫子了。
网上搜搜看吧!!
转载分析本地方法的博客:https://www.jianshu.com/p/37ef66eddca6
也不是特别详细,没有到操作系统底层。
个人猜测操作系统是包了一层while循环,在未超时时就放弃这个CPU时间片,让出CPU,等待下次调度再做判断。
总结
不算太虎头蛇尾吧,至少把jdk中是怎么做到实现keepAliveTime参数的源码给看懂了。
欢迎大家交流指出JRE中阻塞队列调用的UNSAFE如何做到超时,以及对应到操作系统如何配合这个超时指令。