引言
长话短说,今天聊一聊分布式定时任务,我的流水账笔记:
细心朋友稍一分析,就知道还有问题:
水平扩展后的WebApp的Quartz.net定时任务会多次触发, 因为webapp实例使用的是默认的RAMJobStore
, 多实例在内存中都维护了Job和Trigger的副本.
我的定时任务是同步任务,多次执行倒是没有太大问题,但对于特定业务的定时任务, 多次执行可能是致命问题。
基于此,来看看Quartz.net 分布式定时任务的姿势
AdoJobStore
很明显,水平扩展的多实例需要一个 独立于web实例的机制来存储Job和Trigger.
Quartz.NET提供ADO.NET JobStore来存储任务数据。
- 先使用SQL脚本在数据库中生成指定的表结构
执行脚本之后,会看到数据库中多出几个以 QRTZ_开头的表
- 配置Quartz.net使用AdoJobStore
可采用编码形式或者 quartz.config形式添加配置
快速实践
1. 预先生成Job、Trigger表
从https://github.com/quartznet/quartznet/tree/master/database/tables 下载合适的数据库表脚本, 生成指定的表结构
2. 添加AdoJobStore
本次使用编码方式添加AdoJobStore配置。
首次启动会将代码中Job和Trigger持久化到sqlite,后面就直接从sqlite中加载Job和Trigger
using System; using System.Collections.Specialized; using System.Data; using System.Threading.Tasks; using Microsoft.Data.Sqlite; using Microsoft.Extensions.Logging; using Quartz; using Quartz.Impl; using Quartz.Impl.AdoJobStore.Common; using Quartz.Spi; namespace EqidManager { using IOCContainer = IServiceProvider; public class QuartzStartup { public IScheduler Scheduler { get; set; } private readonly ILogger _logger; private readonly IJobFactory iocJobfactory; public QuartzStartup(IOCContainer IocContainer, ILoggerFactory loggerFactory) { _logger = loggerFactory.CreateLogger<QuartzStartup>(); iocJobfactory = new IOCJobFactory(IocContainer); DbProvider.RegisterDbMetadata("sqlite-custom", new DbMetadata() { AssemblyName = typeof(SqliteConnection).Assembly.GetName().Name, ConnectionType = typeof(SqliteConnection), CommandType = typeof(SqliteCommand), ParameterType = typeof(SqliteParameter), ParameterDbType = typeof(DbType), ParameterDbTypePropertyName = "DbType", ParameterNamePrefix = "@", ExceptionType = typeof(SqliteException), BindByName = true }); var properties = new NameValueCollection { ["quartz.jobStore.type"] = "Quartz.Impl.AdoJobStore.JobStoreTX, Quartz", ["quartz.jobStore.useProperties"] = "true", ["quartz.jobStore.dataSource"] = "default", ["quartz.jobStore.tablePrefix"] = "QRTZ_", ["quartz.jobStore.driverDelegateType"] = "Quartz.Impl.AdoJobStore.SQLiteDelegate, Quartz", ["quartz.dataSource.default.provider"] = "sqlite-custom", ["quartz.dataSource.default.connectionString"] = "Data Source=EqidManager.db", ["quartz.jobStore.lockHandler.type"] = "Quartz.Impl.AdoJobStore.UpdateLockRowSemaphore, Quartz", ["quartz.serializer.type"] = "binary" }; var schedulerFactory = new StdSchedulerFactory(properties); Scheduler = schedulerFactory.GetScheduler().Result; Scheduler.JobFactory = iocJobfactory; } public async Task<IScheduler> ScheduleJob() { var _eqidCounterResetJob = JobBuilder.Create<EqidCounterResetJob>() .WithIdentity("EqidCounterResetJob") .Build(); var _eqidCounterResetJobTrigger = TriggerBuilder.Create() .WithIdentity("EqidCounterResetCron") .StartNow() //每天凌晨0s .WithCronSchedule("0 0 0 * * ?") Seconds,Minutes,Hours,Day-of-Month,Month,Day-of-Week,Year(optional field) .Build(); // 这里一定要先判断是否已经从SQlite中加载了Job和Trigger if (!await Scheduler.CheckExists(new JobKey("EqidCounterResetJob")) && !await Scheduler.CheckExists(new TriggerKey("EqidCounterResetCron"))) { await Scheduler.ScheduleJob(_eqidCounterResetJob, _eqidCounterResetJobTrigger); } await Scheduler.Start(); return Scheduler; } public void EndScheduler() { if (Scheduler == null) { return; } if (Scheduler.Shutdown(waitForJobsToComplete: true).Wait(30000)) Scheduler = null; else { } _logger.LogError("Schedule job upload as application stopped"); } } }
上面是Quartz.NET 从sqlite中加载Job和Trigger的核心代码
这里要提示两点:
① IOCJobFactory 是自定义JobFactory,目的是与ASP.NET Core原生依赖注入结合
② 在调度任务的时候,先判断是否已经从sqlite加载了Job和Trigger
3.添加Quartz.Net UI轮子
附赠Quartz.NET的调度UI: CrystalQuartz, 方便在界面管理和调度任务
① Install-Package CrystalQuartz.AspNetCore -IncludePrerelease
② Startup启用CrystalQuartz
using CrystalQuartz.AspNetCore; /* * app is IAppBuilder * scheduler is your IScheduler (local or remote) */ var quartz = app.ApplicationServices.GetRequiredService<QuartzStartup>(); var _schedule = await quartz.ScheduleJob(); app.UseCrystalQuartz(() => scheduler);
③ 在localhost:YOUR_PORT/quartz地址查看调度
总结输出
- Quartz.net以AdoJobStore支撑分布式定时任务,解决多实例多次触发的问题
- 快速抛出轮子:Quartz.Net UI库