定时任务实现原理详解(下)

简介: 在很多业务的系统中,我们常常需要定时的执行一些任务,例如定时发短信、定时变更数据、定时发起促销活动等等。 在上篇文章中,我们简单的介绍了定时任务的使用方式,不同的架构对应的解决方案也有所不同,总结起来主要分单机和分布式两大类,本文会重点分析下单机的定时任务实现原理以及优缺点,分布式框架的实现原理会在后续文章中进行分析。

简单的使用示例:

public static void main(String[] args) {
    ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(3);
    //启动1秒之后,每隔1秒执行一次
    executor.scheduleAtFixedRate((new Runnable() {
        @Override
        public void run() {
            System.out.println("test3");
        }
    }),1,1, TimeUnit.SECONDS);
    //启动1秒之后,每隔3秒执行一次
    executor.scheduleAtFixedRate((new Runnable() {
        @Override
        public void run() {
            System.out.println("test4");
        }
    }),1,3, TimeUnit.SECONDS);
}

同样的,我们首先打开源码,看看里面到底做了啥

  • 进入scheduleAtFixedRate()方法

首先是校验基本参数,然后将任务作为封装到ScheduledFutureTask线程中,ScheduledFutureTask继承自RunnableScheduledFuture,并作为参数调用delayedExecute()方法进行预处理

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(period));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}
  • 继续看delayedExecute()方法

可以很清晰的看到,当线程池没有关闭的时候,会通过super.getQueue().add(task)操作,将任务加入到队列,同时调用ensurePrestart()方法做预处理

private void delayedExecute(RunnableScheduledFuture<?> task) {
    if (isShutdown())
        reject(task);
    else {
        super.getQueue().add(task);
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
   //预处理
            ensurePrestart();
    }
}

其中super.getQueue()得到的是一个自定义的new DelayedWorkQueue()阻塞队列,数据存储方面也是一个最小堆结构的队列,这一点在初始化new ScheduledThreadPoolExecutor()的时候,可以看出!

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

打开源码可以看到,DelayedWorkQueue其实是ScheduledThreadPoolExecutor中的一个静态内部类,在添加的时候,会将任务加入到RunnableScheduledFuture数组中,同时线程池中的Woker线程会通过调用任务队列中的take()方法获取对应的ScheduledFutureTask线程任务,接着执行对应的任务线程

static class DelayedWorkQueue extends AbstractQueue<Runnable>
        implements BlockingQueue<Runnable> {
    private static final int INITIAL_CAPACITY = 16;
    private RunnableScheduledFuture<?>[] queue =
        new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
    private final ReentrantLock lock = new ReentrantLock();
    private int size = 0;   
    //....
    public boolean add(Runnable e) {
        return offer(e);
    }
    public boolean offer(Runnable x) {
        if (x == null)
            throw new NullPointerException();
        RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int i = size;
            if (i >= queue.length)
                grow();
            size = i + 1;
            if (i == 0) {
                queue[0] = e;
                setIndex(e, 0);
            } else {
                siftUp(i, e);
            }
            if (queue[0] == e) {
                leader = null;
                available.signal();
            }
        } finally {
            lock.unlock();
        }
        return true;
    }
    public RunnableScheduledFuture<?> take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                RunnableScheduledFuture<?> first = queue[0];
                if (first == null)
                    available.await();
                else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)
                        return finishPoll(first);
                    first = null; // don't retain ref while waiting
                    if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && queue[0] != null)
                available.signal();
            lock.unlock();
        }
    }
}
  • 回到我们最开始说到的ScheduledFutureTask任务线程类,最终执行任务的其实就是它

ScheduledFutureTask任务线程,才是真正执行任务的线程类,只是绕了一圈,做了很多包装,run()方法就是真正执行定时任务的方法。

private class ScheduledFutureTask<V>
            extends FutureTask<V> implements RunnableScheduledFuture<V> {
    /** Sequence number to break ties FIFO */
    private final long sequenceNumber;
    /** The time the task is enabled to execute in nanoTime units */
    private long time;
    /**
     * Period in nanoseconds for repeating tasks.  A positive
     * value indicates fixed-rate execution.  A negative value
     * indicates fixed-delay execution.  A value of 0 indicates a
     * non-repeating task.
     */
    private final long period;
    /** The actual task to be re-enqueued by reExecutePeriodic */
    RunnableScheduledFuture<V> outerTask = this;
    /**
     * Overrides FutureTask version so as to reset/requeue if periodic.
     */
    public void run() {
        boolean periodic = isPeriodic();
        if (!canRunInCurrentRunState(periodic))
            cancel(false);
        else if (!periodic)
            ScheduledFutureTask.super.run();
        else if (ScheduledFutureTask.super.runAndReset()) {
            setNextRunTime();
            reExecutePeriodic(outerTask);
        }
    }
 //...
}

3.3、小结

ScheduledExecutorService 相比 Timer 定时器,完美的解决上面说到的 Timer 存在的两个缺点!

在单体应用里面,使用 ScheduledExecutorService 可以解决大部分需要使用定时任务的业务需求!

但是这是否意味着它是最佳的解决方案呢?

我们发现线程池中 ScheduledExecutorService 的排序容器跟 Timer 一样,都是采用最小堆的存储结构,新任务加入排序效率是O(log(n)),执行取任务是O(1)

这里的写入排序效率其实是有空间可提升的,有可能优化到O(1)的时间复杂度,也就是我们下面要介绍的时间轮实现

四、时间轮实现

所谓时间轮(RingBuffer)实现,从数据结构上看,简单的说就是循环队列,从名称上看可能感觉很抽象。

它其实就是一个环形的数组,如图所示,假设我们创建了一个长度为 8 的时间轮。

78.jpg

插入、取值流程:

  • 1.当我们需要新建一个 1s 延时任务的时候,则只需要将它放到下标为 1 的那个槽中,2、3、...、7也同样如此。
  • 2.而如果是新建一个 10s 的延时任务,则需要将它放到下标为 2 的槽中,但同时需要记录它所对应的圈数,也就是 1 圈,不然就和 2 秒的延时消息重复了
  • 3.当创建一个 21s 的延时任务时,它所在的位置就在下标为 5 的槽中,同样的需要为他加上圈数为 2,依次类推...

因此,总结起来有两个核心的变量:

  • 数组下标:表示某个任务延迟时间,从数据操作上对执行时间点进行取余
  • 圈数:表示需要循环圈数

通过这张图可以更直观的理解!

79.jpg

当我们需要取出延时任务时,只需要每秒往下移动这个指针,然后取出该位置的所有任务即可,取任务的时间消耗为O(1)

当我们需要插入任务式,也只需要计算出对应的下表和圈数,即可将任务插入到对应的数组位置中,插入任务的时间消耗为O(1)

如果时间轮的槽比较少,会导致某一个槽上的任务非常多,那么效率也比较低,这就和 HashMap 的 hash 冲突是一样的,因此在设计槽的时候不能太大也不能太小。

4.1、代码实现

  • 首先创建一个RingBufferWheel时间轮定时任务管理器
public class RingBufferWheel {
    private Logger logger = LoggerFactory.getLogger(RingBufferWheel.class);
    /**
     * default ring buffer size
     */
    private static final int STATIC_RING_SIZE = 64;
    private Object[] ringBuffer;
    private int bufferSize;
    /**
     * business thread pool
     */
    private ExecutorService executorService;
    private volatile int size = 0;
    /***
     * task stop sign
     */
    private volatile boolean stop = false;
    /**
     * task start sign
     */
    private volatile AtomicBoolean start = new AtomicBoolean(false);
    /**
     * total tick times
     */
    private AtomicInteger tick = new AtomicInteger();
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();
    private AtomicInteger taskId = new AtomicInteger();
    private Map<Integer, Task> taskMap = new ConcurrentHashMap<>(16);
    /**
     * Create a new delay task ring buffer by default size
     *
     * @param executorService the business thread pool
     */
    public RingBufferWheel(ExecutorService executorService) {
        this.executorService = executorService;
        this.bufferSize = STATIC_RING_SIZE;
        this.ringBuffer = new Object[bufferSize];
    }
    /**
     * Create a new delay task ring buffer by custom buffer size
     *
     * @param executorService the business thread pool
     * @param bufferSize      custom buffer size
     */
    public RingBufferWheel(ExecutorService executorService, int bufferSize) {
        this(executorService);
        if (!powerOf2(bufferSize)) {
            throw new RuntimeException("bufferSize=[" + bufferSize + "] must be a power of 2");
        }
        this.bufferSize = bufferSize;
        this.ringBuffer = new Object[bufferSize];
    }
    /**
     * Add a task into the ring buffer(thread safe)
     *
     * @param task business task extends {@link Task}
     */
    public int addTask(Task task) {
        int key = task.getKey();
        int id;
        try {
            lock.lock();
            int index = mod(key, bufferSize);
            task.setIndex(index);
            Set<Task> tasks = get(index);
            int cycleNum = cycleNum(key, bufferSize);
            if (tasks != null) {
                task.setCycleNum(cycleNum);
                tasks.add(task);
            } else {
                task.setIndex(index);
                task.setCycleNum(cycleNum);
                Set<Task> sets = new HashSet<>();
                sets.add(task);
                put(key, sets);
            }
            id = taskId.incrementAndGet();
            task.setTaskId(id);
            taskMap.put(id, task);
            size++;
        } finally {
            lock.unlock();
        }
        start();
        return id;
    }
    /**
     * Cancel task by taskId
     * @param id unique id through {@link #addTask(Task)}
     * @return
     */
    public boolean cancel(int id) {
        boolean flag = false;
        Set<Task> tempTask = new HashSet<>();
        try {
            lock.lock();
            Task task = taskMap.get(id);
            if (task == null) {
                return false;
            }
            Set<Task> tasks = get(task.getIndex());
            for (Task tk : tasks) {
                if (tk.getKey() == task.getKey() && tk.getCycleNum() == task.getCycleNum()) {
                    size--;
                    flag = true;
                    taskMap.remove(id);
                } else {
                    tempTask.add(tk);
                }
            }
            //update origin data
            ringBuffer[task.getIndex()] = tempTask;
        } finally {
            lock.unlock();
        }
        return flag;
    }
    /**
     * Thread safe
     *
     * @return the size of ring buffer
     */
    public int taskSize() {
        return size;
    }
    /**
     * Same with method {@link #taskSize}
     * @return
     */
    public int taskMapSize(){
        return taskMap.size();
    }
    /**
     * Start background thread to consumer wheel timer, it will always run until you call method {@link #stop}
     */
    public void start() {
        if (!start.get()) {
            if (start.compareAndSet(start.get(), true)) {
                logger.info("Delay task is starting");
                Thread job = new Thread(new TriggerJob());
                job.setName("consumer RingBuffer thread");
                job.start();
                start.set(true);
            }
        }
    }
    /**
     * Stop consumer ring buffer thread
     *
     * @param force True will force close consumer thread and discard all pending tasks
     *              otherwise the consumer thread waits for all tasks to completes before closing.
     */
    public void stop(boolean force) {
        if (force) {
            logger.info("Delay task is forced stop");
            stop = true;
            executorService.shutdownNow();
        } else {
            logger.info("Delay task is stopping");
            if (taskSize() > 0) {
                try {
                    lock.lock();
                    condition.await();
                    stop = true;
                } catch (InterruptedException e) {
                    logger.error("InterruptedException", e);
                } finally {
                    lock.unlock();
                }
            }
            executorService.shutdown();
        }
    }
    private Set<Task> get(int index) {
        return (Set<Task>) ringBuffer[index];
    }
    private void put(int key, Set<Task> tasks) {
        int index = mod(key, bufferSize);
        ringBuffer[index] = tasks;
    }
    /**
     * Remove and get task list.
     * @param key
     * @return task list
     */
    private Set<Task> remove(int key) {
        Set<Task> tempTask = new HashSet<>();
        Set<Task> result = new HashSet<>();
        Set<Task> tasks = (Set<Task>) ringBuffer[key];
        if (tasks == null) {
            return result;
        }
        for (Task task : tasks) {
            if (task.getCycleNum() == 0) {
                result.add(task);
                size2Notify();
            } else {
                // decrement 1 cycle number and update origin data
                task.setCycleNum(task.getCycleNum() - 1);
                tempTask.add(task);
            }
            // remove task, and free the memory.
            taskMap.remove(task.getTaskId());
        }
        //update origin data
        ringBuffer[key] = tempTask;
        return result;
    }
    private void size2Notify() {
        try {
            lock.lock();
            size--;
            if (size == 0) {
                condition.signal();
            }
        } finally {
            lock.unlock();
        }
    }
    private boolean powerOf2(int target) {
        if (target < 0) {
            return false;
        }
        int value = target & (target - 1);
        if (value != 0) {
            return false;
        }
        return true;
    }
    private int mod(int target, int mod) {
        // equals target % mod
        target = target + tick.get();
        return target & (mod - 1);
    }
    private int cycleNum(int target, int mod) {
        //equals target/mod
        return target >> Integer.bitCount(mod - 1);
    }
    /**
     * An abstract class used to implement business.
     */
    public abstract static class Task extends Thread {
        private int index;
        private int cycleNum;
        private int key;
        /**
         * The unique ID of the task
         */
        private int taskId ;
        @Override
        public void run() {
        }
        public int getKey() {
            return key;
        }
        /**
         *
         * @param key Delay time(seconds)
         */
        public void setKey(int key) {
            this.key = key;
        }
        public int getCycleNum() {
            return cycleNum;
        }
        private void setCycleNum(int cycleNum) {
            this.cycleNum = cycleNum;
        }
        public int getIndex() {
            return index;
        }
        private void setIndex(int index) {
            this.index = index;
        }
        public int getTaskId() {
            return taskId;
        }
        public void setTaskId(int taskId) {
            this.taskId = taskId;
        }
    }
    private class TriggerJob implements Runnable {
        @Override
        public void run() {
            int index = 0;
            while (!stop) {
                try {
                    Set<Task> tasks = remove(index);
                    for (Task task : tasks) {
                        executorService.submit(task);
                    }
                    if (++index > bufferSize - 1) {
                        index = 0;
                    }
                    //Total tick number of records
                    tick.incrementAndGet();
                    TimeUnit.SECONDS.sleep(1);
                } catch (Exception e) {
                    logger.error("Exception", e);
                }
            }
            logger.info("Delay task has stopped");
        }
    }
}
  • 接着,编写一个客户端,测试客户端
public static void main(String[] args) {
    RingBufferWheel ringBufferWheel = new RingBufferWheel( Executors.newFixedThreadPool(2));
    for (int i = 0; i < 3; i++) {
        RingBufferWheel.Task job = new Job();
        job.setKey(i);
        ringBufferWheel.addTask(job);
    }
}
public static class Job extends RingBufferWheel.Task{
    @Override
    public void run() {
        System.out.println("test5");
    }
}

运行结果:

test5
test5
test5

如果要周期性执行任务,可以在任务执行完成之后,再重新加入到时间轮中。

详细源码分析地址:[https://crossoverjie.top/2019/09/27/algorithm/time%20wheel/]


4.2、应用

时间轮的应用还是非常广的,例如在 Disruptor 项目中就运用到了 RingBuffer,还有Netty中的HashedWheelTimer工具原理也差不多等等,有兴趣的同学,可以阅读一下官方对应的源码!

五、小结

本文主要围绕单体应用中的定时任务原理进行分析,可能也有理解不对的地方,欢迎批评指出!

相关文章
|
1月前
|
Java 调度 Spring
SpringBoot实现多线程定时任务动态定时任务配置文件配置定时任务
SpringBoot实现多线程定时任务动态定时任务配置文件配置定时任务
322 0
|
1月前
|
Java Spring
定时任务里面的任务多线程操作
该内容是关于Spring Boot中配置异步任务和定时任务的代码示例。首先通过`@Configuration`和`@EnableAsync`开启异步支持,然后定义线程池,如使用`ThreadPoolExecutor`并设置核心线程数、最大线程数等参数。接着,在需要异步执行的方法上添加`@Async`注解。此外,通过`@EnableScheduling`开启定时任务,并使用`@Scheduled`定义具体任务和执行周期。若需指定多个线程池,可以创建不同的`Executor` bean,并在`@Async`中指定线程池名称。
28 2
|
9月前
|
Java Spring
SpringBoot核心特性——异步任务和定时任务那些事
前言 通常情况下,SpringMVC接收到请求后会将请求具体分发给单个线程进行处理。如果请求处理中涉及到比较耗时的操作,为了能更快地将响应返回给用户,那么就需要将耗时的业务操作交由别的线程进行异步处理,而SpringBoot已经为我们提供了这样的实现。
489 2
SpringBoot核心特性——异步任务和定时任务那些事
|
8月前
|
存储 Java 关系型数据库
ShedLock的4种使用方式(分布式定时任务锁)
ShedLock的4种使用方式(分布式定时任务锁)
120 0
|
10月前
|
Java 调度
架构系列——定时任务中的Timer类使用简析
架构系列——定时任务中的Timer类使用简析
|
10月前
|
NoSQL Redis
基于Redis在定时任务里判断其他定时任务是否已经正常执行完的方案
基于Redis在定时任务里判断其他定时任务是否已经正常执行完的方案
73 0
|
10月前
|
Java
简单实现一个定时任务
简单实现一个定时任务
102 0
|
负载均衡 Java 数据挖掘
定时任务实现的几种方式
定时任务实现的几种方式
248 1
|
11月前
|
关系型数据库 MySQL 调度
定时任务优化
简单描述一下定时任务的优化
108 0
|
11月前
|
Java 调度 开发者
SpringBoot中的定时任务的同步与异步你确定真的知道?
定时任务调度功能在我们的开发中是非常常见的,随便举几个例子:定时清除一些过期的数据,定时发送邮件等等,实现定时任务调度的方式也十分多样,本篇文章主要学习各种实现定时任务调度方式的优缺点,以便为日后选择的时候提供一定的参考。