初衷:数据文件的处理,其中涉及到一些定时任务处理。当时刚进XXX公司参与第一个项目,但是意识到公司的产品业务基本上都是围绕数据做一些信息或线索的挖掘,尤其是离线数据,应该会有很多各种各样的任务式场景,编写一个通用的轻量级定时任务处理,能够提高工作中的开发效率和质量,以及运维的能力。
目标:设计一个通用的轻量级任务管理框架,包括各种定时任务以及手动创建的任务,在功能上除了能支持常见的场景外,也希望能支持一定的实时监控管理,而在结构上要有足够的扩展性,方便进行一些定制,同时在使用方式上希望能简洁清晰,对原有代码尽量少的侵入。
由于当时是设计用来处理文件的,又是基于 spring 进行管理的,所以取名为 spring-fom。
1. 思路
下面主要从定时任务的角度进行说明,也并不是说只适用于定时任务场景,定时任务与手动创建任务的区别只是在于创建和提交的方式不同,前者由确定的定时线程负责创建和提交,而后者的创建和提交来自未知的外部线程。这里只说的是任务的创建和提交,而没有说创建执行,是因为任务的执行都委托给了线程池,提交就相当于执行的意思。
其实本身就是基于线程池实现的,主要的任务调度逻辑都封装在ScheduleContext
中,其内部维护了一个定时线程和一个线程池,以及一套自定义的状态转换机制。
每个ScheduleContext
就相当于一个独立的任务调度器,其生命周期,比如加载、启动、终止,完全委托给了 spring 的应用上下文。
对于具体的任务则抽象为 Task
,其实就是在 Callable
的基础上定义了一套任务执行模板,因此,整个 spring-fom 的功能,基本就是围绕 ScheduleContext
和 Task
实现的。
1.1. 功能设计
主要特性:
1. 对于**定时任务场景**,支持基本的三种定时语义(cron / fixedRate / fixedDelay) |
2. 支持定时批(多)任务执行 实现接口`ScheduleFactory`可以创建批任务,或者通过`@Scheduled`指定多个任务方法 实现接口`CompleteHandler`可以自定义批任务结束处理 |
3. 支持任务超时检测处理,通过`taskOverTime`可以设置任务超时时间 实现接口`TaskCancelHandler`可以自定义任务超时的取消处理,默认只通过`Interrupt`中断 通过`detectTimeoutOnEachTask`可以设置是否对整体任务计算超时,默认是对每个任务单独检测超时 |
4. 支持任务冲突检测 通过`enableTaskConflict`可以开启任务冲突检测,即如果提交任务时,发现已经存在对应id的任务,并且还在运行,则忽略本次任务, 这个在文件处理的场景中比较有用; |
5. 支持实时监控管理功能,并提供一些监控和管理的接口,同时内置了一个简单的任务管理界面:http://{ip}:{port}/{path}/fom.html 5.1. 可以实时查看定时器的状态,以及任务执行情况的统计等信息 5.2. 可以实时启动 / 终止定时器,并在终止时尝试取消还在运行的任务; 实现接口`TaskCancelHandler`也可以自定义终止定时器时任务的取消处理,默认只通过`Interrupt`中断 实现接口`TerminateHandler`可以自定义定时器终止时的处理,比如清理一些资源之类 5.3. 可以实时触发任务的执行,比如当定时任务未到执行时机时,可以手动使其立即执行 如果触发时任务正在执行,则支持两种策略:直接忽略(默认),或者等待本次执行完成后立即再重新执行 5.4. 可以实时修改任务配置,并支持持久化,即保证重启后修改依然有效 |
6. 对于**非定时任务场景**,比如提交的批任务 同样支持`CompleteHandler`接口,以及超时检测和冲突检测处理,具体可以见后面的使用示例 |
1.2. 配置
对应上面的功能,下面列举一下配置的定义:
1.3. 状态定义
对于ScheduleContext
中维护的几个状态,其状态转换机制可以用下图表示:
- 如果处于状态:INITED 或 STOPPED
可以接收外部线程的startup
,然后启动定时线程,由定时线程将状态切换为 RUNNING;
- 如果处于状态:RUNNING
正常情况下,定时线程会等待任务结束后,将状态切换为 SLEEPING,如果没有设置定时计划(一次性任务),则切换为 INITED;
另外,可以接收外部线程的shutDown
,由外部线程将状态切换为 STOPPING,并请求中断定时线程。然后定时线程会跳过等待任务结束的过程,并很快检测到 STOPPING 状态, 接着尝试取消还在执行的任务,关闭线程池,并在所有任务真正结束后将状态切换为 STOPPED;
也可以接收外部线程的execNow
,但默认是忽略,如果将ignoreExecRequestWhenRunning
设置为false
,那么会在本次任务结束之后会立即再重新执行一次;
- 如果处于状态:SLEEPING
正常情况下,定时线程会在sleep
结束后进行下一次任务执行,并将状态切换为 RUNNING;
另外,可以接收外部线程的execNow
,由外部线程中断定时线程的等待,然后定时线程立即开始下一次任务执行,并将状态切换为 RUNNING;
也可以接收外部线程的shutDown
,也是由外部线程中断定时线程的等待,但在这之前,外部线程会将状态切换为 STOPPING, 然后定时线程在开始下一次任务之前会检测到这个 STOPPING 状态,接着定时线程会关闭线程池,将状态切换为 STOPPED;
- 如果处于状态:STOPPING
此时忽略一切外部请求,直到定时线程等待所有任务结束之后,由定时线程将状态切换为 STOPPED;
1.4. 定时线程
对于ScheduleContext
中的定时线程,根据上面的状态描述,可以简要画出其执行流程如下:
2. 具体实现
2.1. 加载启动
- 加载
对于任务的加载和启动完全委托给了spring应用上下文,首先定义一个注解@FomSchedule
并继承@Component
,这样spring在加载容器时会帮忙加载标识了@FomSchedule
的目标类, 然后在后面可以通过@FomSchedule
来识别目标类
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Component public @interface FomSchedule { // ... ... @AliasFor(annotation = Component.class) String value() default ""; // 加载时是否启动 boolean enable() default ENABLE_DEFAULT; // 启动时是否立即执行,默认false boolean execOnLoad() default EXEC_ONLOAN_DEFAULT; // ... ... }
但这里不仅是想让spring帮忙创建和管理目标类的实例,还希望能根据类上面的@FomSchedule
创建对应的ScheduleContext
于是通过实现接口ImportBeanDefinitionRegistrar
扫描所有注册的BeanDefinition
,如果其对应的类上面标识了@FomSchedule
,就注册一个对应的ScheduleContext
定义, 至于beanName
,就在原目标类的beanName
前加一个$
符,表示获取的意思,当然如果目标类已经继承了ScheduleContext
,那么就简单注册一个别名。
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Import(FomBeanDefinitionRegistrar.class) public @interface EnableFom { boolean enableFomView() default true; }
public class FomBeanDefinitionRegistrar implements ImportBeanDefinitionRegistrar{ private static AtomicBoolean registed = new AtomicBoolean(false); @Override public void registerBeanDefinitions(AnnotationMetadata meta, BeanDefinitionRegistry registry) { if(!registed.compareAndSet(false, true)){ return; } AnnotationAttributes attrs = AnnotationAttributes.fromMap(meta.getAnnotationAttributes(EnableFom.class.getName())); if((boolean)attrs.get("enableFomView")){ // 注册FomController RootBeanDefinition fomController = new RootBeanDefinition(FomController.class); registry.registerBeanDefinition("fomController", fomController); // 注册FomAdvice RootBeanDefinition fomAdvice = new RootBeanDefinition(FomAdvice.class); registry.registerBeanDefinition("fomAdvice", fomAdvice); // 注册FomServiceImpl RootBeanDefinition fomServiceImpl = new RootBeanDefinition(FomServiceImpl.class); registry.registerBeanDefinition("fomService", fomServiceImpl); } // 注册SchedulePostProcessor RootBeanDefinition fomBeanPostProcessor = new RootBeanDefinition(FomBeanPostProcessor.class); registry.registerBeanDefinition("schedulePostProcessor", fomBeanPostProcessor); // 注册FomScheduleStarter RootBeanDefinition fomScheduleStarter = new RootBeanDefinition(FomScheduleStarter.class); registry.registerBeanDefinition("fomScheduleStarter", fomScheduleStarter); // 注册FomBeanDefinition String[] beanNames = registry.getBeanDefinitionNames(); Class<?> clazz; for(String beanName : beanNames){ BeanDefinition beanDefinition = registry.getBeanDefinition(beanName); String className = beanDefinition.getBeanClassName(); if(className != null){ try { clazz = Class.forName(className); } catch (ClassNotFoundException e) { throw new ApplicationContextException("", e); } FomSchedule fomSchedule = clazz.getAnnotation(FomSchedule.class); if(fomSchedule != null){ parseFomSchedule(beanName, clazz, beanDefinition, fomSchedule, registry); } } } } public void parseFomSchedule(String beanName, Class<?> clazz, BeanDefinition beanDefinition, FomSchedule fomSchedule, BeanDefinitionRegistry registry){ if(ScheduleContext.class.isAssignableFrom(clazz)){ beanDefinition.getPropertyValues().add("scheduleName", beanName); registry.registerAlias(beanName, "$" + beanName); }else{ RootBeanDefinition fomBeanDefinition = new RootBeanDefinition(ScheduleContext.class); fomBeanDefinition.getPropertyValues().add("scheduleBeanName", beanName); fomBeanDefinition.getPropertyValues().add("scheduleName", "$" + beanName); registry.registerBeanDefinition("$" + beanName, fomBeanDefinition); } } }
接下来就是创建一个BeanPostProcessor
,扫描所有注册的Bean
,如果类型是ScheduleContext
,就根据其对应注解中的信息进行一些设置,并创建对应的代理。
不过有个问题就是如何根据ScheduleContext
的实例获取到对应的注解信息,思路就在上面的代码中,即在注册BeanDefinition
时注入了一个属性scheduleBeanName
,用来记住对应的目标类, 这样知道了目标类之后自然能找到其上面的注解信息了。
public class FomBeanPostProcessor implements BeanPostProcessor, BeanFactoryAware, EmbeddedValueResolverAware { // ... ... @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { Class<?> clazz = bean.getClass(); if(!(ScheduleContext.class.isAssignableFrom(clazz))){ return bean; } ScheduleContext<?> scheduleContext = (ScheduleContext<?>)bean; String scheduleBeanName = scheduleContext.getScheduleBeanName(); FomSchedule fomSchedule = scheduleContext.getClass().getAnnotation(FomSchedule.class); // 通过@Bean注入的不需要处理 if(StringUtils.isEmpty(scheduleBeanName) && fomSchedule == null){ scheduleContext.setScheduleName(beanName); scheduleContext.setLogger(LoggerFactory.getLogger(scheduleContext.getClass())); return bean; } Object scheduleBean = null; if(!StringUtils.isEmpty(scheduleBeanName)){ scheduleBean = beanFactory.getBean(scheduleBeanName); } // 设置Logger fomSchedule = clazz.getAnnotation(FomSchedule.class); if(fomSchedule == null){ fomSchedule = scheduleBean.getClass().getAnnotation(FomSchedule.class); scheduleContext.setLogger(LoggerFactory.getLogger(scheduleBean.getClass())); }else{ scheduleContext.setLogger(LoggerFactory.getLogger(clazz)); } // 加载配置 ScheduleConfig scheduleConfig = scheduleContext.getScheduleConfig(); if(fomSchedule != null){ // 注解 setCronConf(scheduleConfig, fomSchedule, scheduleContext, scheduleBean); setOtherConf(scheduleConfig, fomSchedule); setValue(scheduleConfig, scheduleContext, scheduleBean); }else{ // xml配置 TODO } // 加载缓存配置(之前修改配置后持久化的文件) try { loadCache(beanName, scheduleContext); } catch (Exception e) { throw new ApplicationContextException("", e); } // 刷新配置 scheduleConfig.refresh(); // 创建代理 注册容器 Enhancer enhancer = new Enhancer(); enhancer.setSuperclass(clazz); enhancer.setCallback(new ScheduleProxy(beanName, scheduleContext, fomSchedule, scheduleBean)); Object obj = enhancer.create(); return obj; } }
- 启动
至于启动就很简单了,直接实现接口Lifecycle
委托给spring在加载之后进行启动
public class FomScheduleStarter implements SmartLifecycle, ApplicationContextAware { // ... ... @SuppressWarnings("rawtypes") @Override public void start() { String[] scheduleNames = applicationContext.getBeanNamesForType(ScheduleContext.class); for(String scheduleName : scheduleNames){ ScheduleContext<?> schedule = (ScheduleContext)applicationContext.getBean(scheduleName); ScheduleConfig config = schedule.getScheduleConfig(); if(config.getBoolean(FomSchedule.ENABLE, true)){ schedule.scheduleStart(); logger.info("load and start schedule[{}]: {}", scheduleName, schedule.getScheduleConfig().getConfMap()); }else{ logger.info("load schedule[{}]: {}", scheduleName, schedule.getScheduleConfig().getConfMap()); } } } @SuppressWarnings("rawtypes") @Override public void stop() { String[] scheduleNames = applicationContext.getBeanNamesForType(ScheduleContext.class); for(String scheduleName : scheduleNames){ ScheduleContext<?> schedule = (ScheduleContext)applicationContext.getBean(scheduleName); schedule.scheduleShutdown(); } } // ... ... }
2.2. 任务执行
除了定时线程和状态之外,每个ScheduleContext
还维护了一个私有的线程池,其对于具体的任务执行可以直接委托给线程池,其实ScheduleContext
中的状态基本就是其线程池状态的体现。
但有个问题是如何检测提交的任务在什么时候结束,尤其是提交了多个任务之后。简单一点的想法是由定时线程在提交任务时记录一个提交任务数,然后由任务线程在结束时将计数减1, 如果减完后计数为0,则表示任务全部结束。
而在实现中,会发现在提交时这个任务的实际提交数无法直接确定,暂且不考虑提交异常的问题,如果要检测任务冲突,那就只能进行遍历提交,每个任务在提交时都需要检测任务id是否已经存在并正在运行, 如果是则放弃本次任务,这样只能在每个任务实际提交成功后才能将提交任务数加1。
于是问题就变复杂了,因为在任务线程结束,并将任务计数减1的同时,定时线程可能也在提交新的任务,并将任务计数加1。
所以,首先要保证在检测任务是否全部结束时,提交任务这个动作已经结束,否则如果任务执行得足够快,可能出现后面的任务还没来得及提交,任务线程已经全部执行完并将计数降为0, 从而误以为所有的任务都已经结束了。对于这个问题,可以添加一个标识,用来表示提交任务是否结束,然后定时线程在提交结束后将这个标识置为真,这样任务线程在结束并将计数减1后,除了判断计数是否为0,还需要判断提交标识是否已经结束。
还有一个问题是在定时线程提交完任务,并将提交结束标识置为真,两个操作之间并没有同步。那么可能出现这样的场景:定时线程已经提交结束,并且任务线程全部结束了, 但在任务线程判断计数是否为0时,定时线程还没来得及将提交结束标识置为真,那么任务线程就会误以为任务还没全部结束,这样如果希望在任务全部结束时触发执行一些事件,就可能错失事件的执行时机。所以,在定时线程提交结束之后,也需要检测一下任务是否已经全部结束,这样如果任务线程错过了执行机会,可以由定时线程来作下把关。
在实现中,对于上面提及的两个变量:任务提交数和提交结束标识,以及相关的操作,都封装在CompleteLatch
中:
static class CompleteLatch<E> { // ... ... // 任务是否提交结束 private volatile boolean hasSubmitCompleted = false; // 还没有结束的任务数 private final AtomicInteger taskNotCompleted = new AtomicInteger(1); // 闭锁,等待任务全部提交并执行结束 private final CountDownLatch latch = new CountDownLatch(1); // ... ... public void submitCompleted(){ hasSubmitCompleted = true; } public boolean hasSubmitCompleted(){ return hasSubmitCompleted; } public void taskCompleted(){ latch.countDown(); } public boolean waitTaskCompleted(long taskOverTime) throws InterruptedException{ return latch.await(taskOverTime, TimeUnit.MILLISECONDS); } public void waitTaskCompleted() throws InterruptedException{ latch.await(); } public long increaseTaskNotCompleted(){ return taskNotCompleted.incrementAndGet(); } public boolean hasTaskCompleted(){ return taskNotCompleted.decrementAndGet() == 0; } public long getTaskNotCompleted(){ return taskNotCompleted.get(); } }
借助上面CompleteLatch
提供的操作,可以简要画出下图来描述一下定时线程提交任务,并等待任务结束的流程
2.3. 超时检测
上面CompleteLatch
提供了一个限时等待方法waitTaskCompleted(long taskOverTime)
,如果任务设置了超时,那么在等待任务结束时将会使用限时等待。
超时检测的实现思路并不难想,主要是借助于延时队列DelayQueue
,并通过Delayed
来包装一下任务对应的future
这样当定时线程提交结束后,首先等一个给定的超时时间overTime
,如果等完还有任务没有结束,那么获取这些任务的耗时。如果已经超时,则尝试取消;如果还没有超时,那么计算一下任务剩余的可用时间, 并重新封装成Delayed
放入延时队列。
接下来就是反复从延时队列中获取一个时间最近的任务来判断,同样的如果超时了就取消,否则重新计算剩余时间再放回队列。这样如果最后队列为空,就表示所有的任务都已经结束或者超时。要注意的是,这时并不代表任务都已真正结束,定时线程对于每个任务,检测到超时只会尝试一次取消,如果任务不响应,定时线程也没办法,可能还要通过waitTaskCompleted()
来进行最后的兜底。
省去一些不相关的代码后,具体实现可以简约如下:
private void waitTaskCompleted(CompleteLatch<E> completeLatch){ // ... ... long overTime = scheduleConfig.getTaskOverTime(); // ... ... if(completeLatch.waitTaskCompleted(overTime)){ cleanCompletedFutures(); }else{ DelayQueue<TaskDelayed> delayQueue = new DelayQueue<>(); for(TimedFuture<Result<E>> future : submitFutures){ waitTaskFuture(future, delayQueue, overTime); } while(!delayQueue.isEmpty()){ TaskDelayed taskDelayed = delayQueue.take(); waitTaskFuture(taskDelayed.getFuture(), delayQueue, overTime); } long taskNotCompleted = completeLatch.getTaskNotCompleted(); if(taskNotCompleted > 0){ logger.warn("some[{}] tasks cancel fails, which may not respond to interrupts.", taskNotCompleted); completeLatch.waitTaskCompleted(); } cleanCompletedFutures(); } // ... ... } private void waitTaskFuture(TimedFuture<Result<E>> future, DelayQueue<TaskDelayed> delayQueue, long overTime){ if(!future.isDone()) { long startTime = future.getStartTime(); if(startTime == 0){ // startTime = 0 表示任务还没启动 delayQueue.add(new TaskDelayed(future, overTime)); }else{ long cost = System.currentTimeMillis() - future.getStartTime(); if(cost >= overTime){ try{ handleCancel(future.getTaskId(), cost); }catch(Exception e){ logger.error("", e); } logger.info("cancle task[{}] due to time out, cost={}ms", future.getTaskId(), cost); future.cancel(true); }else{ delayQueue.add(new TaskDelayed(future, overTime - cost)); } } } }
2.4. 任务关闭
根据Java中的线程机制,如果想从外部取消线程,应该通过中断标识来进行通知,由目标线程自行决定在何时、以及使用何种方式结束自己。所以在关闭任务时,外部线程只做两件事,将状态置为STOPPING
,然后中断定时线程
对于外部的关闭请求,只有两个状态(RUNNING
和SLEEPING
)会进行响应处理:
public Response<Void> scheduleShutdown(){ synchronized (this) { switch(state){ // ... ... case RUNNING: case SLEEPING: state = STOPPING; scheduleThread.interrupt(); //尽快响应 if(scheduleConfig.getPool().isTerminated()){ state = STOPPED; isFirstRun = true; } logger.info("schedule[{}] will stop soon.", scheduleName); return new Response<>(Response.SUCCESS, "schedule[" + scheduleName + "] will stop soon."); // ... ... } } }
然后定时线程在执行过程借助Java Api检测中断请求,如果检测到中断,那么立即重新检查状态,如果为STOPPING
,那么进行关闭清理操作, 即shutdown
线程池,然后awaitTermination
等待线程池结束。
下面通过一段伪代码来描述ScheduleContext
中定时线程如何处理关闭操作的,即在RUNNING
和SLEEPING
状态下如何响应关闭请求
private class ScheduleThread extends Thread { @Override public void run() { while(true){ if(state == STOPPING) { terminate(); return } state = RUNNING submit and execute tasks ... try{ waitTaskCompleted ... }catch (InterruptedException e) { Thread.currentThread().interrupt(); // 保留中断请求,后面检测处理 } state = SLEEPING try { wait(waitTime); } catch (InterruptedException e) { // 响应中断:结束等待,并立即重新检测state } } } }
3. 其它
到这里,主要的实现思路基本已经说完了,下面的功能只是为了使用上更方便一些。在内置的任务界面中体现:http://{ip}:{port}/{path}/fom.html,当然,如果觉得界面不满足,或者希望将一些功能放到自己的界面中, 那么也提供了对应的接口服务FomService
,任务界面上所有的后台接口都由FomService
提供。
3.1. 关于统计
每个ScheduleContext
对各自执行的任务都会有一些统计信息,比如成功失败数、等待数、正在执行数以及结果等。这些都封装ScheduleStatistics
中,其在ScheduleContext
创建时初始化, 然后由每个Task
在结束时自行更新统计结果。
另外,也可以实现接口ResultHandler
自定义任务结果的处理,比如持久化到文件或数据库,默认就在内存中保存7
天的统计数据,方面界面做一些分析使用。
3.2. 关于配置
对于每个ScheduleContext
定时模块,也都有自己的一些配置,具体封装在ScheduleConfig
中,其内部也是委托给了ConcurrentMap
进行管理,然后对get/put
调用做了一下封装,但不支持remove
操作, 也就是说可以实时新增或修改配置,但不允许删除操作。
对于配置项的修改,会进行持久化,这样保证了重启后修改依然有效,另外,会尝试检查修改的配置是否存在于当前ScheduleContext
的一些@Value
属性中, 如果是,则会帮忙将配置值进行属性注入,注意这里只会修改当前所属ScheduleContext
的属性,不影响其他地方的配置。
此外,通过注入ScheduleService
,也可以在任务执行过程中手动进行新增或修改,然后同样能通过接口获取或修改
public interface ScheduleService { // 序列化当前schedule的配置 public void serializeCurrent(); // 序列化指定schedule配置 public void serialize(@NotBlank(message = "scheduleName cannot be empty.") String scheduleName); // 设置当前schedule的配置 public void putCurrentConfig(String key, Object value); // 设置指定schedule的配置 public void putConfig(@NotBlank(message = "scheduleName cannot be empty.") String scheduleName, String key, Object value); // 获取当前schedule的配置 public <V> V getCurrentConfig(String key); // 获取指定schedule的配置 public <V> V getConfig(@NotBlank(message = "scheduleName cannot be empty.") String scheduleName, String key); }
3.3. 关于日志
对于日志,通过slf4j
进行创建,这样就不依赖于具体的日志实现,比如log4j
、log4j2
、或logback
,然后复用了下spring-boot-starter-actuator
中的日志监控模块,实现了任务日志级别实时修改的功能, 并在原来的基础上增加了对log4j
的适配。
另外,对于每个定时任务的日志,默认会使用标识了@FomSchedule
的目标类来进行初始化,如2.1中所示:
scheduleContext.setLogger(LoggerFactory.getLogger(scheduleBean.getClass()));
这样就可以将任务上下文的日志,与目标类中具体任务实现过程中的日志打到一起了。