分布式任务处理:XXL-JOB分布式任务调度框架(一)

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 分布式任务处理:XXL-JOB分布式任务调度框架


❓ 如何去高效处理一批任务

分布式任务调度的处理方案:分布式加多线程,充分利用多台计算机,每台计算机使用多线程处理。

1.业务场景与任务调度

我们可以先思考一下下面业务场景的解决方案:

  • 某电商系统需要在每天上午10点,下午3点,晚上8点发放一批优惠券。
  • 某财务系统需要在每天上午10点前结算前一天的账单数据,统计汇总。
  • 某电商平台每天凌晨3点,要对订单中的无效订单进行清理。
  • 12306网站会根据车次不同,设置几个时间点分批次放票。
  • 电商整点抢购,商品价格某天上午8点整开始优惠。
  • 商品成功发货后,需要向客户发送短信提醒。

类似的场景还有很多,我们该如何实现?以上这些场景,就是任务调度所需要解决的问题。

📖 任务调度顾名思义,就是对任务的调度,它是指系统为了完成特定业务,基于给定时间点,给定时间间隔或者给定执行次数自动执行任务。

2.任务调度的基本实现

2.1 多线程方式实现

我们可以开启一个线程,每sleep一段时间,就去检查是否已到预期执行时间。

以下代码简单实现了任务调度的功能:

/**
 * @author 狐狸半面添
 * @create 2023-02-16 13:15
 */
public class ThreadTaskDemo {
    public static void main(String[] args) {
        // 指定任务执行间隔时间(单位:ms)
        final long timeInterval = 1000;
        Runnable runnable = new Runnable() {
            public void run() {
                while (true) {
                    // TODO 需要执行的任务
                    System.out.println("多线程方式任务调度:每隔1s执行一次任务");
                    try {
                        Thread.sleep(timeInterval);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        Thread thread = new Thread(runnable);
        // 线程执行,开启定时任务
        thread.start();
    }
}

上面的代码实现了按一定的间隔时间执行任务调度的功能。

Jdk也为我们提供了相关支持,如Timer、ScheduledExecutor,如下👇

2.2 Timer方式实现

Timer 的优点在于简单易用,每个Timer对应一个线程,因此可以同时启动多个Timer并行执行多个任务,同一个Timer中的任务是串行执行。

import java.util.Timer;
import java.util.TimerTask;
/**
 * @author 狐狸半面添
 * @create 2023-02-17 15:18
 */
public class TimerTaskDemo {
    public static void main(String[] args) {
        Timer timer = new Timer();
        // 1秒后开始任务调度,每2秒执行一次任务
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                // TODO 需要执行的任务
                System.out.println("Timer方式任务调度:每隔2s执行一次任务");
            }
        }, 1000, 2000);
    }
}

2.3 ScheduledExecutor方式实现

Java 5 推出了基于线程池设计的 ScheduledExecutor,其设计思想是,每一个被调度的任务都会由线程池中一个线程去执行,因此任务是并发执行的,相互之间不会受到干扰。需要注意的是,只有当任务的执行时间到来时,ScheduedExecutor 才会真正启动一个线程,其余时间 ScheduledExecutor 都是在轮询任务的状态。

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
 * @author 狐狸半面添
 * @create 2023-02-17 15:22
 */
public class ScheduledExecutorTaskDemo {
    /**
     * 设置线程池的线程数量
     */
    private static ScheduledExecutorService executor = Executors.newScheduledThreadPool(3);
    public static void main(String[] args) {
        // 第一个任务调度
        executor.scheduleAtFixedRate(
                // 可以使用匿名内部类方式创建一个Runnable实现类,也可以new一个类实现Runnable接口
                new Runnable() {
                    @Override
                    public void run() {
                        // todo 需要执行的任务
                        System.out.println("任务一 定时调度中");
                    }
                },
                // 0秒后开始任务调度,每隔1秒执行一次任务
                0, 1, TimeUnit.SECONDS
        );
        // 第二个任务调度
        executor.scheduleAtFixedRate(
                // 可以使用匿名内部类方式创建一个Runnable实现类,也可以new一个类实现Runnable接口
                new Task(),
                // 500毫秒后开始任务调度,每隔2000毫秒执行一次任务
                500, 2000, TimeUnit.MILLISECONDS
        );
    }
    static class Task implements Runnable {
        @Override
        public void run() {
            // todo 需要执行的任务
            System.out.println("任务二 定时调度中");
        }
    }
}

2.4 第三方Quartz方式实现

TimerScheduledExecutor 都仅能提供基于开始时间与重复间隔的任务调度,对于比较复杂的调度需求,比如,设置每月第一天凌晨1点执行任务、复杂调度任务的管理、任务间传递数据等等,实现起来比较麻烦。

Quartz 是一个功能强大的任务调度框架,它可以满足更多更复杂的调度需求,Quartz 设计的核心类包括 Scheduler, Job 以及 Trigger。其中,Job 负责定义需要执行的任务,Trigger 负责设置调度策略,Scheduler 将二者组装在一起,并触发任务开始执行。Quartz支持简单的按时间间隔调度、还支持按日历调度方式,通过设置CronTrigger表达式(包括:秒、分、时、日、月、周、年)进行任务调度。

<!-- https://mvnrepository.com/artifact/org.quartz-scheduler/quartz -->
<dependency>
    <groupId>org.quartz-scheduler</groupId>
    <artifactId>quartz</artifactId>
    <version>2.3.2</version>
</dependency>
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
/**
 * @author 狐狸半面添
 * @create 2023-02-17 15:47
 */
public class QuartzDemo {
    public static void main(String[] agrs) throws SchedulerException {
        // 创建一个Scheduler
        SchedulerFactory schedulerFactory = new StdSchedulerFactory();
        Scheduler scheduler = schedulerFactory.getScheduler();
        // 创建JobDetail
        JobBuilder jobDetailBuilder = JobBuilder.newJob(MyJob.class);
        jobDetailBuilder.withIdentity("jobName", "jobGroupName");
        JobDetail jobDetail = jobDetailBuilder.build();
        // 创建触发的CronTrigger 支持按日历调度
        CronTrigger trigger = (CronTrigger) TriggerBuilder.newTrigger()
                .withIdentity("triggerName", "triggerGroupName")
                .startNow()
                // 每隔两秒执行一次
                .withSchedule(CronScheduleBuilder.cronSchedule("0/2 * * * * ?"))
                .build();
        //创建触发的SimpleTrigger 简单的间隔调度
        /*
            SimpleTrigger trigger = TriggerBuilder.newTrigger()
                .withIdentity("triggerName","triggerGroupName")
                .startNow()
                .withSchedule(SimpleScheduleBuilder
                        .simpleSchedule()
                        .withIntervalInSeconds(2)
                        .repeatForever())
                .build();
        */
        scheduler.scheduleJob(jobDetail, (Trigger) trigger);
        scheduler.start();
    }
    public static class MyJob implements Job {
        @Override
        public void execute(JobExecutionContext jobExecutionContext) {
            // todo 需要定时调度的任务
            System.out.println("定时任务正在调度执行");
        }
    }
}

3.分布式任务调度

通常任务调度的程序是集成在应用中的,比如:优惠卷服务中包括了定时发放优惠卷的的调度程序,结算服务中包括了定期生成报表的任务调度程序,由于采用分布式架构,一个服务往往会部署多个冗余实例来运行我们的业务,在这种分布式系统环境下运行任务调度,我们称之为分布式任务调度,如下图:

🚩 分布式调度要实现的目标:

不管是任务调度程序集成在应用程序中,还是单独构建的任务调度系统,如果采用分布式调度任务的方式就相当于将任务调度程序分布式构建,这样就可以具有分布式系统的特点,并且提高任务的调度处理能力:

  1. 并行任务调度:并行任务调度实现靠多线程,如果有大量任务需要调度,此时光靠多线程就会有瓶颈了,因为一台计算机CPU的处理能力是有限的。
    如果将任务调度程序分布式部署,每个结点还可以部署为集群,这样就可以让多台计算机共同去完成任务调度,我们可以将任务分割为若干个分片,由不同的实例并行执行,来提高任务调度的处理效率。
  2. 高可用:若某一个实例宕机,不影响其他实例来执行任务。
  3. 弹性扩容:当集群中增加实例就可以提高并执行任务的处理效率。
  4. 任务管理与监测:对系统中存在的所有定时任务进行统一的管理及监测。让开发人员及运维人员能够时刻了解任务执行情况,从而做出快速的应急处理响应。
  5. 避免任务重复执行:当任务调度以集群方式部署,同一个任务调度可能会执行多次,比如在上面提到的电商系统中到点发优惠券的例子,就会发放多次优惠券,对公司造成很多损失,所以我们需要控制相同的任务在多个运行实例上只执行一次。

4.XXL-JOB介绍

XXL-JOB是一个轻量级分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。

🏠 官网:https://www.xuxueli.com/xxl-job/

📖 文档:https://www.xuxueli.com/xxl-job/#%E3%80%8A%E5%88%86%E5%B8%83%E5%BC%8F%E4%BB%BB%E5%8A%A1%E8%B0%83%E5%BA%A6%E5%B9%B3%E5%8F%B0XXL-JOB%E3%80%8B

XXL-JOB主要有调度中心执行器任务

🍀 调度中心:负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码。主要职责为执行器管理、任务管理、监控运维、日志管理等。

🍀 任务执行器:负责接收调度请求并执行任务逻辑。只要职责是注册服务、任务执行服务(接收到任务后会放入线程池中的任务队列)、执行结果上报、日志服务等。

🍀 任务:负责执行具体的业务处理。

🚩 调度中心与执行器之间的工作流程如下:

📍 执行流程:

  1. 任务执行器根据配置的调度中心的地址,自动注册到调度中心。
  2. 达到任务触发条件,调度中心下发任务
  3. 执行器基于线程池执行任务,并把执行结果放入内存队列中、把执行日志写入日志文件中
  4. 执行器消费内存队列中的执行结果,主动上报给调度中心
  5. 当用户在调度中心查看任务日志,调度中心请求任务执行器,任务执行器读取任务日志文件并返回日志详情

5.搭建XXL-JOB —— 调度中心

5.1 下载与查看XXL-JOB

🏠 下载 XXL-JOB:

我们这里使用2.3.1版本: https://github.com/xuxueli/xxl-job/releases/tag/2.3.1

使用IDEA打开解压后的目录:

  • xxl-job-admin:调度中心
  • xxl-job-core:公共依赖
  • xxl-job-executor-samples:执行器Sample示例(选择合适的版本执行器,可直接使用)
  1. xxl-job-executor-sample-springboot:Springboot版本,通过Springboot管理执行器,推荐这种方式;
  2. xxl-job-executor-sample-frameless:无框架版本;
  • doc :文档资料,包含数据库脚本

5.2 创建数据库表

打开脚本,全选执行即可。

⚠️ 注意事项:

之后我们在访问调度中心时,需要登录用户名和密码,默认为:

  • 用户名:admin
  • 密码:123456

这个信息在数据库的 xxl_job_user 进行保存和登录验证:

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
27天前
|
Java 数据库
在Java中使用Seata框架实现分布式事务的详细步骤
通过以上步骤,利用 Seata 框架可以实现较为简单的分布式事务处理。在实际应用中,还需要根据具体业务需求进行更详细的配置和处理。同时,要注意处理各种异常情况,以确保分布式事务的正确执行。
|
27天前
|
消息中间件 Java Kafka
在Java中实现分布式事务的常用框架和方法
总之,选择合适的分布式事务框架和方法需要综合考虑业务需求、性能、复杂度等因素。不同的框架和方法都有其特点和适用场景,需要根据具体情况进行评估和选择。同时,随着技术的不断发展,分布式事务的解决方案也在不断更新和完善,以更好地满足业务的需求。你还可以进一步深入研究和了解这些框架和方法,以便在实际应用中更好地实现分布式事务管理。
|
4天前
|
分布式计算 大数据 数据处理
技术评测:MaxCompute MaxFrame——阿里云自研分布式计算框架的Python编程接口
随着大数据和人工智能技术的发展,数据处理的需求日益增长。阿里云推出的MaxCompute MaxFrame(简称“MaxFrame”)是一个专为Python开发者设计的分布式计算框架,它不仅支持Python编程接口,还能直接利用MaxCompute的云原生大数据计算资源和服务。本文将通过一系列最佳实践测评,探讨MaxFrame在分布式Pandas处理以及大语言模型数据处理场景中的表现,并分析其在实际工作中的应用潜力。
21 2
|
26天前
|
存储 Java 关系型数据库
在Spring Boot中整合Seata框架实现分布式事务
可以在 Spring Boot 中成功整合 Seata 框架,实现分布式事务的管理和处理。在实际应用中,还需要根据具体的业务需求和技术架构进行进一步的优化和调整。同时,要注意处理各种可能出现的问题,以保障分布式事务的顺利执行。
46 6
|
26天前
|
数据库
如何在Seata框架中配置分布式事务的隔离级别?
总的来说,配置分布式事务的隔离级别是实现分布式事务管理的重要环节之一,需要认真对待和仔细调整,以满足业务的需求和性能要求。你还可以进一步深入研究和实践 Seata 框架的配置和使用,以更好地应对各种分布式事务场景的挑战。
28 6
|
24天前
|
消息中间件 运维 数据库
Seata框架和其他分布式事务框架有什么区别
Seata框架和其他分布式事务框架有什么区别
24 1
|
1月前
|
机器学习/深度学习 自然语言处理 并行计算
DeepSpeed分布式训练框架深度学习指南
【11月更文挑战第6天】随着深度学习模型规模的日益增大,训练这些模型所需的计算资源和时间成本也随之增加。传统的单机训练方式已难以应对大规模模型的训练需求。
134 3
|
1月前
|
机器学习/深度学习 并行计算 Java
谈谈分布式训练框架DeepSpeed与Megatron
【11月更文挑战第3天】随着深度学习技术的不断发展,大规模模型的训练需求日益增长。为了应对这种需求,分布式训练框架应运而生,其中DeepSpeed和Megatron是两个备受瞩目的框架。本文将深入探讨这两个框架的背景、业务场景、优缺点、主要功能及底层实现逻辑,并提供一个基于Java语言的简单demo例子,帮助读者更好地理解这些技术。
87 2
|
2月前
|
分布式计算 Hadoop
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
52 1
|
2月前
|
存储 SQL 消息中间件
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
53 0