【RuoYi-Eggjs】:基于 Bull Queue 的企业级定时任务调度系统

简介: 【RuoYi-Eggjs】基于 Bull Queue 与 Redis 实现企业级定时任务调度系统,支持动态管理、分布式执行、失败重试、日志监控及手动触发。集成 Cron 表达式解析、任务安全校验与执行统计,适用于数据同步、报表生成等场景,助力高可用任务调度。项目开源,易于扩展。

【RuoYi-Eggjs】:基于 Bull Queue 的企业级定时任务调度系统

本文介绍 RuoYi-Eggjs 中基于 Bull Queue 构建的企业级定时任务调度系统,涵盖动态任务管理、分布式执行、失败重试、日志监控等核心功能的设计与实现。

一、引言:为什么需要强大的定时任务系统?

在企业级应用中,定时任务是不可或缺的基础设施:

  • 数据统计报表:每日/每周/每月的业务数据汇总
  • 系统维护:日志清理、缓存刷新、数据备份
  • 业务提醒:订单超时处理、会员到期通知
  • 数据同步:不同系统间的数据同步任务

RuoYi-Eggjs 基于 Bull Queue 构建了一套完整的定时任务调度系统,具备以下企业级特性:

  • 🔄 动态任务管理 - 从数据库读取 cron 表达式,支持热更新
  • 🚀 分布式执行 - 基于 Redis 的分布式任务调度
  • 🔁 失败重试 - 自动重试机制,保证任务执行可靠性
  • 📊 可视化监控 - 完整的任务执行日志和状态监控
  • 手动执行 - 支持立即执行任务,方便测试调试

二、系统架构设计

整体架构

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   管理端 Web     │    │   Bull Queue    │    │   任务执行器     │
│                 │    │                 │    │                 │
│ ┌─────────────┐ │    │ ┌─────────────┐ │    │ ┌─────────────┐ │
│ │ 任务管理API  │ │───▶│ │ Redis 队列   │ │───▶│ │ ryTask 处理器│ │
│ │ - 增删改查   │ │    │ │ - 任务调度   │ │    │ │ - 任务执行   │ │
│ │ - 启停控制   │ │    │ │ - 失败重试   │ │    │ │ - 结果记录   │ │
│ └─────────────┘ │    │ └─────────────┘ │    │ └─────────────┘ │
└─────────────────┘    └─────────────────┘    └─────────────────┘
         │                       │                       │
         ▼                       ▼                       ▼
┌─────────────────────────────────────────────────────────────────┐
│                     数据存储层                                   │
│  ┌─────────────────┐            ┌─────────────────────────────┐ │
│  │     MySQL       │            │          Redis              │ │
│  │ - sys_job 表    │            │ - 队列数据                   │ │
│  │ - sys_job_log表 │            │ - 分布式锁                   │ │
│  └─────────────────┘            └─────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘

核心组件

组件 职责 实现文件
JobService 任务管理、调度控制 app/service/monitor/job.js
Bull Queue 分布式任务队列 app/queue/ryTask.js
CronUtils Cron 表达式工具 app/util/cronUtils.js
JobLog 任务执行日志 app/service/monitor/jobLog.js
RyTask 任务执行器 app/service/ryTask.js

三、Cron 表达式处理

智能 Cron 工具类

系统内置了强大的 Cron 表达式处理工具,支持表达式验证、下次执行时间计算等功能:

// app/util/cronUtils.js
const parser = require('cron-parser');

class CronUtils {
   
  // 验证 cron 表达式是否有效
  static isValid(cronExpression) {
   
    try {
   
      parser.parseExpression(cronExpression);
      return true;
    } catch (err) {
   
      return false;
    }
  }

  // 获取下次执行时间
  static getNextExecution(cronExpression) {
   
    try {
   
      const interval = parser.parseExpression(cronExpression);
      return interval.next().toDate();
    } catch (err) {
   
      return null;
    }
  }
}

常用 Cron 表达式

系统预置了常用的 Cron 表达式模板:

static get EXAMPLES() {
   
  return {
   
    EVERY_MINUTE: '0 * * * * *',           // 每分钟执行
    EVERY_HOUR: '0 0 * * * *',             // 每小时执行
    EVERY_DAY: '0 0 0 * * *',              // 每天 0 点执行
    WORKDAY_MORNING: '0 0 9 * * 1-5',      // 工作日早上 9 点
    EVERY_WEEK: '0 0 0 * * 0',             // 每周日 0 点执行
  };
}

四、Bull Queue 队列处理

队列初始化

基于 Redis 创建分布式任务队列:

// app/queue/ryTask.js
const Queue = require('bull');

module.exports = app => {
   
  const queue = new Queue('ryTask', {
   
    redis: {
   
      port: app.config.bull.client.port,
      host: app.config.bull.client.host,
      password: app.config.bull.client.password,
      db: app.config.bull.client.db,
    },
  });

  // 配置任务处理器
  queue.process(async (job) => {
   
    const ctx = app.createAnonymousContext();
    const {
    invokeTarget, jobInfo } = job.data;

    // 执行任务逻辑...
  });

  return queue;
};

任务执行流程

每个任务的执行都经过完整的生命周期管理:

queue.process(async (job) => {
   
  const startTime = Date.now();
  let status = '0'; // 0-成功 1-失败
  let jobMessage = '';
  let exceptionInfo = '';

  try {
   
    // 1. 记录开始执行
    ctx.logger.info(`[Bull] 开始执行任务: ${
     jobInfo.jobName}`);

    // 2. 调用任务执行器
    const result = await ctx.service.ryTask.execute(invokeTarget);

    jobMessage = result.message || '任务执行成功';
    ctx.logger.info(`[Bull] 任务执行成功: ${
     jobInfo.jobName}`);
  } catch (err) {
   
    status = '1';
    jobMessage = '任务执行失败';
    exceptionInfo = err.message;

    ctx.logger.error(`[Bull] 任务执行失败: ${
     jobInfo.jobName}`, err);
    throw err; // 抛出错误,让 Bull 处理重试
  } finally {
   
    const duration = Date.now() - startTime;

    // 3. 记录执行日志
    await ctx.service.monitor.jobLog.insertJobLog({
   
      jobName: jobInfo.jobName,
      jobGroup: jobInfo.jobGroup,
      invokeTarget: invokeTarget,
      jobMessage: `${
     jobMessage} (耗时: ${
     duration}ms)`,
      status,
      exceptionInfo: exceptionInfo.substring(0, 2000),
      createTime: ctx.helper.formatDate(new Date(startTime)),
    });
  }
});

五、动态任务管理

创建定时任务

系统支持动态创建定时任务,任务信息存储在数据库中:

// app/service/monitor/job.js
async createBullJob(job) {
   
  const {
    app, ctx } = this;

  try {
   
    // 使用 jobId + invokeTarget 作为唯一标识
    const uniqueId = `${
     job.jobId}:${
     job.invokeTarget}`;

    // 清理可能存在的旧任务
    await this.cleanOldRepeatableJobs(job);

    // 添加新的重复任务
    await app.queue.ryTask.add(
      {
   
        invokeTarget: job.invokeTarget,
        jobInfo: {
   
          jobId: job.jobId,
          jobName: job.jobName,
          jobGroup: job.jobGroup,
          uniqueId,
        },
      },
      {
   
        jobId: uniqueId,
        repeat: {
   
          cron: job.cronExpression,
          key: uniqueId,
        },
        removeOnComplete: true,
        removeOnFail: 100,
      }
    );

    ctx.logger.info(`[Bull] 创建定时任务成功: ${
     job.jobName}`);
    return true;
  } catch (err) {
   
    ctx.logger.error(`[Bull] 创建定时任务失败: ${
     job.jobName}`, err);
    return false;
  }
}

任务状态控制

支持任务的启动、暂停、删除等状态控制:

// 暂停任务
async pauseBullJob(job) {
   
  return await this.deleteBullJob(job);
}

// 恢复任务
async resumeBullJob(job) {
   
  return await this.createBullJob(job);
}

// 立即执行任务
async runBullJob(job) {
   
  await app.queue.ryTask.add(
    {
   
      invokeTarget: job.invokeTarget,
      jobInfo: job,
    },
    {
   
      removeOnComplete: true,
      priority: 1, // 高优先级立即执行
    }
  );

  return true;
}

六、任务执行与监控

任务执行日志

系统完整记录每次任务执行的详细日志:

// app/service/monitor/jobLog.js
async insertJobLog(jobLog) {
   
  const {
    ctx } = this;

  try {
   
    const mapper = ctx.helper.getDB(ctx).sysJobLogMapper;

    // 设置日志ID和创建时间
    jobLog.jobLogId = ctx.helper.generateId();
    jobLog.createTime = jobLog.createTime || ctx.helper.formatDate(new Date());

    return await mapper.insertJobLog([], jobLog);
  } catch (err) {
   
    ctx.logger.error('插入任务执行日志失败:', err);
    throw err;
  }
}

日志查询与统计

提供丰富的日志查询功能,支持按任务名、执行状态、时间范围等条件筛选:

async selectJobLogPage(params = {
   }) {
   
  const {
    ctx } = this;
  const mapper = ctx.helper.getDB(ctx).sysJobLogMapper;

  return await ctx.helper.pageQuery(
    mapper.selectJobLogListMapper([], params),
    params,
    mapper.db()
  );
}

七、企业级特性

1. 分布式支持

基于 Redis 实现分布式任务调度,支持多实例部署:

  • 任务分发:Redis 作为消息队列,确保任务在集群中合理分发
  • 状态同步:任务状态和执行结果通过 Redis 在各实例间同步
  • 防重复执行:通过 Redis 锁机制防止同一任务重复执行

2. 失败重试机制

Bull Queue 内置强大的失败重试功能:

// 配置重试策略
{
   
  attempts: 3,                    // 重试次数
  backoff: {
   
    type: 'exponential',          // 指数退避
    delay: 2000,                  // 初始延迟
  },
  removeOnFail: 100,              // 保留失败任务数量
}

3. 性能监控

系统提供完整的性能监控指标:

  • 执行时长统计:记录每次任务执行的详细耗时
  • 成功/失败率:统计任务执行的成功率
  • 队列状态监控:监控队列积压情况

4. 安全控制

严格的任务执行安全控制:

// 任务安全校验
checkJobSecurity(job) {
   
  const securityCheck = {
    pass: true, message: '' };

  // 检查调用目标是否安全
  if (job.invokeTarget && !job.invokeTarget.match(/^[\w\.]+(\(.*\))?$/)) {
   
    securityCheck.pass = false;
    securityCheck.message = '调用目标格式不正确';
  }

  return securityCheck;
}

八、最佳实践

1. 任务设计原则

  • 幂等性:确保任务可以安全重复执行
  • 异常处理:完善的错误处理和日志记录
  • 资源管控:避免长时间占用系统资源

2. Cron 表达式优化

// 推荐的 Cron 表达式
'0 0 2 * * *'          // 每天凌晨2点(避开高峰期)
'0 0 9-17 * * 1-5'     // 工作日工作时间
'0 */5 * * * *'        // 每5分钟(适度频率)

// 避免的表达式
'* * * * * *'          // 每秒执行(过于频繁)
'0 0 0 * * *'          // 午夜12点(高峰期)

3. 监控告警

建议配置任务执行监控告警:

// 监控失败任务
queue.on('failed', (job, err) => {
   
  // 发送告警通知
  notificationService.sendAlert({
   
    title: '任务执行失败',
    content: `任务 ${
     job.data.jobInfo.jobName} 执行失败: ${
     err.message}`,
    level: 'error'
  });
});

九、总结

RuoYi-Eggjs 的定时任务调度系统通过以下技术栈实现了企业级的任务管理能力:

核心优势

  1. 技术先进:基于 Bull Queue + Redis 的成熟方案
  2. 功能完整:涵盖任务的全生命周期管理
  3. 高可用性:分布式部署 + 失败重试保证系统稳定
  4. 易于使用:直观的管理界面 + 丰富的 API

目录
相关文章
|
5天前
|
存储 JavaScript 前端开发
JavaScript基础
本节讲解JavaScript基础核心知识:涵盖值类型与引用类型区别、typeof检测类型及局限性、===与==差异及应用场景、内置函数与对象、原型链五规则、属性查找机制、instanceof原理,以及this指向和箭头函数中this的绑定时机。重点突出类型判断、原型继承与this机制,助力深入理解JS面向对象机制。(238字)
|
4天前
|
云安全 人工智能 安全
阿里云2026云上安全健康体检正式开启
新年启程,来为云上环境做一次“深度体检”
1605 6
|
6天前
|
安全 数据可视化 网络安全
安全无小事|阿里云先知众测,为企业筑牢防线
专为企业打造的漏洞信息收集平台
1330 2
|
1天前
|
消息中间件 人工智能 Kubernetes
阿里云云原生应用平台岗位急招,加入我们,打造 AI 最强基础设施
云原生应用平台作为中国最大云计算公司的基石,现全面转向 AI,打造 AI 时代最强基础设施。寻找热爱技术、具备工程极致追求的架构师、极客与算法专家,共同重构计算、定义未来。杭州、北京、深圳、上海热招中,让我们一起在云端,重构 AI 的未来。
|
6天前
|
缓存 算法 关系型数据库
深入浅出分布式 ID 生成方案:从原理到业界主流实现
本文深入探讨分布式ID的生成原理与主流解决方案,解析百度UidGenerator、滴滴TinyID及美团Leaf的核心设计,涵盖Snowflake算法、号段模式与双Buffer优化,助你掌握高并发下全局唯一ID的实现精髓。
353 160
|
6天前
|
人工智能 自然语言处理 API
n8n:流程自动化、智能化利器
流程自动化助你在重复的业务流程中节省时间,可通过自然语言直接创建工作流啦。
427 6
n8n:流程自动化、智能化利器
|
7天前
|
人工智能 API 开发工具
Skills比MCP更重要?更省钱的多!Python大佬这观点老金测了一周终于懂了
加我进AI学习群,公众号右下角“联系方式”。文末有老金开源知识库·全免费。本文详解Claude Skills为何比MCP更轻量高效:极简配置、按需加载、省90% token,适合多数场景。MCP仍适用于复杂集成,但日常任务首选Skills。推荐先用SKILL.md解决,再考虑协议。附实测对比与配置建议,助你提升效率,节省精力。关注老金,一起玩转AI工具。
|
14天前
|
机器学习/深度学习 安全 API
MAI-UI 开源:通用 GUI 智能体基座登顶 SOTA!
MAI-UI是通义实验室推出的全尺寸GUI智能体基座模型,原生集成用户交互、MCP工具调用与端云协同能力。支持跨App操作、模糊语义理解与主动提问澄清,通过大规模在线强化学习实现复杂任务自动化,在出行、办公等高频场景中表现卓越,已登顶ScreenSpot-Pro、MobileWorld等多项SOTA评测。
1571 7
|
4天前
|
Linux 数据库
Linux 环境 Polardb-X 数据库 单机版 rpm 包 安装教程
本文介绍在CentOS 7.9环境下安装PolarDB-X单机版数据库的完整流程,涵盖系统环境准备、本地Yum源配置、RPM包安装、用户与目录初始化、依赖库解决、数据库启动及客户端连接等步骤,助您快速部署运行PolarDB-X。
254 1
Linux 环境 Polardb-X 数据库 单机版 rpm 包 安装教程
|
9天前
|
人工智能 前端开发 API
Google发布50页AI Agent白皮书,老金帮你提炼10个核心要点
老金分享Google最新AI Agent指南:让AI从“动嘴”到“动手”。Agent=大脑(模型)+手(工具)+协调系统,可自主完成任务。通过ReAct模式、多Agent协作与RAG等技术,实现真正自动化。入门推荐LangChain,文末附开源知识库链接。
690 119