【小家Spring】Spring任务调度@Scheduled的使用以及原理、源码分析(@EnableScheduling)(下)

简介: 【小家Spring】Spring任务调度@Scheduled的使用以及原理、源码分析(@EnableScheduling)(下)

这里可以说已经把Spring怎么发现Task、执行Task的流程讲解通了。这里忽略了一个重要的类:ScheduledTaskRegistrar,它整体作为一个注册中心的角色,非常的重要,下面讲解它:


ScheduledTaskRegistrar


ScheduledTask注册中心,ScheduledTaskHolder接口的一个重要的实现类,维护了程序中所有配置的ScheduledTask。


它是ScheduledAnnotationBeanPostProcessor的一个重要角色。

//@since 3.0 它在Spring3.0就有了
// 这里面又有一个重要的接口:我们可议通过扩展实现此接口,来定制化属于自己的ScheduledTaskRegistrar 下文会有详细介绍
// 它实现了InitializingBean和DisposableBean,所以我们也可以把它放进容器里面
public class ScheduledTaskRegistrar implements ScheduledTaskHolder, InitializingBean, DisposableBean {
  // 任务调度器
  @Nullable
  private TaskScheduler taskScheduler;
  // 该类事JUC包中的类
  @Nullable
  private ScheduledExecutorService localExecutor;
  // 对任务进行分类 管理
  @Nullable
  private List<TriggerTask> triggerTasks;
  @Nullable
  private List<CronTask> cronTasks;
  @Nullable
  private List<IntervalTask> fixedRateTasks;
  @Nullable
  private List<IntervalTask> fixedDelayTasks;
  private final Map<Task, ScheduledTask> unresolvedTasks = new HashMap<>(16);
  private final Set<ScheduledTask> scheduledTasks = new LinkedHashSet<>(16);
  // 调用者可议自己指定一个TaskScheduler 
  public void setTaskScheduler(TaskScheduler taskScheduler) {
    Assert.notNull(taskScheduler, "TaskScheduler must not be null");
    this.taskScheduler = taskScheduler;
  }
  // 这里,如果你指定的是个TaskScheduler、ScheduledExecutorService都是阔仪得
  // ConcurrentTaskScheduler也是一个TaskScheduler的实现类
  public void setScheduler(@Nullable Object scheduler) {
    if (scheduler == null) {
      this.taskScheduler = null;
    }
    else if (scheduler instanceof TaskScheduler) {
      this.taskScheduler = (TaskScheduler) scheduler;
    }
    else if (scheduler instanceof ScheduledExecutorService) {
      this.taskScheduler = new ConcurrentTaskScheduler(((ScheduledExecutorService) scheduler));
    }
    else {
      throw new IllegalArgumentException("Unsupported scheduler type: " + scheduler.getClass());
    }
  }
  @Nullable
  public TaskScheduler getScheduler() {
    return this.taskScheduler;
  }
  // 将触发的任务指定为可运行文件(任务)和触发器对象的映射  typically custom implementations of the {@link Trigger} interface
  // org.springframework.scheduling.Trigger
  public void setTriggerTasks(Map<Runnable, Trigger> triggerTasks) {
    this.triggerTasks = new ArrayList<>();
    triggerTasks.forEach((task, trigger) -> addTriggerTask(new TriggerTask(task, trigger)));
  }
  // 主要处理` <task:*>`这种配置
  public void setTriggerTasksList(List<TriggerTask> triggerTasks) {
    this.triggerTasks = triggerTasks;
  }
  public List<TriggerTask> getTriggerTaskList() {
    return (this.triggerTasks != null? Collections.unmodifiableList(this.triggerTasks) :
        Collections.emptyList());
  }
  // 这个一般是最常用的 CronTrigger:Trigger的一个实现类。另一个实现类为PeriodicTrigger
  public void setCronTasks(Map<Runnable, String> cronTasks) {
    this.cronTasks = new ArrayList<>();
    cronTasks.forEach(this::addCronTask);
  }
  public void setCronTasksList(List<CronTask> cronTasks) {
    this.cronTasks = cronTasks;
  }
  public List<CronTask> getCronTaskList() {
    return (this.cronTasks != null ? Collections.unmodifiableList(this.cronTasks) :
        Collections.emptyList());
  }
  ...
  // 判断是否还有任务
  public boolean hasTasks() {
    return (!CollectionUtils.isEmpty(this.triggerTasks) ||
        !CollectionUtils.isEmpty(this.cronTasks) ||
        !CollectionUtils.isEmpty(this.fixedRateTasks) ||
        !CollectionUtils.isEmpty(this.fixedDelayTasks));
  }
  /**
   * Calls {@link #scheduleTasks()} at bean construction time.
   */
  // 这个方法很重要:开始执行所有已经注册的任务们~~
  @Override
  public void afterPropertiesSet() {
    scheduleTasks();
  }
  @SuppressWarnings("deprecation")
  protected void scheduleTasks() {
    // 这一步非常重要:如果我们没有指定taskScheduler ,这里面会new一个newSingleThreadScheduledExecutor
    // 显然它并不是是一个真的线程池,所以他所有的任务还是得一个一个的One by One的执行的  请务必注意啊~~~~
    // 默认是它:Executors.newSingleThreadScheduledExecutor() 所以肯定串行啊
    if (this.taskScheduler == null) {
      this.localExecutor = Executors.newSingleThreadScheduledExecutor();
      this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
    }
    // 加下来做得事,就是借助TaskScheduler来启动每个任务
    // 并且把启动了的任务最终保存到scheduledTasks里面~~~ 后面还会介绍TaskScheduler的两个实现
    if (this.triggerTasks != null) {
      for (TriggerTask task : this.triggerTasks) {
        addScheduledTask(scheduleTriggerTask(task));
      }
    }
    if (this.cronTasks != null) {
      for (CronTask task : this.cronTasks) {
        addScheduledTask(scheduleCronTask(task));
      }
    }
    if (this.fixedRateTasks != null) {
      for (IntervalTask task : this.fixedRateTasks) {
        addScheduledTask(scheduleFixedRateTask(task));
      }
    }
    if (this.fixedDelayTasks != null) {
      for (IntervalTask task : this.fixedDelayTasks) {
        addScheduledTask(scheduleFixedDelayTask(task));
      }
    }
  }
  private void addScheduledTask(@Nullable ScheduledTask task) {
    if (task != null) {
      this.scheduledTasks.add(task);
    }
  }
  @Override
  public Set<ScheduledTask> getScheduledTasks() {
    return Collections.unmodifiableSet(this.scheduledTasks);
  }
  // 销毁: task.cancel()取消所有的任务  调用的是Future.cancel()
  // 并且关闭线程池localExecutor.shutdownNow()
  @Override
  public void destroy() {
    for (ScheduledTask task : this.scheduledTasks) {
      task.cancel();
    }
    if (this.localExecutor != null) {
      this.localExecutor.shutdownNow();
    }
  }
}


展示默认情况下任务不能并行的Demo

首先,采用全默认配置执行定时器:


    @Scheduled(cron = "0/2 * * * * ?")
    public void fun1(){
        System.out.println(Thread.currentThread().getName());
        // 模拟任务内抛出异常~~~~
        throw new RuntimeException("aaaaaaaa");
    }
pool-2-thread-1_61
pool-2-thread-1_61
pool-2-thread-1_61
pool-2-thread-1_61
...


从这里可以得出结论:默认情况下使用的使用的是线程池但是因为只有一个线程,所以使用的都是它。但是需要注意的是,虽然是同一个线程但是即使内部抛出了异常,线程也不会终止的,所以请放心。原因如下:

DelegatingErrorHandlingRunnable#run


public class DelegatingErrorHandlingRunnable implements Runnable {
  ...
  @Override
  public void run() {
    try {
      this.delegate.run();
    }
    catch (UndeclaredThrowableException ex) {
      this.errorHandler.handleError(ex.getUndeclaredThrowable());
    }
    catch (Throwable ex) {
      this.errorHandler.handleError(ex);
    }
  }
  ... 
}


人家都给你try住了,并且还交给了你自定义的errorHandler去处理,可谓还是非常友好的~


让任务并行执行?


不解释了,就是自定义配置一个线程池给它,提高执行效率。具体请参照下面有标准使用方式~~~


备注:SpringBoot本身默认的执行方式是串行执行,无论有多少task,都是一个线程串行执行。若你所想提高效率,请提供线程池


@Scheduled注解各属性含义


参考:@Scheduled注解各参数详解


Quartz和Spring schedule简单对比


Quartz是个著名的、强大的、开源的任务调度框架。下面来几点非常简单的对比,供各位参考:


备注:这里只谈单机版的,不说分布式的,在分布式中有其他的框架可以解决,同时quartz也是可以支持分布式的。


  • schedule:支持cron、不支持持久化、开发非常非常非常简单
  • quartz:支持cron、支持持久化、开发复杂

虽然quartz看起来更强大些、也支持分布式。但是,但是,但是和@Schedule简单的开发步骤,如果你只是简单的任务,完全用Spring的就可以了。 若是分布式环境,我们一般也不会直接使用quartz,而是选择其它更适合的产品(虽然它有可能是基于quartz定制的~~~)

我们可以把@Schedule看做一个轻量级的Quartz,但它使用起来极其方便


使用推荐配置

默认的,SchedulingConfigurer 使用的也是单线程的方式,如果需要配置多线程,则需要指定 PoolSize

@EnableScheduling
@Configuration
public class ScheduldConfig implements SchedulingConfigurer {
  // 下面是推荐的配置,当然你也可以简单粗暴的使用它:  开启100个核心线程  足够用了吧
  // taskRegistrar.setScheduler(Executors.newScheduledThreadPool(100));
    @Override
    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
        // 方式一:可议直接使用一个TaskScheduler  然后设置上poolSize等参数即可  (推荐)
        //ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        //taskScheduler.setPoolSize(10);
        //taskScheduler.initialize(); // 记得调用啊
        //taskRegistrar.setTaskScheduler(taskScheduler);
        // 方式二:ScheduledExecutorService(使用ScheduledThreadPoolExecutor,它继承自JUC的ThreadPoolExecutor,是一个和任务调度相关的线程池)
        ScheduledThreadPoolExecutor poolExecutor = new ScheduledThreadPoolExecutor(10);
        taskRegistrar.setScheduler(poolExecutor);
        // =========示例:这里,我们也可以注册一个任务,一直执行的~~~=========
        taskRegistrar.addFixedRateTask(() -> System.out.println("执行定时任务1: " + new Date()), 1000);
    }
}



其实这里给了我们打开了一个思路。通过这我们可以捕获到ScheduledTaskRegistrar,从而我们可以通过接口动态的去改变任务的执行时间、以及对任务的增加、删、改、查等操作,有兴趣的小伙伴可以动手试试


总结


Task在平时业务开发中确实使用非常的广泛,但在分布式环境下,其实已经很少使用Spring自带的定时器了,而使用分布式任务调度框架:Elastic-job、xxl-job等


另外说几点使用细节:


  1. 标注@Scheduled注解的方法必须无入数
  2. cron、fixedDelay、fixedRate注解属性必须至少一个
  3. 若在分布式环境(或者集群环境)直接使用Spring的Scheduled,请使用分布式锁或者保证任务的幂等


网上有一个谣言:说@Schedule若发生异常后,后面该任务就死掉了不会再执行了。如果看了本文分析的原理,显然就不攻自破了。结论:抛出异常后任务还是会执行的


相关文章
|
1月前
|
安全 Java Spring
Spring之Aop的底层原理
Spring之Aop的底层原理
|
2月前
|
设计模式 前端开发 Java
【深入浅出Spring原理及实战】「夯实基础系列」360全方位渗透和探究SpringMVC的核心原理和运作机制(总体框架原理篇)
【深入浅出Spring原理及实战】「夯实基础系列」360全方位渗透和探究SpringMVC的核心原理和运作机制(总体框架原理篇)
34 0
|
2月前
|
XML Java Shell
【深入浅出Spring原理及实战】「夯实基础系列」360全方位渗透和探究Spring的核心注解开发和实现指南(Spring5的常见的注解)
【深入浅出Spring原理及实战】「夯实基础系列」360全方位渗透和探究Spring的核心注解开发和实现指南(Spring5的常见的注解)
21 1
|
2月前
|
安全 Java 数据安全/隐私保护
【深入浅出Spring原理及实战】「EL表达式开发系列」深入解析SpringEL表达式理论详解与实际应用
【深入浅出Spring原理及实战】「EL表达式开发系列」深入解析SpringEL表达式理论详解与实际应用
79 1
|
2月前
|
XML 存储 缓存
【深入浅出Spring原理及实战】「缓存Cache开发系列」带你深入分析Spring所提供的缓存Cache管理器的实战开发指南(修正篇)
【深入浅出Spring原理及实战】「缓存Cache开发系列」带你深入分析Spring所提供的缓存Cache管理器的实战开发指南(修正篇)
35 0
|
2天前
|
监控 安全 Java
Spring cloud原理详解
Spring cloud原理详解
13 0
|
8天前
|
Java 开发者 微服务
Spring Cloud原理详解
【5月更文挑战第4天】Spring Cloud是Spring生态系统中的微服务框架,包含配置管理、服务发现、断路器、API网关等工具,简化分布式系统开发。核心组件如Eureka(服务发现)、Config Server(配置中心)、Ribbon(负载均衡)、Hystrix(断路器)、Zuul(API网关)等。本文讨论了Spring Cloud的基本概念、核心组件、常见问题及解决策略,并提供代码示例,帮助开发者更好地理解和实践微服务架构。此外,还涵盖了服务通信方式、安全性、性能优化、自动化部署、服务网格和无服务器架构的融合等话题,揭示了微服务架构的未来趋势。
32 6
|
12天前
|
负载均衡 Java 开发者
Spring Cloud:一文读懂其原理与架构
Spring Cloud 是一套微服务解决方案,它整合了Netflix公司的多个开源框架,简化了分布式系统开发。Spring Cloud 提供了服务注册与发现、配置中心、消息总线、负载均衡、熔断机制等工具,让开发者可以快速地构建一些常见的微服务架构。
|
18天前
|
安全 Java API
Spring工厂API与原理
Spring工厂API与原理
35 10
|
2月前
|
缓存 NoSQL Java
【深入浅出Spring原理及实战】「缓存Cache开发系列」带你深入分析Spring所提供的缓存Cache功能的开发实战指南(二)
【深入浅出Spring原理及实战】「缓存Cache开发系列」带你深入分析Spring所提供的缓存Cache功能的开发实战指南
37 0