Java并发编程之调度线程池

简介: Java并发编程之调度线程池

调度线程池:



  1. 调度线程池是线程池类ThreadPoolExecutor的子类,复用了线程池的计算框架,主要用于解决任务在一定的时间间隔后重复执行的问题。


  1. 例子


public class ScheduledThreadPoolTest {
    /**
     * 以固定的频率调度,包括任务执行的时间
     */
    public static void scheduledAtFixRate(){
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);
        //任务开始执行的延迟时间
        int initDelay = 1;
        //任务循环执行的间隔
        int period = 5;
        scheduledExecutorService.scheduleAtFixedRate(new Task(),initDelay,period,TimeUnit.SECONDS);
    }
    /**
     * 以固定的时间间隔调度,不包括任务执行的时间
     */
    public static void scheduledWithFixDelay(){
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);
        //任务开始执行的延迟时间
        int initDelay = 1;
        //任务循环执行的间隔
        int period = 5;
        scheduledExecutorService.scheduleWithFixedDelay(new Task(),initDelay,period,TimeUnit.SECONDS);
    }
    static class Task implements Runnable{
        @Override
        public void run() {
            System.out.println(String.format("开始执行任务调度,执行线程:%s",Thread.currentThread().getName()));
            try {
                //模拟任务的执行时间为5秒
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("执行任务调度完成");
        }
    }
    public static void main(String[] args){
        ScheduledThreadPoolTest.scheduledAtFixRate();
        ScheduledThreadPoolTest.scheduledWithFixDelay();
    }
}


(1)其中,ScheduledThreadPoolTest.scheduleAtFixRate()方法,会启动一个调度线程池,并以初始延迟1s,固定频率为5s,使用调度线程池的scheduleAtFixedRate执行任务Task,任务Task的执行时间为5s。


(2)ScheduledThreadPoolTest.scheduleWithFixDelay()方法,会启动一个调度线程池,并以初始延迟1s,固定频率为5s,使用调度线程池的scheduleWithFixedDelay执行任务Task,任务Task的执行时间为5s。


(3)那么,这两个方法的区别在哪里呢?scheduleAtFixRate是以固定的频率执行任务,在本例中任务开始执行的时间点(以当前时间为起点)为:1s,6s,11s,16s,…,该方法和任务本身的执行时间无关,以固定的频率触发任务执行。而scheduleWithFixDelay是以固定的延迟执行任务,在本例中任务开始执行的时间点(以当前时间为起点)为:1s,11s,21s,31s,…,该方法和任务本身的执行时间相关,在任务执行完成后间隔5s再开始执行下一次任务。


3.ScheduledExecutorService:


//Executors类的newScheduledThreadPool方法
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
 }


//可以看到该类是ThreadPoolExecutor的子类,并且构造函数里面向父类传递了一个类型为DelayedWorkQueue的阻塞队列.
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }
    ...
}


4.scheduleAtFixedRate:


 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
     if (command == null || unit == null)
         throw new NullPointerException();
     if (period <= 0)
         throw new IllegalArgumentException();
     //将待执行的任务进行包装,生成一个调度任务,任务循环执行的秘密就在这个调度任务里
     ScheduledFutureTask<Void> sft =
         new ScheduledFutureTask<Void>(command,
                                       null,
                                       triggerTime(initialDelay, unit),
                                       unit.toNanos(period));
     RunnableScheduledFuture<Void> t = decorateTask(command, sft);
     sft.outerTask = t;
     delayedExecute(t);
     return t;
 }
private void delayedExecute(RunnableScheduledFuture<?> task) {
    if (isShutdown())
        reject(task);
    else {
        //将任务添加进阻塞队列,这个阻塞队列就是上面提的DelayedWorkQueue
        super.getQueue().add(task);
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
            ensurePrestart();
    }
}


5.ScheduledFutureTask是如何实现任务循环执行:


private class ScheduledFutureTask<V>
            extends FutureTask<V> implements RunnableScheduledFuture<V> {
    /**
         * 任务执行的具体方法,还记得ThreadPoolExecutor的runWorker方法吗?方法内部会调用任务的run()方法以执行具体的任务
         */
    public void run() {
        //判断任务是否需要周期执行
        boolean periodic = isPeriodic();
        if (!canRunInCurrentRunState(periodic))
            cancel(false);
        else if (!periodic)
            ScheduledFutureTask.super.run();
        //执行具体的任务,调用最原始任务的run()方法    
        else if (ScheduledFutureTask.super.runAndReset()) {
            //计算任务下一次开始执行的时间
            setNextRunTime();
            //重新将任务放入阻塞队列,DelayedWorkQueue
            reExecutePeriodic(outerTask);
        }
    } 
    //计算任务下一次执行时间
    private void setNextRunTime() {
        long p = period;
        if (p > 0)
            time += p;
        else
            time = triggerTime(-p);
    } 
    void reExecutePeriodic(RunnableScheduledFuture<?> task) {
        if (canRunInCurrentRunState(true)) {
            //将任务放入阻塞队列,DelayedWorkQueue
            super.getQueue().add(task);
            if (!canRunInCurrentRunState(true) && remove(task))
                task.cancel(false);
            else
                ensurePrestart();
        }
    }   
}


可以看到这里的关键是在执行任务的同时,计算任务的下一次执行时间,并用原始任务生成一个新的任务重新放入阻塞队列等待执行。


6.阻塞队列DelayedWorkQueue:


static class DelayedWorkQueue extends AbstractQueue<Runnable>
        implements BlockingQueue<Runnable>
     //线程池的Worker会从阻塞队列中取任务,调用的就是队列的take方法
     public RunnableScheduledFuture<?> take() throws InterruptedException {
            //多线程操作,要先获取锁
            final ReentrantLock lock = this.lock;
            //以可中断的方式获取锁,线程池被关闭的时候会中断内部线程
            lock.lockInterruptibly();
            try {
                for (;;) {
                    RunnableScheduledFuture<?> first = queue[0];
                    if (first == null)
                        //如果队列中无任务需要等待
                        available.await();
                    else {
                        //计算任务是否到达执行时间,<0代表到达执行时间直接返回,否则等待
                        long delay = first.getDelay(NANOSECONDS);
                        if (delay <= 0)
                            //从队列中出队,并维护堆(此处队列内部是用堆来实现的)
                            return finishPoll(first);
                        first = null; // don't retain ref while waiting
                        //leader不为空,说明其他线程正在获取任务,当前线程必须等待
                        if (leader != null)
                            available.await();
                        else {//leander指的是处理队列中第一个任务的线程,leader为空,将当前线程设置为leader
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                // 阻塞一定时间(调度任务的时间)
                                available.awaitNanos(delay);
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && queue[0] != null)
                    //任务获取成功后,唤醒其他等待的线程
                    available.signal();
                lock.unlock();
            }
        }
}


目录
相关文章
|
4天前
|
IDE Java 物联网
《Java 简易速速上手小册》第1章:Java 编程基础(2024 最新版)
《Java 简易速速上手小册》第1章:Java 编程基础(2024 最新版)
11 0
|
4天前
|
安全 Java
深入理解 Java 多线程和并发工具类
【4月更文挑战第19天】本文探讨了Java多线程和并发工具类在实现高性能应用程序中的关键作用。通过继承`Thread`或实现`Runnable`创建线程,利用`Executors`管理线程池,以及使用`Semaphore`、`CountDownLatch`和`CyclicBarrier`进行线程同步。保证线程安全、实现线程协作和性能调优(如设置线程池大小、避免不必要同步)是重要环节。理解并恰当运用这些工具能提升程序效率和可靠性。
|
4天前
|
安全 Java 开发者
Java并发编程:深入理解Synchronized关键字
【4月更文挑战第19天】 在Java多线程编程中,为了确保数据的一致性和线程安全,我们经常需要使用到同步机制。其中,`synchronized`关键字是最为常见的一种方式,它能够保证在同一时刻只有一个线程可以访问某个对象的特定代码段。本文将深入探讨`synchronized`关键字的原理、用法以及性能影响,并通过具体示例来展示如何在Java程序中有效地应用这一技术。
|
5天前
|
安全 Java
java多线程(一)(火车售票)
java多线程(一)(火车售票)
|
5天前
|
安全 Java 调度
Java并发编程:深入理解线程与锁
【4月更文挑战第18天】本文探讨了Java中的线程和锁机制,包括线程的创建(通过Thread类、Runnable接口或Callable/Future)及其生命周期。Java提供多种锁机制,如`synchronized`关键字、ReentrantLock和ReadWriteLock,以确保并发访问共享资源的安全。此外,文章还介绍了高级并发工具,如Semaphore(控制并发线程数)、CountDownLatch(线程间等待)和CyclicBarrier(同步多个线程)。掌握这些知识对于编写高效、正确的并发程序至关重要。
|
5天前
|
安全 Java 程序员
Java中的多线程并发编程实践
【4月更文挑战第18天】在现代软件开发中,为了提高程序性能和响应速度,经常需要利用多线程技术来实现并发执行。本文将深入探讨Java语言中的多线程机制,包括线程的创建、启动、同步以及线程池的使用等关键技术点。我们将通过具体代码实例,分析多线程编程的优势与挑战,并提出一系列优化策略来确保多线程环境下的程序稳定性和性能。
|
17天前
|
Java 调度
Java并发编程:深入理解线程池的原理与实践
【4月更文挑战第6天】本文将深入探讨Java并发编程中的重要概念——线程池。我们将从线程池的基本原理入手,逐步解析其工作过程,以及如何在实际开发中合理使用线程池以提高程序性能。同时,我们还将关注线程池的一些高级特性,如自定义线程工厂、拒绝策略等,以帮助读者更好地掌握线程池的使用技巧。
|
7月前
|
算法 Java 调度
Java由浅入深理解线程池设计和原理1
Java由浅入深理解线程池设计和原理1
133 0
|
1月前
|
缓存 Java 开发者
深入理解Java并发编程:线程池的原理与实践
【2月更文挑战第17天】在多线程的世界中,线程池是管理并发执行的有效工具。本文将探讨线程池的核心原理、设计目的及其在实际Java开发中的应用。我们将从线程池的基本概念出发,深入到它的内部工作机制,并讨论如何合理配置和优化线程池以提高性能和资源利用率。最后,通过实例演示线程池的最佳实践,帮助开发者避免常见的并发陷阱。
|
7月前
|
缓存 Java 调度
Java由浅入深理解线程池设计和原理2
Java由浅入深理解线程池设计和原理2
88 0

热门文章

最新文章