ScheduleExecutorService

简介: 本文深入探讨了Java中`ScheduleExecutorService`的周期调度原理,特别是当任务执行时间超过设定调度周期时的行为。通过源码分析发现,任务被封装为`ScheduledFutureTask`并加入延时队列`DelayedWorkQueue`。若任务运行时间超出调度周期,新任务会立即加入队列,导致实际调度间隔等于任务运行时间。文章结合代码实例和关键方法(如`runAndReset`、`setNextRunTime`)解析了调度逻辑,最终得出结论:调度时间将受任务执行时间影响而动态调整。此分析为理解线程池机制提供了详细参考。

       这个问题曾经困扰了我很久,我们都知道,ScheduleExecutorService是一个支持周期调度的线程池,我们可以设置调度的周期period,ScheduleExecutorService会按照设定好的周期调度我们的任务,如果我们设定的调度周期小于任务运行时间,那么很好理解,比如说我们设置的调度周期为1秒,而任务实际只需要10毫秒就可以执行完成一次,那么执行完成之后放到调度队列即可,下次调度时间到了再次调度执行。那么,如果我们的任务执行时间大于我们设定的调度时间会怎么样?比如我们设定的调度周期为1秒,但是我们的任务每次需要执行2秒,这个情况是不是很奇怪呢?
       对于ScheduleExecutorService来说,你给我设定的调度周期是1秒,那么我当然1秒就会去运行一次你,但是运行1秒后发现你还在运行,那我是再次运行你还是等你运行完成再调度你运行?
当然,这都是我的主观臆断来猜测ScheduleExecutorService的原理,ScheduleExecutorService的真正原理需要去阅读源码来理解,下面带着这个问题,以解决这个问题为目标去看一下ScheduleExecutorService的源码吧。
       首先,我们使用下面的代码作为测试:

private static Runnable blockRunner = () -> {
   
    try {
   
        TimeUnit.SECONDS.sleep(2);
        System.out.println("one round:" + new Date());
    } catch (InterruptedException e) {
   
        e.printStackTrace();
    }
};
private static ScheduledExecutorService scheduledExecutorService =
    Executors.newScheduledThreadPool(2);
public static void main(String ... args) {
   
    scheduledExecutorService
        .scheduleAtFixedRate(blockRunner, 0, 100, TimeUnit.MILLISECONDS);
}

       我们设定了调度周期为100毫秒,但是blockRunner实际上需要执行2秒才能返回。关于java的线程池,已经在前面写到了。
       先来看一下scheduleAtFixedRate这个方法:

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
   
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(period));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

       我们的任务command被包装了两次,一次变成了一个ScheduledFutureTask类型的对象,然后又变成了RunnableScheduledFuture类型的对象。然后执行了一个方法delayedExecute,这个方法字面意思上看起来像是延时执行的意思,看一下它的代码:

private void delayedExecute(RunnableScheduledFuture<?> task) {
   
    if (isShutdown())
        reject(task);
    else {
   
        super.getQueue().add(task);
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
            ensurePrestart();
    }
}

       它的执行逻辑是:如果线程池被关闭了,那么拒绝提交的任务,否则,将该任务添加队列中去。这个队列就是ThreadPoolExecutor中的workQueue,而这个workQueue是在ThreadPoolExecutor的构造函数中被初始化的,也就是下面这关键的一句:

public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory,
                                   RejectedExecutionHandler handler) {
   
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory, handler);
}

       也就是说,我们的任务被添加到了一个DelayedWorkQueue队列中去了,而DelayedWorkQueue我们在Java阻塞队列详解中已经分析过,它是一个可以延迟消费的阻塞队列。而延时的时间是通过接口Delayed的getDelay方法来获得的,我们最后找到ScheduledFutureTask实现了Delayed的getDelay方法。

public long getDelay(TimeUnit unit) {
   
    return unit.convert(time - now(), NANOSECONDS);
}

       time变量是什么?原来是delay,好像和period无关啊!!分析了这么久,发现这是第一次执行任务的逻辑啊,我想知道的是第二次、第三次以后和初始的delay无关之后的周期调度的情况啊,继续找吧!
然后发现了ScheduledFutureTask的run方法,很明显这就是任务调度被执行的关键所在,看下代码:

public void run() {
   
    boolean periodic = isPeriodic();
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    else if (!periodic)
        ScheduledFutureTask.super.run();
    else if (ScheduledFutureTask.super.runAndReset()) {
   
        setNextRunTime();
        reExecutePeriodic(outerTask);
    }
}

       最为关键的地方在于:

else if (ScheduledFutureTask.super.runAndReset()) {
   
                setNextRunTime();
                reExecutePeriodic(outerTask);
            }

       首先是:runAndReset()这个方法,然后是setNextRunTime()这个方法,然后是reExecutePeriodic(outerTask)这个方法。

       第一个方法runAndReset()貌似是执行我们的提交的任务的,我们看下代码:

protected boolean runAndReset() {
   
    if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
        return false;
    boolean ran = false;
    int s = state;
    try {
   
        Callable<V> c = callable;
        if (c != null && s == NEW) {
   
            try {
   
                c.call(); // don't set result
                ran = true;
            } catch (Throwable ex) {
   
                setException(ex);
            }
        }
    } finally {
   
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
    return ran && s == NEW;
}

       关键的地方是c.call()这一句,这个c就是我们提交的任务。
       第二个方法setNextRunTime()的意思是设置下次执行的时间,下面是他的代码细节:

private void setNextRunTime() {
   
    long p = period;
    if (p > 0)
        time += p;
    else
        time = triggerTime(-p);
}

       我们只需要看p>0这个分支就可以了,其实这是两种策略。我们的示例对应了第一个分支的策略,所以很显然,time这个变量会被加p,而p则是我们设定好的period。下面我们找一下这个time是在哪里初始化的,回忆一下scheduleAtFixedRate这个方法的内,我们说我们的任务被包装了两次,而time就是在这里被初始化的:

/**
* Returns the trigger time of a delayed action.
*/
private long triggerTime(long delay, TimeUnit unit) {
   
    return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}

/**
* Returns the trigger time of a delayed action.
*/
long triggerTime(long delay) {
   
    return now() +
        ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}

       无论如何,我们知道一个任务会被运行完一次之后再次设置时间,然后线程池会获取任务来执行,而任务队列是一个延时阻塞队列,所以也就造成了周期性运行的假象。可以看下下面获取任务的take方法:

public RunnableScheduledFuture<?> take() throws InterruptedException {
   
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
   
        for (;;) {
   
            RunnableScheduledFuture<?> first = queue[0];
            if (first == null)
                available.await();
            else {
   
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    return finishPoll(first);
                first = null; // don't retain ref while waiting
                if (leader != null)
                    available.await();
                else {
   
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
   
                        available.awaitNanos(delay);
                    } finally {
   
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
   
        if (leader == null && queue[0] != null)
            available.signal();
        lock.unlock();
    }
}

       可以看到,如果delay小于等于0,那么就是说需要被立即调度,否则延时delay这样一段时间。也就是延时消费。
结论就是:一个任务会被重复添加到一个延时任务队列,所以同一时间任务队列中会有多个任务待调度,线程池会首先获取优先级高的任务执行。如果我们的任务运行时间大于设置的调度时间,那么效果就是任务运行多长时间,调度时间就会变为多久,因为添加到任务队列的任务的延时时间每次都是负数,所以会被立刻执行

       上面列出了最近写的关于java线程池ScheduleExecutorService的内容,可以作为参考,本文是对ScheduleExecutorService学习和总结的一个收尾,对java线程池技术更为深入的学习和总结将在未来适宜的时候进行。

相关文章
|
资源调度 Java
在SchedulerX中,你可以使用`schedulerx.submitTask(taskName)`方法来提交并执行单个任务
【1月更文挑战第7天】【1月更文挑战第34篇】在SchedulerX中,你可以使用`schedulerx.submitTask(taskName)`方法来提交并执行单个任务
107 1
|
2月前
|
Java 调度
ScheduledThreadPoolExecutor分析
ScheduledThreadPoolExecutor 是一种适用于延时或周期性任务调度的线程池。它继承自 ThreadPoolExecutor,具备更强大的功能,可通过设定参数实现周期性任务调度。 内部实现中,`ScheduledFutureTask` 是关键类,通过 `run` 方法实现任务的循环调度与时间管理。`DelayedWorkQueue` 提供延迟队列功能,利用堆结构确保任务按优先级执行,并通过条件变量实现精确的延迟控制。整体设计结合了循环调度与延迟机制,满足高效的任务管理需求。
|
7月前
|
存储 Linux 调度
APScheduler
【10月更文挑战第09天】
75 2
|
Java 调度
ScheduledExecutorService使用介绍
JUC包(java.util.concurrent)中提供了对定时任务的支持,即ScheduledExecutorService接口。 本文对ScheduledExecutorService的介绍,将基于Timer类使用介绍进行,因此请先阅读Timer类使用介绍文章。
1426 1
|
资源调度 分布式计算 算法
Gang Scheduling
Gang Scheduling(Coscheduling)、FIFO Scheduling、Capacity Scheduling、Fair sharing、Binpack/Spread等是云计算和分布式系统中的任务调度算法,用于在资源有限的情况下,公平、高效地分配任务和资源。下面是这些调度算法的基本介绍和如何在实际应用中使用它们的一些建议:
425 2
|
NoSQL Redis
@Scheduled的使用
@Scheduled的使用
80 0
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor
70 0
|
存储 调度
Quartz-SchedulerListener解读
Quartz-SchedulerListener解读
205 0