SpringBoot整合XXL-JOB【05】- 任务分片

简介: 在实际业务中,批量定时任务可能因上一批任务未完成而影响业务。为解决此问题,本文介绍如何使用Xxl-job对批量任务进行分片处理,通过分片广播形式调度集群机器并行执行任务,大幅提升执行效率。具体步骤包括环境准备、添加依赖和配置、声明实体类与查询类,以及改造业务逻辑实现分片查询。测试结果显示,分片处理将两千条数据的执行时间从30秒缩短至15秒,性能提升显著。

在实际业务场景中,我们可能会碰到需要短时间批量执行的定时任务,此时如果我们不做任何处理,只是像前面那样去执行,可能出现上一批的任务还没执行完,定时的时间就到了,那么势必会对业务产生影响,所以就需要我们对批量任务进行分片处理,本节就让我们看看Xxl-job如何做分片处理。

一、环境准备

1.mock数据

首先在数据库里建一个测试的表,并mock一些数据进去,模拟业务中的“大表”。因为本身就是在模拟,也不讲究性能什么的,大家自己操作下就行。

2.添加依赖

依赖也没什么好讲究的,大家根据自己的喜好添加,只要能确保可以正常连接上数据库就行,如下:

<!--MyBatis驱动-->
<dependency>
    <groupId>org.mybatis.spring.boot</groupId>
    <artifactId>mybatis-spring-boot-starter</artifactId>
    <version>1.2.0</version>
</dependency>
<!--mysql驱动-->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
</dependency>
<!--lombok依赖-->
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <scope>provided</scope>
</dependency>
<!--连接池-->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>druid</artifactId>
    <version>1.1.10</version>
</dependency>
AI 代码解读

3.添加配置

别忘了配置数据库的连接信息,如下:

spring.datasource.url=jdbc:mysql://localhost:3306/xxl_job_demo?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=UTF-8
spring.datasource.driverClassName=com.mysql.jdbc.Driver
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.username=root
spring.datasource.password=123456
AI 代码解读

4.声明实体类和查询类

根据自己实际模拟的数据声明对应的实体类和查询类,确保能查到数据就行,这步也不细说,每个人的实体和选择的ORM框架都不一样。

二、分片与不分片

1.不使用分片

首先,我们来看看这段代码:

@XxlJob("sendMsgHandler")
public void sendMsgHandler() throws Exception{
   
    List<UserMobilePlan> userMobilePlans = userMobilePlanMapper.selectAll();
    System.out.println("任务开始时间:"+new Date()+",处理任务数量:"+userMobilePlans.size());
    Long startTime = System.currentTimeMillis();
    userMobilePlans.forEach(item->{
   
        try {
   
            //模拟发送短信动作
            TimeUnit.MILLISECONDS.sleep(10);
        } catch (InterruptedException e) {
   
            e.printStackTrace();
        }
    });
    System.out.println("任务结束时间:"+new Date());
    System.out.println("任务耗时:"+(System.currentTimeMillis()-startTime)+"毫秒");
}
AI 代码解读

具体的mapper如下:

@Mapper
public interface UserMobilePlanMapper {
   

    @Select("select * from t_user_mobile_plan")
    List<UserMobilePlan> selectAll();
}
AI 代码解读

通过查询用户表模拟发短信的场景,我们启动项目执行一下这个任务看看需要多久完成,如下:

01.png

可以看到,我本地模拟了两千条数据测试的结果:用时30秒左右,那么假如在实际业务场景中,这个任务需要我们每隔20秒就执行一次,这样是不是就出现问题了,所以就需要我们对批量任务进行切片处理,使得它们可以被分为好几组并行执行,缩短执行耗时。

2.使用分片

区别于上面的案例,如果采用分片广播的形式,那么这一次任务的执行将会调度集群里的所有机器(也就是执行器)进行执行,从而大大加快执行速度。因此,我们需要在原来的业务逻辑基础上加上分片参数,这个分片参数也不需要我们自己去写,通过XXL-JOB提供的工具类就可以拿到,如下:

int shardTotal = XxlJobHelper.getShardTotal();
int shardIndex = XxlJobHelper.getShardIndex();
AI 代码解读

这里的shardTotalshardIndex分别对应分片总数和分片任务的索引,这么说可能有的同学还是不太理解。我们换种描述,shardTotal就是执行器的个数(Ps:也就是我们机器的个数),而shardIndex则可以对应到我们具体查询里的数据的id。因此,我们这里改造原查询的思路则是通过shardTotalshardIndex对id进行求模运算,如下:

@Mapper
public interface UserMobilePlanMapper {
   

    @Select("select * from t_user_mobile_plan where mod(id, #{shardingTotal})=#{shardingIndex}")
    List<UserMobilePlan> selectByMod(@Param("shardingIndex") Integer shardingIndex,@Param("shardingTotal")Integer shardingTotal);

    @Select("select * from t_user_mobile_plan")
    List<UserMobilePlan> selectAll();
}
AI 代码解读

然后再对接口进行小小的改造,根据执行器的个数来决定是否采用分片的方式查询,如下:

@XxlJob("sendMsgHandler1")
public void sendMsgHandler1() throws Exception{
   
    System.out.println("任务开始时间" + new Date());
    int shardTotal = XxlJobHelper.getShardTotal();
    int shardIndex = XxlJobHelper.getShardIndex();
    List<UserMobilePlan> userMobilePlans = null;
    if (shardTotal == 1) {
   
        userMobilePlans= userMobilePlanMapper.selectAll();
    } else {
   
        userMobilePlans = userMobilePlanMapper.selectByMod(shardIndex, shardTotal);
    }
    System.out.println("处理任务数量" + userMobilePlans.size());
    long startTime = System.currentTimeMillis();
    userMobilePlans.forEach(item -> {
   
        try {
   
            TimeUnit.MILLISECONDS.sleep(10);
        } catch (InterruptedException e) {
   
            throw new RuntimeException(e);
        }
    });
    System.out.println("任务结束时间" + new Date());
    System.out.println("任务耗时:" + (System.currentTimeMillis() - startTime) + "毫秒");
}
AI 代码解读

最后,在调度中心里修改这个任务的路由策略,如下:

02.png

3.分片测试

执行一次,观察控制台的输出,如下:

03.png

04.png

可以看到,原本两千条数据的查询被均匀地分配在两台机器上,并且同时开始执行。原本需要30秒执行的任务,现在只需要15秒就完成了,性能提升了一倍。

三、小结

SpringBoot整合XXL-JOB系列暂时完结了,各位读者如果在使用过程中遇到问题可以去查阅官方文档或者在评论区留言!

目录
打赏
0
12
13
0
153
分享
相关文章
Spring Boot的定时任务与异步任务
Spring Boot的定时任务与异步任务
SpringBoot整合XXL-JOB【04】- 以GLUE模式运行与执行器负载均衡策略
在本节中,我们将介绍XXL-JOB的GLUE模式和集群模式下的路由策略。GLUE模式允许直接在线上改造方法为定时任务,无需重新部署。通过一个测试方法,展示了如何在调度中心配置并使用GLUE模式执行定时任务。接着,我们探讨了多实例环境下的负载均衡策略,确保任务不会重复执行,并可通过修改路由策略(如轮训)实现任务在多个实例间的均衡分配。最后,总结了GLUE模式和负载均衡策略的应用,帮助读者更深入理解XXL-JOB的使用。
106 9
SpringBoot整合XXL-JOB【04】-  以GLUE模式运行与执行器负载均衡策略
SpringBoot整合XXL-JOB【03】- 执行器的使用
本文介绍了如何将调度中心与项目结合,通过配置“执行器”实现定时任务控制。首先新建SpringBoot项目并引入依赖,接着配置xxl-job相关参数,如调度中心地址、执行器名称等。然后通过Java代码将执行器注册为Spring Bean,并声明测试方法使用`@XxlJob`注解。最后,在调度中心配置并启动定时任务,验证任务是否按预期执行。通过这些步骤,读者可以掌握Xxl-Job的基本使用,专注于业务逻辑的编写而无需关心定时器本身的实现。
107 10
SpringBoot整合XXL-JOB【03】-  执行器的使用
SpringBoot整合Flowable【07】- 驳回节点任务
本文通过绩效流程的业务场景,详细介绍了如何在Flowable工作流引擎中实现任务驳回功能。具体步骤包括:获取目标任务节点和当前任务节点信息,进行必要的判空和逻辑校验,调用API完成节点回退,并清理相关脏数据(如历史任务和变量)。最后通过测试验证了驳回功能的正确性,确保流程能够成功回退到指定节点并清除中间产生的冗余数据。此功能在实际业务中非常有用,能够满足上级驳回自评等需求。
115 0
SpringBoot整合Flowable【07】- 驳回节点任务
实时计算 Flink版产品使用问题之使用Spring Boot启动Flink处理任务时,使用Spring Boot的@Scheduled注解进行定时任务调度,出现内存占用过高,该怎么办
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
SpringBoot 自定义初始化任务 Runner
SpringBoot 自定义初始化任务 Runner
43 0
基于springboot+vue.js+uniapp小程序的校园悬赏任务平台附带文章源码部署视频讲解等
基于springboot+vue.js+uniapp小程序的校园悬赏任务平台附带文章源码部署视频讲解等
88 0

热门文章

最新文章

AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等