【RuoYi-Eggjs】:基于 Bull Queue 的企业级定时任务调度系统

简介: 【RuoYi-Eggjs】基于 Bull Queue 与 Redis 实现企业级定时任务调度系统,支持动态管理、分布式执行、失败重试、日志监控及手动触发。集成 Cron 表达式解析、任务安全校验与执行统计,适用于数据同步、报表生成等场景,助力高可用任务调度。项目开源,易于扩展。

【RuoYi-Eggjs】:基于 Bull Queue 的企业级定时任务调度系统

本文介绍 RuoYi-Eggjs 中基于 Bull Queue 构建的企业级定时任务调度系统,涵盖动态任务管理、分布式执行、失败重试、日志监控等核心功能的设计与实现。

一、引言:为什么需要强大的定时任务系统?

在企业级应用中,定时任务是不可或缺的基础设施:

  • 数据统计报表:每日/每周/每月的业务数据汇总
  • 系统维护:日志清理、缓存刷新、数据备份
  • 业务提醒:订单超时处理、会员到期通知
  • 数据同步:不同系统间的数据同步任务

RuoYi-Eggjs 基于 Bull Queue 构建了一套完整的定时任务调度系统,具备以下企业级特性:

  • 🔄 动态任务管理 - 从数据库读取 cron 表达式,支持热更新
  • 🚀 分布式执行 - 基于 Redis 的分布式任务调度
  • 🔁 失败重试 - 自动重试机制,保证任务执行可靠性
  • 📊 可视化监控 - 完整的任务执行日志和状态监控
  • 手动执行 - 支持立即执行任务,方便测试调试

二、系统架构设计

整体架构

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   管理端 Web     │    │   Bull Queue    │    │   任务执行器     │
│                 │    │                 │    │                 │
│ ┌─────────────┐ │    │ ┌─────────────┐ │    │ ┌─────────────┐ │
│ │ 任务管理API  │ │───▶│ │ Redis 队列   │ │───▶│ │ ryTask 处理器│ │
│ │ - 增删改查   │ │    │ │ - 任务调度   │ │    │ │ - 任务执行   │ │
│ │ - 启停控制   │ │    │ │ - 失败重试   │ │    │ │ - 结果记录   │ │
│ └─────────────┘ │    │ └─────────────┘ │    │ └─────────────┘ │
└─────────────────┘    └─────────────────┘    └─────────────────┘
         │                       │                       │
         ▼                       ▼                       ▼
┌─────────────────────────────────────────────────────────────────┐
│                     数据存储层                                   │
│  ┌─────────────────┐            ┌─────────────────────────────┐ │
│  │     MySQL       │            │          Redis              │ │
│  │ - sys_job 表    │            │ - 队列数据                   │ │
│  │ - sys_job_log表 │            │ - 分布式锁                   │ │
│  └─────────────────┘            └─────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘

核心组件

组件 职责 实现文件
JobService 任务管理、调度控制 app/service/monitor/job.js
Bull Queue 分布式任务队列 app/queue/ryTask.js
CronUtils Cron 表达式工具 app/util/cronUtils.js
JobLog 任务执行日志 app/service/monitor/jobLog.js
RyTask 任务执行器 app/service/ryTask.js

三、Cron 表达式处理

智能 Cron 工具类

系统内置了强大的 Cron 表达式处理工具,支持表达式验证、下次执行时间计算等功能:

// app/util/cronUtils.js
const parser = require('cron-parser');

class CronUtils {
   
  // 验证 cron 表达式是否有效
  static isValid(cronExpression) {
   
    try {
   
      parser.parseExpression(cronExpression);
      return true;
    } catch (err) {
   
      return false;
    }
  }

  // 获取下次执行时间
  static getNextExecution(cronExpression) {
   
    try {
   
      const interval = parser.parseExpression(cronExpression);
      return interval.next().toDate();
    } catch (err) {
   
      return null;
    }
  }
}

常用 Cron 表达式

系统预置了常用的 Cron 表达式模板:

static get EXAMPLES() {
   
  return {
   
    EVERY_MINUTE: '0 * * * * *',           // 每分钟执行
    EVERY_HOUR: '0 0 * * * *',             // 每小时执行
    EVERY_DAY: '0 0 0 * * *',              // 每天 0 点执行
    WORKDAY_MORNING: '0 0 9 * * 1-5',      // 工作日早上 9 点
    EVERY_WEEK: '0 0 0 * * 0',             // 每周日 0 点执行
  };
}

四、Bull Queue 队列处理

队列初始化

基于 Redis 创建分布式任务队列:

// app/queue/ryTask.js
const Queue = require('bull');

module.exports = app => {
   
  const queue = new Queue('ryTask', {
   
    redis: {
   
      port: app.config.bull.client.port,
      host: app.config.bull.client.host,
      password: app.config.bull.client.password,
      db: app.config.bull.client.db,
    },
  });

  // 配置任务处理器
  queue.process(async (job) => {
   
    const ctx = app.createAnonymousContext();
    const {
    invokeTarget, jobInfo } = job.data;

    // 执行任务逻辑...
  });

  return queue;
};

任务执行流程

每个任务的执行都经过完整的生命周期管理:

queue.process(async (job) => {
   
  const startTime = Date.now();
  let status = '0'; // 0-成功 1-失败
  let jobMessage = '';
  let exceptionInfo = '';

  try {
   
    // 1. 记录开始执行
    ctx.logger.info(`[Bull] 开始执行任务: ${
     jobInfo.jobName}`);

    // 2. 调用任务执行器
    const result = await ctx.service.ryTask.execute(invokeTarget);

    jobMessage = result.message || '任务执行成功';
    ctx.logger.info(`[Bull] 任务执行成功: ${
     jobInfo.jobName}`);
  } catch (err) {
   
    status = '1';
    jobMessage = '任务执行失败';
    exceptionInfo = err.message;

    ctx.logger.error(`[Bull] 任务执行失败: ${
     jobInfo.jobName}`, err);
    throw err; // 抛出错误,让 Bull 处理重试
  } finally {
   
    const duration = Date.now() - startTime;

    // 3. 记录执行日志
    await ctx.service.monitor.jobLog.insertJobLog({
   
      jobName: jobInfo.jobName,
      jobGroup: jobInfo.jobGroup,
      invokeTarget: invokeTarget,
      jobMessage: `${
     jobMessage} (耗时: ${
     duration}ms)`,
      status,
      exceptionInfo: exceptionInfo.substring(0, 2000),
      createTime: ctx.helper.formatDate(new Date(startTime)),
    });
  }
});

五、动态任务管理

创建定时任务

系统支持动态创建定时任务,任务信息存储在数据库中:

// app/service/monitor/job.js
async createBullJob(job) {
   
  const {
    app, ctx } = this;

  try {
   
    // 使用 jobId + invokeTarget 作为唯一标识
    const uniqueId = `${
     job.jobId}:${
     job.invokeTarget}`;

    // 清理可能存在的旧任务
    await this.cleanOldRepeatableJobs(job);

    // 添加新的重复任务
    await app.queue.ryTask.add(
      {
   
        invokeTarget: job.invokeTarget,
        jobInfo: {
   
          jobId: job.jobId,
          jobName: job.jobName,
          jobGroup: job.jobGroup,
          uniqueId,
        },
      },
      {
   
        jobId: uniqueId,
        repeat: {
   
          cron: job.cronExpression,
          key: uniqueId,
        },
        removeOnComplete: true,
        removeOnFail: 100,
      }
    );

    ctx.logger.info(`[Bull] 创建定时任务成功: ${
     job.jobName}`);
    return true;
  } catch (err) {
   
    ctx.logger.error(`[Bull] 创建定时任务失败: ${
     job.jobName}`, err);
    return false;
  }
}

任务状态控制

支持任务的启动、暂停、删除等状态控制:

// 暂停任务
async pauseBullJob(job) {
   
  return await this.deleteBullJob(job);
}

// 恢复任务
async resumeBullJob(job) {
   
  return await this.createBullJob(job);
}

// 立即执行任务
async runBullJob(job) {
   
  await app.queue.ryTask.add(
    {
   
      invokeTarget: job.invokeTarget,
      jobInfo: job,
    },
    {
   
      removeOnComplete: true,
      priority: 1, // 高优先级立即执行
    }
  );

  return true;
}

六、任务执行与监控

任务执行日志

系统完整记录每次任务执行的详细日志:

// app/service/monitor/jobLog.js
async insertJobLog(jobLog) {
   
  const {
    ctx } = this;

  try {
   
    const mapper = ctx.helper.getDB(ctx).sysJobLogMapper;

    // 设置日志ID和创建时间
    jobLog.jobLogId = ctx.helper.generateId();
    jobLog.createTime = jobLog.createTime || ctx.helper.formatDate(new Date());

    return await mapper.insertJobLog([], jobLog);
  } catch (err) {
   
    ctx.logger.error('插入任务执行日志失败:', err);
    throw err;
  }
}

日志查询与统计

提供丰富的日志查询功能,支持按任务名、执行状态、时间范围等条件筛选:

async selectJobLogPage(params = {
   }) {
   
  const {
    ctx } = this;
  const mapper = ctx.helper.getDB(ctx).sysJobLogMapper;

  return await ctx.helper.pageQuery(
    mapper.selectJobLogListMapper([], params),
    params,
    mapper.db()
  );
}

七、企业级特性

1. 分布式支持

基于 Redis 实现分布式任务调度,支持多实例部署:

  • 任务分发:Redis 作为消息队列,确保任务在集群中合理分发
  • 状态同步:任务状态和执行结果通过 Redis 在各实例间同步
  • 防重复执行:通过 Redis 锁机制防止同一任务重复执行

2. 失败重试机制

Bull Queue 内置强大的失败重试功能:

// 配置重试策略
{
   
  attempts: 3,                    // 重试次数
  backoff: {
   
    type: 'exponential',          // 指数退避
    delay: 2000,                  // 初始延迟
  },
  removeOnFail: 100,              // 保留失败任务数量
}

3. 性能监控

系统提供完整的性能监控指标:

  • 执行时长统计:记录每次任务执行的详细耗时
  • 成功/失败率:统计任务执行的成功率
  • 队列状态监控:监控队列积压情况

4. 安全控制

严格的任务执行安全控制:

// 任务安全校验
checkJobSecurity(job) {
   
  const securityCheck = {
    pass: true, message: '' };

  // 检查调用目标是否安全
  if (job.invokeTarget && !job.invokeTarget.match(/^[\w\.]+(\(.*\))?$/)) {
   
    securityCheck.pass = false;
    securityCheck.message = '调用目标格式不正确';
  }

  return securityCheck;
}

八、最佳实践

1. 任务设计原则

  • 幂等性:确保任务可以安全重复执行
  • 异常处理:完善的错误处理和日志记录
  • 资源管控:避免长时间占用系统资源

2. Cron 表达式优化

// 推荐的 Cron 表达式
'0 0 2 * * *'          // 每天凌晨2点(避开高峰期)
'0 0 9-17 * * 1-5'     // 工作日工作时间
'0 */5 * * * *'        // 每5分钟(适度频率)

// 避免的表达式
'* * * * * *'          // 每秒执行(过于频繁)
'0 0 0 * * *'          // 午夜12点(高峰期)

3. 监控告警

建议配置任务执行监控告警:

// 监控失败任务
queue.on('failed', (job, err) => {
   
  // 发送告警通知
  notificationService.sendAlert({
   
    title: '任务执行失败',
    content: `任务 ${
     job.data.jobInfo.jobName} 执行失败: ${
     err.message}`,
    level: 'error'
  });
});

九、总结

RuoYi-Eggjs 的定时任务调度系统通过以下技术栈实现了企业级的任务管理能力:

核心优势

  1. 技术先进:基于 Bull Queue + Redis 的成熟方案
  2. 功能完整:涵盖任务的全生命周期管理
  3. 高可用性:分布式部署 + 失败重试保证系统稳定
  4. 易于使用:直观的管理界面 + 丰富的 API

目录
相关文章
|
1月前
|
人工智能 安全 API
Nacos 安全护栏:MCP、Agent、配置全维防护,重塑 AI Registry 安全边界
Nacos安全新标杆:精细鉴权、无感灰度、全量审计!
680 66
|
25天前
|
数据采集 监控 数据可视化
快速上手:LangChain + AgentRun 浏览器沙箱极简集成指南
AgentRun Browser Sandbox 是基于云原生函数计算的浏览器沙箱服务,为 AI Agent 提供安全、免运维的浏览器环境。通过 Serverless 架构与 CDP 协议支持,实现网页抓取、自动化操作等能力,并结合 VNC 实时可视化,助力大模型“上网”交互。
437 43
|
存储 机器学习/深度学习 缓存
Hybrid Model Support:阿里云 Tair 联合 SGLang对 Mamba-Transformer 等混合架构模型的支持方案
阿里云 Tair KVCache 联合 SGLang,创新支持 Mamba-Transformer 等混合架构模型。通过双池内存、状态快照等技术,解决异构状态管理难题,实现前缀缓存与推测解码,显著提升 Qwen3-Next 等模型的推理效率,推动大模型迈向高效智能体时代。
|
26天前
|
NoSQL Java API
【RuoYi-SpringBoot3-Pro】:Magic API 低代码开发
RuoYi-SpringBoot3-Pro 集成 Magic API,实现低代码快速开发。通过 Web 界面编写脚本,无需编写 Controller、Service 等代码,支持实时生效、数据库操作、多数据源、权限校验与 Redis 缓存,助力高效构建 RESTful 接口,适用于原型开发、报表查询等场景。
373 0
|
1月前
|
人工智能 运维 API
n1n:从替代 LiteLLM API Proxy 自建网关到企业级 AI 大模型 LLM API 统一架构的进阶之路
在 2026 年的大模型应用开发中,如何统一管理 GPT-5、Claude 4.5、Gemini 3 pro 等异构 AI 大模型 LLM API 成为企业的核心痛点。本文将深度解析开源网关 LiteLLM 的技术原理与实施路径,剖析自建网关在生产环境中的“隐形深坑”,并探讨如何通过 n1n.ai 等企业级聚合架构实现从“可用”到“高可用”的跨越。
331 9
|
23天前
|
人工智能 安全 调度
AI工程vs传统工程 —「道法术」中的变与不变
本文从“道、法、术”三个层面对比AI工程与传统软件工程的异同,指出AI工程并非推倒重来,而是在传统工程坚实基础上,为应对大模型带来的不确定性(如概率性输出、幻觉、高延迟等)所进行的架构升级:在“道”上,从追求绝对正确转向管理概率预期;在“法”上,延续分层解耦、高可用等原则,但建模重心转向上下文工程与不确定性边界控制;在“术”上,融合传统工程基本功与AI新工具(如Context Engineering、轨迹可视化、多维评估体系),最终以确定性架构驾驭不确定性智能,实现可靠价值交付。
311 41
AI工程vs传统工程 —「道法术」中的变与不变
|
26天前
|
移动开发 小程序 JavaScript
【RuoYi-SpringBoot3-UniApp】:一套代码,多端运行的移动端开发方案
RuoYi-SpringBoot3-UniApp 是基于 Vue3 与 UniApp 的跨平台移动端解决方案,支持一套代码编译到小程序、App、H5 和桌面端。集成 Pinia 状态管理、JWT 认证、权限路由、z-paging 分页、mp-html 富文本等主流功能,开箱即用,显著降低多端开发与维护成本,助力高效构建企业级应用。
259 6
|
30天前
|
人工智能 中间件 API
2026 AI 大模型 LLM API 生态全景:AnythingLLM、OpenRouter、LiteLLM 与 n1n.ai 深度对比
面对 AI 生态的爆发,如何选择合适的 LLM API 基础设施?本文深度横评 AnythingLLM、OpenRouter、LiteLLM 与 n1n.ai 四大主流工具。从个人 AI 开发到企业级 AI 大模型部署,剖析各平台在 AI API 聚合及成本控制上的优劣,助你构建高效的 AI 大模型技术栈。
446 9
|
19天前
|
弹性计算
阿里云轻量应用服务器38元,云服务器2核2G99元与2核4G199元购买入口,亲测有效
阿里云最便宜的轻量应用服务器38元,最便宜的云服务器云服务器2核2G3M99元与2核4G5M199元,在哪里购买呢?有部分新手用户不知道购买入口了。本文为大家分享几个亲测有效的入口,都是官方购买入口,以供参考。
163 14
|
30天前
|
安全 Java Maven
【RuoYi-SpringBoot3-Pro】:ClassFinal 代码加密
本文介绍RuoYi-SpringBoot3-Pro如何集成ClassFinal实现Java代码加密,保护核心业务逻辑。通过Maven插件对class文件与配置文件进行AES加密,防止反编译泄露,支持选择性加密、密码验证与机器码绑定,适用于商业交付、私有化部署等场景,保障知识产权安全。
279 5