定时任务实现原理详解(上)

简介: 在很多业务的系统中,我们常常需要定时的执行一些任务,例如定时发短信、定时变更数据、定时发起促销活动等等。在上篇文章中,我们简单的介绍了定时任务的使用方式,不同的架构对应的解决方案也有所不同,总结起来主要分单机和分布式两大类,本文会重点分析下单机的定时任务实现原理以及优缺点,分布式框架的实现原理会在后续文章中进行分析。

一、摘要

在很多业务的系统中,我们常常需要定时的执行一些任务,例如定时发短信、定时变更数据、定时发起促销活动等等。

在上篇文章中,我们简单的介绍了定时任务的使用方式,不同的架构对应的解决方案也有所不同,总结起来主要分单机分布式两大类,本文会重点分析下单机的定时任务实现原理以及优缺点,分布式框架的实现原理会在后续文章中进行分析。

从单机角度,定时任务实现主要有以下 3 种方案:

  • while + sleep 组合
  • 最小堆实现
  • 时间轮实现

二、while+sleep组合

while+sleep 方案,简单的说,就是定义一个线程,然后 while 循环,通过 sleep 延迟时间来达到周期性调度任务。

简单示例如下:

public static void main(String[] args) {
    final long timeInterval = 5000;
    new Thread(new Runnable() {
        @Override
        public void run() {
            while (true) {
                System.out.println(Thread.currentThread().getName() + "每隔5秒执行一次");
                try {
                    Thread.sleep(timeInterval);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }).start();
}

实现上非常简单,如果我们想在创建一个每隔3秒钟执行一次任务,怎么办呢?

同样的,也可以在创建一个线程,然后间隔性的调度方法;但是如果创建了大量这种类型的线程,这个时候会发现大量的定时任务线程在调度切换时性能消耗会非常大,而且整体效率低!

面对这种在情况,大佬们也想到了,于是想出了用一个线程将所有的定时任务存起来,事先排好序,按照一定的规则来调度,这样不就可以极大的减少每个线程的切换消耗吗?

正因此,JDK 中的 Timer 定时器由此诞生了!

三、最小堆实现

所谓最小堆方案,正如我们上面所说的,每当有新任务加入的时候,会把需要即将要执行的任务排到前面,同时会有一个线程不断的轮询判断,如果当前某个任务已经到达执行时间点,就会立即执行,具体实现代表就是 JDK 中的 Timer 定时器!

3.1、Timer

首先我们来一个简单的 Timer 定时器例子

public static void main(String[] args) {
    Timer timer = new Timer();
    //每隔1秒调用一次
    timer.schedule(new TimerTask() {
        @Override
        public void run() {
            System.out.println("test1");
        }
    }, 1000, 1000);
    //每隔3秒调用一次
    timer.schedule(new TimerTask() {
        @Override
        public void run() {
            System.out.println("test2");
        }
    }, 3000, 3000);
}

实现上,好像跟我们上面介绍的 while+sleep 方案差不多,同样也是起一个TimerTask线程任务,只不过共用一个Timer调度器。

下面我们一起来打开源码看看里面到底有些啥!

  • 进入Timer.schedule()方法

从方法上可以看出,这里主要做参数验证,其中TimerTask是一个线程任务,delay表示延迟多久执行(单位毫秒),period表示多久执行一次(单位毫秒)

public void schedule(TimerTask task, long delay, long period) {
    if (delay < 0)
        throw new IllegalArgumentException("Negative delay.");
    if (period <= 0)
        throw new IllegalArgumentException("Non-positive period.");
    sched(task, System.currentTimeMillis()+delay, -period);
}
  • 接着看sched()方法

这步操作中,可以很清晰的看到,在同步代码块里,会将task对象加入到queue

private void sched(TimerTask task, long time, long period) {
    if (time < 0)
        throw new IllegalArgumentException("Illegal execution time.");
    // Constrain value of period sufficiently to prevent numeric
    // overflow while still being effectively infinitely large.
    if (Math.abs(period) > (Long.MAX_VALUE >> 1))
        period >>= 1;
    synchronized(queue) {
        if (!thread.newTasksMayBeScheduled)
            throw new IllegalStateException("Timer already cancelled.");
        synchronized(task.lock) {
            if (task.state != TimerTask.VIRGIN)
                throw new IllegalStateException(
                    "Task already scheduled or cancelled");
            task.nextExecutionTime = time;
            task.period = period;
            task.state = TimerTask.SCHEDULED;
        }
        queue.add(task);
        if (queue.getMin() == task)
            queue.notify();
    }
}
  • 我们继续来看queue对象

任务会将入到TaskQueue队列中,同时在Timer初始化阶段会将TaskQueue作为参数传入到TimerThread线程中,并且起到线程

public class Timer {
    private final TaskQueue queue = new TaskQueue();
    private final TimerThread thread = new TimerThread(queue);
    public Timer() {
        this("Timer-" + serialNumber());
    }
    public Timer(String name) {
        thread.setName(name);
        thread.start();
    }
    //...
}
  • TaskQueue其实是一个最小堆的数据实体类,源码如下

每当有新元素加入的时候,会对原来的数组进行重排,会将即将要执行的任务排在数组的前面

class TaskQueue {
    private TimerTask[] queue = new TimerTask[128];
    private int size = 0;
    void add(TimerTask task) {
        // Grow backing store if necessary
        if (size + 1 == queue.length)
            queue = Arrays.copyOf(queue, 2*queue.length);
        queue[++size] = task;
        fixUp(size);
    }
    private void fixUp(int k) {
        while (k > 1) {
            int j = k >> 1;
            if (queue[j].nextExecutionTime <= queue[k].nextExecutionTime)
                break;
            TimerTask tmp = queue[j];
   queue[j] = queue[k];
   queue[k] = tmp;
            k = j;
        }
    }
 //....
}
  • 最后我们来看看TimerThread

TimerThread其实就是一个任务调度线程,首先从TaskQueue里面获取排在最前面的任务,然后判断它是否到达任务执行时间点,如果已到达,就会立刻执行任务

class TimerThread extends Thread {
    boolean newTasksMayBeScheduled = true;
    private TaskQueue queue;
    TimerThread(TaskQueue queue) {
        this.queue = queue;
    }
    public void run() {
        try {
            mainLoop();
        } finally {
            // Someone killed this Thread, behave as if Timer cancelled
            synchronized(queue) {
                newTasksMayBeScheduled = false;
                queue.clear();  // Eliminate obsolete references
            }
        }
    }
    /**
     * The main timer loop.  (See class comment.)
     */
    private void mainLoop() {
        while (true) {
            try {
                TimerTask task;
                boolean taskFired;
                synchronized(queue) {
                    // Wait for queue to become non-empty
                    while (queue.isEmpty() && newTasksMayBeScheduled)
                        queue.wait();
                    if (queue.isEmpty())
                        break; // Queue is empty and will forever remain; die
                    // Queue nonempty; look at first evt and do the right thing
                    long currentTime, executionTime;
                    task = queue.getMin();
                    synchronized(task.lock) {
                        if (task.state == TimerTask.CANCELLED) {
                            queue.removeMin();
                            continue;  // No action required, poll queue again
                        }
                        currentTime = System.currentTimeMillis();
                        executionTime = task.nextExecutionTime;
                        if (taskFired = (executionTime<=currentTime)) {
                            if (task.period == 0) { // Non-repeating, remove
                                queue.removeMin();
                                task.state = TimerTask.EXECUTED;
                            } else { // Repeating task, reschedule
                                queue.rescheduleMin(
                                  task.period<0 ? currentTime   - task.period
                                                : executionTime + task.period);
                            }
                        }
                    }
                    if (!taskFired) // Task hasn't yet fired; wait
                        queue.wait(executionTime - currentTime);
                }
                if (taskFired)  // Task fired; run it, holding no locks
                    task.run();
            } catch(InterruptedException e) {
            }
        }
    }
}

总结这个利用最小堆实现的方案,相比 while + sleep 方案,多了一个线程来管理所有的任务,优点就是减少了线程之间的性能开销,提升了执行效率;但是同样也带来的了一些缺点,整体的新加任务写入效率变成了 O(log(n))。

同时,细心的发现,这个方案还有以下几个缺点:

  • 串行阻塞:调度线程只有一个,长任务会阻塞短任务的执行,例如,A任务跑了一分钟,B任务至少需要等1分钟才能跑
  • 容错能力差:没有异常处理能力,一旦一个任务执行故障,后续任务都无法执行

3.2、ScheduledThreadPoolExecutor

鉴于 Timer 的上述缺陷,从 Java 5 开始,推出了基于线程池设计的 ScheduledThreadPoolExecutor 。

77.jpg


其设计思想是,每一个被调度的任务都会由线程池来管理执行,因此任务是并发执行的,相互之间不会受到干扰。需要注意的是,只有当任务的执行时间到来时,ScheduledThreadPoolExecutor 才会真正启动一个线程,其余时间 ScheduledThreadPoolExecutor 都是在轮询任务的状态。

相关文章
|
10月前
|
数据采集 存储 Web App开发
Python爬虫技巧:设置Cookie永不超时的详细指南
Python爬虫技巧:设置Cookie永不超时的详细指南
|
运维 Kubernetes Cloud Native
主流定时任务解决方案全横评
定时任务作为一种按照约定时间执行预期逻辑的通用模式,在企业级开发中承载着丰富的业务场景,诸如后台定时同步数据生成报表,定时清理磁盘日志文件,定时扫描超时订单进行补偿回调等。
主流定时任务解决方案全横评
|
监控 负载均衡 Java
5 大 SpringCloud 核心组件详解,8 张图彻底弄懂
本文图文详解 Spring Cloud 的五大核心组件,帮助深入理解和掌握微服务架构。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
5 大 SpringCloud 核心组件详解,8 张图彻底弄懂
|
Java Apache Maven
HttpClientConnectionManager哪个版本里有?
【8月更文挑战第25天】HttpClientConnectionManager哪个版本里有?
726 2
|
缓存 Dubbo Java
理解的Java中SPI机制
本文深入解析了JDK提供的Java SPI(Service Provider Interface)机制,这是一种基于接口编程、策略模式与配置文件组合实现的动态加载机制,核心在于解耦。文章通过具体示例介绍了SPI的使用方法,包括定义接口、创建配置文件及加载实现类的过程,并分析了其原理与优缺点。SPI适用于框架扩展或替换场景,如JDBC驱动加载、SLF4J日志实现等,但存在加载效率低和线程安全问题。
632 7
理解的Java中SPI机制
|
存储 消息中间件 JSON
DDD基础教程:一文带你读懂DDD分层架构
DDD基础教程:一文带你读懂DDD分层架构
|
Docker 容器
【赵渝强老师】使用二进制包方式安装Docker
本文介绍了在企业生产环境中无法直接访问外网时,如何使用Docker官方提供的二进制包进行Docker的离线安装。文章详细列出了从安装wget、下载Docker安装包、解压、复制命令到启动Docker服务的具体步骤,并提供了相关命令和示例图片。最后,还介绍了如何设置Docker为开机自启模式。
826 0
|
负载均衡 Java 开发者
Spring Cloud:一文读懂其原理与架构
Spring Cloud 是一套微服务解决方案,它整合了Netflix公司的多个开源框架,简化了分布式系统开发。Spring Cloud 提供了服务注册与发现、配置中心、消息总线、负载均衡、熔断机制等工具,让开发者可以快速地构建一些常见的微服务架构。
|
机器学习/深度学习 分布式计算 大数据
一文读懂Apache Beam:统一的大数据处理模型与工具
【4月更文挑战第8天】Apache Beam是开源的统一大数据处理模型,提供抽象化编程模型,支持批处理和流处理。它提倡"一次编写,到处运行",可在多种引擎(如Spark、Dataflow、Flink)上运行。Beam的核心特性包括抽象化概念(PCollection、PTransform和PipelineRunner)、灵活性(支持多种数据源和转换)和高效执行。它广泛应用在ETL、实时流处理、机器学习和大数据仓库场景,助力开发者轻松应对数据处理挑战。
3252 1
|
缓存 负载均衡 数据库
构建高性能微服务架构:策略与实践
【5月更文挑战第21天】 随着现代业务需求的不断演进,传统的单体应用架构已难以满足快速迭代和灵活部署的要求。微服务架构应运而生,以其服务的细粒度、独立性和弹性优势成为企业转型的重要选择。本文将深入探讨如何构建一个高性能的微服务系统,从服务划分原则到性能优化技巧,旨在为开发者提供一套系统的微服务性能提升方案。文章将重点讨论在设计高并发、低延迟的微服务时需要考虑的关键因素,包括服务治理、缓存策略、负载均衡以及异步通信机制等。