Java并发编程之调度线程池

简介: Java并发编程之调度线程池

调度线程池:



  1. 调度线程池是线程池类ThreadPoolExecutor的子类,复用了线程池的计算框架,主要用于解决任务在一定的时间间隔后重复执行的问题。


  1. 例子


public class ScheduledThreadPoolTest {
    /**
     * 以固定的频率调度,包括任务执行的时间
     */
    public static void scheduledAtFixRate(){
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);
        //任务开始执行的延迟时间
        int initDelay = 1;
        //任务循环执行的间隔
        int period = 5;
        scheduledExecutorService.scheduleAtFixedRate(new Task(),initDelay,period,TimeUnit.SECONDS);
    }
    /**
     * 以固定的时间间隔调度,不包括任务执行的时间
     */
    public static void scheduledWithFixDelay(){
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);
        //任务开始执行的延迟时间
        int initDelay = 1;
        //任务循环执行的间隔
        int period = 5;
        scheduledExecutorService.scheduleWithFixedDelay(new Task(),initDelay,period,TimeUnit.SECONDS);
    }
    static class Task implements Runnable{
        @Override
        public void run() {
            System.out.println(String.format("开始执行任务调度,执行线程:%s",Thread.currentThread().getName()));
            try {
                //模拟任务的执行时间为5秒
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("执行任务调度完成");
        }
    }
    public static void main(String[] args){
        ScheduledThreadPoolTest.scheduledAtFixRate();
        ScheduledThreadPoolTest.scheduledWithFixDelay();
    }
}


(1)其中,ScheduledThreadPoolTest.scheduleAtFixRate()方法,会启动一个调度线程池,并以初始延迟1s,固定频率为5s,使用调度线程池的scheduleAtFixedRate执行任务Task,任务Task的执行时间为5s。


(2)ScheduledThreadPoolTest.scheduleWithFixDelay()方法,会启动一个调度线程池,并以初始延迟1s,固定频率为5s,使用调度线程池的scheduleWithFixedDelay执行任务Task,任务Task的执行时间为5s。


(3)那么,这两个方法的区别在哪里呢?scheduleAtFixRate是以固定的频率执行任务,在本例中任务开始执行的时间点(以当前时间为起点)为:1s,6s,11s,16s,…,该方法和任务本身的执行时间无关,以固定的频率触发任务执行。而scheduleWithFixDelay是以固定的延迟执行任务,在本例中任务开始执行的时间点(以当前时间为起点)为:1s,11s,21s,31s,…,该方法和任务本身的执行时间相关,在任务执行完成后间隔5s再开始执行下一次任务。


3.ScheduledExecutorService:


//Executors类的newScheduledThreadPool方法
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
 }


//可以看到该类是ThreadPoolExecutor的子类,并且构造函数里面向父类传递了一个类型为DelayedWorkQueue的阻塞队列.
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }
    ...
}


4.scheduleAtFixedRate:


 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
     if (command == null || unit == null)
         throw new NullPointerException();
     if (period <= 0)
         throw new IllegalArgumentException();
     //将待执行的任务进行包装,生成一个调度任务,任务循环执行的秘密就在这个调度任务里
     ScheduledFutureTask<Void> sft =
         new ScheduledFutureTask<Void>(command,
                                       null,
                                       triggerTime(initialDelay, unit),
                                       unit.toNanos(period));
     RunnableScheduledFuture<Void> t = decorateTask(command, sft);
     sft.outerTask = t;
     delayedExecute(t);
     return t;
 }
private void delayedExecute(RunnableScheduledFuture<?> task) {
    if (isShutdown())
        reject(task);
    else {
        //将任务添加进阻塞队列,这个阻塞队列就是上面提的DelayedWorkQueue
        super.getQueue().add(task);
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
            ensurePrestart();
    }
}


5.ScheduledFutureTask是如何实现任务循环执行:


private class ScheduledFutureTask<V>
            extends FutureTask<V> implements RunnableScheduledFuture<V> {
    /**
         * 任务执行的具体方法,还记得ThreadPoolExecutor的runWorker方法吗?方法内部会调用任务的run()方法以执行具体的任务
         */
    public void run() {
        //判断任务是否需要周期执行
        boolean periodic = isPeriodic();
        if (!canRunInCurrentRunState(periodic))
            cancel(false);
        else if (!periodic)
            ScheduledFutureTask.super.run();
        //执行具体的任务,调用最原始任务的run()方法    
        else if (ScheduledFutureTask.super.runAndReset()) {
            //计算任务下一次开始执行的时间
            setNextRunTime();
            //重新将任务放入阻塞队列,DelayedWorkQueue
            reExecutePeriodic(outerTask);
        }
    } 
    //计算任务下一次执行时间
    private void setNextRunTime() {
        long p = period;
        if (p > 0)
            time += p;
        else
            time = triggerTime(-p);
    } 
    void reExecutePeriodic(RunnableScheduledFuture<?> task) {
        if (canRunInCurrentRunState(true)) {
            //将任务放入阻塞队列,DelayedWorkQueue
            super.getQueue().add(task);
            if (!canRunInCurrentRunState(true) && remove(task))
                task.cancel(false);
            else
                ensurePrestart();
        }
    }   
}


可以看到这里的关键是在执行任务的同时,计算任务的下一次执行时间,并用原始任务生成一个新的任务重新放入阻塞队列等待执行。


6.阻塞队列DelayedWorkQueue:


static class DelayedWorkQueue extends AbstractQueue<Runnable>
        implements BlockingQueue<Runnable>
     //线程池的Worker会从阻塞队列中取任务,调用的就是队列的take方法
     public RunnableScheduledFuture<?> take() throws InterruptedException {
            //多线程操作,要先获取锁
            final ReentrantLock lock = this.lock;
            //以可中断的方式获取锁,线程池被关闭的时候会中断内部线程
            lock.lockInterruptibly();
            try {
                for (;;) {
                    RunnableScheduledFuture<?> first = queue[0];
                    if (first == null)
                        //如果队列中无任务需要等待
                        available.await();
                    else {
                        //计算任务是否到达执行时间,<0代表到达执行时间直接返回,否则等待
                        long delay = first.getDelay(NANOSECONDS);
                        if (delay <= 0)
                            //从队列中出队,并维护堆(此处队列内部是用堆来实现的)
                            return finishPoll(first);
                        first = null; // don't retain ref while waiting
                        //leader不为空,说明其他线程正在获取任务,当前线程必须等待
                        if (leader != null)
                            available.await();
                        else {//leander指的是处理队列中第一个任务的线程,leader为空,将当前线程设置为leader
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                // 阻塞一定时间(调度任务的时间)
                                available.awaitNanos(delay);
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && queue[0] != null)
                    //任务获取成功后,唤醒其他等待的线程
                    available.signal();
                lock.unlock();
            }
        }
}


目录
相关文章
|
2天前
|
Java 程序员 开发者
Java社招面试题:一个线程运行时发生异常会怎样?
大家好,我是小米。今天分享一个经典的 Java 面试题:线程运行时发生异常,程序会怎样处理?此问题考察 Java 线程和异常处理机制的理解。线程发生异常,默认会导致线程终止,但可以通过 try-catch 捕获并处理,避免影响其他线程。未捕获的异常可通过 Thread.UncaughtExceptionHandler 处理。线程池中的异常会被自动处理,不影响任务执行。希望这篇文章能帮助你深入理解 Java 线程异常处理机制,为面试做好准备。如果你觉得有帮助,欢迎收藏、转发!
35 14
|
6天前
|
Linux
Linux编程: 在业务线程中注册和处理Linux信号
本文详细介绍了如何在Linux中通过在业务线程中注册和处理信号。我们讨论了信号的基本概念,并通过完整的代码示例展示了在业务线程中注册和处理信号的方法。通过正确地使用信号处理机制,可以提高程序的健壮性和响应能力。希望本文能帮助您更好地理解和应用Linux信号处理,提高开发效率和代码质量。
38 17
|
15天前
|
Linux
Linux编程: 在业务线程中注册和处理Linux信号
通过本文,您可以了解如何在业务线程中注册和处理Linux信号。正确处理信号可以提高程序的健壮性和稳定性。希望这些内容能帮助您更好地理解和应用Linux信号处理机制。
50 26
|
5天前
|
安全 Java 程序员
Java 面试必问!线程构造方法和静态块的执行线程到底是谁?
大家好,我是小米。今天聊聊Java多线程面试题:线程类的构造方法和静态块是由哪个线程调用的?构造方法由创建线程实例的主线程调用,静态块在类加载时由主线程调用。理解这些细节有助于掌握Java多线程机制。下期再见! 简介: 本文通过一个常见的Java多线程面试题,详细讲解了线程类的构造方法和静态块是由哪个线程调用的。构造方法由创建线程实例的主线程调用,静态块在类加载时由主线程调用。理解这些细节对掌握Java多线程编程至关重要。
34 13
|
6天前
|
安全 Java 开发者
【JAVA】封装多线程原理
Java 中的多线程封装旨在简化使用、提高安全性和增强可维护性。通过抽象和隐藏底层细节,提供简洁接口。常见封装方式包括基于 Runnable 和 Callable 接口的任务封装,以及线程池的封装。Runnable 适用于无返回值任务,Callable 支持有返回值任务。线程池(如 ExecutorService)则用于管理和复用线程,减少性能开销。示例代码展示了如何实现这些封装,使多线程编程更加高效和安全。
|
25天前
|
算法 安全 Java
Java线程调度揭秘:从算法到策略,让你面试稳赢!
在社招面试中,关于线程调度和同步的相关问题常常让人感到棘手。今天,我们将深入解析Java中的线程调度算法、调度策略,探讨线程调度器、时间分片的工作原理,并带你了解常见的线程同步方法。让我们一起破解这些面试难题,提升你的Java并发编程技能!
65 16
|
1月前
|
监控 Java
java异步判断线程池所有任务是否执行完
通过上述步骤,您可以在Java中实现异步判断线程池所有任务是否执行完毕。这种方法使用了 `CompletionService`来监控任务的完成情况,并通过一个独立线程异步检查所有任务的执行状态。这种设计不仅简洁高效,还能确保在大量任务处理时程序的稳定性和可维护性。希望本文能为您的开发工作提供实用的指导和帮助。
109 17
|
2月前
|
Java
Java—多线程实现生产消费者
本文介绍了多线程实现生产消费者模式的三个版本。Version1包含四个类:`Producer`(生产者)、`Consumer`(消费者)、`Resource`(公共资源)和`TestMain`(测试类)。通过`synchronized`和`wait/notify`机制控制线程同步,但存在多个生产者或消费者时可能出现多次生产和消费的问题。 Version2将`if`改为`while`,解决了多次生产和消费的问题,但仍可能因`notify()`随机唤醒线程而导致死锁。因此,引入了`notifyAll()`来唤醒所有等待线程,但这会带来性能问题。
Java—多线程实现生产消费者
|
1月前
|
缓存 安全 算法
Java 多线程 面试题
Java 多线程 相关基础面试题
|
2月前
|
消息中间件 缓存 安全
Java多线程是什么
Java多线程简介:本文介绍了Java中常见的线程池类型,包括`newCachedThreadPool`(适用于短期异步任务)、`newFixedThreadPool`(适用于固定数量的长期任务)、`newScheduledThreadPool`(支持定时和周期性任务)以及`newSingleThreadExecutor`(保证任务顺序执行)。同时,文章还讲解了Java中的锁机制,如`synchronized`关键字、CAS操作及其实现方式,并详细描述了可重入锁`ReentrantLock`和读写锁`ReadWriteLock`的工作原理与应用场景。