分布式任务调度框架XXL-JOB入门教程

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: XXL-JOB是一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。

1、概述

XXL-JOB是一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。

官方文档:https://www.xuxueli.com/xxl-job/#%E4%BA%8C%E3%80%81%E5%BF%AB%E9%80%9F%E5%85%A5%E9%97%A8

  • 官网地址:https://www.xuxueli.com/xxl-job/
  • GitHub地址:https://github.com/xuxueli/xxl-job/
  • xxl-job的设计思想

    • 将调度行为抽象形成“调度中心”公共平台,而平台自身并不承担业务逻辑,“调度中心”负责发起调度请求。
    • 将任务抽象成分散的JobHandler,交由“执行器”统一管理
    • “执行器”负责接收调度请求并执行对应的JobHandler中业务逻辑。
    • 因此,“调度”和“任务”两部分可以相互解耦,提高系统整体稳定性和扩展性

    img

  • 架构图(图片来源是xxl-job官网)

    • 调度中心

      • 负责管理调度的信息,按照调度的配置来发出调度请求
      • 支持可视化、简单的动态管理调度信息,包括新建、删除、更新等,这些操作都会实时生效,同时也支持监控调度结果以及执行日志。
    • 执行器

      • 负责接收请求并且执行任务的逻辑。任务模块专注于任务的执行操作等等,使得开发和维护更加的简单与高效
  • XXL-Job具有哪些特性

    • 调度中心HA(中心式):调度采用了中心式进行设计,“调度中心”支持集群部署,可保证调度中心HA
    • 执行器HA(分布式):任务分布式的执行,任务执行器支持集群部署,可保证任务执行HA
    • 触发策略:有Cron触发、固定间隔触发、固定延时触发、API事件触发、人工触发、父子任务触发
    • 路由策略:执行器在集群部署的时候提供了丰富的路由策略,如:第一个、最后一个、轮询、随机、一致性HASH、最不经常使用LFU、最久未使用LRU、故障转移等等
    • 故障转移:如果执行器集群的一台机器发生故障,会自动切换到一台正常的执行器发送任务调度
    • Rolling实时日志的监控:支持rolling方式查看输入的完整执行日志
    • 脚本任务:支持GLUE模式开发和运行脚本任务,包括Shell、python、node.js、php等等类型脚本

2、搭建调度中心

1、下载源码

下载源码导入idea,源码地址:https://gitee.com/xuxueli0323/xxl-job.git

  • doc:xxl-job的文档资料,包括了数据库的脚本(后面要用到)
  • xxl-job-core:公共jar包依赖
  • xxl-job-admin:调度中心,项目源码,是Springboot项目,可以直接启动
  • xxl-job-executor-samples:执行器,是Sample实例项目,里面的Springboot工程可以直接启动,也可以在该项目的基础上进行开发,也可以将现有的项目改造成为执行器项目

2、数据库

数据库文件在源码doc/db目录下

  • xxl_job的数据库里有如下几个表

    • xxl_job_group:执行器信息表,用于维护任务执行器的信息
    • xxl_job_info:调度扩展信息表,主要是用于保存xxl-job的调度任务的扩展信息,比如说像任务分组、任务名、机器的地址等等
    • xxl_job_lock:任务调度锁表
    • xxl_job_log:日志表,主要是用在保存xxl-job任务调度历史信息,像调度结果、执行结果、调度入参等等
    • xxl_job_log_report:日志报表,会存储xxl-job任务调度的日志报表,会在调度中心里的报表功能里使用到
    • xxl_job_logglue:任务的GLUE日志,用于保存GLUE日志的更新历史变化,支持GLUE版本的回溯功能
    • xxl_job_registry:执行器的注册表,用在维护在线的执行器与调度中心的地址信息
    • xxl_job_user:系统的用户表

3、调度中心配置:

调度中心配置文件地址:

/xxl-job/xxl-job-admin/src/main/resources/application.properties
  • 配置数据库连接
## xxl-job, datasource
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai
spring.datasource.username=root
spring.datasource.password=xdclass.net
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
  • 配置 xxl.job.accessToken(后续要配置客户端接入配置token)
xxl.job.accessToken=javaxiaobear.cn

4、启动项目

调度中心访问地址:http: //localhost:8080/xxl-job-admin (该地址执行器将会使用到,作为回调地址)

默认登录账号 “admin/123456”, 登录后运行界面如下图所示。

image-20230630144053744

5、UI界面介绍

1、运行报表:以图形化来展示了整体的任务执行情况

  • 任务数量:能够看到调度中心运行的任务数量
  • 调度次数:调度中心所触发的调度次数
  • 执行器数量:在整个调度中心中,在线的执行器数量有多少

2、任务管理(配置执行任务)

image-20230630144304198

  • 示例执行器:所用到的执行器
  • 任务描述:概述该任务是做什么的
  • 路由策略:

    • 第一个:选择第一个机器
    • 最后一个:选择最后一个机器
    • 轮询:依次选择执行
    • 随机:随机选择在线的机器
    • 一致性HASH:每个任务按照Hash算法固定选择某一台机器,并且所有的任务均匀散列在不同的机器上
    • 最不经常使用:使用频率最低的机器优先被使用
    • 最近最久未使用:最久未使用的机器优先被选举
    • 故障转移:按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标的执行器并且会发起任务调度
    • 忙碌转移:按照顺序来依次进行空闲检测,第一个空闲检测成功的机器会被选定为目标群机器,并且会发起任务调度
    • 分片广播:广播触发对于集群中的所有机器执行任务,同时会系统会自动传递分片的参数
  • Cron:执行规则
  • 调度过期策略:调度中心错过调度时间的补偿处理策略,包括:忽略、立即补偿触发一次等
  • JobHandler:定义执行器的名字
  • 阻塞处理策略:

    • 单机串行:新的调度任务在进入到执行器之后,该调度任务进入FIFO队列,并以串行的方式去进行
    • 丢弃后续调度:新的调度任务在进入到执行器之后,如果存在相同的且正在运行的调度任务,本次的调度任务请求就会被丢弃掉,并且标记为失败
    • 覆盖之前的调度:新的调度任务在进入到执行器之后,如果存在相同的且正在运行的调度任务,就会终止掉当前正在运行的调度任务,并且清空队列,运行新的调度任务。
  • 子任务ID:输入子任务的任务id,可填写多个
  • 任务超时时间:添加任务超时的时候,单位s,设置时间大于0的时候就会生效
  • 失败重试次数:设置失败重试的次数,设置时间大于0的时候就会生效
  • 负责人:填写该任务调度的负责人
  • 报警邮件:出现报警,则发送邮件

3、调度日志

  • 这里是查看调度的日志,根据日志来查看任务具体的执行情况是怎样的

4、执行器管理

  • 这里是配置执行器,等待执行器启动的时候都会被调度中心监听加入到地址列表

5、用户管理

  • 可以对用户的一些操作

3、整合xxl_job

1、项目搭建

1、引入xxl_job依赖

<!-- http://repo1.maven.org/maven2/com/xuxueli/xxl-job-core/ -->
<dependency>
    <groupId>com.xuxueli</groupId>
    <artifactId>xxl-job-core</artifactId>
    <version>2.3.1</version>
</dependency>

2、配置yaml

xxl:
  job:
    admin:
      addresses: http://127.0.0.1:8080/xxl-job-admin
      # 执行器的名字
    executor:
      appname: javaxiaobear-xxl-job-test
    accessToken: default_token
server:
  port: 8081

3、编写配置类

@Configuration
@Slf4j
public class XxlJobConfig {

    @Value("${xxl.job.admin.addresses}")
    private String adminAddresses;

    @Value("${xxl.job.executor.appname}")
    private String appName;

    @Value("${xxl.job.accessToken}")
    private String accessToken;


    //旧版的有bug
    //@Bean(initMethod = "start", destroyMethod = "destroy")
    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        log.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppname(appName);
//        xxlJobSpringExecutor.setIp(ip);
//        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(accessToken);
//        xxlJobSpringExecutor.setLogPath(logPath);
//        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

        return xxlJobSpringExecutor;
    }
}

2、第一个XXL-Job分布式调度任务

1、界面新增一个任务

image-20230630152704891

2、新增一个执行器

这里是因为在配置文件里面用的是自定义的执行器,所以我们需要新增,当然,你可以用默认的执行器

image-20230630154909704

3、代码配置handler

@Slf4j
@Component
public class MyXxlJobHandler {

    @XxlJob("myXxlJobHandler")
    public ReturnT<String> execute(String param){
        log.info("小熊学Java 任务方法触发成功");
        return ReturnT.SUCCESS;
    }
}

4、重新启动项目

image-20230630155107647

image-20230630155113771

4、路由策略

  • 第一个:选择第一个机器
  • 最后一个:选择最后一个机器
  • 轮询:依次选择执行,流量均摊(推荐)
  • 随机:随机选择在线的机器
  • 一致性HASH:每个任务按照Hash算法固定选择某一台机器,并且所有的任务均匀散列在不同的机器上
  • 最不经常使用:使用频率最低的机器优先被使用
  • 最近最久未使用:最久未使用的机器优先被选举
  • 故障转移:按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标的执行器并且会发起任务调度
  • 忙碌转移:按照顺序来依次进行空闲检测,第一个空闲检测成功的机器会被选定为目标群机器,并且会发起任务调度
  • 分片广播:广播触发对于集群中的所有机器执行任务,同时会系统会自动传递分片的参数

5、分片广播

1、场景

需求

  • 有一个任务需要处理100W条数据,每条数据的业务逻辑处理要0.1s
  • 对于普通任务来说,只有一个线程来处理 可能需要10万秒才能处理完,业务则严重受影响
  • 案例:双十一大促,给1000万用户发营销短信

什么是分片任务

  • 执行器集群部署,如果任务的路由策略选择【分片广播】,一次任务调度将会【广播触发】对应集群中所有执行器执行一次任务,同时系统自动传递分片参数,执行器可根据分片参数开发分片任务
  • 需要处理的海量数据,以执行器为划分,每个执行器分配一定的任务数,并行执行
  • XXL-Job支持动态扩容执行器集群,从而动态增加分片数量,到达更快处理任务
  • 分片的值是调度中心分配的
// 当前分片数,从0开始,即执行器的序号
int shardIndex = XxlJobHelper.getShardIndex();
//总分片数,执行器集群总机器数量
int shardTotal = XxlJobHelper.getShardTotal();

image-20230703140222054

解决思路

  1. 分片广播
  2. 也可以启动多个job,使用同个jobHandler,通过命令行参数控制
  • 如果将100W数据均匀分给集群里的10台机器同时处理,
  • 每台机器耗时,1万秒即可,耗时会大大缩短,也能充分利用集群资源
  • 在xxl-job里,可以配置执行器集群有10个机器,那么分片总数是10,分片序号0~9 分别对应那10台机器。
  • 分片方式

    • id % 分片总数 余数是0 的,在第1个执行器上执行
    • id % 分片总数 余数是1 的,在第2个执行器上执行
    • id % 分片总数 余数是2 的,在第3个执行器上执行
    • ...
    • id % 分片总数 余数是9 的,在第10个执行器上执行

2、代码编写

/**
     * 100个用户,分片处理
     */
    @XxlJob("myShardingJobHandler")
    public void shardingJobHandler(){
        // 当前分片数,从0开始,即执行器的序号
        int shardIndex = XxlJobHelper.getShardIndex();
        //总分片数,执行器集群总机器数量
        int shardTotal = XxlJobHelper.getShardTotal();

        XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);


        List<Integer> allUserIds = getAllUserIds();
        allUserIds.forEach(obj -> {
            if (obj % shardTotal == shardIndex) {
                log.info("第 {} 片, 命中分片开始处理用户id={}",shardIndex,obj);
            }
        });
    }

    private List<Integer> getAllUserIds() {
        List<Integer> ids = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            ids.add(i);
        }
        return ids;
    }

3、测试

  1. 新建一个分片任务

image-20230703141009035

  1. 新建一个服务器实例,直接copy一份

    image-20230703141109854

  2. 启动系统,点击运行一次

    image-20230703141221278

6、阻塞策略

  • 单机串行:新的调度任务在进入到执行器之后,该调度任务进入FIFO队列,并以串行的方式去进行
  • 丢弃后续调度:新的调度任务在进入到执行器之后,如果存在相同的且正在运行的调度任务,本次的调度任务请求就会被丢弃掉,并且标记为失败
  • 覆盖之前的调度:新的调度任务在进入到执行器之后,如果存在相同的且正在运行的调度任务,就会终止掉当前正在运行的调度任务,并且清空队列,运行新的调度任务。

好了,本文就到这里了!如果觉得内容不错的话,希望大家可以帮忙点赞转发一波,这是对我最大的鼓励,感谢🙏🏻

资料获取👇 最后面就是领取暗号,公众号回复即可!

相关实践学习
【涂鸦即艺术】基于云应用开发平台CAP部署AI实时生图绘板
【涂鸦即艺术】基于云应用开发平台CAP部署AI实时生图绘板
相关文章
|
2月前
|
负载均衡 算法 调度
基于遗传算法的新的异构分布式系统任务调度算法研究(Matlab代码实现)
基于遗传算法的新的异构分布式系统任务调度算法研究(Matlab代码实现)
174 11
|
8月前
|
数据采集 存储 数据可视化
分布式爬虫框架Scrapy-Redis实战指南
本文介绍如何使用Scrapy-Redis构建分布式爬虫系统,采集携程平台上热门城市的酒店价格与评价信息。通过代理IP、Cookie和User-Agent设置规避反爬策略,实现高效数据抓取。结合价格动态趋势分析,助力酒店业优化市场策略、提升服务质量。技术架构涵盖Scrapy-Redis核心调度、代理中间件及数据解析存储,提供完整的技术路线图与代码示例。
864 0
分布式爬虫框架Scrapy-Redis实战指南
|
6月前
|
监控 Java 调度
SpringBoot中@Scheduled和Quartz的区别是什么?分布式定时任务框架选型实战
本文对比分析了SpringBoot中的`@Scheduled`与Quartz定时任务框架。`@Scheduled`轻量易用,适合单机简单场景,但存在多实例重复执行、无持久化等缺陷;Quartz功能强大,支持分布式调度、任务持久化、动态调整和失败重试,适用于复杂企业级需求。文章通过特性对比、代码示例及常见问题解答,帮助开发者理解两者差异,合理选择方案。记住口诀:单机简单用注解,多节点上Quartz;若是任务要可靠,持久化配置不能少。
647 4
|
11月前
|
存储 监控 数据可视化
常见的分布式定时任务调度框架
分布式定时任务调度框架用于在分布式系统中管理和调度定时任务,确保任务按预定时间和频率执行。其核心概念包括Job(任务)、Trigger(触发器)、Executor(执行器)和Scheduler(调度器)。这类框架应具备任务管理、任务监控、良好的可扩展性和高可用性等功能。常用的Java生态中的分布式任务调度框架有Quartz Scheduler、ElasticJob和XXL-JOB。
4142 66
|
10月前
|
数据采集 人工智能 分布式计算
MaxFrame:链接大数据与AI的高效分布式计算框架深度评测与实践!
阿里云推出的MaxFrame是链接大数据与AI的分布式Python计算框架,提供类似Pandas的操作接口和分布式处理能力。本文从部署、功能验证到实际场景全面评测MaxFrame,涵盖分布式Pandas操作、大语言模型数据预处理及企业级应用。结果显示,MaxFrame在处理大规模数据时性能显著提升,代码兼容性强,适合从数据清洗到训练数据生成的全链路场景...
530 5
MaxFrame:链接大数据与AI的高效分布式计算框架深度评测与实践!
|
10月前
|
人工智能 分布式计算 大数据
MaxFrame 产品评测:大数据与AI融合的Python分布式计算框架
MaxFrame是阿里云MaxCompute推出的自研Python分布式计算框架,支持大规模数据处理与AI应用。它提供类似Pandas的API,简化开发流程,并兼容多种机器学习库,加速模型训练前的数据准备。MaxFrame融合大数据和AI,提升效率、促进协作、增强创新能力。尽管初次配置稍显复杂,但其强大的功能集、性能优化及开放性使其成为现代企业与研究机构的理想选择。未来有望进一步简化使用门槛并加强社区建设。
474 8
|
11月前
|
分布式计算 大数据 数据处理
技术评测:MaxCompute MaxFrame——阿里云自研分布式计算框架的Python编程接口
随着大数据和人工智能技术的发展,数据处理的需求日益增长。阿里云推出的MaxCompute MaxFrame(简称“MaxFrame”)是一个专为Python开发者设计的分布式计算框架,它不仅支持Python编程接口,还能直接利用MaxCompute的云原生大数据计算资源和服务。本文将通过一系列最佳实践测评,探讨MaxFrame在分布式Pandas处理以及大语言模型数据处理场景中的表现,并分析其在实际工作中的应用潜力。
421 2
|
12月前
|
消息中间件 运维 数据库
Seata框架和其他分布式事务框架有什么区别
Seata框架和其他分布式事务框架有什么区别
259 1
|
3月前
|
存储 负载均衡 NoSQL
【赵渝强老师】Redis Cluster分布式集群
Redis Cluster是Redis的分布式存储解决方案,通过哈希槽(slot)实现数据分片,支持水平扩展,具备高可用性和负载均衡能力,适用于大规模数据场景。
317 2
|
3月前
|
存储 缓存 NoSQL
【📕分布式锁通关指南 12】源码剖析redisson如何利用Redis数据结构实现Semaphore和CountDownLatch
本文解析 Redisson 如何通过 Redis 实现分布式信号量(RSemaphore)与倒数闩(RCountDownLatch),利用 Lua 脚本与原子操作保障分布式环境下的同步控制,帮助开发者更好地理解其原理与应用。
253 6

热门文章

最新文章

下一篇
oss云网关配置