public static void main(String[] args) { ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(3); //启动1秒之后,每隔1秒执行一次 executor.scheduleAtFixedRate((new Runnable() { @Override public void run() { System.out.println("test3"); } }),1,1, TimeUnit.SECONDS); //启动1秒之后,每隔3秒执行一次 executor.scheduleAtFixedRate((new Runnable() { @Override public void run() { System.out.println("test4"); } }),1,3, TimeUnit.SECONDS); }
- 进入
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; }
- 继续看
private void delayedExecute(RunnableScheduledFuture<?> task) { if (isShutdown()) reject(task); else { super.getQueue().add(task); if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else //预处理 ensurePrestart(); } }
得到的是一个自定义的new DelayedWorkQueue()
阻塞队列,数据存储方面也是一个最小堆结构的队列,这一点在初始化new ScheduledThreadPoolExecutor()
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> { private static final int INITIAL_CAPACITY = 16; private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY]; private final ReentrantLock lock = new ReentrantLock(); private int size = 0; //.... public boolean add(Runnable e) { return offer(e); } public boolean offer(Runnable x) { if (x == null) throw new NullPointerException(); RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x; final ReentrantLock lock = this.lock; lock.lock(); try { int i = size; if (i >= queue.length) grow(); size = i + 1; if (i == 0) { queue[0] = e; setIndex(e, 0); } else { siftUp(i, e); } if (queue[0] == e) { leader = null; available.signal(); } } finally { lock.unlock(); } return true; } public RunnableScheduledFuture<?> take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { RunnableScheduledFuture<?> first = queue[0]; if (first == null) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return finishPoll(first); first = null; // don't retain ref while waiting if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } } }
- 回到我们最开始说到的
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> { /** Sequence number to break ties FIFO */ private final long sequenceNumber; /** The time the task is enabled to execute in nanoTime units */ private long time; /** * Period in nanoseconds for repeating tasks. A positive * value indicates fixed-rate execution. A negative value * indicates fixed-delay execution. A value of 0 indicates a * non-repeating task. */ private final long period; /** The actual task to be re-enqueued by reExecutePeriodic */ RunnableScheduledFuture<V> outerTask = this; /** * Overrides FutureTask version so as to reset/requeue if periodic. */ public void run() { boolean periodic = isPeriodic(); if (!canRunInCurrentRunState(periodic)) cancel(false); else if (!periodic) ScheduledFutureTask.super.run(); else if (ScheduledFutureTask.super.runAndReset()) { setNextRunTime(); reExecutePeriodic(outerTask); } } //... }
ScheduledExecutorService 相比 Timer 定时器,完美的解决上面说到的 Timer 存在的两个缺点!
在单体应用里面,使用 ScheduledExecutorService 可以解决大部分需要使用定时任务的业务需求!
我们发现线程池中 ScheduledExecutorService 的排序容器跟 Timer 一样,都是采用最小堆的存储结构,新任务加入排序效率是O(log(n))
它其实就是一个环形的数组,如图所示,假设我们创建了一个长度为 8 的时间轮。
- 1.当我们需要新建一个 1s 延时任务的时候,则只需要将它放到下标为 1 的那个槽中,2、3、...、7也同样如此。
- 2.而如果是新建一个 10s 的延时任务,则需要将它放到下标为 2 的槽中,但同时需要记录它所对应的圈数,也就是 1 圈,不然就和 2 秒的延时消息重复了
- 3.当创建一个 21s 的延时任务时,它所在的位置就在下标为 5 的槽中,同样的需要为他加上圈数为 2,依次类推...
- 数组下标:表示某个任务延迟时间,从数据操作上对执行时间点进行取余
- 圈数:表示需要循环圈数
如果时间轮的槽比较少,会导致某一个槽上的任务非常多,那么效率也比较低,这就和 HashMap 的 hash 冲突是一样的,因此在设计槽的时候不能太大也不能太小。
- 首先创建一个
public class RingBufferWheel { private Logger logger = LoggerFactory.getLogger(RingBufferWheel.class); /** * default ring buffer size */ private static final int STATIC_RING_SIZE = 64; private Object[] ringBuffer; private int bufferSize; /** * business thread pool */ private ExecutorService executorService; private volatile int size = 0; /*** * task stop sign */ private volatile boolean stop = false; /** * task start sign */ private volatile AtomicBoolean start = new AtomicBoolean(false); /** * total tick times */ private AtomicInteger tick = new AtomicInteger(); private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); private AtomicInteger taskId = new AtomicInteger(); private Map<Integer, Task> taskMap = new ConcurrentHashMap<>(16); /** * Create a new delay task ring buffer by default size * * @param executorService the business thread pool */ public RingBufferWheel(ExecutorService executorService) { this.executorService = executorService; this.bufferSize = STATIC_RING_SIZE; this.ringBuffer = new Object[bufferSize]; } /** * Create a new delay task ring buffer by custom buffer size * * @param executorService the business thread pool * @param bufferSize custom buffer size */ public RingBufferWheel(ExecutorService executorService, int bufferSize) { this(executorService); if (!powerOf2(bufferSize)) { throw new RuntimeException("bufferSize=[" + bufferSize + "] must be a power of 2"); } this.bufferSize = bufferSize; this.ringBuffer = new Object[bufferSize]; } /** * Add a task into the ring buffer(thread safe) * * @param task business task extends {@link Task} */ public int addTask(Task task) { int key = task.getKey(); int id; try { lock.lock(); int index = mod(key, bufferSize); task.setIndex(index); Set<Task> tasks = get(index); int cycleNum = cycleNum(key, bufferSize); if (tasks != null) { task.setCycleNum(cycleNum); tasks.add(task); } else { task.setIndex(index); task.setCycleNum(cycleNum); Set<Task> sets = new HashSet<>(); sets.add(task); put(key, sets); } id = taskId.incrementAndGet(); task.setTaskId(id); taskMap.put(id, task); size++; } finally { lock.unlock(); } start(); return id; } /** * Cancel task by taskId * @param id unique id through {@link #addTask(Task)} * @return */ public boolean cancel(int id) { boolean flag = false; Set<Task> tempTask = new HashSet<>(); try { lock.lock(); Task task = taskMap.get(id); if (task == null) { return false; } Set<Task> tasks = get(task.getIndex()); for (Task tk : tasks) { if (tk.getKey() == task.getKey() && tk.getCycleNum() == task.getCycleNum()) { size--; flag = true; taskMap.remove(id); } else { tempTask.add(tk); } } //update origin data ringBuffer[task.getIndex()] = tempTask; } finally { lock.unlock(); } return flag; } /** * Thread safe * * @return the size of ring buffer */ public int taskSize() { return size; } /** * Same with method {@link #taskSize} * @return */ public int taskMapSize(){ return taskMap.size(); } /** * Start background thread to consumer wheel timer, it will always run until you call method {@link #stop} */ public void start() { if (!start.get()) { if (start.compareAndSet(start.get(), true)) { logger.info("Delay task is starting"); Thread job = new Thread(new TriggerJob()); job.setName("consumer RingBuffer thread"); job.start(); start.set(true); } } } /** * Stop consumer ring buffer thread * * @param force True will force close consumer thread and discard all pending tasks * otherwise the consumer thread waits for all tasks to completes before closing. */ public void stop(boolean force) { if (force) { logger.info("Delay task is forced stop"); stop = true; executorService.shutdownNow(); } else { logger.info("Delay task is stopping"); if (taskSize() > 0) { try { lock.lock(); condition.await(); stop = true; } catch (InterruptedException e) { logger.error("InterruptedException", e); } finally { lock.unlock(); } } executorService.shutdown(); } } private Set<Task> get(int index) { return (Set<Task>) ringBuffer[index]; } private void put(int key, Set<Task> tasks) { int index = mod(key, bufferSize); ringBuffer[index] = tasks; } /** * Remove and get task list. * @param key * @return task list */ private Set<Task> remove(int key) { Set<Task> tempTask = new HashSet<>(); Set<Task> result = new HashSet<>(); Set<Task> tasks = (Set<Task>) ringBuffer[key]; if (tasks == null) { return result; } for (Task task : tasks) { if (task.getCycleNum() == 0) { result.add(task); size2Notify(); } else { // decrement 1 cycle number and update origin data task.setCycleNum(task.getCycleNum() - 1); tempTask.add(task); } // remove task, and free the memory. taskMap.remove(task.getTaskId()); } //update origin data ringBuffer[key] = tempTask; return result; } private void size2Notify() { try { lock.lock(); size--; if (size == 0) { condition.signal(); } } finally { lock.unlock(); } } private boolean powerOf2(int target) { if (target < 0) { return false; } int value = target & (target - 1); if (value != 0) { return false; } return true; } private int mod(int target, int mod) { // equals target % mod target = target + tick.get(); return target & (mod - 1); } private int cycleNum(int target, int mod) { //equals target/mod return target >> Integer.bitCount(mod - 1); } /** * An abstract class used to implement business. */ public abstract static class Task extends Thread { private int index; private int cycleNum; private int key; /** * The unique ID of the task */ private int taskId ; @Override public void run() { } public int getKey() { return key; } /** * * @param key Delay time(seconds) */ public void setKey(int key) { this.key = key; } public int getCycleNum() { return cycleNum; } private void setCycleNum(int cycleNum) { this.cycleNum = cycleNum; } public int getIndex() { return index; } private void setIndex(int index) { this.index = index; } public int getTaskId() { return taskId; } public void setTaskId(int taskId) { this.taskId = taskId; } } private class TriggerJob implements Runnable { @Override public void run() { int index = 0; while (!stop) { try { Set<Task> tasks = remove(index); for (Task task : tasks) { executorService.submit(task); } if (++index > bufferSize - 1) { index = 0; } //Total tick number of records tick.incrementAndGet(); TimeUnit.SECONDS.sleep(1); } catch (Exception e) { logger.error("Exception", e); } } logger.info("Delay task has stopped"); } } }
- 接着,编写一个客户端,测试客户端
public static void main(String[] args) { RingBufferWheel ringBufferWheel = new RingBufferWheel( Executors.newFixedThreadPool(2)); for (int i = 0; i < 3; i++) { RingBufferWheel.Task job = new Job(); job.setKey(i); ringBufferWheel.addTask(job); } } public static class Job extends RingBufferWheel.Task{ @Override public void run() { System.out.println("test5"); } }
test5 test5 test5
时间轮的应用还是非常广的,例如在 Disruptor 项目中就运用到了 RingBuffer,还有Netty