如何使用 SpringBoot SpringBatch Quartz 来整合定时批量任务呢 ?

简介: 我是小假 期待与你的下一次相遇 ~

1、pom文件

  1. <dependencies>
  2. <dependency>
  3.   <groupId>org.springframework.boot</groupId>
  4.   <artifactId>spring-boot-starter-web</artifactId>
  5. </dependency>
  6. <dependency>
  7.   <groupId>org.postgresql</groupId>
  8.   <artifactId>postgresql</artifactId>
  9. </dependency>
  10. <dependency>
  11.   <groupId>org.springframework.boot</groupId>
  12.   <artifactId>spring-boot-starter-jdbc</artifactId>
  13. </dependency>
  14. <dependency>
  15.   <groupId>org.springframework.boot</groupId>
  16.   <artifactId>spring-boot-starter-batch</artifactId>
  17. </dependency>
  18. <dependency>
  19.   <groupId>org.projectlombok</groupId>
  20.   <artifactId>lombok</artifactId>
  21. </dependency>
  22. <dependency>
  23.   <groupId>org.springframework.boot</groupId>
  24.   <artifactId>spring-boot-starter-batch</artifactId>
  25. </dependency>
  26. </dependencies>

2、application.yaml文件

  1. spring:
  2.  datasource:
  3.    username: thinklink
  4.    password: thinklink
  5.    url: jdbc:postgresql://172.16.205.54:5432/thinklink
  6.    driver-class-name: org.postgresql.Driver
  7.  batch:
  8.    job:
  9.      enabled: false
  10. server:
  11.  port: 8073
  12. #upgrade-dispatch-base-url: http://172.16.205.125:8080/api/rpc/dispatch/command/
  13. upgrade-dispatch-base-url: http://172.16.205.211:8080/api/noauth/rpc/dispatch/command/
  14. # 每次批量处理的数据量,默认为5000
  15. batch-size: 5000

3、Service实现类,触发批处理任务的入口,执行一个job

  1. @Service("batchService")
  2. public class BatchServiceImpl implements BatchService {
  3. // 框架自动注入
  4.    @Autowired
  5.    private JobLauncher jobLauncher;
  6.    @Autowired
  7.    private Job updateDeviceJob;
  8.    /**
  9.     * 根据 taskId 创建一个Job
  10.     * @param taskId
  11.     * @throws Exception
  12.     */
  13.    @Override
  14.    public void createBatchJob(String taskId) throws Exception {
  15.        JobParameters jobParameters = new JobParametersBuilder()
  16.                .addString("taskId", taskId)
  17.                .addString("uuid", UUID.randomUUID().toString().replace("-",""))
  18.                .toJobParameters();
  19.        // 传入一个Job任务和任务需要的参数
  20.        jobLauncher.run(updateDeviceJob, jobParameters);
  21.    }
  22. }

4、SpringBatch配置类,此部分最重要

  1. @Configuration
  2. public class BatchConfiguration {
  3.    private static final Logger log = LoggerFactory.getLogger(BatchConfiguration.class);
  4.    @Value("${batch-size:5000}")
  5.    private int batchSize;
  6. // 框架自动注入
  7.    @Autowired
  8.    public JobBuilderFactory jobBuilderFactory;
  9. // 框架自动注入
  10.    @Autowired
  11.    public StepBuilderFactory stepBuilderFactory;
  12. // 数据过滤器,对从数据库读出来的数据,注意进行操作
  13.    @Autowired
  14.    public TaskItemProcessor taskItemProcessor;
  15.    // 接收job参数
  16.    public Map<String, JobParameter> parameters;
  17.    public Object taskId;
  18.    @Autowired
  19.    private JdbcTemplate jdbcTemplate;
  20. // 读取数据库操作
  21.    @Bean
  22.    @StepScope
  23.    public JdbcCursorItemReader<DispatchRequest> itemReader(DataSource dataSource) {
  24.        String querySql = " SELECT " +
  25.                " e. ID AS taskId, " +
  26.                " e.user_id AS userId, " +
  27.                " e.timing_startup AS startTime, " +
  28.                " u.device_id AS deviceId, " +
  29.                " d.app_name AS appName, " +
  30.                " d.compose_file AS composeFile, " +
  31.                " e.failure_retry AS failureRetry, " +
  32.                " e.tetry_times AS retryTimes, " +
  33.                " e.device_managered AS deviceManagered " +
  34.                " FROM " +
  35.                " eiot_upgrade_task e " +
  36.                " LEFT JOIN eiot_upgrade_device u ON e. ID = u.upgrade_task_id " +
  37.                " LEFT JOIN eiot_app_detail d ON e.app_id = d. ID " +
  38.                " WHERE " +
  39.                " ( " +
  40.                " u.device_upgrade_status = 0 " +
  41.                " OR u.device_upgrade_status = 2" +
  42.                " )" +
  43.                " AND e.tetry_times > u.retry_times " +
  44.                " AND e. ID = ?";
  45.        return new JdbcCursorItemReaderBuilder<DispatchRequest>()
  46.                .name("itemReader")
  47.                .sql(querySql)
  48.                .dataSource(dataSource)
  49.                .queryArguments(new Object[]{parameters.get("taskId").getValue()})
  50.                .rowMapper(new DispatchRequest.DispatchRequestRowMapper())
  51.                .build();
  52.    }
  53. // 将结果写回数据库
  54.    @Bean
  55.    @StepScope
  56.    public ItemWriter<ProcessResult> itemWriter() {
  57.        return new ItemWriter<ProcessResult>() {
  58.            private int updateTaskStatus(DispatchRequest dispatchRequest, int status) {
  59.                log.info("update taskId: {}, deviceId: {} to status {}", dispatchRequest.getTaskId(), dispatchRequest.getDeviceId(), status);
  60.                Integer retryTimes = jdbcTemplate.queryForObject(
  61.                        "select retry_times from eiot_upgrade_device where device_id = ? and upgrade_task_id = ?",
  62.                        new Object[]{ dispatchRequest.getDeviceId(), dispatchRequest.getTaskId()}, Integer.class
  63.                );
  64.                retryTimes += 1;
  65.                int updateCount = jdbcTemplate.update("update eiot_upgrade_device set device_upgrade_status = ?, retry_times = ? " +
  66.                        "where device_id = ? and upgrade_task_id = ?", status, retryTimes, dispatchRequest.getDeviceId(), dispatchRequest.getTaskId());
  67.                if (updateCount <= 0) {
  68.                    log.warn("no task updated");
  69.                } else {
  70.                    log.info("count of {} task updated", updateCount);
  71.                }
  72.                // 最后一次重试
  73.                if (status == STATUS_DISPATCH_FAILED && retryTimes == dispatchRequest.getRetryTimes()) {
  74.                    log.info("the last retry of {} failed, inc deviceManagered", dispatchRequest.getTaskId());
  75.                    return 1;
  76.                } else {
  77.                    return 0;
  78.                }
  79.            }
  80.            @Override
  81.            @Transactional
  82.            public void write(List<? extends ProcessResult> list) throws Exception {
  83.                Map taskMap = jdbcTemplate.queryForMap(
  84.                        "select device_managered, device_count, task_status from eiot_upgrade_task where id = ?",
  85.                        list.get(0).getDispatchRequest().getTaskId() // 我们认定一个批量里面,taskId都是一样的
  86.                        );
  87.                int deviceManagered = (int)taskMap.get("device_managered");
  88.                Integer deviceCount = (Integer) taskMap.get("device_count");
  89.                if (deviceCount == null) {
  90.                    log.warn("deviceCount of task {} is null", list.get(0).getDispatchRequest().getTaskId());
  91.                }
  92.                int taskStatus = (int)taskMap.get("task_status");
  93.                for (ProcessResult result: list) {
  94.                    deviceManagered += updateTaskStatus(result.getDispatchRequest(), result.getStatus());
  95.                }
  96.                if (deviceCount != null && deviceManagered == deviceCount) {
  97.                    taskStatus = 2; //任务状态 0:待升级,1:升级中,2:已完成
  98.                }
  99.                jdbcTemplate.update("update eiot_upgrade_task set device_managered = ?, task_status = ? " +
  100.                        "where id = ?", deviceManagered, taskStatus, list.get(0).getDispatchRequest().getTaskId());
  101.            }
  102.        };
  103.    }
  104.    /**
  105.     * 定义一个下发更新的 job
  106.     * @return
  107.     */
  108.    @Bean
  109.    public Job updateDeviceJob(Step updateDeviceStep) {
  110.        return jobBuilderFactory.get(UUID.randomUUID().toString().replace("-", ""))
  111.                .listener(new JobListener()) // 设置Job的监听器
  112.                .flow(updateDeviceStep)// 执行下发更新的Step
  113.                .end()
  114.                .build();
  115.    }
  116.    /**
  117.     * 定义一个下发更新的 step
  118.     * @return
  119.     */
  120.    @Bean
  121.    public Step updateDeviceStep(JdbcCursorItemReader<DispatchRequest> itemReader,ItemWriter<ProcessResult> itemWriter) {
  122.        return stepBuilderFactory.get(UUID.randomUUID().toString().replace("-", ""))
  123.                .<DispatchRequest, ProcessResult> chunk(batchSize)
  124.                .reader(itemReader) //根据taskId从数据库读取更新设备信息
  125.                .processor(taskItemProcessor) // 每条更新信息,执行下发更新接口
  126.                .writer(itemWriter)
  127.                .build();
  128.    }
  129.    // job 监听器
  130.    public class JobListener implements JobExecutionListener {
  131.        @Override
  132.        public void beforeJob(JobExecution jobExecution) {
  133.            log.info(jobExecution.getJobInstance().getJobName() + " before... ");
  134.            parameters = jobExecution.getJobParameters().getParameters();
  135.            taskId = parameters.get("taskId").getValue();
  136.            log.info("job param taskId : " + parameters.get("taskId"));
  137.        }
  138.        @Override
  139.        public void afterJob(JobExecution jobExecution) {
  140.            log.info(jobExecution.getJobInstance().getJobName() + " after... ");
  141.            // 当所有job执行完之后,查询设备更新状态,如果有失败,则要定时重新执行job
  142.            String sql = " SELECT " +
  143.                    " count(*) " +
  144.                    " FROM " +
  145.                    " eiot_upgrade_device d " +
  146.                    " LEFT JOIN eiot_upgrade_task u ON d.upgrade_task_id = u. ID " +
  147.                    " WHERE " +
  148.                    " u. ID = ? " +
  149.                    " AND d.retry_times < u.tetry_times " +
  150.                    " AND ( " +
  151.                    " d.device_upgrade_status = 0 " +
  152.                    " OR d.device_upgrade_status = 2 " +
  153.                    " ) ";
  154.            // 获取更新失败的设备个数
  155.            Integer count = jdbcTemplate.queryForObject(sql, new Object[]{taskId}, Integer.class);
  156.            log.info("update device failure count : " + count);
  157.            // 下面是使用Quartz触发定时任务
  158.            // 获取任务时间,单位秒
  159. // String time = jdbcTemplate.queryForObject(sql, new Object[]{taskId}, Integer.class);
  160.            // 此处方便测试,应该从数据库中取taskId对应的重试间隔,单位秒
  161.            Integer millSecond = 10;
  162.            if(count != null && count > 0){
  163.                String jobName = "UpgradeTask_" + taskId;
  164.                String reTaskId = taskId.toString();
  165.                Map<String,Object> params = new HashMap<>();
  166.                params.put("jobName",jobName);
  167.                params.put("taskId",reTaskId);
  168.                if (QuartzManager.checkNameNotExist(jobName))
  169.                {
  170.                    QuartzManager.scheduleRunOnceJob(jobName, RunOnceJobLogic.class,params,millSecond);
  171.                }
  172.            }
  173.        }
  174.    }
  175. }

5、Processor,处理每条数据,可以在此对数据进行过滤操作

  1. @Component("taskItemProcessor")
  2. public class TaskItemProcessor implements ItemProcessor<DispatchRequest, ProcessResult> {
  3.    public static final int STATUS_DISPATCH_FAILED = 2;
  4.    public static final int STATUS_DISPATCH_SUCC = 1;
  5.    private static final Logger log = LoggerFactory.getLogger(TaskItemProcessor.class);
  6.    @Value("${upgrade-dispatch-base-url:http://localhost/api/v2/rpc/dispatch/command/}")
  7.    private String dispatchUrl;
  8.    @Autowired
  9.    JdbcTemplate jdbcTemplate;
  10.    /**
  11.     * 在这里,执行 下发更新指令 的操作
  12.     * @param dispatchRequest
  13.     * @return
  14.     * @throws Exception
  15.     */
  16.    @Override
  17.    public ProcessResult process(final DispatchRequest dispatchRequest) {
  18.        // 调用接口,下发指令
  19.        String url = dispatchUrl + dispatchRequest.getDeviceId()+"/"+dispatchRequest.getUserId();
  20.        log.info("request url:" + url);
  21.        RestTemplate restTemplate = new RestTemplate();
  22.        HttpHeaders headers = new HttpHeaders();
  23.        headers.setContentType(MediaType.APPLICATION_JSON_UTF8);
  24.        MultiValueMap<String, String> params = new LinkedMultiValueMap<String, String>();
  25.        JSONObject jsonOuter = new JSONObject();
  26.        JSONObject jsonInner = new JSONObject();
  27.        try {
  28.            jsonInner.put("jobId",dispatchRequest.getTaskId());
  29.            jsonInner.put("name",dispatchRequest.getName());
  30.            jsonInner.put("composeFile", Base64Util.bytesToBase64Str(dispatchRequest.getComposeFile()));
  31.            jsonInner.put("policy",new JSONObject().put("startTime",dispatchRequest.getPolicy()));
  32.            jsonInner.put("timestamp",dispatchRequest.getTimestamp());
  33.            jsonOuter.put("method","updateApp");
  34.            jsonOuter.put("params",jsonInner);
  35.        } catch (JSONException e) {
  36.            log.info("JSON convert Exception :" + e);
  37.        }catch (IOException e) {
  38.            log.info("Base64Util bytesToBase64Str :" + e);
  39.        }
  40.        log.info("request body json :" + jsonOuter);
  41.        HttpEntity<String> requestEntity = new HttpEntity<String>(jsonOuter.toString(),headers);
  42.        int status;
  43.        try {
  44.            ResponseEntity<String> response = restTemplate.postForEntity(url,requestEntity,String.class);
  45.            log.info("response :" + response);
  46.            if (response.getStatusCode() == HttpStatus.OK) {
  47.                status = STATUS_DISPATCH_SUCC;
  48.            } else {
  49.                status = STATUS_DISPATCH_FAILED;
  50.            }
  51.        }catch (Exception e){
  52.            status = STATUS_DISPATCH_FAILED;
  53.        }
  54.        return new ProcessResult(dispatchRequest, status);
  55.    }
  56. }

6、封装数据库返回数据的实体Bean,注意静态内部类

  1. public class DispatchRequest {
  2.    private String taskId;
  3.    private String deviceId;
  4.    private String userId;
  5.    private String name;
  6.    private byte[] composeFile;
  7.    private String policy;
  8.    private String timestamp;
  9.    private String md5;
  10.    private int failureRetry;
  11.    private int retryTimes;
  12.    private int deviceManagered;
  13.   // 省略构造函数,setter/getter/tostring方法
  14.   //......
  15.    public static class DispatchRequestRowMapper implements RowMapper<DispatchRequest> {
  16.        @Override
  17.        public DispatchRequest mapRow(ResultSet resultSet, int i) throws SQLException {
  18.            DispatchRequest dispatchRequest = new DispatchRequest();
  19.            dispatchRequest.setTaskId(resultSet.getString("taskId"));
  20.            dispatchRequest.setUserId(resultSet.getString("userId"));
  21.            dispatchRequest.setPolicy(resultSet.getString("startTime"));
  22.            dispatchRequest.setDeviceId(resultSet.getString("deviceId"));
  23.            dispatchRequest.setName(resultSet.getString("appName"));
  24.            dispatchRequest.setComposeFile(resultSet.getBytes("composeFile"));
  25.            dispatchRequest.setTimestamp(DateUtil.DateToString(new Date()));
  26.            dispatchRequest.setRetryTimes(resultSet.getInt("retryTimes"));
  27.            dispatchRequest.setFailureRetry(resultSet.getInt("failureRetry"));
  28.            dispatchRequest.setDeviceManagered(resultSet.getInt("deviceManagered"));
  29.            return dispatchRequest;
  30.        }
  31.    }
  32. }

7、启动类上要加上注解

  1. @SpringBootApplication
  2. @EnableBatchProcessing
  3. public class Application {
  4.    public static void main(String[] args) {
  5.        SpringApplication.run(Application.class, args);
  6.    }
  7. }


相关文章
|
druid Java 数据库
Spring Boot的定时任务与异步任务
Spring Boot的定时任务与异步任务
|
4月前
|
Java 关系型数据库 MySQL
springboot项目集成dolphinscheduler调度器 实现datax数据同步任务
springboot项目集成dolphinscheduler调度器 实现datax数据同步任务
571 2
|
4月前
|
人工智能 安全 Java
Spring Boot 中使用 Function 和异步线程池处理列表拆分任务并汇总结果
在Java开发中,处理大规模数据时常常需要将列表拆分为多个子列表进行异步处理并汇总结果。本文介绍如何在Spring Boot中使用Function和异步线程池实现高效且可维护的代码,涵盖结果封装、线程池配置、列表拆分处理及结果汇总等关键步骤。
212 0
|
分布式计算 大数据 Java
springboot项目集成大数据第三方dolphinscheduler调度器 执行/停止任务
springboot项目集成大数据第三方dolphinscheduler调度器 执行/停止任务
89 0
|
5月前
|
Java Spring
如何优雅的实现 SpringBoot 并行任务
我是小假 期待与你的下一次相遇 ~
143 1
|
10月前
|
Java 调度 数据库
SpringBoot整合XXL-JOB【05】- 任务分片
在实际业务中,批量定时任务可能因上一批任务未完成而影响业务。为解决此问题,本文介绍如何使用Xxl-job对批量任务进行分片处理,通过分片广播形式调度集群机器并行执行任务,大幅提升执行效率。具体步骤包括环境准备、添加依赖和配置、声明实体类与查询类,以及改造业务逻辑实现分片查询。测试结果显示,分片处理将两千条数据的执行时间从30秒缩短至15秒,性能提升显著。
1215 13
SpringBoot整合XXL-JOB【05】-  任务分片
|
10月前
|
前端开发 Java API
SpringBoot整合Flowable【07】- 驳回节点任务
本文通过绩效流程的业务场景,详细介绍了如何在Flowable工作流引擎中实现任务驳回功能。具体步骤包括:获取目标任务节点和当前任务节点信息,进行必要的判空和逻辑校验,调用API完成节点回退,并清理相关脏数据(如历史任务和变量)。最后通过测试验证了驳回功能的正确性,确保流程能够成功回退到指定节点并清除中间产生的冗余数据。此功能在实际业务中非常有用,能够满足上级驳回自评等需求。
1310 0
SpringBoot整合Flowable【07】- 驳回节点任务
|
消息中间件 缓存 监控
【Java笔记+踩坑】SpringBoot基础3——开发。热部署+配置高级+整合NoSQL/缓存/任务/邮件/监控
springboot的热部署、配置的宽松绑定和校验、任务、邮件、监控、springboot整合JdbcTemplate,h2等sql技术、整合redis,mongodb,es等nosql技术、整合redis,Memcached,jetcache,j2cache等缓存技术、整合ActiveMQ,RabbitMQ,RocketMQ,Kafka等消息的中间件的入门、整合缓存/任务/邮件/监控
【Java笔记+踩坑】SpringBoot基础3——开发。热部署+配置高级+整合NoSQL/缓存/任务/邮件/监控
|
SQL Java 调度
实时计算 Flink版产品使用问题之使用Spring Boot启动Flink处理任务时,使用Spring Boot的@Scheduled注解进行定时任务调度,出现内存占用过高,该怎么办
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
Java 数据安全/隐私保护
SpringBoot 自定义初始化任务 Runner
SpringBoot 自定义初始化任务 Runner
187 0
下一篇
oss云网关配置