SpringBoot-ElasticJob封装快速上手使用(分布式定时器)

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: SpringBoot-ElasticJob封装快速上手使用(分布式定时器)

elastic-job-spring-boot

qq交流群:812321371

1 简介

Elastic-Job是一个分布式调度解决方案,由两个相互独立的子项目Elastic-Job-LiteElastic-Job-Cloud组成。Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务。

基于quartz定时任务框架为基础的,因此具备quartz的大部分功能

使用zookeeper做协调,调度中心,更加轻量级

支持任务的分片

支持弹性扩容,可以水平扩展, 当任务再次运行时,会检查当前的服务器数量,重新分片,分片结束之后才会继续执行任务

失效转移,容错处理,当一台调度服务器宕机或者跟zookeeper断开连接之后,会立即停止作业,然后再去寻找其他空闲的调度服务器,来运行剩余的任务

提供运维界面,可以管理作业和注册中心。

1.1 使用场景

由于项目为微服务,单模块可能在两个实例以上的数量,定时器就会出现多实例同时执行的情况。

一般定时器缺少管理界面,无法监控定时器是否执行成功。

市面上常见的解决方案为定时器加锁的操作,或者采用第3方分布式定时器。

分布式定时器有多种方案,比如阿里内部的ScheduledX,当当网的Elastic job,个人开源的xxl-job等。

1.2 功能列表

  • 分布式调度协调
  • 弹性扩容缩容
  • 失效转移
  • 错过执行作业重触发
  • 作业分片一致性,保证同一分片在分布式环境中仅一个执行实例
  • 自诊断并修复分布式不稳定造成的问题
  • 支持并行调度
  • 支持作业生命周期操作
  • 丰富的作业类型
  • Spring整合以及命名空间提供
  • 运维平台

1.3 概念

分片:任务的分布式执行,需要将一个任务拆分为多个独立的任务项,然后由分布式的服务器分别执行某一个或几个分片项。 例如:有一个遍历 数据库某张表的作业,现有2台服务器。为了快速的执行作业,那么每台服务器应执行作业的50%。 为满足此需求,可将作业分成2片,每台服务器执行1片。作业遍历数据的逻辑应为:服务器A遍历ID以奇数结尾的数据;服务器B遍历ID以偶数结尾的数据。 如果分成10片,则作业遍历数据的逻辑应为:每片分到的分片项应为ID%10,而服务器A被分配到分片项0,1,2,3,4;服务器B被分配到分片项5,6,7,8,9,直接的结果就是服务器A遍历ID以0-4结尾的数据;服务器B遍历ID以5-9结尾的数据。

历史轨迹:Elastic-Job提供了事件追踪功能,可通过事件订阅的方式处理调度过程的重要事件,用于查询、统计和监控。

1.4 封装elasticjob

由于当当网Elastic job处于1年间未更新阶段,相关jar处于可以使用阶段功能不全。考虑到使用场景为多项目使用,将elastic-job-lite-spring简单封装便于使用。

2.使用说明:

2.1 添加依赖

ps:实际version版本请使用最新版

<dependency>
  <groupId>com.purgeteam</groupId>
  <artifactId>elasticjob-spring-boot-starter</artifactId>
  <version>0.1.1.RELEASE</version>
</dependency>

2.2 配置

ps: 需要mysql,zookeeper支持,请提前搭建好。

配置bootstrap.yml或者application.yml

加入以下配置:

spring:
  elasticjob:
    datasource: # job需要的记录数据源
      url: jdbc:mysql://127.0.0.1:3306/batch_log?useUnicode=true&characterEncoding=utf-8&verifyServerCertificate=false&useSSL=false&requireSSL=false
      driver-class-name: com.mysql.cj.jdbc.Driver
      username: root
      password: Rtqw123OpnmER
    regCenter: # 注册中心
      serverList: 127.0.0.1:2181
      namespace: elasticJobDemo

2.3 定时器实现方法编写

创建定时器类(唯一不同的地方在于将@Scheduled改为实现SimpleJob接口即可)

定时器实现方法编写在execute方法里。

@Slf4j
@Component
public class MySimpleJob implements SimpleJob {
 
    //  @Scheduled(cron = "0 0/1 * * * ?")
    @Override
    public void execute(ShardingContext shardingContext) {
        log.info(String.format("Thread ID: %s, 作业分片总数: %s, " +
                        "当前分片项: %s.当前参数: %s," +
                        "作业名称: %s.作业自定义参数: %s",
                Thread.currentThread().getId(),
                shardingContext.getShardingTotalCount(),
                shardingContext.getShardingItem(),
                shardingContext.getShardingParameter(),
                shardingContext.getJobName(),
                shardingContext.getJobParameter()
        ));
        // 分片大致如下:根据配置的分片参数执行相应的逻辑
        switch (context.getShardingItem()) {
            case 0: 
                // do something by sharding item 0
                break;
            case 1: 
                // do something by sharding item 1
                break;
            case 2: 
                // do something by sharding item 2
                break;
            // case n: ...
        }
    }
}
log:Thread ID: 66, 作业分片总数: 1, 当前分片项: 0.当前参数: Beijing,作业名称: PropertiesSimpleJob.作业自定义参数: test

2.4 配置定时器

2.4.1 创建Configuration类

ZookeeperRegistryCenter和JobEventConfiguration注入。 创建JobScheduler @Bean(initMethod = "init")。 在mySimpleJobScheduler方法里先通过ElasticJobUtils#getLiteJobConfiguration获取LiteJobConfiguration对象。 创建SpringJobScheduler对象返回即可。

@Configuration
public class MyJobConfig {
 
    // job 名称
    private static final String JOB_NAME = "MySimpleJob";
 
    // 定时器cron参数
    private static final String CRON = "0 0/1 * * * ?";
 
    // 定时器分片
    private static final int SHARDING_TOTAL_COUNT = 1;
 
    // 分片参数
    private static final String SHARDING_ITEM_PARAMETERS = "0=Beijing,1=Shanghai,2=Guangzhou";
 
    // 自定义参数
    private static final String JOB_PARAMETERS = "parameter";
 
    @Resource
    private ZookeeperRegistryCenter regCenter;
 
    @Resource
    private JobEventConfiguration jobEventConfiguration;
 
 
    @Bean(initMethod = "init")
    public JobScheduler mySimpleJobScheduler(final MySimpleJob mySimpleJob) {
 
        LiteJobConfiguration liteJobConfiguration = ElasticJobUtils
                .getLiteJobConfiguration(mySimpleJob.getClass(), JOB_NAME, CRON,
                        SHARDING_TOTAL_COUNT, SHARDING_ITEM_PARAMETERS, JOB_PARAMETERS);
        // 参数:1.定时器实例,2.注册中心类,3.LiteJobConfiguration,
        //     3.历史轨迹(不需要可以省略)
        return new SpringJobScheduler(mySimpleJob, regCenter, liteJobConfiguration, jobEventConfiguration);
    }
 
}

ElasticJobUtils#getLiteJobConfiguration参数简介:

/**
     * 获取 {@link LiteJobConfiguration} 对象
     *
     * @param jobClass               定时器实现类
     * @param jobName                定时器名称
     * @param cron                   定时参数
     * @param shardingTotalCount     作业分片总数
     * @param shardingItemParameters 当前参数 可以为null
     * @param jobParameters          作业自定义参数 可以为null
     * @return {@link LiteJobConfiguration}
     */
  public static LiteJobConfiguration getLiteJobConfiguration(
      final Class<? extends SimpleJob> jobClass,
      final String jobName,
      final String cron,
      final int shardingTotalCount,
      final String shardingItemParameters,
      final String jobParameters) {
  ...
    return ...;
  }
2.4.2 简化Configuration类

当然也可以用下面的@Configuration实现简化,配置bootstrap.yml或者application.yml

spring:
  elasticjob:
    scheduled:
      jobConfigMap: // 为map集合
        PropertiesSimpleJob: // 定时器key名称
          jobName: PropertiesSimpleJob // job名称
          cron: 0 0/1 * * * ? // cron表达式
          shardingTotalCount: 2 // 分片数量
          shardingItemParameters: 0=123,1=332 // 分片参数
          jobParameters: test // 自定义参数

注入SpringJobSchedulerFactory,在propertiesSimpleJobScheduler方法里调用gerSpringJobScheduler方法即可。

@Configuration
public class PropertiesSimpleJobConfig {
 
    @Resource
    private SpringJobSchedulerFactory springJobSchedulerFactory;
 
    @Bean(initMethod = "init")
    public JobScheduler propertiesSimpleJobScheduler(final PropertiesSimpleJob job) {
        // 参数:1.定时器实例,2.配置名称,3.是否开启历史轨迹
        return springJobSchedulerFactory.getSpringJobScheduler(job,"PropertiesSimpleJob", true);
    }
 
}
2.4.3 注解方式配置(推荐方式)

ps:这个注解包含了上述方式,简化定时器注入。

继承SimpleJob实现方法execute

AnnotationSimpleJob类上加入注解@ElasticJobScheduler即可。

下面为完整注解。

@Slf4j
@ElasticJobScheduler(
        name = "AnnotationSimpleJob", // 定时器名称
        cron = "0/8 * * * * ?", // 定时器表达式
        shardingTotalCount = 1, // 作业分片总数 默认为1
        shardingItemParameters = "0=Beijing,1=Shanghai,2=Guangzhou",  // 分片序列号和参数用等号分隔 不需要参数可以不加
        jobParameters = "123", // 作业自定义参数 不需要参数可以不加
        isEvent = true // 是否开启数据记录 默认为true
)
public class AnnotationSimpleJob implements SimpleJob {
 
    @Override
    public void execute(ShardingContext shardingContext) {
        log.info(String.format("Thread ID: %s, 作业分片总数: %s, " +
                        "当前分片项: %s.当前参数: %s," +
                        "作业名称: %s.作业自定义参数: %s",
                Thread.currentThread().getId(),
                shardingContext.getShardingTotalCount(),
                shardingContext.getShardingItem(),
                shardingContext.getShardingParameter(),
                shardingContext.getJobName(),
                shardingContext.getJobParameter()
        ));
    }
}

总结

分布式job可以解决多个项目同一个定时器都执行的问题,配合elastic-job控制台可以直观监控定时器执行情况等。

示例代码地址:elastic-job-spring-boot

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
4月前
|
缓存 NoSQL Java
SpringBoot整合Redis、以及缓存穿透、缓存雪崩、缓存击穿的理解分布式情况下如何添加分布式锁 【续篇】
这篇文章是关于如何在SpringBoot应用中整合Redis并处理分布式场景下的缓存问题,包括缓存穿透、缓存雪崩和缓存击穿。文章详细讨论了在分布式情况下如何添加分布式锁来解决缓存击穿问题,提供了加锁和解锁的实现过程,并展示了使用JMeter进行压力测试来验证锁机制有效性的方法。
SpringBoot整合Redis、以及缓存穿透、缓存雪崩、缓存击穿的理解分布式情况下如何添加分布式锁 【续篇】
|
23天前
|
缓存 NoSQL Java
Spring Boot中的分布式缓存方案
Spring Boot提供了简便的方式来集成和使用分布式缓存。通过Redis和Memcached等缓存方案,可以显著提升应用的性能和扩展性。合理配置和优化缓存策略,可以有效避免常见的缓存问题,保证系统的稳定性和高效运行。
41 3
|
1月前
|
存储 Java 关系型数据库
在Spring Boot中整合Seata框架实现分布式事务
可以在 Spring Boot 中成功整合 Seata 框架,实现分布式事务的管理和处理。在实际应用中,还需要根据具体的业务需求和技术架构进行进一步的优化和调整。同时,要注意处理各种可能出现的问题,以保障分布式事务的顺利执行。
51 6
|
1月前
|
Java
springboot将list封装成csv文件
springboot将list封装成csv文件
39 4
|
2月前
|
Java API Spring
springBoot:注解&封装类&异常类&登录实现类 (八)
本文介绍了Spring Boot项目中的一些关键代码片段,包括使用`@PathVariable`绑定路径参数、创建封装类Result和异常处理类GlobalException、定义常量接口Constants、自定义异常ServiceException以及实现用户登录功能。通过这些代码,展示了如何构建RESTful API,处理请求参数,统一返回结果格式,以及全局异常处理等核心功能。
|
4月前
|
前端开发 小程序 Java
【规范】SpringBoot接口返回结果及异常统一处理,这样封装才优雅
本文详细介绍了如何在SpringBoot项目中统一处理接口返回结果及全局异常。首先,通过封装`ResponseResult`类,实现了接口返回结果的规范化,包括状态码、状态信息、返回信息和数据等字段,提供了多种成功和失败的返回方法。其次,利用`@RestControllerAdvice`和`@ExceptionHandler`注解配置全局异常处理,捕获并友好地处理各种异常信息。
2036 0
【规范】SpringBoot接口返回结果及异常统一处理,这样封装才优雅
|
4月前
|
缓存 NoSQL Java
SpringBoot整合Redis、以及缓存穿透、缓存雪崩、缓存击穿的理解、如何添加锁解决缓存击穿问题?分布式情况下如何添加分布式锁
这篇文章介绍了如何在SpringBoot项目中整合Redis,并探讨了缓存穿透、缓存雪崩和缓存击穿的问题以及解决方法。文章还提供了解决缓存击穿问题的加锁示例代码,包括存在问题和问题解决后的版本,并指出了本地锁在分布式情况下的局限性,引出了分布式锁的概念。
SpringBoot整合Redis、以及缓存穿透、缓存雪崩、缓存击穿的理解、如何添加锁解决缓存击穿问题?分布式情况下如何添加分布式锁
|
3月前
|
运维 NoSQL Java
SpringBoot接入轻量级分布式日志框架GrayLog技术分享
在当今的软件开发环境中,日志管理扮演着至关重要的角色,尤其是在微服务架构下,分布式日志的统一收集、分析和展示成为了开发者和运维人员必须面对的问题。GrayLog作为一个轻量级的分布式日志框架,以其简洁、高效和易部署的特性,逐渐受到广大开发者的青睐。本文将详细介绍如何在SpringBoot项目中接入GrayLog,以实现日志的集中管理和分析。
297 1
|
4月前
|
Java 微服务 Spring
SpringBoot+Vue+Spring Cloud Alibaba 实现大型电商系统【分布式微服务实现】
文章介绍了如何利用Spring Cloud Alibaba快速构建大型电商系统的分布式微服务,包括服务限流降级等主要功能的实现,并通过注解和配置简化了Spring Cloud应用的接入和搭建过程。
SpringBoot+Vue+Spring Cloud Alibaba 实现大型电商系统【分布式微服务实现】
|
4月前
|
JSON 前端开发 Java
SpringBoot3怎么做统一结果封装?
在Spring Boot应用中,统一结果封装有助于团队协作,确保一致的API响应格式,提升代码质量和维护性。主要优点包括:简化前端集成工作,减少后端重复编码,以及增强接口的可维护性。实现上,首先定义`Result`类来封装响应状态码、消息、数据及时间戳;其次,通过`ResultCode`枚举类标准化状态信息。示例代码展示了如何构建这些类,并通过一个简单的控制器方法演示了如何使用它们返回JSON格式的响应结果。
180 2