cheduleExecutorService提交死循环任务

简介: 本文探讨了向调度线程池提交死循环任务时的运行机制。通过分析`scheduleAtFixedRate`方法的调用链路,详细解析了任务从包装为`ScheduledFutureTask`对象到最终执行的过程。核心步骤包括:任务被添加至延时队列、线程池启动Worker执行任务、周期性任务的时间计算与重新提交。若提交的是死循环任务,它将永久占用一个线程,可能导致其他任务无法调度。文章还深入剖析了`getTask`方法中任务获取逻辑及延时计算原理,帮助理解线程池内部工作机制。

       首先提出一个问题,如果向一个调度线程池提交一个死循环任务会发生什么?为了内容的完整性,本文会提到一些在上面列出的文章中已经涉及到的内容。
       比如我们运行下面的代码:

       loopRunner里面只有一个死循环什么也不做,当然这是极端情况,更为一般的情况为在for(;;)里面做一些某种驱动类型的工作,比如Netty的EventLoop一样,那样的循环更有意义,但是本文只是为了学习当向一个调度线程池提交了一个死循环任务之后的运行情况。
下面我们就分析一下scheduleAtFixedRate方法的调用链路:
1、将loopRunner包装成一个ScheduledFutureTask对象,ScheduledFutureTask这个类对于调度线程池至关重要
2、再次包装变为RunnableScheduledFuture对象
3、delayedExecute方法运行,确保任务被正确处理,如果线程池已经被关闭了,那么拒绝任务的提交,否则将任务添加到一个延时队列(workQueue)中去,这是一个具有延时功能的阻塞队列,初始容量为16,每次扩容增加50%的容量,最大容量为Integer.MAX_VALUE
4、运行方法ensurePrestart,确保线程池已经开始工作了,如果线程池里面的线程数量还没有达到设定的corePoolSize,那么就添加一个新的Worker,然后让这个Worker去延时队列去获取任务来执行
5、方法addWorker执行,添加一个Worker,然后让他执行我们提交的任务,下面摘取一段addWorker的方法内容:

/**
 * 完整代码见源码,下面只是摘取了部分,去除了一些不影响阅读的部分
 */
 private boolean addWorker(Runnable firstTask, boolean core) {
   

     boolean workerStarted = false;
     boolean workerAdded = false;
     Worker w = null;
     try {
   
         w = new Worker(firstTask);
         final Thread t = w.thread;
         if (t != null) {
   
             final ReentrantLock mainLock = this.mainLock;
             mainLock.lock();
             try {
   
                 int rs = runStateOf(ctl.get());
                 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
   
                     if (t.isAlive()) // precheck that t is startable
                         throw new IllegalThreadStateException();
                     workers.add(w); //添加一个新的worker
                     int s = workers.size();
                     if (s > largestPoolSize)
                         largestPoolSize = s; 
                     workerAdded = true;
                 }
             } finally {
   
                 mainLock.unlock();
             }
             if (workerAdded) {
    //如果添加新的Worker成功,那么就启动它来执行我们提交的任务
                 t.start();
                 workerStarted = true;
             }
         }
     } 
     return workerStarted;
 }
AI 代码解读

6、第五步中最为重要的一句话就是t.start(),这句话的执行会发生什么?首先看这个t是什么东西:

Worker(Runnable firstTask) {
   
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}
AI 代码解读

       而this就是Worker自身,而Worker是实现了Runnable的,也就是说,t.start()这句话会执行worker自身的run方法

private final class Worker extends AbstractQueuedSynchronizer implements Runnable
AI 代码解读

7、我们已经知道现在会执行Worker的run方法,下面是run方法的内容:

public void run() {
   
    runWorker(this);
}
final void runWorker(Worker w) {
   
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
   
        while (task != null || (task = getTask()) != null) {
   
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
   
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
   
                    task.run();
                } catch (RuntimeException x) {
   
                    thrown = x; throw x;
                } catch (Error x) {
   
                    thrown = x; throw x;
                } catch (Throwable x) {
   
                    thrown = x; throw new Error(x);
                } finally {
   
                    afterExecute(task, thrown);
                }
            } finally {
   
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
   
        processWorkerExit(w, completedAbruptly);
    }
}
AI 代码解读

       首先从Worker中获取其负责的task,如果task为空,那么就去延时队列获取任务,如果没有获取到任务那么线程就可以休息了,如果获取到,那么继续执行下面的内容。主要的就是一句:task.run(),那这句话会发生什么呢?

8、想要知道task.run()之后会发生什么,就需要知道task是个什么东西,第二步的时候说过,也就是我们的任务,只是被包装成了一个RunnableScheduledFuture对象,那现在就去看RunnableScheduledFuture这个方法里面的run会发生什么,下面展示了其run方法的具体细节:

public void run() {
   
    boolean periodic = isPeriodic();
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    else if (!periodic)
        ScheduledFutureTask.super.run();
    else if (ScheduledFutureTask.super.runAndReset()) {
   
        setNextRunTime();
        reExecutePeriodic(outerTask);
    }
}
AI 代码解读

       如果这不是一个周期性的任务,那么就执行super的run方法,否则执行runAndReset方法,介于本文是问题导向的文章,所以在此不对super的run方法和runAndReset方法做分析,只要知道这就是执行我们实际提交的任务就好了。也就是说,走到这一步,我们的任务开始运行起来了,也就是我们的那个loopRunner开始无限循环了,下面的代码将永远得不到执行。所以,到这一步就可以解决问题了,向一个调度线程池提交一个死循环的任务,那么这个任务会霸占一个线程一直不会释放,如果很不幸线程池里面只允许有一个线程的话,那么其他提交的任务都将得不到调度执行。

9、为了走通整个流程,我们假设我们提交的不是一个死循环任务,那么提交的任务总是会被执行完的,线程总是会被释放的,那么就会执行setNextRunTime这个方法,下面是这个方法的细节

private void setNextRunTime() {
   
    long p = period;
    if (p > 0)
        time += p;
    else
        time = triggerTime(-p);
}
AI 代码解读

       p > 0代表的是scheduleAtFixedRate,p < 0代表的是scheduleWithFixedDelay,两者的区别在于前者总是按照设定的轨迹来设定下次应该调度的时间,而后者总是在任务执行完成之后再根据周期设定下一次应该执行的时间。我们只分析前者。对于第一次提交的任务,time等于当前时间 + 首次延时执行的时间,对于delay等于0的情况下,首次提交任务的time就是当前时间,然后 + p代表的是下一次应该被调度的时间。

10、我们发现,每个任务都是在执行完一次之后再设定下次执行任务的时间的,这也特别关键。设定好下次调度的时间,那么就要开始去准备执行它吧,也就是reExecutePeriodic方法会执行,下面是reExecutePeriodic方法的内容:

void reExecutePeriodic(RunnableScheduledFuture<?> task) {
   
    if (canRunInCurrentRunState(true)) {
   
        super.getQueue().add(task);
        if (!canRunInCurrentRunState(true) && remove(task))
            task.cancel(false);
        else
            ensurePrestart();
    }
}
AI 代码解读

       这个方法只是将任务重新提交到了延时队列而已,一次完整的流程到底也就结束了,为了内容的完整性,再来分析一下一个Worker从延时队列获取任务时的情况。回到第七步,我们有一个方法没有提到,那就是getTask():

private Runnable getTask() {
   
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
   
        int c = ctl.get();
        int rs = runStateOf(c);
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
   
            decrementWorkerCount();
            return null;
        }
        int wc = workerCountOf(c);
        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
   
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
   
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
            workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
   
            timedOut = false;
        }
    }
}
AI 代码解读

       我们主要来看两个方法: poll/take,这两个方法都是从延时队列获取一个任务,下面是poll的代码,take会阻塞一直到获取到内容,而poll则不会阻塞:

public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit) throws InterruptedException {
   
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
   
        for (;;) {
   
            RunnableScheduledFuture<?> first = queue[0];
            if (first == null) {
   
                if (nanos <= 0)
                    return null;
                else
                    nanos = available.awaitNanos(nanos);
            } else {
   
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    return finishPoll(first);
                if (nanos <= 0)
                    return null;
                first = null; // don't retain ref while waiting
                if (nanos < delay || leader != null)
                    nanos = available.awaitNanos(nanos);
                else {
   
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
   
                        long timeLeft = available.awaitNanos(delay);
                        nanos -= delay - timeLeft;
                    } finally {
   
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
   
        if (leader == null && queue[0] != null)
            available.signal();
        lock.unlock();
    }
}
AI 代码解读

       poll的代码最为核心的内容就是,获取队列首部的任务,然后获取其延时时间,这个时间是我们在完成一次调度之后设置的下次调度时间,如果任务的运行时间大于我们设定的周期的话,这个延时时间就是负数的,那么就会被立即执行,否则会等到设定的时间,时间到了再返回给Worker执行。
       最后把getDelay方法的细节粘出来,这样内容就完整了,其中的time是我们设定的:

public long getDelay(TimeUnit unit) {
   
    return unit.convert(time - now(), NANOSECONDS);
}
AI 代码解读
目录
打赏
0
0
0
0
72
分享
相关文章
|
7月前
|
线程池中线程抛了异常,该如何处理?
【8月更文挑战第27天】在Java多线程编程中,线程池(ThreadPool)是一种常用的并发处理工具,它能够有效地管理线程的生命周期,提高资源利用率,并简化并发编程的复杂性。然而,当线程池中的线程在执行任务时抛出异常,如果不妥善处理,这些异常可能会导致程序出现未预料的行为,甚至崩溃。因此,了解并掌握线程池异常处理机制至关重要。
635 0
父子任务使用不当线程池死锁怎么解决?
在Java多线程编程中,线程池有助于提升性能与资源利用效率,但若父子任务共用同一池,则可能诱发死锁。本文通过一个具体案例剖析此问题:在一个固定大小为2的线程池中,父任务直接调用`outerTask`,而`outerTask`再次使用同一线程池异步调用`innerTask`。理论上,任务应迅速完成,但实际上却超时未完成。经由`jstack`输出的线程调用栈分析发现,线程陷入等待状态,形成“死锁”。原因是子任务需待父任务完成,而父任务则需等待子任务执行完毕以释放线程,从而相互阻塞。此问题在测试环境中不易显现,常在生产环境下高并发时爆发,重启或扩容仅能暂时缓解。
120 0
死循环记录
死循环记录
94 0
【JavaSE专栏83】线程插队,一个线程在另一个线程执行特定任务之前先执行
【JavaSE专栏83】线程插队,一个线程在另一个线程执行特定任务之前先执行
154 0
Java并发计算判断线程池中的线程是否全部执行完毕
Java并发计算判断线程池中的线程是否全部执行完毕
127 0
如何处理 JDK 线程池内线程执行异常
如何处理 JDK 线程池内线程执行异常
158 2
线程池内的线程如果全部忙,提交一个新的任务,会发生什么?队列全部塞满了之后,还是忙,再提交会发生什么?
线程池内的线程如果全部忙,提交一个新的任务,会发生什么?队列全部塞满了之后,还是忙,再提交会发生什么?
线程池内的线程如果全部忙,提交一个新的任务,会发生什么?队列全部塞满了之后,还是忙,再提交会发生什么?