万字长文简单明了的介绍xxl-job以及quartz

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 本文主要介绍分布式定时任务框架xxl-job,本文首先会对xxl-job做一个基本的介绍,接着将xxl-job与quartz做一个比较,最后就是介绍xxl-job调度的详细过程。

前言

本文主要介绍分布式定时任务框架xxl-job,本文首先会对xxl-job做一个基本的介绍,接着将xxl-job与quartz做一个比较,最后就是介绍xxl-job调度的详细过程。

xxl-job的介绍

xxl-job是一个开源的分布式定时任务框架,其调度中心和执行器是相互分离,分开部署的,两者通过HTTP协议进行通信。其架构如下图所示:

调度中心:

负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码。调度系统与任务解耦,提高了系统可用性和稳定性,同时调度系统性能不再受限于任务模块;

支持可视化、简单且动态的管理调度信息,包括任务新建,更新,删除,GLUE开发和任务报警等,所有上述操作都会实时生效,同时支持监控调度结果以及执行日志,支持执行器Failover,支持创建执行器等功能。

执行模块(执行器):

负责接收调度请求并执行任务逻辑。任务模块专注于任务的执行等操作,开发和维护更加简单和高效;接收“调度中心”的执行请求、终止请求和日志请求等。

特性

xxl-job的特性有很多,官网上有详细的介绍,这里我会介绍几个重要的特性:

1.简单:支持通过Web页面对任务进行CRUD操作,操作简单,一分钟上手;

2.动态:支持动态修改任务状态、启动/停止任务,以及终止运行中的任务,都是即时生效的。

3.调度中心HA(中心式):调度采用中心式设计,“调度中心”自研调度组件并支持集群部署,可保证调度中心HA;

4.执行器HA(分布式):任务分布式执行,任务”执行器”支持集群部署,可保证任务执行HA;

5.调度过期策略:调度中心错过调度时间的补偿处理策略:包括:忽略,立即补偿触发一次等;

6.阻塞处理策略:调度过于密集执行器来不及处理时的处理策略,策略包括:单机串行(默认)、丢弃后续调度、覆盖之前的调用。

7.任务超时控制:支持自定义任务超时时间,任务运行超时将会主动中断任务;

xxl-job相关的数据表

xxl-job将任务信息以及日志信息持久化到数据表中,这个就保证了可以动态的添加删除任务。

1.xxl_job_lock:任务调度锁表,在线程查询任务信息时会调用上锁。

2.xxl_job_group:执行器信息表,维护任务执行器信息;

3.xxl_job_info:调度扩展信息表: 用于保存XXL-JOB调度任务的扩展信息,如任务分组、任务名、机器地址、执行器、执行入参和报警邮件等等;

4.xxl_job_log:调度日志表: 用于保存XXL-JOB任务调度的历史信息,如调度结果、执行结果、调度入参、调度机器和执行器等等;

5.xxl_job_log_report:调度日志报表:用户存储XXL-JOB任务调度日志的报表,调度中心报表功能页面会用到;

6.xxl_job_logglue:任务GLUE日志:用于保存GLUE更新历史,用于支持GLUE的版本回溯功能;

7.xxl_job_registry:执行器注册表,维护在线的执行器和调度中心机器地址信息;

8.xxl_job_user:系统用户表;

xxl-job与quartz的异同

这一部分主要是将quartz和xxl-job做一个比较,quartz是一款开源的使用非常广泛的定时任务框架。其可以说是定时任务的鼻祖,很多理念都与xxl-job类似。

综合比较

ba2140b8c6b450f083ae081b47794a6e_watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3UwMTQ1MzQ4MDg=,size_16,color_FFFFFF,t_70.png

整体来说,xxl-job就是quartz的一个增强版,其弥补了quartz不支持并行调度,不支持失败处理策略和动态分片的策略等诸多不足,同时其有管理界面,上手比较容易,支持分布式,适用于分布式场景下的使用。两者相同的是都是通过数据库锁来控制任务不能重复执行。

核心类比较

quartz的核心类如下图所示:

类名 作用
QuartzSchedulerThread 负责执行向QuartzScheduler注册的触发Trigger的工作的线程
ThreadPool Scheduler使用一个线程池作为任务运行的基础设施,任务通过共享线程池中的线程提供运行效率
QuartzSchedulerResources 包含创建QuartzScheduler实例所需的所有资源(JobStore,ThreadPool等)
SchedulerFactory 生成Scheduler实例
JobStore 通过类实现的接口,这些类要为org.quartz.core.QuartzScheduler的使用提供一个org.quartz.Job和org.quartz.Trigger存储机制。作业和触发器的存储应该以其名称和组的组合为唯一性。
QuartzScheduler 这是Quartz的核心,它是org.quartz.Scheduler接口的间接实现,包含调度org.quartz.Jobs,注册org.quartz.JobListener实例等的方法。
Scheduler 代表一个调度容器,一个调度容器中可以注册多个JobDetail和Trigger。当Trigger与JobDetail组合,就可以被Scheduler容器调度了。
Trigger 具有所有触发器通用属性的基本接口,描述了job执行的时间出发规则,使用TriggerBuilder实例化实际触发器,即表示什么时候去调用任务
JobDetail 表示一个具体的可执行的调度程序,Job是这个可执行的调度程序所要执行的内容,另外JobDetail还包含了这个任务调度的方案和策略
Job 表示一个工作,即要执行的具体内容

quartz中的类有很多,我们关注并掌握好Schedule(调度容器),Trigger(触发器),JobDetail&Job(定义具体的执行任务)这几个类就掌握了quartz的核心了。因为其余的类都是围绕这几个类转的,下图展示了各个核心类的调用关系:

1b81999dcf70c76884dac7499eb9a657_watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3UwMTQ1MzQ4MDg=,size_16,color_FFFFFF,t_70.png

quartz的调用示例:

public class RAMQuartz {
    public static void main(String[] args) throws SchedulerException {
   //1.创建Scheduler的工厂
        SchedulerFactory sf = new StdSchedulerFactory();
        //2.从工厂中获取调度器实例
        Scheduler scheduler = sf.getScheduler();
        //3.创建JobDetail
        JobDetail jobDetail = JobBuilder.newJob(RAMJob.class).withDescription("this is a ram job")
                .withIdentity("ramJob", "ramGroup").build();   //job的name和group
        // 4.任务运行的时间,SimpleScheduler类型触发器有效,3秒后启动
        long time = System.currentTimeMillis() + 3 * 1000L;
        Date startTime = new Date(time);
        // 5.创建Trigger
        CronTrigger cronTrigger = TriggerBuilder.newTrigger().withDescription("")
                .withIdentity("ramTrigger", "ramTriggerGroup")
                .startAt(startTime).withSchedule(CronScheduleBuilder.cronSchedule("0/10 * * * * ?")) //每10秒跑一次
                .build();
        // 6.注册任务和定时器
        scheduler.scheduleJob(jobDetail, cronTrigger);
        // 7.启动调度器
        scheduler.start();
        System.out.println("启动时间: " + new Date());
 }

其中RAMJob实现了Job接口,并重写了execute方法。

public class RAMJob implements Job {
    public void execute(JobExecutionContext context) throws JobExecutionException {
        System.out.println("Say hello to Quartz " + System.currentTimeMillis());
    }
}

xxl-job的核心类如下图所示:

类名 作用
XxlJobAdminConfig 调度中心的总配置类,负责创建XxlJobScheduler实例
XxlJobScheduler 负责创建各种线程,包括任务注册主线程,调度容器的主线程,以及调度参数的配置线程池JobTriggerPoolHelper
JobScheduleHelper 调度容器,创建一个守护线程查询所有下次执行时间在当前时间5秒内的定时任务,并按条件执行
JobTriggerPoolHelper 创建操作XxlJobTrigger的线程池,并添加trigger
XxlJobTrigger 表示一个调度参数的配置,会查询具体的定时任务信息XxlJobInfo
XxlJob 定义执行器的注解
JobThread 调用IJobHandler的executer执行任务,并回调调度中心
IJobHandler 抽象的执行器接口,定义了要执行的具体内容,同样的也是一个execute方法
EmbedServer 内嵌的Server,默认端口是9999
ExecutorBiz 其中的run方法用于调用执行器,有两个是实现类ExecutorBizImpl以及ExecutorBizClient 。

核心类的调用关系如下图所示:

0340159eebd193b6683dbccac332ce51_watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3UwMTQ1MzQ4MDg=,size_16,color_FFFFFF,t_70.png

从核心类我们可以看出xxl-job和quartz还是有很多相同点的,都有Scheduler,Trigger以及Job等几个核心的组件。不同之处是xxl-job把任务信息直接存储在了数据表中,而quartz是可以不存的。而且xxl-job调度和执行是分开的,而quartz调度和执行是在一块的。

xxl-job的调度过程

下图展示了调度中心调度执行器执行任务的时序图:

4867b24aed4596d8eb6f3f783a95e1b3_watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3UwMTQ1MzQ4MDg=,size_16,color_FFFFFF,t_70.png

1.在XxlJobAdminConfig类的afterPropertiesSet方法中创建XxlJobScheduler实例

@Override
    public void afterPropertiesSet() throws Exception {
        adminConfig = this;
        xxlJobScheduler = new XxlJobScheduler();
        xxlJobScheduler.init();
    }

2.在XxlJobScheduler类的init方法中初始化registry,schedule的主线程,并创建JobTriggerPool的线程池。

public void init() throws Exception {
        //省略部分代码
       // admin registry monitor run
        JobRegistryMonitorHelper.getInstance().start();
        // admin trigger pool start
        JobTriggerPoolHelper.toStart();
        // start-schedule
        JobScheduleHelper.getInstance().start();
    }

3.JobScheduleHelper的start方法会创建一个新的线程,在该线程内会首先查询xxl_job_lock获取数据库锁,然后查询5秒内待执行的任务。当前时间大于任务下一次执行的时间,则会调用JobTriggerPoolHelper.trigger进行任务的执行。

下面代码展示了数据库锁的使用。

//上锁
try{
  conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
                        connAutoCommit = conn.getAutoCommit();
      //取消事务自动提交
                        conn.setAutoCommit(false);
                        preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
                        preparedStatement.execute();
  finally {
    // commit
    if (conn != null) {
      //提交事务,释放锁
      conn.commit();        
      conn.setAutoCommit(connAutoCommit);
      } 
    }

下面代码展示了定时任务的调用:

public static final long PRE_READ_MS = 5000;    // pre read
  long nowTime = System.currentTimeMillis();
        //查询任务下一次执行时间<当前时间+5秒的任务
                        List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
       for (XxlJobInfo jobInfo: scheduleList) {
         if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
                                    // 2.1、trigger-expire > 5s:pass && make next-trigger-time,任务过期超过5秒,不在执行该任务,重新设置下一次执行时间
                                    // fresh next
                                    refreshNextValidTime(jobInfo, new Date());
                                } else if (nowTime > jobInfo.getTriggerNextTime()) {
                                    // 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time,任务过期<5秒,立即执行任务
                                    // 1、trigger
                                    JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
                                    // 2、fresh next
                                    refreshNextValidTime(jobInfo, new Date());
          //省略部分代码
                       }

4.JobTriggerPoolHelper.trigger这个方法是通过第二步创建的线程池处理,将任务转给XxlJobTrigger.trigger方法。

ThreadPoolExecutor triggerPool_ = fastTriggerPool;
  triggerPool_.execute(new Runnable() {
      // do trigger
                    XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
  }

5.XxlJobTrigger.trigger这个方法首先根据jobId查询任务信息,接着根据jobGroup查询执行器信息,接着就是组装trigger-param,初始化address信息,最后就是调用ExecutorBizClient的run方法

// load data,加载任务信息
        XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
  if (executorParam != null) {
            jobInfo.setExecutorParam(executorParam);
        }
             processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);

核心逻辑在processTrigger中。

//初始化trigger-param
  TriggerParam triggerParam = new TriggerParam();
        triggerParam.setJobId(jobInfo.getId());
//初始化地址
                routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
//执行任务
 triggerResult = runExecutor(triggerParam, address);
 //日志处理,代码省略

6.ExecutorBizClient的run方法通过Netty Http调用EmbedServer的process方法

7.EmbedServer类的内部类EmbedHttpServerHandler的process方法会调用ExecutorBizImpl类的run方法。

private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq){
  else if ("/run".equals(uri)) {
                    TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
                    return executorBiz.run(triggerParam);
                }
 }

8.ExecutorBizImpl类的run方法首先会创建JobThread,然后将任务放入triggerQueue(LinkedBlockingQueue)队列中,最后启动JobThread

public ReturnT<String> run(TriggerParam triggerParam) {
  JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
        IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
      // push data to queue
        ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
  return pushResult;
}

9.JobThread线程首先从triggerQueue中poll中任务,然后通过反射的话获取IJobHandler,调用其execute方法执行具体的任务。

public void run() {
    //通过反射的方式获取执行器的方法
    handler.init();
    //从队列中取出任务
    triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
    FutureTask<ReturnT<String>> futureTask = new FutureTask<ReturnT<String>>(new Callable<ReturnT<String>>() {
        @Override
        public ReturnT<String> call() throws Exception {
          return 
          //执行任务 
          handler.execute(triggerParamTmp.getExecutorParams());
        }
        });
  }

任务执行完成之后,将执行结果放入回调的队列callBackQueue中。

总结

本文首先对xxl-job做一个基本的介绍,接着将xxl-job与quartz做一个比较,最后就是介绍xxl-job调度过程做了一个详细介绍,xxl-job是一个上手很容易,适用于分布式场景下使用,调度中心和执行器分开部署,减少了系统的耦合以及调度中心的调度效率。最重要的是xxl-job对任务的过期处理以及阻塞处理策略设计的比较好。

参考

XXL-JOB官方文档

定时任务框架:quartz、elastic-job和xxl-job的分析对比。

Quartz任务调度框架–简介与示例(一)


相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
存储 Java BI
XXL-JOB定时任务知识点和应用实例
XXL-JOB是一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。该处只是介绍xxl_job的一下基础知识和使用的实例,具体的安装调试请参照对应的最新的官方文档,中文开源地址:https://www.xuxueli.com/xxl-job
3659 0
|
4月前
|
数据采集 SQL JSON
《花100块做个摸鱼小网站! 》第五篇—通过xxl-job定时获取热搜数据
本文介绍了使用XXL-Job组件优化热搜数据定时更新的方法,实现了包括阿里云服务器部署、代码库下载、表结构初始化及启动等步骤,并详细展示了如何通过注解配置爬虫任务。文中通过具体示例(如抖音热搜)展示了如何将`@Scheduled`注解替换为`@XxlJob`注解,实现更灵活的任务调度。此外,还优化了前端展示,增加了热搜更新时间显示,并提供了B站热搜爬虫的实现方案。通过这些改进,使得热搜组件不仅功能完善,而且更加美观实用。详细代码可在作者提供的代码仓库中查看。
42 7
|
4月前
|
监控 Java API
死磕xxl-job(一)
死磕xxl-job(一)
|
4月前
|
SQL Shell 数据库连接
死磕xxl-job(二)
死磕xxl-job(二)
|
5月前
|
存储 监控 算法
XXL-JOB内部机制大揭秘:让任务调度飞起来
【8月更文挑战第14天】在大数据时代,高效的任务调度系统是支撑业务稳定运行与快速迭代的基石。XXL-JOB,作为一款轻量级、分布式任务调度平台,凭借其灵活的配置、强大的扩展性和高可用特性,在众多任务调度框架中脱颖而出。今天,我们就来深入揭秘XXL-JOB的内部机制,看看它是如何让任务调度“飞起来”的。
319 0
|
8月前
|
SQL Java 关系型数据库
【极光系列】springBoot集成xxl-job调度器
【极光系列】springBoot集成xxl-job调度器
93 2
|
缓存 Java 调度
xxl-job的原理(1)
xxl-job的原理(1)
148 0
|
负载均衡 Java API
分布式任务调度框架:XXL-JOB(入门篇)
将调度行为抽象形成“调度中心”公共平台,而平台自身并不承担业务逻辑,“**调度中心**”负责发起调度请求。将任务抽象成分散的JobHandler,交由“执行器”统一管理,“**执行器**”负责接收调度请求并执行对应的JobHandler中业务逻辑。因此,“调度”和“任务”两部分可以相互解耦,提高系统整体稳定性和扩展性;
1082 0
分布式任务调度框架:XXL-JOB(入门篇)
|
监控 算法 Java
分布式任务调度框架XXL-JOB入门教程
XXL-JOB是一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。
182 0
|
存储 负载均衡 算法