schedule:延迟多长时间之后只执行一次;
scheduledAtFixedRate固定:延迟指定时间后执行一次,之后按照固定的时长周期执行;
scheduledWithFixedDelay非固定:延迟指定时间后执行一次,之后按照:上一次任务执行时长 + 周期的时长 的时间去周期执行;
private void delayedExecute(RunnableScheduledFuture<?> task) { //如果线程池不是RUNNING状态,则使用拒绝策略把提交任务拒绝掉 if (isShutdown()) reject(task); else { //与ThreadPoolExecutor不同,这里直接把任务加入延迟队列 super.getQueue().add(task); //如果当前状态无法执行任务,则取消 if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else //和ThreadPoolExecutor不一样,corePoolSize没有达到会增加Worker; //增加Worker,确保提交的任务能够被执行 ensurePrestart(); } }
offer方法:
public boolean offer(Runnable x) { if (x == null) throw new NullPointerException(); RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x; final ReentrantLock lock = this.lock; lock.lock(); try { int i = size; if (i >= queue.length) // 容量扩增50%。 grow(); size = i + 1; // 第一个元素,其实这里也可以统一进行sift-up操作,没必要特别判断。 if (i == 0) { queue[0] = e; setIndex(e, 0); } else { // 插入堆尾。 siftUp(i, e); } // 如果新加入的元素成为了堆顶,则原先的leader就无效了。 if (queue[0] == e) { leader = null; // 由于原先leader已经无效被设置为null了,这里随便唤醒一个线程(未必是原先的leader)来取走堆顶任务。 available.signal(); } } finally { lock.unlock(); } return true; }
siftup方法:
private void siftUp(int k, RunnableScheduledFuture<?> key) { // 找到父节点的索引 while (k > 0) { // 获取父节点 int parent = (k - 1) >>> 1; RunnableScheduledFuture<?> e = queue[parent]; // 如果key节点的执行时间大于父节点的执行时间,不需要再排序了 if (key.compareTo(e) >= 0) break; // 如果key.compareTo(e) < 0, // 说明key节点的执行时间小于父节点的执行时间,需要把父节点移到后面 queue[k] = e; setIndex(e, k); // 设置索引为k k = parent; } // key设置为排序后的位置中 queue[k] = key; setIndex(key, k); }
任务执行:
public void run() { // 是否周期性,就是判断period是否为0。 boolean periodic = isPeriodic(); // 检查任务是否可以被执行。 if (!canRunInCurrentRunState(periodic)) cancel(false); // 如果非周期性任务直接调用run运行即可。 else if (!periodic) ScheduledFutureTask.super.run(); // 如果成功runAndRest,则设置下次运行时间并调用reExecutePeriodic。 else if (ScheduledFutureTask.super.runAndReset()) { setNextRunTime(); // 需要重新将任务(outerTask)放到工作队列中。此方法源码会在后文介绍ScheduledThreadPoolExecutor本身API时提及。 reExecutePeriodic(outerTask); } }
public void run() { private void setNextRunTime() { long p = period; /* * fixed-rate模式,时间设置为上一次时间+p。 * 提一句,这里的时间其实只是可以被执行的最小时间,不代表到点就要执行。 * 如果这次任务还没执行完是肯定不会执行下一次的。 */ if (p > 0) time += p; /** * fixed-delay模式,计算下一次任务可以被执行的时间。 * 简单来说差不多就是当前时间+delay值。因为代码走到这里任务就已经结束了,now()可以认为就是任务结束时间。 */ else time = triggerTime(-p); } long triggerTime(long delay) { /* * 如果delay < Long.Max_VALUE/2,则下次执行时间为当前时间+delay。 * * 否则为了避免队列中出现由于溢出导致的排序紊乱,需要调用overflowFree来修正一下delay(如果有必要的话)。 */ return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay)); } /** * 主要就是有这么一种情况: * 某个任务的delay为负数,说明当前可以执行(其实早该执行了)。 * 工作队列中维护任务顺序是基于compareTo的,在compareTo中比较两个任务的顺序会用time相减,负数则说明优先级高。 * * 那么就有可能出现一个delay为正数,减去另一个为负数的delay,结果上溢为负数,则会导致compareTo产生错误的结果。 * * 为了特殊处理这种情况,首先判断一下队首的delay是不是负数,如果是正数不用管了,怎么减都不会溢出。 * 否则可以拿当前delay减去队首的delay来比较看,如果不出现上溢,则整个队列都ok,排序不会乱。 * 不然就把当前delay值给调整为Long.MAX_VALUE + 队首delay。 */ private long overflowFree(long delay) { Delayed head = (Delayed) super.getQueue().peek(); if (head != null) { long headDelay = head.getDelay(NANOSECONDS); if (headDelay < 0 && (delay - headDelay < 0)) delay = Long.MAX_VALUE + headDelay; } return delay; }
循环的根据key节点与它的父节点来判断,如果key节点的执行时间小于父节点,则将两个节点交换,使执行时间靠前的节点排列在队列的前面。
可以理解为一个树形的结构,最小点堆的结构;父节点一定小于子节点;
DelayQueue内部封装了一个PriorityQueue,它会根据time的先后时间排序(time小的排在前面),若time相同则根据sequenceNumber排序( sequenceNumber小的排在前面);