分布式定时任务调度框架实践

本文涉及的产品
云原生网关 MSE Higress,422元/月
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
简介: 分布式定时任务调度框架实践

分布式任务调度框架几乎是每个大型应用必备的工具,本文介绍了任务调度框架使用的需求背景和痛点,对业界普遍使用的开源分布式任务调度框架的使用进行了探究实践,并分析了这几种框架的优劣势和对自身业务的思考。

分布式定时任务调度框架实践

  • 结语
  • 以下文章来源于vivo互联网技术 ,作者陈王荣

一、业务背景

1.1 为什么需要使用定时任务调度

(1)时间驱动处理场景: 整点发送优惠券,每天更新收益,每天刷新标签数据和人群数据。

(2)批量处理数据: 按月批量统计报表数据,批量更新短信状态,实时性要求不高。

(3)异步执行解耦: 活动状态刷新,异步执行离线查询,与内部逻辑解耦。

1.2 使用需求和痛点

(1)任务执行监控告警能力。

(2)任务可灵活动态配置,无需重启。

(3)业务透明,低耦合,配置精简,开发方便。

(4)易测试。

(5)高可用,无单点故障。

(6)任务不可重复执行,防止逻辑异常。

(7)大任务的分发并行处理能力。

二、开源框架实践与探索

2.1 Java 原生 Timer 和

ScheduledExecutorService

2.1.1 Timer使用

Timer缺陷:

  1. Timer底层是使用单线程来处理多个Timer任务,这意味着所有任务实际上都是串行执行,前一个任务的延迟会影响到之后的任务的执行。
  2. 由于单线程的缘故,一旦某个定时任务在运行时,产生未处理的异常,那么不仅当前这个线程会停止,所有的定时任务都会停止。
  3. Timer任务执行是依赖于系统绝对时间,系统时间变化会导致执行计划的变更。

由于上述缺陷,尽量不要使用Timer, idea中也会明确提示,使用ScheduledThreadPoolExecutor替代Timer 。

2.1.2 ScheduledExecutorService使用

ScheduledExecutorService对于Timer的缺陷进行了修补,首先ScheduledExecutorService内部实现是ScheduledThreadPool线程池,可以支持多个任务并发执行。

对于某一个线程执行的任务出现异常,也会处理,不会影响其他线程任务的执行,另外ScheduledExecutorService是基于时间间隔的延迟,执行不会由于系统时间的改变发生变化。

当然,ScheduledExecutorService也有自己的局限性:只能根据任务的延迟来进行调度,无法满足基于绝对时间和日历调度的需求。

2.2 Spring Task

2.2.1 Spring Task 使用

spring task 是spring自主开发的轻量级定时任务框架,不需要依赖其他额外的包,配置较为简单。

此处使用注解配置

2.2.2 Spring Task缺陷

Spring Task 本身不支持持久化,也没有推出官方的分布式集群模式,只能靠开发者在业务应用中自己手动扩展实现,无法满足可视化,易配置的需求。

2.3 永远经典的 Quartz

2.3.1 基本介绍

Quartz框架是Java领域最著名的开源任务调度工具,也是目前事实上的定时任务标准,几乎全部的开源定时任务框架都是基于Quartz核心调度构建而成。

2.3.2 原理解析

核心组件和架构

关键概念

(1) Scheduler :任务调度器,是执行任务调度的控制器。本质上是一个计划调度容器,注册了全部Trigger和对应的JobDetail, 使用线程池作为任务运行的基础组件,提高任务执行效率。

(2) Trigger :触发器,用于定义任务调度的时间规则,告诉任务调度器什么时候触发任务,其中CronTrigger是基于cron表达式构建的功能强大的触发器。

(3) Calendar :日历特定时间点的集合。一个trigger可以包含多个Calendar,可用于排除或包含某些时间点。

(4) JobDetail :是一个可执行的工作,用来描述Job实现类及其它相关的静态信息,如Job的名称、监听器等相关信息。

(5) Job :任务执行接口,只有一个execute方法,用于执行真正的业务逻辑。

(6) JobStore :任务存储方式,主要有RAMJobStore和JDBCJobStore,RAMJobStore是存储在JVM的内存中,有丢失和数量受限的风险,JDBCJobStore是将任务信息持久化到数据库中,支持集群。

2.3.3 实践说明

(1)关于Quartz的基本使用

可参考Quartz官方文档和网上博客实践教程。

(2)业务使用要满足动态修改和重启不丢失, 一般需要使用数据库进行保存。

Quartz本身支持JDBCJobStore,但是其配置的数据表比较多,官方推荐配置可参照官方文档,超过10张表,业务使用比较重。

在使用的时候只需要存在基本trigger配置和对应任务以及相关执行日志的表即可满足绝大部分需求。

(3)组件化

将quartz动态任务配置信息持久化到数据库,将数据操作包装成基本jar包,供项目之间使用,引用项目只需要引入jar包依赖和配置对应的数据表,使用时就可以对Quartz配置透明。

(4)扩展

集群模式

通过故障转移和负载均衡实现了任务的高可用性,通过数据库的锁机制来确保任务执行的唯一性,但是集群特性仅仅只是用来HA,节点数量的增加并不会提升单个任务的执行效率,不能实现水平扩展。

Quartz插件

可以对特定需要进行扩展,比如增加触发器和任务执行日志,任务依赖串行处理场景,可参考:quartz插件——实现任务之间的串行调度

2.3.4 缺陷和不足

(1)需要把任务信息持久化到业务数据表,和业务有耦合。

(2)调度逻辑和执行逻辑并存于同一个项目中,在机器性能固定的情况下,业务和调度之间不可避免地会相互影响。

(3)quartz集群模式下,是通过数据库独占锁来唯一获取任务,任务执行并没有实现完善的负载均衡机制。

2.4 轻量级神器 XXLJOB

2.4.1 基本介绍

XXLJOB是一个轻量级分布式任务调度平台,主打特点是平台化,易部署,开发迅速、学习简单、轻量级、易扩展,代码仍在持续更新中。

“调度中心”是任务调度控制台,平台自身并不承担业务逻辑,只是负责任务的统一管理和调度执行,并且提供任务管理平台, “执行器” 负责接收“调度中心”的调度并执行,可直接部署执行器,也可以将执行器集成到现有业务项目中。 通过将任务的调度控制和任务的执行解耦,业务使用只需要关注业务逻辑的开发。

主要提供了任务的动态配置管理、任务监控和统计报表以及调度日志几大功能模块,支持多种运行模式和路由策略,可基于对应执行器机器集群数量进行简单分片数据处理。

2.4.2 原理解析

2.1.0版本前核心调度模块都是基于quartz框架,2.1.0版本开始自研调度组件,移除quartz依赖 ,使用时间轮调度。

2.4.3 实践说明

详细配置和介绍参考官方文档。

2.4.3.1 demo使用:

示例1:实现简单任务配置,只需要继承IJobHandler 抽象类,并声明注解

@JobHandler(value=“offlineTaskJobHandler”) ,实现业务逻辑即可。(注:此次引入了dubbo,后文介绍)。

@JobHandler(value = "offlineTaskJobHandler")
@Componentpublic
class OfflineTaskJobHandler extends IJobHandler {
    @Reference(check = false, version = "cmsdev", group = "cmsservice")
    private OfflineTaskExecutorFacade offlineTaskExecutorFacade;
    @Override
    public ReturnT<String> execute(String param) throws Exception {
        XxlJobLogger.log(" offlineTaskJobHandler start.");
        try {
            offlineTaskExecutorFacade.executeOfflineTask();
        } catch (Exception e) {
            XxlJobLogger.log("offlineTaskJobHandler>exception.", e);
            return FAIL;
        }
        XxlJobLogger.log("XXLJOB, offlineTaskJobHandler end.");
        return SUCCESS;
    }
}

示例2:分片广播任务。

@JobHandler(value = "shardingJobHandler")
@Servicepublic
class ShardingJobHandler extends IJobHandler {
    @Override
    public ReturnT<String> execute(String param) throws Exception {        // 分片参数   
        ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();
        XxlJobLogger.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardingVO.getIndex(), shardingVO.getTotal());        // 业务逻辑   
        for (int i = 0; i < shardingVO.getTotal(); i++) {
            if (i == shardingVO.getIndex()) {
                XxlJobLogger.log("第 {} 片, 命中分片开始处理", i);
            } else {
                XxlJobLogger.log("第 {} 片, 忽略", i);
            }
        }
        return SUCCESS;
    }
}

2.4.3.2 整合dubbo

(1)引入dubbospringbootstarter和业务facade jar包依赖。

<dependency>  
  <groupId>com.alibaba.spring.boot</groupId>   
   <artifactId>dubbospringbootstarter</artifactId>   
    <version>2.0.0</version>
    </dependency>
      <dependency>  
        <groupId>com.demo.service</groupId>  
          <artifactId>xxxfacade</artifactId>  
            <version>1.9SNAPSHOT</version>
            </dependency>

(2)配置文件加入dubbo消费端配置(可根据环境定义多个配置文件,通过profile切换)。

## Dubbo 服务消费者配置
spring.dubbo.application.name=xxljob  
spring.dubbo.registry.address=zookeeper://zookeeper.xyz:2183
spring.dubbo.port=20880  
spring.dubbo.version=demospring.dubbo.group=demoservice

(3)代码中通过@Reference注入facade接口即可。

@Reference(check = false,version = "demo",group="demoservice")private OfflineTaskExecutorFacade offlineTaskExecutorFacade;

(4)启动程序加入@EnableDubboConfiguration注解。

@SpringBootApplication@EnableDubboConfigurationpublic class XxlJobExecutorApplication {  
 public static void main(String[] args) {      
   SpringApplication.run(XxlJobExecutorApplication.class, args); 
     }}

2.4.4 任务可视化配置

内置了平台项目,方便了开发者对任务的管理和执行日志的监控,并提供了一些便于测试的功能。

2.4.5 扩展

(1)任务监控和报表的优化。

(2)任务报警方式的扩展,比如加入告警中心,提供内部消息,短信告警。

(3)对实际业务内部执行出现异常情况下的不同监控告警和重试策略。

2.5 高可用 ElasticJob

2.5.1 基本介绍

ElasticJob是一个分布式调度解决方案,由两个相互独立的子项目ElasticJobLite和ElasticJobCloud组成。

ElasticJobLite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务。

ElasticJobCloud使用Mesos + Docker的解决方案,额外提供资源治理、应用分发以及进程隔离等服务。

可惜的是已经两年没有迭代更新记录。

2.5.2 原理解析

2.5.3 实践说明

2.5.3.1 demo使用

(1)安装zookeeper,配置注册中心config,配置文件加入注册中心zk的配置。

import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@ConditionalOnExpression("'${regCenter.serverList}'.length() > 0")
public class JobRegistryCenterConfig {
    @Bean(initMethod = "init")
    public ZookeeperRegistryCenter regCenter(@Value("${regCenter.serverList}") final String serverList, @Value("${regCenter.namespace}") final String namespace) {
        return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
    }
}
spring.application.name=demo_elasticjob  regCenter.serverList=localhost:2181regCenter.namespace=demo_elasticjob  spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxljob?Unicode=true&characterEncoding=UTF8spring.datasource.username=userspring.datasource.password=pwd

(2)配置数据源config,并配置文件中加入数据源配置。

import jdk.nashorn.internal.objects.annotations.Getter;
import jdk.nashorn.internal.objects.annotations.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@ToString
@Configuration
@ConfigurationProperties(prefix = "spring.datasource")
public class DataSourceProperties {
    private String url;
    private String username;
    private String password;
    @Bean
    @Primary
    public DataSource getDataSource() {
        DruidDataSource dataSource = new DruidDataSource();
        dataSource.setUrl(url);
        dataSource.setUsername(username);
        dataSource.setPassword(password);
        return dataSource;
    }
}
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxljob?Unicode=truecharacterEncoding=UTF8spring.datasource.username=userspring.datasource.password=pwd

(3)配置事件config。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
@Configurationpublic
class JobEventConfig {
    @Autowired
    private DataSource dataSource;
    @Bean
    public JobEventConfiguration jobEventConfiguration() {
        return new JobEventRdbConfiguration(dataSource);
    }
}

(4)为了便于灵活配置不同的任务触发事件,加入ElasticSimpleJob注解。

import org.springframework.core.annotation.AliasFor;
import java.lang.annotation.ElementType;
import java.lang.annotation.RetentionPolicy;
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface ElasticSimpleJob {
    @AliasFor("cron") String value() default "";
    @AliasFor("value") String cron() default "";
    String jobName() default "";
    int shardingTotalCount() default 1;
    String shardingItemParameters() default "";
    String jobParameter() default "";
}

(5)对配置进行初始化。

import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
import java.util.Map;
@Configuration
@ConditionalOnExpression("'${elaticjob.zookeeper.serverlists}'.length() > 0")
public class ElasticJobAutoConfiguration {
    @Value("${regCenter.serverList}")
    private String serverList;
    @Value("${regCenter.namespace}")
    private String namespace;
    @Autowired
    private ApplicationContext applicationContext;
    @Autowired
    private DataSource dataSource;
    @PostConstruct
    public void initElasticJob() {
        ZookeeperRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
        regCenter.init();
        Map<String, SimpleJob> map = applicationContext.getBeansOfType(SimpleJob.class);
        for (Map.Entry<String, SimpleJob> entry : map.entrySet()) {
            SimpleJob simpleJob = entry.getValue();
            ElasticSimpleJob elasticSimpleJobAnnotation = simpleJob.getClass().getAnnotation(ElasticSimpleJob.class);
            String cron = StringUtils.defaultIfBlank(elasticSimpleJobAnnotation.cron(), elasticSimpleJobAnnotation.value());
            SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(JobCoreConfiguration.newBuilder(simpleJob.getClass().getName(), cron, elasticSimpleJobAnnotation.shardingTotalCount()).shardingItemParameters(elasticSimpleJobAnnotation.shardingItemParameters()).build(), simpleJob.getClass().getCanonicalName());
            LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(simpleJobConfiguration).overwrite(true).build();
            JobEventRdbConfiguration jobEventRdbConfiguration = new JobEventRdbConfiguration(dataSource);
            SpringJobScheduler jobScheduler = new SpringJobScheduler(simpleJob, regCenter, liteJobConfiguration, jobEventRdbConfiguration);
            jobScheduler.init();
        }
    }
}

(6)实现 SimpleJob接口,按上文中方法整合dubbo, 完成业务逻辑。

@ElasticSimpleJob(cron = "*/10 * * * * ?", jobName = "OfflineTaskJob", shardingTotalCount = 2, jobParameter = "测试参数", shardingItemParameters = "0=A,1=B")
@Componentpublic
class MySimpleJob implements SimpleJob {
    Logger logger = LoggerFactory.getLogger(OfflineTaskJob.class);
    @Reference(check = false, version = "cmsdev", group = "cmsservice")
    private OfflineTaskExecutorFacade offlineTaskExecutorFacade;
    @Override
    public void execute(ShardingContext shardingContext) {
        offlineTaskExecutorFacade.executeOfflineTask();
        logger.info(String.format("Thread ID: %s, 作业分片总数: %s, " + "当前分片项: %s.当前参数: %s," + "作业名称: %s.作业自定义参数: %s", Thread.currentThread().getId(), shardingContext.getShardingTotalCount(), shardingContext.getShardingItem(), shardingContext.getShardingParameter(), shardingContext.getJobName(), shardingContext.getJobParameter()));
    }
}

2.6 其余开源框架

(1) Saturn :Saturn是唯品会开源的一个分布式任务调度平台,在Elastic Job的基础上进行了改造。

(2) SIATASK :是宜信开源的分布式任务调度平台。

三、优劣势对比和业务场景适配思考

业务思考:

  1. 丰富任务监控数据和告警策略。
  2. 接入统一登录和权限控制。
  3. 进一步简化业务接入步骤。

四、总语

对于并发场景不是特别高的系统来说,xxljob配置部署简单易用,不需要引入多余的组件,同时提供了可视化的控制台,使用起来非常友好,是一个比较好的选择。希望直接利用开源分布式框架能力的系统,建议根据自身的情况来进行合适的选型。

附:参考文献

quartz插件——实现任务之间的串行调度

结语

如果这篇文章对您有所帮助,或者有所启发的话,求一键三连:点赞、评论、收藏➕关注,您的支持是我坚持写作最大的动力。

目录
相关文章
|
8天前
|
分布式计算 大数据 数据处理
技术评测:MaxCompute MaxFrame——阿里云自研分布式计算框架的Python编程接口
随着大数据和人工智能技术的发展,数据处理的需求日益增长。阿里云推出的MaxCompute MaxFrame(简称“MaxFrame”)是一个专为Python开发者设计的分布式计算框架,它不仅支持Python编程接口,还能直接利用MaxCompute的云原生大数据计算资源和服务。本文将通过一系列最佳实践测评,探讨MaxFrame在分布式Pandas处理以及大语言模型数据处理场景中的表现,并分析其在实际工作中的应用潜力。
36 2
|
16天前
|
运维 Kubernetes 调度
阿里云容器服务 ACK One 分布式云容器企业落地实践
阿里云容器服务ACK提供强大的产品能力,支持弹性、调度、可观测、成本治理和安全合规。针对拥有IDC或三方资源的企业,ACK One分布式云容器平台能够有效解决资源管理、多云多集群管理及边缘计算等挑战,实现云上云下统一管理,提升业务效率与稳定性。
|
23天前
|
机器学习/深度学习 存储 运维
分布式机器学习系统:设计原理、优化策略与实践经验
本文详细探讨了分布式机器学习系统的发展现状与挑战,重点分析了数据并行、模型并行等核心训练范式,以及参数服务器、优化器等关键组件的设计与实现。文章还深入讨论了混合精度训练、梯度累积、ZeRO优化器等高级特性,旨在提供一套全面的技术解决方案,以应对超大规模模型训练中的计算、存储及通信挑战。
56 4
|
26天前
|
NoSQL Java 数据处理
基于Redis海量数据场景分布式ID架构实践
【11月更文挑战第30天】在现代分布式系统中,生成全局唯一的ID是一个常见且重要的需求。在微服务架构中,各个服务可能需要生成唯一标识符,如用户ID、订单ID等。传统的自增ID已经无法满足在集群环境下保持唯一性的要求,而分布式ID解决方案能够确保即使在多个实例间也能生成全局唯一的标识符。本文将深入探讨如何利用Redis实现分布式ID生成,并通过Java语言展示多个示例,同时分析每个实践方案的优缺点。
57 8
|
18小时前
|
存储 监控 数据可视化
常见的分布式定时任务调度框架
分布式定时任务调度框架用于在分布式系统中管理和调度定时任务,确保任务按预定时间和频率执行。其核心概念包括Job(任务)、Trigger(触发器)、Executor(执行器)和Scheduler(调度器)。这类框架应具备任务管理、任务监控、良好的可扩展性和高可用性等功能。常用的Java生态中的分布式任务调度框架有Quartz Scheduler、ElasticJob和XXL-JOB。
96 66
|
2月前
|
NoSQL Java Redis
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
Redis分布式锁在高并发场景下是重要的技术手段,但其实现过程中常遇到五大深坑:**原子性问题**、**连接耗尽问题**、**锁过期问题**、**锁失效问题**以及**锁分段问题**。这些问题不仅影响系统的稳定性和性能,还可能导致数据不一致。尼恩在实际项目中总结了这些坑,并提供了详细的解决方案,包括使用Lua脚本保证原子性、设置合理的锁过期时间和使用看门狗机制、以及通过锁分段提升性能。这些经验和技巧对面试和实际开发都有很大帮助,值得深入学习和实践。
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
|
4月前
|
NoSQL Redis
基于Redis的高可用分布式锁——RedLock
这篇文章介绍了基于Redis的高可用分布式锁RedLock的概念、工作流程、获取和释放锁的方法,以及RedLock相比单机锁在高可用性上的优势,同时指出了其在某些特殊场景下的不足,并提到了ZooKeeper作为另一种实现分布式锁的方案。
131 2
基于Redis的高可用分布式锁——RedLock
|
23天前
|
存储 NoSQL Java
使用lock4j-redis-template-spring-boot-starter实现redis分布式锁
通过使用 `lock4j-redis-template-spring-boot-starter`,我们可以轻松实现 Redis 分布式锁,从而解决分布式系统中多个实例并发访问共享资源的问题。合理配置和使用分布式锁,可以有效提高系统的稳定性和数据的一致性。希望本文对你在实际项目中使用 Redis 分布式锁有所帮助。
66 5
|
1月前
|
NoSQL Redis
Redis分布式锁如何实现 ?
Redis分布式锁通过SETNX指令实现,确保仅在键不存在时设置值。此机制用于控制多个线程对共享资源的访问,避免并发冲突。然而,实际应用中需解决死锁、锁超时、归一化、可重入及阻塞等问题,以确保系统的稳定性和可靠性。解决方案包括设置锁超时、引入Watch Dog机制、使用ThreadLocal绑定加解锁操作、实现计数器支持可重入锁以及采用自旋锁思想处理阻塞请求。
59 16
|
1月前
|
缓存 NoSQL PHP
Redis作为PHP缓存解决方案的优势、实现方式及注意事项。Redis凭借其高性能、丰富的数据结构、数据持久化和分布式支持等特点,在提升应用响应速度和处理能力方面表现突出
本文深入探讨了Redis作为PHP缓存解决方案的优势、实现方式及注意事项。Redis凭借其高性能、丰富的数据结构、数据持久化和分布式支持等特点,在提升应用响应速度和处理能力方面表现突出。文章还介绍了Redis在页面缓存、数据缓存和会话缓存等应用场景中的使用,并强调了缓存数据一致性、过期时间设置、容量控制和安全问题的重要性。
43 5

热门文章

最新文章