为什么要引入分布式任务调度系统?

简介: 在开发中,定时任务是一种十分常见的应用场景,比如每天晚上12点同步数据,又或者每隔一个小时拉取一次数据。在Java中,实现定时任务的方式有很多,最简单的在线程中通过Thread.sleep睡眠线程,或者采用SpringBoot中的@Schedule注解,又或者采用定时线程池ScheduledExecutorService来实现。

本文收录于JavaStarter ,里面有我完整的Java系列文章,学习或面试都可以看看

本文将介绍分布式任务调度系统:xxl-job,开源地址如下:


码云地址:gitee.com/xuxueli0323…

文档地址:www.xuxueli.com/xxl-job/


(一)定时任务的场景


在开发中,定时任务是一种十分常见的应用场景,比如每天晚上12点同步数据,又或者每隔一个小时拉取一次数据。


在Java中,实现定时任务的方式有很多,最简单的在线程中通过Thread.sleep睡眠线程,或者采用SpringBoot中的@Schedule注解,又或者采用定时线程池ScheduledExecutorService来实现。


(二)上面的定时任务会有什么问题?


在单机环境下,上面的这种定时任务实现方式问题主要有一个,无法进行管理,没有容错机制。


但是在集群环境下,如果不对代码作控制,就会导致集群的每一台机器都会执行一次定时任务。


常见的解决方式,我通过配置文件进行控制,只让定时任务在某一台机器上执行,如果项目比较小,就几台机器组成的集群环境,这样的方式确实可以,只不过在任务的管理上需要想办法解决。


如果是一个很庞大的分布式微服务系统,可能会有成千上万个定时任务,那上面的方法就不合理了。因此许多互联网公司会采用分布式任务调度系统,主要为了实现高可用、容错管理、负载均衡、管理机制等功能,我目前所在公司使用的是xxl-job作为分布式任务调度平台。


(三)xxl-job的使用


xxl-job的一大优势就是使用简单,学习成本低,xxl-job作者已经给出了很详细的使用说明,下面我们就通过源码直接来跑一下。


3.1 初始化调度数据库


git上clone的项目中保存了初始化sql脚本,位置在:

/xxl-job/doc/db/tables_xxl_job.sql

执行完毕后会在数据库中新建库以及表结构。


3.2 修改配置


修改主配置文件:

/xxl-job/xxl-job-admin/src/main/resources/application.properties

主要修改jdbc的连接信息,以及报警邮件,xxl-job支持通过邮件报警的方式。


网络异常,图片无法展示
|


3.3 运行项目


直接运行xxl-job-admin中的XxlJobAdminApplication,正常启动后访问http://localhost:8080/xxl-job-admin,输入用户名密码:admin/123456,然后就能看到任务调度中心页面了


网络异常,图片无法展示
|


到这里为止,xxl-job的管理平台已经搭建完成,接下来展示客户端使用xxl-job的案例。xxl-job支持多种执行方式,我这里演示Java Bean的使用。其余的可看官方提供的技术文档。


3.4 配置执行器


执行器管理页面点击新增执行器:

AppName: 是每个执行器集群的唯一标示AppName, 执行器会周期性以AppName为对象进行自动注册。可通过该配置自动发现注册成功的执行器, 供任务调度时使用;
名称: 执行器的名称, 因为AppName限制字母数字等组成,可读性不强, 名称为了提高执行器的可读性;
排序: 执行器的排序, 系统中需要执行器的地方,如任务新增, 将会按照该排序读取可用的执行器列表;
注册方式:调度中心获取执行器地址的方式;自动注册:执行器自动进行执行器注册,调度中心通过底层注册表可以动态发现执行器机器地址;手动录入:人工手动录入执行器的地址信息,多地址逗号分隔,供调度中心使用;机器地址:"注册方式""手动录入"时有效,支持人工维护执行器的地址信息;

我这里选择手动录入,并且自己输入执行器的地址,ip是本机ip,端口选择一个未使用过的端口。


网络异常,图片无法展示
|


3.5 编写客户端代码


接下来编写客户端的代码,在xxl开源项目中,已经有springboot的demo,我们自己写一个。


第一步引入依赖:

<dependency><groupId>com.xuxueli</groupId><artifactId>xxl-job-core</artifactId><version>${project.parent.version}</version></dependency>

这里的version填写最新的稳定版本,因为我在xxljob的开源项目中新建了一个module进行测试,因此直接用父项目版本了。


第二步编写配置文件:

server.port=8081xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-adminxxl.job.accessToken=xxl.job.executor.appname=test-xxl-jobxxl.job.executor.address=xxl.job.executor.ip=172.18.2.49xxl.job.executor.port=8999xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandlerxxl.job.executor.logretentiondays=30

server.port=8081

xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin

xxl.job.accessToken=

xxl.job.executor.appname=test-xxl-job

xxl.job.executor.address=

xxl.job.executor.ip=172.18.2.49

xxl.job.executor.port=8999

xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler

xxl.job.executor.logretentiondays=30

@ConfigurationpublicclassXxlConfig {
privateLoggerlogger=LoggerFactory.getLogger(XxlConfig.class);
@Value("${xxl.job.admin.addresses}")
privateStringadminAddresses;
@Value("${xxl.job.accessToken}")
privateStringaccessToken;
@Value("${xxl.job.executor.appname}")
privateStringappname;
@Value("${xxl.job.executor.address}")
privateStringaddress;
@Value("${xxl.job.executor.ip}")
privateStringip;
@Value("${xxl.job.executor.port}")
privateintport;
@Value("${xxl.job.executor.logpath}")
privateStringlogPath;
@Value("${xxl.job.executor.logretentiondays}")
privateintlogRetentionDays;
@BeanpublicXxlJobSpringExecutorxxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutorxxlJobSpringExecutor=newXxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
returnxxlJobSpringExecutor;
    }
}

第四步编写demo代码:

@ComponentpublicclassDemoJobHandler {
@XxlJob("demoJobHandler")
publicvoiddemoJobHandler(){
System.out.println("执行定时任务");
XxlJobHelper.log("执行定时任务");
    }
}

通过@XxlJob("demoJobHandler"),指定任务的名称。


3.5 配置任务


代码写好了,接下来配置具体的任务了,进管理平台的任务管理,在test执行器下新建一个任务,简单如下配置:


网络异常,图片无法展示
|


Cron配置了每10s执行一次,配置任务完成后启动任务,定时Job就开始工作了,通过日志可以查看是否执行成功。


(四)xxl-job集群下的使用


既然被称为分布式任务调度平台,xxl-job如何体现分布式场景下的任务调度呢?在任务配置的高级配置中,提供了多种路由策略:


网络异常,图片无法展示
|


我现在选择轮询,然后修改一下执行器的配置,加入两个地址:


http://172.18.2.49:8999,http://172.18.2.49:8998

同时将测试项目启动两个,两者的配置文件分别为:

#第一个项目server.port=8081xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-adminxxl.job.accessToken=xxl.job.executor.appname=test-xxl-jobxxl.job.executor.address=xxl.job.executor.ip=172.18.2.49xxl.job.executor.port=8999xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandlerxxl.job.executor.logretentiondays=30#第二个项目server.port=8082xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-adminxxl.job.accessToken=xxl.job.executor.appname=test-xxl-jobxxl.job.executor.address=xxl.job.executor.ip=172.18.2.49xxl.job.executor.port=8998xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandlerxxl.job.executor.logretentiondays=30

再启动任务后,会发现定时任务会在两个项目中轮询的执行:


网络异常,图片无法展示
|
网络异常,图片无法展示
|


除了轮询之外,像故障转移、忙碌转移策略可以实现容错,一致性哈希可以保证同一个任务只在一台机器上执行。


(五)总结


目前分布式任务调度的开源框架有很多,xxl-job是最常用的,功能确实很完善,同时完全开源。也难怪大量互联网企业在使用它。我是鱼仔,我们下期再见!



相关文章
|
24天前
|
机器学习/深度学习 存储 运维
分布式机器学习系统:设计原理、优化策略与实践经验
本文详细探讨了分布式机器学习系统的发展现状与挑战,重点分析了数据并行、模型并行等核心训练范式,以及参数服务器、优化器等关键组件的设计与实现。文章还深入讨论了混合精度训练、梯度累积、ZeRO优化器等高级特性,旨在提供一套全面的技术解决方案,以应对超大规模模型训练中的计算、存储及通信挑战。
57 4
|
1月前
|
存储 运维 负载均衡
构建高可用性GraphRAG系统:分布式部署与容错机制
【10月更文挑战第28天】作为一名数据科学家和系统架构师,我在构建和维护大规模分布式系统方面有着丰富的经验。最近,我负责了一个基于GraphRAG(Graph Retrieval-Augmented Generation)模型的项目,该模型用于构建一个高可用性的问答系统。在这个过程中,我深刻体会到分布式部署和容错机制的重要性。本文将详细介绍如何在生产环境中构建一个高可用性的GraphRAG系统,包括分布式部署方案、负载均衡、故障检测与恢复机制等方面的内容。
119 4
构建高可用性GraphRAG系统:分布式部署与容错机制
|
1月前
|
机器学习/深度学习 人工智能 分布式计算
【AI系统】分布式通信与 NVLink
进入大模型时代后,AI的核心转向大模型发展,训练这类模型需克服大量GPU资源及长时间的需求。面对单个GPU内存限制,跨多个GPU的分布式训练成为必要,这涉及到分布式通信和NVLink技术的应用。分布式通信允许多个节点协作完成任务,而NVLink则是一种高速、低延迟的通信技术,用于连接GPU或GPU与其它设备,以实现高性能计算。随着大模型的参数、数据规模扩大及算力需求增长,分布式并行策略,如数据并行和模型并行,变得至关重要。这些策略通过将模型或数据分割在多个GPU上处理,提高了训练效率。此外,NVLink和NVSwitch技术的持续演进,为GPU间的高效通信提供了更强的支持,推动了大模型训练的快
41 0
|
2月前
|
消息中间件 中间件 数据库
NServiceBus:打造企业级服务总线的利器——深度解析这一面向消息中间件如何革新分布式应用开发与提升系统可靠性
【10月更文挑战第9天】NServiceBus 是一个面向消息的中间件,专为构建分布式应用程序设计,特别适用于企业级服务总线(ESB)。它通过消息队列实现服务间的解耦,提高系统的可扩展性和容错性。在 .NET 生态中,NServiceBus 提供了强大的功能,支持多种传输方式如 RabbitMQ 和 Azure Service Bus。通过异步消息传递模式,各组件可以独立运作,即使某部分出现故障也不会影响整体系统。 示例代码展示了如何使用 NServiceBus 发送和接收消息,简化了系统的设计和维护。
68 3
|
2月前
|
消息中间件 存储 监控
消息队列系统中的确认机制在分布式系统中如何实现
消息队列系统中的确认机制在分布式系统中如何实现
|
2月前
|
消息中间件 存储 监控
【10月更文挑战第2天】消息队列系统中的确认机制在分布式系统中如何实现
【10月更文挑战第2天】消息队列系统中的确认机制在分布式系统中如何实现
|
2月前
|
存储 开发框架 .NET
C#语言如何搭建分布式文件存储系统
C#语言如何搭建分布式文件存储系统
87 2
|
2月前
|
消息中间件 存储 监控
消息队列系统中的确认机制在分布式系统中如何实现?
消息队列系统中的确认机制在分布式系统中如何实现?
|
3月前
|
存储 块存储
ceph分布式存储系统常见术语篇
关于Ceph分布式存储系统的常见术语解释和概述。
171 1
ceph分布式存储系统常见术语篇
|
2月前
|
存储 分布式计算 监控
C# 创建一个分布式文件存储系统需要怎么设计??
C# 创建一个分布式文件存储系统需要怎么设计??
44 0