SpringBoot整合XXL-JOB【05】- 任务分片

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,高可用系列 2核4GB
简介: 在实际业务中,批量定时任务可能因执行时间过长影响业务。本文介绍如何使用Xxl-job对批量任务进行分片处理,通过分片广播形式调度集群机器并行执行,显著缩短任务耗时。具体步骤包括环境准备、不使用分片的代码示例、使用分片的改造方法及测试结果,展示了分片处理带来的性能提升。

在实际业务场景中,我们可能会碰到需要短时间批量执行的定时任务,此时如果我们不做任何处理,只是像前面那样去执行,可能出现上一批的任务还没执行完,定时的时间就到了,那么势必会对业务产生影响,所以就需要我们对批量任务进行分片处理,本节就让我们看看Xxl-job如何做分片处理。

一、环境准备

1.mock数据

首先在数据库里建一个测试的表,并mock一些数据进去,模拟业务中的“大表”。因为本身就是在模拟,也不讲究性能什么的,大家自己操作下就行。

2.添加依赖

依赖也没什么好讲究的,大家根据自己的喜好添加,只要能确保可以正常连接上数据库就行,如下:

<!--MyBatis驱动-->
<dependency>
    <groupId>org.mybatis.spring.boot</groupId>
    <artifactId>mybatis-spring-boot-starter</artifactId>
    <version>1.2.0</version>
</dependency>
<!--mysql驱动-->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
</dependency>
<!--lombok依赖-->
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <scope>provided</scope>
</dependency>
<!--连接池-->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>druid</artifactId>
    <version>1.1.10</version>
</dependency>
AI 代码解读

3.添加配置

别忘了配置数据库的连接信息,如下:

spring.datasource.url=jdbc:mysql://localhost:3306/xxl_job_demo?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=UTF-8
spring.datasource.driverClassName=com.mysql.jdbc.Driver
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.username=root
spring.datasource.password=123456
AI 代码解读

4.声明实体类和查询类

根据自己实际模拟的数据声明对应的实体类和查询类,确保能查到数据就行,这步也不细说,每个人的实体和选择的ORM框架都不一样。

二、分片与不分片

1.不使用分片

首先,我们来看看这段代码:

@XxlJob("sendMsgHandler")
public void sendMsgHandler() throws Exception{
   
    List<UserMobilePlan> userMobilePlans = userMobilePlanMapper.selectAll();
    System.out.println("任务开始时间:"+new Date()+",处理任务数量:"+userMobilePlans.size());
    Long startTime = System.currentTimeMillis();
    userMobilePlans.forEach(item->{
   
        try {
   
            //模拟发送短信动作
            TimeUnit.MILLISECONDS.sleep(10);
        } catch (InterruptedException e) {
   
            e.printStackTrace();
        }
    });
    System.out.println("任务结束时间:"+new Date());
    System.out.println("任务耗时:"+(System.currentTimeMillis()-startTime)+"毫秒");
}
AI 代码解读

具体的mapper如下:

@Mapper
public interface UserMobilePlanMapper {
   

    @Select("select * from t_user_mobile_plan")
    List<UserMobilePlan> selectAll();
}
AI 代码解读

通过查询用户表模拟发短信的场景,我们启动项目执行一下这个任务看看需要多久完成,如下:
01.png
可以看到,我本地模拟了两千条数据测试的结果:用时30秒左右,那么假如在实际业务场景中,这个任务需要我们每隔20秒就执行一次,这样是不是就出现问题了,所以就需要我们对批量任务进行切片处理,使得它们可以被分为好几组并行执行,缩短执行耗时。

2.使用分片

区别于上面的案例,如果采用分片广播的形式,那么这一次任务的执行将会调度集群里的所有机器(也就是执行器)进行执行,从而大大加快执行速度。因此,我们需要在原来的业务逻辑基础上加上分片参数,这个分片参数也不需要我们自己去写,通过XXL-JOB提供的工具类就可以拿到,如下:

int shardTotal = XxlJobHelper.getShardTotal();
int shardIndex = XxlJobHelper.getShardIndex();
AI 代码解读

这里的shardTotalshardIndex分别对应分片总数和分片任务的索引,这么说可能有的同学还是不太理解。我们换种描述,shardTotal就是执行器的个数(Ps:也就是我们机器的个数),而shardIndex则可以对应到我们具体查询里的数据的id。因此,我们这里改造原查询的思路则是通过shardTotalshardIndex对id进行求模运算,如下:

@Mapper
public interface UserMobilePlanMapper {
   

    @Select("select * from t_user_mobile_plan where mod(id, #{shardingTotal})=#{shardingIndex}")
    List<UserMobilePlan> selectByMod(@Param("shardingIndex") Integer shardingIndex,@Param("shardingTotal")Integer shardingTotal);

    @Select("select * from t_user_mobile_plan")
    List<UserMobilePlan> selectAll();
}
AI 代码解读

然后再对接口进行小小的改造,根据执行器的个数来决定是否采用分片的方式查询,如下:

@XxlJob("sendMsgHandler1")
public void sendMsgHandler1() throws Exception{
   
    System.out.println("任务开始时间" + new Date());
    int shardTotal = XxlJobHelper.getShardTotal();
    int shardIndex = XxlJobHelper.getShardIndex();
    List<UserMobilePlan> userMobilePlans = null;
    if (shardTotal == 1) {
   
        userMobilePlans= userMobilePlanMapper.selectAll();
    } else {
   
        userMobilePlans = userMobilePlanMapper.selectByMod(shardIndex, shardTotal);
    }
    System.out.println("处理任务数量" + userMobilePlans.size());
    long startTime = System.currentTimeMillis();
    userMobilePlans.forEach(item -> {
   
        try {
   
            TimeUnit.MILLISECONDS.sleep(10);
        } catch (InterruptedException e) {
   
            throw new RuntimeException(e);
        }
    });
    System.out.println("任务结束时间" + new Date());
    System.out.println("任务耗时:" + (System.currentTimeMillis() - startTime) + "毫秒");
}
AI 代码解读

最后,在调度中心里修改这个任务的路由策略,如下:
02.png

3.分片测试

执行一次,观察控制台的输出,如下:
03.png

04.png
可以看到,原本两千条数据的查询被均匀地分配在两台机器上,并且同时开始执行。原本需要30秒执行的任务,现在只需要15秒就完成了,性能提升了一倍。

三、小结

SpringBoot整合XXL-JOB系列暂时完结了,各位读者如果在使用过程中遇到问题可以去查阅官方文档或者在评论区留言!

相关实践学习
如何快速连接云数据库RDS MySQL
本场景介绍如何通过阿里云数据管理服务DMS快速连接云数据库RDS MySQL,然后进行数据表的CRUD操作。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
打赏
0
12
13
0
73
分享
相关文章
SpringBoot整合XXL-JOB【03】- 执行器的使用
本文介绍了如何将调度中心与项目结合,通过配置“执行器”实现定时任务控制。首先新建SpringBoot项目并引入依赖,接着配置xxl-job相关参数,如调度中心地址、执行器名称等。然后通过Java代码将执行器注册为Spring Bean,并声明测试方法使用`@XxlJob`注解。最后,在调度中心配置并启动定时任务,验证任务是否按预期执行。通过这些步骤,读者可以掌握Xxl-Job的基本使用,专注于业务逻辑的编写而无需关心定时器本身的实现。
23 10
SpringBoot整合XXL-JOB【03】-  执行器的使用
SpringBoot整合XXL-JOB【04】- 以GLUE模式运行与执行器负载均衡策略
在本节中,我们将介绍XXL-JOB的GLUE模式和集群模式下的路由策略。GLUE模式允许直接在线上改造方法为定时任务,无需重新部署。通过一个测试方法,展示了如何在调度中心配置并使用GLUE模式执行定时任务。接着,我们探讨了多实例环境下的负载均衡策略,确保任务不会重复执行,并可通过修改路由策略(如轮训)实现任务在多个实例间的均衡分配。最后,总结了GLUE模式和负载均衡策略的应用,帮助读者更深入理解XXL-JOB的使用。
24 9
|
3天前
|
SpringBoot整合XXL-JOB【01】- 初识XXL-JOB
XXL-JOB 是一个分布式任务调度平台,设计目标为开发迅速、学习简单、轻量级、易扩展。它解决了分布式环境下定时任务重复执行的问题,无需额外加锁,降低了维护成本。XXL-JOB 由调度中心和执行器两部分组成,前者管理任务,后者执行具体逻辑,使代码结构更清晰。适用于多机部署场景,支持统一管理任务的启停和频率调整。
25 8
SpringBoot整合XXL-JOB【02】- 启动调度中心
本文介绍了如何初始化和配置XXL-JOB调度中心。首先,从GitHub或Gitee获取源码;接着,执行`tables_xxl_job.sql`脚本初始化数据库。然后,在IDE中打开项目并修改`application.properties`中的数据库连接和`accessToken`配置。完成配置后,启动`XxlJobAdminApplication`,访问http://localhost:8080/xxl-job-admin/进行登录。最后,简要介绍了调度中心的主要功能模块,包括运行报表、任务管理、调度日志、执行器管理和用户管理。下篇将通过实例演示如何使用XXL-JOB执行定时任务。
19 6
SpringBoot整合Flowable【07】- 驳回节点任务
本文通过绩效流程的业务场景,详细介绍了如何在Flowable工作流引擎中实现任务驳回功能。具体步骤包括:获取目标任务节点和当前任务节点信息,进行必要的判空和逻辑校验,调用API完成节点回退,并清理相关脏数据(如历史任务和变量)。最后通过测试验证了驳回功能的正确性,确保流程能够成功回退到指定节点并清除中间产生的冗余数据。此功能在实际业务中非常有用,能够满足上级驳回自评等需求。
19 0
实时计算 Flink版产品使用问题之使用Spring Boot启动Flink处理任务时,使用Spring Boot的@Scheduled注解进行定时任务调度,出现内存占用过高,该怎么办
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
SpringBoot 自定义初始化任务 Runner
SpringBoot 自定义初始化任务 Runner
33 0
基于springboot+vue.js+uniapp小程序的校园悬赏任务平台附带文章源码部署视频讲解等
基于springboot+vue.js+uniapp小程序的校园悬赏任务平台附带文章源码部署视频讲解等
64 0
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等