利用 XXL-JOB 实现灵活控制的分片处理

简介: 本文讲述了一种利用 XXL-JOB 来进行分片任务处理的方法,另外加入对执行节点数的灵活控制。

本文讲述了一种利用 XXL-JOB 来进行分片任务处理的方法,另外加入对执行节点数的灵活控制。

场景

现在一张数据表里有大量数据需要某个服务端应用来处理,要求:

  1. 能够并行处理;

  2. 能够较灵活地控制并行任务数量。

  3. 压力较均衡地分散到不同的服务器节点;

思路

因为需要并行处理同一张数据表里的数据,所以比较自然地想到了分片查询数据,可以利用对 id 取模的方法进行分片,避免同一条数据被重复处理。

根据第 1、2 点要求,本来想通过对线程池的动态配置来实现,但结合第 3 点来考虑,服务器节点数量有可能会变化,节点之间相互无感知无通信,自己在应用内实现一套调度机制可能会很复杂。

如果有现成的独立于这些服务器节点之外的调度器就好了——顺着这个思路,就想到了已经接入的分布式任务调度平台 XXL-JOB,而在阅读其 官方文档 后发现「分片广播 & 动态分片」很贴合这种场景。

图片

方案

  1. 利用 XXL-JOB 的路由策略「分片广播」来调度定时任务;

  2. 通过任务参数传入执行任务节点数量;

  3. 定时任务逻辑里,根据获取到的分片参数、执行任务节点数量,决策当前节点是否需要执行,分片查询数据并处理:

  • 如果 分片序号 > (执行任务节点数量 - 1),则当前节点不执行任务,直接返回;

  • 否则,取 分片序号执行任务节点数量 作为分片参数,查询数据并处理。

这样,我们可以实现灵活调度 [1, N] 个节点并行执行任务处理数据。

主要代码示例

JobHandler 示例:

@XxlJob("demoJobHandler")
public void execute() {
    String param = XxlJobHelper.getJobParam();
    if (StringUtils.isBlank(param)) {
        XxlJobHelper.log("任务参数为空");
        XxlJobHelper.handleFail();
        return;
    }

    // 执行任务节点数量
    int executeNodeNum = Integer.valueOf(param);
    // 分片序号
    int shardIndex = XxlJobHelper.getShardIndex();
    // 分片总数
    int shardTotal = XxlJobHelper.getShardTotal();

    if (executeNodeNum <= 0 || executeNodeNum > shardTotal) {
        XxlJobHelper.log("执行任务节点数量取值范围[1,节点总数]");
        XxlJobHelper.handleFail();
        return;
    }

    if (shardIndex > (executeNodeNum - 1)) {
        XxlJobHelper.log("当前分片 {} 无需执行", shardIndex);
        XxlJobHelper.handleSuccess();
        return;
    }

    shardTotal = executeNodeNum;

    // 分片查询数据并处理
    process(shardIndex, shardTotal);

    XxlJobHelper.handleSuccess();
}

分片查询数据示例:

select field1, field2 
from table_name 
where ... 
    and mod(id, #{shardTotal}) = #{shardIndex} 
order by id limit #{rows};

进一步思考

  1. 如果需要更大的并发量,需要有大于应用节点数量的任务并行,如何处理?

    两种思路:

  • 通过任务参数传入一个并发数,单个节点在处理任务时,将查询到的数据按这个数字进行再分片,交由线程池并行处理;

  • 配置 M 个定时任务,指定相同的 JobHandler,给它们编号 0、1、2…M,并将定时任务编号和 M 这两个数,由任务参数传入,定时任务逻辑里,先根据分片参数、定时任务编号、M,重新计算出新的分片参数,如 分片序号 = (分片序号 * M) + 定时任务编号分片总数 = 分片总数 * M,再查询数据并处理。

  1. 如果有可能频繁调整任务执行逻辑,包括可能要新增任务参数等,而不想重启服务器,如何解决?

    可以考虑使用 XXL-JOB 的「GLUE模式」任务,能够在线编辑和更新定时任务执行逻辑。

参考

  • 分布式任务调度平台XXL-JOB
目录
相关文章
|
Java 调度 数据库
SpringBoot整合XXL-JOB【05】- 任务分片
在实际业务中,批量定时任务可能因上一批任务未完成而影响业务。为解决此问题,本文介绍如何使用Xxl-job对批量任务进行分片处理,通过分片广播形式调度集群机器并行执行任务,大幅提升执行效率。具体步骤包括环境准备、添加依赖和配置、声明实体类与查询类,以及改造业务逻辑实现分片查询。测试结果显示,分片处理将两千条数据的执行时间从30秒缩短至15秒,性能提升显著。
1957 13
SpringBoot整合XXL-JOB【05】-  任务分片
|
3月前
|
Java 调度
什么是分片广播任务
本文介绍XXL-JOB的分片广播机制,通过集群执行器动态分片处理任务。调度中心为每个执行器分配分片参数,实现任务并行处理,提升效率。适用于大数据量分布式场景,支持动态扩容,每台机器处理部分数据,显著降低耗时。开发时可通过`getShardIndex()`和`getShardTotal()`获取分片信息,灵活控制业务逻辑。
|
8月前
|
消息中间件 缓存 监控
MQ消息积压 / Rocketmq 积压 最全的处理方案。 (秒懂+图解+史上最全)
MQ消息积压 / Rocketmq 积压 最全的处理方案。 (秒懂+图解+史上最全)
MQ消息积压 / Rocketmq 积压 最全的处理方案。 (秒懂+图解+史上最全)
|
人工智能 分布式计算 Java
说说XXLJob分片任务实现原理?
说说XXLJob分片任务实现原理?
2436 0
说说XXLJob分片任务实现原理?
|
消息中间件 RocketMQ
2024最全RocketMQ集群方案汇总
在研究RocketMQ集群方案时,发现网上存在诸多不一致之处,如组件包含NameServer、Broker、Proxy等。通过查阅官方文档,了解到v4.x和v5.x版本的差异。v4.x部署模式包括单主、多主、多主多从(异步复制、同步双写),而v5.x新增Local与Cluster模式,主要区别在于Broker和Proxy是否同进程部署。Local模式适合平滑升级,Cluster模式适合高可用需求。不同模式下,集群部署方案大致相同,涵盖单主、多主、多主多从等模式,以满足不同的高可用性和性能需求。
1868 0
|
算法 调度 微服务
【黑马头条】day20—xxl-job(三)
【黑马头条】day20—xxl-job(三)
641 0
|
XML Java 数据库
"揭秘!Spring Boot日志链路追踪大法,让你的调试之路畅通无阻,效率飙升,问题无所遁形!"
【8月更文挑战第11天】在微服务架构中,请求可能跨越多个服务与组件,传统日志记录难以全局追踪问题。本文以电商系统为例,介绍如何手动实现Spring Boot应用的日志链路追踪。通过为每个请求生成唯一追踪ID并贯穿全链路,在服务间传递该ID,并在日志中记录,即使日志分散也能通过ID串联。提供了实现这一机制所需的关键代码片段,包括使用过滤器设置追踪ID、业务代码中的日志记录及Logback配置。此方案显著提升了问题定位的效率,适用于基于Spring Boot构建的微服务环境。
952 4
|
存储 监控 算法
XXL-JOB内部机制大揭秘:让任务调度飞起来
【8月更文挑战第14天】在大数据时代,高效的任务调度系统是支撑业务稳定运行与快速迭代的基石。XXL-JOB,作为一款轻量级、分布式任务调度平台,凭借其灵活的配置、强大的扩展性和高可用特性,在众多任务调度框架中脱颖而出。今天,我们就来深入揭秘XXL-JOB的内部机制,看看它是如何让任务调度“飞起来”的。
1148 0
|
存储 缓存 NoSQL
解析Java中的缓存机制及其实现方式
解析Java中的缓存机制及其实现方式
|
关系型数据库 分布式数据库 数据库
【阿里云云原生专栏】云原生时代的数据库选型:阿里云RDS与PolarDB对比分析
【5月更文挑战第24天】阿里云提供RDS和PolarDB两种数据库服务。RDS是高性能的在线关系型数据库,支持MySQL等引擎,适合中小规模需求;而PolarDB是分布式数据库,具备高扩展性和性能,适用于大规模数据和高并发场景。RDS与PolarDB在架构、性能、弹性伸缩、成本等方面存在差异,开发者应根据具体需求选择。示例代码展示了如何通过CLI创建RDS和PolarDB实例。
2271 0

热门文章

最新文章