调度任务动态装载
Scheduling
模块本身已经支持基于NamespaceHandler
支持通过XML
文件配置调度任务,但是笔者一直认为XML
给人的感觉太"重",使用起来显得太笨重,这里打算扩展出JSON
文件配置和基于JDBC
数据源配置(也就是持久化任务,这里选用MySQL
)。根据前文的源码分析,需要用到SchedulingConfigurer
接口的实现,用于在所有调度任务触发之前从外部添加自定义的调度任务。先定义调度任务的一些配置属性类:
// 调度任务类型枚举 @Getter @RequiredArgsConstructor public enum ScheduleTaskType { CRON("CRON"), FIXED_DELAY("FIXED_DELAY"), FIXED_RATE("FIXED_RATE"), ; private final String type; } // 调度任务配置,enable属性为全局开关 @Data public class ScheduleTaskProperties { private Long version; private Boolean enable; private List<ScheduleTasks> tasks; } // 调度任务集合,笔者设计的时候采用一个宿主类中每个独立方法都是一个任务实例的模式 @Data public class ScheduleTasks { // 这里故意叫Klass代表Class,避免关键字冲突 private String taskHostKlass; private Boolean enable; private List<ScheduleTaskMethod> taskMethods; } // 调度任务方法 - enable为任务开关,没有配置会被ScheduleTaskProperties或者ScheduleTasks中的enable覆盖 @Data public class ScheduleTaskMethod { private Boolean enable; private String taskDescription; private String taskMethod; // 时区,cron的计算需要用到 private String timeZone; private String cronExpression; private String intervalMilliseconds; private String initialDelayMilliseconds; } 复制代码
设计的时候,考虑到多个任务执行方法可以放在同一个宿主类,这样可以方便同一种类的任务进行统一管理,如:
public class TaskHostClass { public void task1() { } public void task2() { } ...... public void taskN() { } } 复制代码
细节方面,intervalMilliseconds
和initialDelayMilliseconds
的单位设计为毫秒,使用字符串形式,方便可以基于StringValueResolver
解析配置文件中的属性配置。添加一个抽象的SchedulingConfigurer
:
@Slf4j public abstract class AbstractSchedulingConfigurer implements SchedulingConfigurer, InitializingBean, BeanFactoryAware, EmbeddedValueResolverAware { @Getter private StringValueResolver embeddedValueResolver; private ConfigurableBeanFactory configurableBeanFactory; private final List<InternalTaskProperties> internalTasks = Lists.newLinkedList(); private final Set<String> tasksLoaded = Sets.newHashSet(); @Override public void setBeanFactory(BeanFactory beanFactory) throws BeansException { configurableBeanFactory = (ConfigurableBeanFactory) beanFactory; } @Override public void afterPropertiesSet() throws Exception { internalTasks.clear(); internalTasks.addAll(loadTaskProperties()); } @Override public void setEmbeddedValueResolver(StringValueResolver resolver) { embeddedValueResolver = resolver; } @Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { for (InternalTaskProperties task : internalTasks) { try { synchronized (tasksLoaded) { String key = task.taskHostKlass() + "#" + task.taskMethod(); // 避免重复加载 if (!tasksLoaded.contains(key)) { if (task instanceof CronTaskProperties) { loadCronTask((CronTaskProperties) task, taskRegistrar); } if (task instanceof FixedDelayTaskProperties) { loadFixedDelayTask((FixedDelayTaskProperties) task, taskRegistrar); } if (task instanceof FixedRateTaskProperties) { loadFixedRateTask((FixedRateTaskProperties) task, taskRegistrar); } tasksLoaded.add(key); } else { log.info("调度任务已经装载,任务宿主类:{},任务执行方法:{}", task.taskHostKlass(), task.taskMethod()); } } } catch (Exception e) { throw new IllegalStateException(String.format("加载调度任务异常,任务宿主类:%s,任务执行方法:%s", task.taskHostKlass(), task.taskMethod()), e); } } } private ScheduledMethodRunnable loadScheduledMethodRunnable(String taskHostKlass, String taskMethod) throws Exception { Class<?> klass = ClassUtils.forName(taskHostKlass, null); Object target = configurableBeanFactory.getBean(klass); Method method = ReflectionUtils.findMethod(klass, taskMethod); if (null == method) { throw new IllegalArgumentException(String.format("找不到目标方法,任务宿主类:%s,任务执行方法:%s", taskHostKlass, taskMethod)); } Method invocableMethod = AopUtils.selectInvocableMethod(method, target.getClass()); return new ScheduledMethodRunnable(target, invocableMethod); } private void loadCronTask(CronTaskProperties pops, ScheduledTaskRegistrar taskRegistrar) throws Exception { ScheduledMethodRunnable runnable = loadScheduledMethodRunnable(pops.taskHostKlass(), pops.taskMethod()); String cronExpression = embeddedValueResolver.resolveStringValue(pops.cronExpression()); if (null != cronExpression) { String timeZoneString = embeddedValueResolver.resolveStringValue(pops.timeZone()); TimeZone timeZone; if (null != timeZoneString) { timeZone = TimeZone.getTimeZone(timeZoneString); } else { timeZone = TimeZone.getDefault(); } CronTask cronTask = new CronTask(runnable, new CronTrigger(cronExpression, timeZone)); taskRegistrar.addCronTask(cronTask); log.info("装载CronTask[{}#{}()]成功,cron表达式:{},任务描述:{}", cronExpression, pops.taskMethod(), pops.cronExpression(), pops.taskDescription()); } } private void loadFixedDelayTask(FixedDelayTaskProperties pops, ScheduledTaskRegistrar taskRegistrar) throws Exception { ScheduledMethodRunnable runnable = loadScheduledMethodRunnable(pops.taskHostKlass(), pops.taskMethod()); long fixedDelayMilliseconds = parseDelayAsLong(embeddedValueResolver.resolveStringValue(pops.intervalMilliseconds())); long initialDelayMilliseconds = parseDelayAsLong(embeddedValueResolver.resolveStringValue(pops.initialDelayMilliseconds())); FixedDelayTask fixedDelayTask = new FixedDelayTask(runnable, fixedDelayMilliseconds, initialDelayMilliseconds); taskRegistrar.addFixedDelayTask(fixedDelayTask); log.info("装载FixedDelayTask[{}#{}()]成功,固定延迟间隔:{} ms,初始延迟执行时间:{} ms,任务描述:{}", pops.taskHostKlass(), pops.taskMethod(), fixedDelayMilliseconds, initialDelayMilliseconds, pops.taskDescription()); } private void loadFixedRateTask(FixedRateTaskProperties pops, ScheduledTaskRegistrar taskRegistrar) throws Exception { ScheduledMethodRunnable runnable = loadScheduledMethodRunnable(pops.taskHostKlass(), pops.taskMethod()); long fixedRateMilliseconds = parseDelayAsLong(embeddedValueResolver.resolveStringValue(pops.intervalMilliseconds())); long initialDelayMilliseconds = parseDelayAsLong(embeddedValueResolver.resolveStringValue(pops.initialDelayMilliseconds())); FixedRateTask fixedRateTask = new FixedRateTask(runnable, fixedRateMilliseconds, initialDelayMilliseconds); taskRegistrar.addFixedRateTask(fixedRateTask); log.info("装载FixedRateTask[{}#{}()]成功,固定执行频率:{} ms,初始延迟执行时间:{} ms,任务描述:{}", pops.taskHostKlass(), pops.taskMethod(), fixedRateMilliseconds, initialDelayMilliseconds, pops.taskDescription()); } private long parseDelayAsLong(String value) { if (null == value) { return 0L; } if (value.length() > 1 && (isP(value.charAt(0)) || isP(value.charAt(1)))) { return Duration.parse(value).toMillis(); } return Long.parseLong(value); } private boolean isP(char ch) { return (ch == 'P' || ch == 'p'); } /** * 加载任务配置,预留给子类实现 */ protected abstract List<InternalTaskProperties> loadTaskProperties() throws Exception; interface InternalTaskProperties { String taskHostKlass(); String taskMethod(); String taskDescription(); } @Builder protected static class CronTaskProperties implements InternalTaskProperties { private String taskHostKlass; private String taskMethod; private String cronExpression; private String taskDescription; private String timeZone; @Override public String taskDescription() { return taskDescription; } public String cronExpression() { return cronExpression; } public String timeZone() { return timeZone; } @Override public String taskHostKlass() { return taskHostKlass; } @Override public String taskMethod() { return taskMethod; } } @Builder protected static class FixedDelayTaskProperties implements InternalTaskProperties { private String taskHostKlass; private String taskMethod; private String intervalMilliseconds; private String initialDelayMilliseconds; private String taskDescription; @Override public String taskDescription() { return taskDescription; } public String initialDelayMilliseconds() { return initialDelayMilliseconds; } public String intervalMilliseconds() { return intervalMilliseconds; } @Override public String taskHostKlass() { return taskHostKlass; } @Override public String taskMethod() { return taskMethod; } } @Builder protected static class FixedRateTaskProperties implements InternalTaskProperties { private String taskHostKlass; private String taskMethod; private String intervalMilliseconds; private String initialDelayMilliseconds; private String taskDescription; @Override public String taskDescription() { return taskDescription; } public String initialDelayMilliseconds() { return initialDelayMilliseconds; } public String intervalMilliseconds() { return intervalMilliseconds; } @Override public String taskHostKlass() { return taskHostKlass; } @Override public String taskMethod() { return taskMethod; } } } 复制代码
loadTaskProperties()
方法用于加载任务配置,留给子类实现。
JSON配置
JSON
配置文件的格式如下(类路径下的scheduling/tasks.json
文件):
{ "version": 1, "tasks": [ { "taskKlass": "club.throwable.schedule.Tasks", "taskMethods": [ { "taskType": "FIXED_DELAY", "taskDescription": "processTask1任务", "taskMethod": "processTask1", "intervalMilliseconds": "5000" } ] } ] } 复制代码
每个层级都有一个enable
属性,默认为true
,只有强制指定为false
的时候才不会装载对应的任务调度方法。这里就是简单继承AbstractSchedulingConfigurer
,实现从类路径加载配置的逻辑,定义JsonSchedulingConfigurer
:
public class JsonSchedulingConfigurer extends AbstractSchedulingConfigurer { // 这里把默认的任务配置JSON文件放在CLASSPATH下的scheduling/tasks.json,可以通过配置项scheduling.json.config.location进行覆盖 @Value("${scheduling.json.config.location:scheduling/tasks.json}") private String location; @Autowired private ObjectMapper objectMapper; @Override protected List<InternalTaskProperties> loadTaskProperties() throws Exception { ClassPathResource resource = new ClassPathResource(location); String content = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8); ScheduleTaskProperties properties = objectMapper.readValue(content, ScheduleTaskProperties.class); if (Boolean.FALSE.equals(properties.getEnable()) || null == properties.getTasks()) { return Lists.newArrayList(); } List<InternalTaskProperties> target = Lists.newArrayList(); for (ScheduleTasks tasks : properties.getTasks()) { if (null != tasks) { List<ScheduleTaskMethod> taskMethods = tasks.getTaskMethods(); if (null != taskMethods) { for (ScheduleTaskMethod taskMethod : taskMethods) { if (!Boolean.FALSE.equals(taskMethod.getEnable())) { if (ScheduleTaskType.CRON == taskMethod.getTaskType()) { target.add(CronTaskProperties.builder() .taskMethod(taskMethod.getTaskMethod()) .cronExpression(taskMethod.getCronExpression()) .timeZone(taskMethod.getTimeZone()) .taskDescription(taskMethod.getTaskDescription()) .taskHostKlass(tasks.getTaskKlass()) .build()); } if (ScheduleTaskType.FIXED_DELAY == taskMethod.getTaskType()) { target.add(FixedDelayTaskProperties.builder() .taskMethod(taskMethod.getTaskMethod()) .intervalMilliseconds(taskMethod.getIntervalMilliseconds()) .initialDelayMilliseconds(taskMethod.getInitialDelayMilliseconds()) .taskDescription(taskMethod.getTaskDescription()) .taskHostKlass(tasks.getTaskKlass()) .build()); } if (ScheduleTaskType.FIXED_RATE == taskMethod.getTaskType()) { target.add(FixedRateTaskProperties.builder() .taskMethod(taskMethod.getTaskMethod()) .intervalMilliseconds(taskMethod.getIntervalMilliseconds()) .initialDelayMilliseconds(taskMethod.getInitialDelayMilliseconds()) .taskDescription(taskMethod.getTaskDescription()) .taskHostKlass(tasks.getTaskKlass()) .build()); } } } } } } return target; } } 复制代码
添加一个配置类和任务类:
@Configuration public class SchedulingAutoConfiguration { @Bean public JsonSchedulingConfigurer jsonSchedulingConfigurer(){ return new JsonSchedulingConfigurer(); } } // club.throwable.schedule.Tasks @Slf4j @Component public class Tasks { public void processTask1() { log.info("processTask1触发.........."); } } 复制代码
启动SpringBoot
应用,某次执行的部分日志如下:
2020-03-22 16:24:17.248 INFO 22836 --- [ main] c.t.s.AbstractSchedulingConfigurer : 装载FixedDelayTask[club.throwable.schedule.Tasks#processTask1()]成功,固定延迟间隔:5000 ms,初始延迟执行时间:0 ms,任务描述:processTask1任务 2020-03-22 16:24:22.275 INFO 22836 --- [pool-1-thread-1] club.throwable.schedule.Tasks : processTask1触发.......... 2020-03-22 16:24:27.277 INFO 22836 --- [pool-1-thread-1] club.throwable.schedule.Tasks : processTask1触发.......... 2020-03-22 16:24:32.279 INFO 22836 --- [pool-1-thread-1] club.throwable.schedule.Tasks : processTask1触发.......... ...... 复制代码
这里有些细节没有完善,例如配置文件参数的一些非空判断、配置值是否合法等等校验逻辑没有做,如果要设计成一个工业级的类库,这些方面必须要考虑。
JDBC数据源配置
JDBC
数据源这里用MySQL
举例说明,先建一个调度任务配置表schedule_task
:
CREATE TABLE `schedule_task` ( id BIGINT UNSIGNED PRIMARY KEY AUTO_INCREMENT COMMENT '主键', edit_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', editor VARCHAR(32) NOT NULL DEFAULT 'admin' COMMENT '修改者', creator VARCHAR(32) NOT NULL DEFAULT 'admin' COMMENT '创建者', deleted BIGINT UNSIGNED NOT NULL DEFAULT 0 COMMENT '软删除标识', task_host_class VARCHAR(256) NOT NULL COMMENT '任务宿主类全类名', task_method VARCHAR(128) NOT NULL COMMENT '任务执行方法名', task_type VARCHAR(16) NOT NULL COMMENT '任务类型', task_description VARCHAR(64) NOT NULL COMMENT '任务描述', cron_expression VARCHAR(128) COMMENT 'cron表达式', time_zone VARCHAR(32) COMMENT '时区', interval_milliseconds BIGINT UNSIGNED NOT NULL DEFAULT 0 COMMENT '执行间隔时间', initial_delay_milliseconds BIGINT UNSIGNED NOT NULL DEFAULT 0 COMMENT '初始延迟执行时间', UNIQUE uniq_class_method (task_host_class, task_method) ) COMMENT '调度任务配置表'; 复制代码
其实具体的做法和JSON
配置差不多,先引入spring-boot-starter-jdbc
,接着编写MysqlSchedulingConfigurer
:
// DAO @RequiredArgsConstructor public class MysqlScheduleTaskDao { private final JdbcTemplate jdbcTemplate; private static final ResultSetExtractor<List<ScheduleTask>> MULTI = r -> { List<ScheduleTask> tasks = Lists.newArrayList(); while (r.next()) { ScheduleTask task = new ScheduleTask(); tasks.add(task); task.setId(r.getLong("id")); task.setCronExpression(r.getString("cron_expression")); task.setInitialDelayMilliseconds(r.getLong("initial_delay_milliseconds")); task.setIntervalMilliseconds(r.getLong("interval_milliseconds")); task.setTimeZone(r.getString("time_zone")); task.setTaskDescription(r.getString("task_description")); task.setTaskHostClass(r.getString("task_host_class")); task.setTaskMethod(r.getString("task_method")); task.setTaskType(r.getString("task_type")); } return tasks; }; public List<ScheduleTask> selectAllTasks() { return jdbcTemplate.query("SELECT * FROM schedule_task WHERE deleted = 0", MULTI); } } // MysqlSchedulingConfigurer @RequiredArgsConstructor public class MysqlSchedulingConfigurer extends AbstractSchedulingConfigurer { private final MysqlScheduleTaskDao mysqlScheduleTaskDao; @Override protected List<InternalTaskProperties> loadTaskProperties() throws Exception { List<InternalTaskProperties> target = Lists.newArrayList(); List<ScheduleTask> tasks = mysqlScheduleTaskDao.selectAllTasks(); if (!tasks.isEmpty()) { for (ScheduleTask task : tasks) { ScheduleTaskType scheduleTaskType = ScheduleTaskType.fromType(task.getTaskType()); if (ScheduleTaskType.CRON == scheduleTaskType) { target.add(CronTaskProperties.builder() .taskMethod(task.getTaskMethod()) .cronExpression(task.getCronExpression()) .timeZone(task.getTimeZone()) .taskDescription(task.getTaskDescription()) .taskHostKlass(task.getTaskHostClass()) .build()); } if (ScheduleTaskType.FIXED_DELAY == scheduleTaskType) { target.add(FixedDelayTaskProperties.builder() .taskMethod(task.getTaskMethod()) .intervalMilliseconds(String.valueOf(task.getIntervalMilliseconds())) .initialDelayMilliseconds(String.valueOf(task.getInitialDelayMilliseconds())) .taskDescription(task.getTaskDescription()) .taskHostKlass(task.getTaskHostClass()) .build()); } if (ScheduleTaskType.FIXED_RATE == scheduleTaskType) { target.add(FixedRateTaskProperties.builder() .taskMethod(task.getTaskMethod()) .intervalMilliseconds(String.valueOf(task.getIntervalMilliseconds())) .initialDelayMilliseconds(String.valueOf(task.getInitialDelayMilliseconds())) .taskDescription(task.getTaskDescription()) .taskHostKlass(task.getTaskHostClass()) .build()); } } } return target; } } 复制代码
记得引入spring-boot-starter-jdbc
和mysql-connector-java
并且激活MysqlSchedulingConfigurer
配置。插入一条记录:
INSERT INTO `schedule_task`(`id`, `edit_time`, `create_time`, `editor`, `creator`, `deleted`, `task_host_class`, `task_method`, `task_type`, `task_description`, `cron_expression`, `time_zone`, `interval_milliseconds`, `initial_delay_milliseconds`) VALUES (1, '2020-03-30 23:46:10', '2020-03-30 23:46:10', 'admin', 'admin', 0, 'club.throwable.schedule.Tasks', 'processTask1', 'FIXED_DELAY', '测试任务', NULL, NULL, 10000, 5000); 复制代码
然后启动服务,某次执行的输出:
2020-03-30 23:47:27.376 INFO 53120 --- [pool-1-thread-1] club.throwable.schedule.Tasks : processTask1触发.......... 2020-03-30 23:47:37.378 INFO 53120 --- [pool-1-thread-1] club.throwable.schedule.Tasks : processTask1触发.......... .... 复制代码
混合配置
有些时候我们希望可以JSON
配置和JDBC
数据源配置进行混合配置,或者动态二选一以便灵活应对多环境的场景(例如要在开发环境使用JSON
配置而测试和生产环境使用JDBC
数据源配置,甚至可以将JDBC
数据源配置覆盖JSON
配置,这样能保证总是倾向于使用JDBC
数据源配置),这样需要对前面两小节的实现加多一层抽象。这里的设计可以参考SpringMVC
中的控制器参数解析器的设计,具体是HandlerMethodArgumentResolverComposite
,其实道理是相同的。
其他注意事项
在生产实践中,暂时不考虑生成任务执行日志和细粒度的监控,着重做了两件事:
- 并发控制,(多服务节点下)禁止任务并发执行。
- 跟踪任务的日志轨迹。
解决并发执行问题
一般情况下,我们需要禁止任务并发执行,考虑引入Redisson
提供的分布式锁:
// 引入依赖 <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>最新版本</version> </dependency> // 配置类 @Configuration @AutoConfigureAfter(RedisAutoConfiguration.class) public class RedissonAutoConfiguration { @Autowired private RedisProperties redisProperties; @Bean(destroyMethod = "shutdown") public RedissonClient redissonClient() { Config config = new Config(); SingleServerConfig singleServerConfig = config.useSingleServer(); singleServerConfig.setAddress(String.format("redis://%s:%d", redisProperties.getHost(), redisProperties.getPort())); if (redisProperties.getDatabase() > 0) { singleServerConfig.setDatabase(redisProperties.getDatabase()); } if (null != redisProperties.getPassword()) { singleServerConfig.setPassword(redisProperties.getPassword()); } return Redisson.create(config); } } // 分布式锁工厂 @Component public class DistributedLockFactory { private static final String DISTRIBUTED_LOCK_PATH_PREFIX = "dl:"; @Autowired private RedissonClient redissonClient; public DistributedLock provideDistributedLock(String lockKey) { String lockPath = DISTRIBUTED_LOCK_PATH_PREFIX + lockKey; return new RedissonDistributedLock(redissonClient, lockPath); } } 复制代码
这里考虑到项目依赖了spring-boot-starter-redis
,直接复用了它的配置属性类(RedissonDistributedLock
是RLock
的轻量级封装,见附录)。使用方式如下:
@Autowired private DistributedLockFactory distributedLockFactory; public void task1() { DistributedLock lock = distributedLockFactory.provideDistributedLock(lockKey); // 等待时间为20秒,持有锁的最大时间为60秒 boolean tryLock = lock.tryLock(20L, 60, TimeUnit.SECONDS); if (tryLock) { try { // 业务逻辑 }finally { lock.unlock(); } } } 复制代码
引入MDC跟踪任务的Trace
MDC
其实是Mapped Diagnostic Context
的缩写,也就是映射诊断上下文,一般用于日志框架里面同一个线程执行过程的跟踪(例如一个线程跑过了多个方法,各个方法里面都打印了日志,那么通过MDC
可以对整个调用链通过一个唯一标识关联起来),例如这里选用slf4j
提供的org.slf4j.MDC
:
@Component public class MappedDiagnosticContextAssistant { /** * 在MDC中执行 * * @param runnable runnable */ public void processInMappedDiagnosticContext(Runnable runnable) { String uuid = UUID.randomUUID().toString(); MDC.put("TRACE_ID", uuid); try { runnable.run(); } finally { MDC.remove("TRACE_ID"); } } } 复制代码
任务执行的时候需要包裹成一个Runnale
实例:
public void task1() { mappedDiagnosticContextAssistant.processInMappedDiagnosticContext(() -> { StopWatch watch = new StopWatch(); watch.start(); log.info("开始执行......"); // 业务逻辑 watch.stop(); log.info("执行完毕,耗时:{} ms......", watch.getTotalTimeMillis()); }); } 复制代码
结合前面一节提到的并发控制,那么最终执行的任务方法如下:
public void task1() { mappedDiagnosticContextAssistant.processInMappedDiagnosticContext(() -> { StopWatch watch = new StopWatch(); watch.start(); log.info("开始执行......"); scheduleTaskAssistant.executeInDistributedLock("任务分布式锁KEY", () -> { // 真实的业务逻辑 }); watch.stop(); log.info("执行完毕,耗时:{} ms......", watch.getTotalTimeMillis()); }); } 复制代码
这里的方法看起来比较别扭,其实可以直接在任务装载的时候基于分布式锁和MDC
进行封装,方式类似于ScheduledMethodRunnable
,这里不做展开,因为要详细展开篇幅可能比较大(ScheduleTaskAssistant
见附录)。
小结
其实spring-context
整个调度模块完全依赖于TaskScheduler
实现,更底层的是JUC
调度线程池ScheduledThreadPoolExecutor
。如果想要从底层原理理解整个调度模块的运行原理,那么就一定要分析ScheduledThreadPoolExecutor
的实现。整篇文章大致介绍了spring-context
调度模块的加载调度任务的流程,并且基于扩展接口SchedulingConfigurer
扩展出多种自定义配置调度任务的方式,但是考虑到需要在生产环境中运行,那么免不了需要考虑监控、并发控制、日志跟踪等等的功能,但是这样就会使得整个调度模块变重,慢慢地就会发现,这个轮子越造越大,越有主流调度框架Quartz
或者Easy Scheduler
的影子。笔者认为,软件工程,有些时候要权衡取舍,该抛弃的就应该果断抛弃,否则总是负重而行,还能走多远?
参考资料:
SpringBoot
源码
附录
ScheduleTaskAssistant
:
@RequiredArgsConstructor @Component public class ScheduleTaskAssistant { /** * 5秒 */ public static final long DEFAULT_WAIT_TIME = 5L; /** * 30秒 */ public static final long DEFAULT_LEAVE_TIME = 30L; private final DistributedLockFactory distributedLockFactory; /** * 在分布式锁中执行 * * @param waitTime 锁等着时间 * @param leaveTime 锁持有时间 * @param timeUnit 时间单位 * @param lockKey 锁的key * @param task 任务对象 */ public void executeInDistributedLock(long waitTime, long leaveTime, TimeUnit timeUnit, String lockKey, Runnable task) { DistributedLock lock = distributedLockFactory.dl(lockKey); boolean tryLock = lock.tryLock(waitTime, leaveTime, timeUnit); if (tryLock) { try { long waitTimeMillis = timeUnit.toMillis(waitTime); long start = System.currentTimeMillis(); task.run(); long end = System.currentTimeMillis(); long cost = end - start; // 预防锁过早释放 if (cost < waitTimeMillis) { Sleeper.X.sleep(waitTimeMillis - cost); } } finally { lock.unlock(); } } } /** * 在分布式锁中执行 - 使用默认时间 * * @param lockKey 锁的key * @param task 任务对象 */ public void executeInDistributedLock(String lockKey, Runnable task) { executeInDistributedLock(DEFAULT_WAIT_TIME, DEFAULT_LEAVE_TIME, TimeUnit.SECONDS, lockKey, task); } } 复制代码
RedissonDistributedLock
:
@Slf4j public class RedissonDistributedLock implements DistributedLock { private final RedissonClient redissonClient; private final String lockPath; private final RLock internalLock; RedissonDistributedLock(RedissonClient redissonClient, String lockPath) { this.redissonClient = redissonClient; this.lockPath = lockPath; this.internalLock = initInternalLock(); } private RLock initInternalLock() { return redissonClient.getLock(lockPath); } @Override public boolean isLock() { return internalLock.isLocked(); } @Override public boolean isHeldByCurrentThread() { return internalLock.isHeldByCurrentThread(); } @Override public void lock(long leaseTime, TimeUnit unit) { internalLock.lock(leaseTime, unit); } @Override public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) { try { return internalLock.tryLock(waitTime, leaseTime, unit); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IllegalStateException(String.format("Acquire lock fail by thread interrupted,path:%s", lockPath), e); } } @Override public void unlock() { try { internalLock.unlock(); } catch (IllegalMonitorStateException ex) { log.warn("Unlock path:{} error for thread status change in concurrency", lockPath, ex); } } }