public static ScheduledExecutorService newSingleThreadScheduledExecutor() { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1)); } static class DelegatedScheduledExecutorService extends DelegatedExecutorService implements ScheduledExecutorService { private final ScheduledExecutorService e; DelegatedScheduledExecutorService(ScheduledExecutorService executor) { super(executor); e = executor; }
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (delay <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(-delay)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; // 延迟执行 delayedExecute(t); return t; } private void delayedExecute(RunnableScheduledFuture<?> task) { if (isShutdown()) reject(task); else { // 加入任务队列 super.getQueue().add(task); if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else // 确保执行 ensurePrestart(); } } // 如果worker数量小于corePoolSize,创建新的线程,其他情况不处理 void ensurePrestart() { int wc = workerCountOf(ctl.get()); if (wc < corePoolSize) addWorker(null, true); else if (wc == 0) addWorker(null, false); }
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit))); delayedExecute(t); return t; }
private long triggerTime(long delay, TimeUnit unit) { return triggerTime(unit.toNanos((delay < 0) ? 0 : delay)); } /** * Returns the trigger time of a delayed action. */ long triggerTime(long delay) { return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay)); }
FutureTask 定时是通过LockSupport.parkNanos(this, nanos);LockSupport.park(this);
private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } //注意这里 LockSupport.parkNanos(this, nanos); } else //注意这里 LockSupport.park(this); } }
ScheduledExecutorService single = Executors.newSingleThreadScheduledExecutor(); Runnable runnable1 = () -> { try { Thread.sleep(4000); System.out.println("11111111111111"); } catch (InterruptedException e) { e.printStackTrace(); } }; Runnable runnable2 = () -> { try { Thread.sleep(4000); System.out.println("222"); } catch (InterruptedException e) { e.printStackTrace(); } }; single.scheduleWithFixedDelay(runnable1,0,1, TimeUnit.SECONDS); single.scheduleWithFixedDelay(runnable2,0,2, TimeUnit.SECONDS);
11111111111111 222 11111111111111 222 11111111111111
actionService = Executors.newSingleThreadScheduledExecutor(); actionService.scheduleWithFixedDelay(() -> { try { Thread.currentThread().setName("robotActionService"); Integer robotId = robotQueue.poll(); if (robotId == null) { // 关闭线程池 actionService.shutdown(); } else { int aiLv = robots.get(robotId); if (actionQueueMap.containsKey(aiLv)) { ActionQueue actionQueue = actionQueueMap.get(aiLv); actionQueue.doAction(robotId); } } } catch (Exception e) { // 捕捉异常 LOG.error("",e); } }, 1, 1, TimeUnit.SECONDS);