这里可以说已经把Spring怎么发现Task、执行Task的流程讲解通了。这里忽略了一个重要的类:ScheduledTaskRegistrar,它整体作为一个注册中心的角色,非常的重要,下面讲解它:
ScheduledTaskRegistrar
ScheduledTask注册中心,ScheduledTaskHolder接口的一个重要的实现类,维护了程序中所有配置的ScheduledTask。
它是ScheduledAnnotationBeanPostProcessor的一个重要角色。
//@since 3.0 它在Spring3.0就有了 // 这里面又有一个重要的接口:我们可议通过扩展实现此接口,来定制化属于自己的ScheduledTaskRegistrar 下文会有详细介绍 // 它实现了InitializingBean和DisposableBean,所以我们也可以把它放进容器里面 public class ScheduledTaskRegistrar implements ScheduledTaskHolder, InitializingBean, DisposableBean { // 任务调度器 @Nullable private TaskScheduler taskScheduler; // 该类事JUC包中的类 @Nullable private ScheduledExecutorService localExecutor; // 对任务进行分类 管理 @Nullable private List<TriggerTask> triggerTasks; @Nullable private List<CronTask> cronTasks; @Nullable private List<IntervalTask> fixedRateTasks; @Nullable private List<IntervalTask> fixedDelayTasks; private final Map<Task, ScheduledTask> unresolvedTasks = new HashMap<>(16); private final Set<ScheduledTask> scheduledTasks = new LinkedHashSet<>(16); // 调用者可议自己指定一个TaskScheduler public void setTaskScheduler(TaskScheduler taskScheduler) { Assert.notNull(taskScheduler, "TaskScheduler must not be null"); this.taskScheduler = taskScheduler; } // 这里,如果你指定的是个TaskScheduler、ScheduledExecutorService都是阔仪得 // ConcurrentTaskScheduler也是一个TaskScheduler的实现类 public void setScheduler(@Nullable Object scheduler) { if (scheduler == null) { this.taskScheduler = null; } else if (scheduler instanceof TaskScheduler) { this.taskScheduler = (TaskScheduler) scheduler; } else if (scheduler instanceof ScheduledExecutorService) { this.taskScheduler = new ConcurrentTaskScheduler(((ScheduledExecutorService) scheduler)); } else { throw new IllegalArgumentException("Unsupported scheduler type: " + scheduler.getClass()); } } @Nullable public TaskScheduler getScheduler() { return this.taskScheduler; } // 将触发的任务指定为可运行文件(任务)和触发器对象的映射 typically custom implementations of the {@link Trigger} interface // org.springframework.scheduling.Trigger public void setTriggerTasks(Map<Runnable, Trigger> triggerTasks) { this.triggerTasks = new ArrayList<>(); triggerTasks.forEach((task, trigger) -> addTriggerTask(new TriggerTask(task, trigger))); } // 主要处理` <task:*>`这种配置 public void setTriggerTasksList(List<TriggerTask> triggerTasks) { this.triggerTasks = triggerTasks; } public List<TriggerTask> getTriggerTaskList() { return (this.triggerTasks != null? Collections.unmodifiableList(this.triggerTasks) : Collections.emptyList()); } // 这个一般是最常用的 CronTrigger:Trigger的一个实现类。另一个实现类为PeriodicTrigger public void setCronTasks(Map<Runnable, String> cronTasks) { this.cronTasks = new ArrayList<>(); cronTasks.forEach(this::addCronTask); } public void setCronTasksList(List<CronTask> cronTasks) { this.cronTasks = cronTasks; } public List<CronTask> getCronTaskList() { return (this.cronTasks != null ? Collections.unmodifiableList(this.cronTasks) : Collections.emptyList()); } ... // 判断是否还有任务 public boolean hasTasks() { return (!CollectionUtils.isEmpty(this.triggerTasks) || !CollectionUtils.isEmpty(this.cronTasks) || !CollectionUtils.isEmpty(this.fixedRateTasks) || !CollectionUtils.isEmpty(this.fixedDelayTasks)); } /** * Calls {@link #scheduleTasks()} at bean construction time. */ // 这个方法很重要:开始执行所有已经注册的任务们~~ @Override public void afterPropertiesSet() { scheduleTasks(); } @SuppressWarnings("deprecation") protected void scheduleTasks() { // 这一步非常重要:如果我们没有指定taskScheduler ,这里面会new一个newSingleThreadScheduledExecutor // 显然它并不是是一个真的线程池,所以他所有的任务还是得一个一个的One by One的执行的 请务必注意啊~~~~ // 默认是它:Executors.newSingleThreadScheduledExecutor() 所以肯定串行啊 if (this.taskScheduler == null) { this.localExecutor = Executors.newSingleThreadScheduledExecutor(); this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor); } // 加下来做得事,就是借助TaskScheduler来启动每个任务 // 并且把启动了的任务最终保存到scheduledTasks里面~~~ 后面还会介绍TaskScheduler的两个实现 if (this.triggerTasks != null) { for (TriggerTask task : this.triggerTasks) { addScheduledTask(scheduleTriggerTask(task)); } } if (this.cronTasks != null) { for (CronTask task : this.cronTasks) { addScheduledTask(scheduleCronTask(task)); } } if (this.fixedRateTasks != null) { for (IntervalTask task : this.fixedRateTasks) { addScheduledTask(scheduleFixedRateTask(task)); } } if (this.fixedDelayTasks != null) { for (IntervalTask task : this.fixedDelayTasks) { addScheduledTask(scheduleFixedDelayTask(task)); } } } private void addScheduledTask(@Nullable ScheduledTask task) { if (task != null) { this.scheduledTasks.add(task); } } @Override public Set<ScheduledTask> getScheduledTasks() { return Collections.unmodifiableSet(this.scheduledTasks); } // 销毁: task.cancel()取消所有的任务 调用的是Future.cancel() // 并且关闭线程池localExecutor.shutdownNow() @Override public void destroy() { for (ScheduledTask task : this.scheduledTasks) { task.cancel(); } if (this.localExecutor != null) { this.localExecutor.shutdownNow(); } } }
展示默认情况下任务不能并行的Demo
首先,采用全默认配置执行定时器:
@Scheduled(cron = "0/2 * * * * ?") public void fun1(){ System.out.println(Thread.currentThread().getName()); // 模拟任务内抛出异常~~~~ throw new RuntimeException("aaaaaaaa"); } pool-2-thread-1_61 pool-2-thread-1_61 pool-2-thread-1_61 pool-2-thread-1_61 ...
从这里可以得出结论:默认情况下使用的使用的是线程池但是因为只有一个线程,所以使用的都是它。但是需要注意的是,虽然是同一个线程但是即使内部抛出了异常,线程也不会终止的,所以请放心。原因如下:
DelegatingErrorHandlingRunnable#run
public class DelegatingErrorHandlingRunnable implements Runnable { ... @Override public void run() { try { this.delegate.run(); } catch (UndeclaredThrowableException ex) { this.errorHandler.handleError(ex.getUndeclaredThrowable()); } catch (Throwable ex) { this.errorHandler.handleError(ex); } } ... }
人家都给你try住了,并且还交给了你自定义的errorHandler去处理,可谓还是非常友好的~
让任务并行执行?
不解释了,就是自定义配置一个线程池给它,提高执行效率。具体请参照下面有标准使用方式~~~
备注:SpringBoot本身默认的执行方式是串行执行,无论有多少task,都是一个线程串行执行。若你所想提高效率,请提供线程池
@Scheduled注解各属性含义
参考:@Scheduled注解各参数详解
Quartz和Spring schedule简单对比
Quartz是个著名的、强大的、开源的任务调度框架。下面来几点非常简单的对比,供各位参考:
备注:这里只谈单机版的,不说分布式的,在分布式中有其他的框架可以解决,同时quartz也是可以支持分布式的。
- schedule:支持cron、不支持持久化、开发非常非常非常简单
- quartz:支持cron、支持持久化、开发复杂
虽然quartz看起来更强大些、也支持分布式。但是,但是,但是和@Schedule简单的开发步骤,如果你只是简单的任务,完全用Spring的就可以了。 若是分布式环境,我们一般也不会直接使用quartz,而是选择其它更适合的产品(虽然它有可能是基于quartz定制的~~~)
我们可以把@Schedule看做一个轻量级的Quartz,但它使用起来极其方便
使用推荐配置
默认的,SchedulingConfigurer 使用的也是单线程的方式,如果需要配置多线程,则需要指定 PoolSize
@EnableScheduling @Configuration public class ScheduldConfig implements SchedulingConfigurer { // 下面是推荐的配置,当然你也可以简单粗暴的使用它: 开启100个核心线程 足够用了吧 // taskRegistrar.setScheduler(Executors.newScheduledThreadPool(100)); @Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { // 方式一:可议直接使用一个TaskScheduler 然后设置上poolSize等参数即可 (推荐) //ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); //taskScheduler.setPoolSize(10); //taskScheduler.initialize(); // 记得调用啊 //taskRegistrar.setTaskScheduler(taskScheduler); // 方式二:ScheduledExecutorService(使用ScheduledThreadPoolExecutor,它继承自JUC的ThreadPoolExecutor,是一个和任务调度相关的线程池) ScheduledThreadPoolExecutor poolExecutor = new ScheduledThreadPoolExecutor(10); taskRegistrar.setScheduler(poolExecutor); // =========示例:这里,我们也可以注册一个任务,一直执行的~~~========= taskRegistrar.addFixedRateTask(() -> System.out.println("执行定时任务1: " + new Date()), 1000); } }
其实这里给了我们打开了一个思路。通过这我们可以捕获到ScheduledTaskRegistrar,从而我们可以通过接口动态的去改变任务的执行时间、以及对任务的增加、删、改、查等操作,有兴趣的小伙伴可以动手试试
总结
Task在平时业务开发中确实使用非常的广泛,但在分布式环境下,其实已经很少使用Spring自带的定时器了,而使用分布式任务调度框架:Elastic-job、xxl-job等
另外说几点使用细节:
- 标注@Scheduled注解的方法必须无入数
- cron、fixedDelay、fixedRate注解属性必须至少一个
- 若在分布式环境(或者集群环境)直接使用Spring的Scheduled,请使用分布式锁或者保证任务的幂等
网上有一个谣言:说@Schedule若发生异常后,后面该任务就死掉了不会再执行了。如果看了本文分析的原理,显然就不攻自破了。结论:抛出异常后任务还是会执行的