如何使用 SpringBoot SpringBatch Quartz 来整合定时批量任务呢 ?

简介: 我是小假 期待与你的下一次相遇 ~

1、pom文件

  1. <dependencies>
  2. <dependency>
  3.   <groupId>org.springframework.boot</groupId>
  4.   <artifactId>spring-boot-starter-web</artifactId>
  5. </dependency>
  6. <dependency>
  7.   <groupId>org.postgresql</groupId>
  8.   <artifactId>postgresql</artifactId>
  9. </dependency>
  10. <dependency>
  11.   <groupId>org.springframework.boot</groupId>
  12.   <artifactId>spring-boot-starter-jdbc</artifactId>
  13. </dependency>
  14. <dependency>
  15.   <groupId>org.springframework.boot</groupId>
  16.   <artifactId>spring-boot-starter-batch</artifactId>
  17. </dependency>
  18. <dependency>
  19.   <groupId>org.projectlombok</groupId>
  20.   <artifactId>lombok</artifactId>
  21. </dependency>
  22. <dependency>
  23.   <groupId>org.springframework.boot</groupId>
  24.   <artifactId>spring-boot-starter-batch</artifactId>
  25. </dependency>
  26. </dependencies>

2、application.yaml文件

  1. spring:
  2.  datasource:
  3.    username: thinklink
  4.    password: thinklink
  5.    url: jdbc:postgresql://172.16.205.54:5432/thinklink
  6.    driver-class-name: org.postgresql.Driver
  7.  batch:
  8.    job:
  9.      enabled: false
  10. server:
  11.  port: 8073
  12. #upgrade-dispatch-base-url: http://172.16.205.125:8080/api/rpc/dispatch/command/
  13. upgrade-dispatch-base-url: http://172.16.205.211:8080/api/noauth/rpc/dispatch/command/
  14. # 每次批量处理的数据量,默认为5000
  15. batch-size: 5000

3、Service实现类,触发批处理任务的入口,执行一个job

  1. @Service("batchService")
  2. public class BatchServiceImpl implements BatchService {
  3. // 框架自动注入
  4.    @Autowired
  5.    private JobLauncher jobLauncher;
  6.    @Autowired
  7.    private Job updateDeviceJob;
  8.    /**
  9.     * 根据 taskId 创建一个Job
  10.     * @param taskId
  11.     * @throws Exception
  12.     */
  13.    @Override
  14.    public void createBatchJob(String taskId) throws Exception {
  15.        JobParameters jobParameters = new JobParametersBuilder()
  16.                .addString("taskId", taskId)
  17.                .addString("uuid", UUID.randomUUID().toString().replace("-",""))
  18.                .toJobParameters();
  19.        // 传入一个Job任务和任务需要的参数
  20.        jobLauncher.run(updateDeviceJob, jobParameters);
  21.    }
  22. }

4、SpringBatch配置类,此部分最重要

  1. @Configuration
  2. public class BatchConfiguration {
  3.    private static final Logger log = LoggerFactory.getLogger(BatchConfiguration.class);
  4.    @Value("${batch-size:5000}")
  5.    private int batchSize;
  6. // 框架自动注入
  7.    @Autowired
  8.    public JobBuilderFactory jobBuilderFactory;
  9. // 框架自动注入
  10.    @Autowired
  11.    public StepBuilderFactory stepBuilderFactory;
  12. // 数据过滤器,对从数据库读出来的数据,注意进行操作
  13.    @Autowired
  14.    public TaskItemProcessor taskItemProcessor;
  15.    // 接收job参数
  16.    public Map<String, JobParameter> parameters;
  17.    public Object taskId;
  18.    @Autowired
  19.    private JdbcTemplate jdbcTemplate;
  20. // 读取数据库操作
  21.    @Bean
  22.    @StepScope
  23.    public JdbcCursorItemReader<DispatchRequest> itemReader(DataSource dataSource) {
  24.        String querySql = " SELECT " +
  25.                " e. ID AS taskId, " +
  26.                " e.user_id AS userId, " +
  27.                " e.timing_startup AS startTime, " +
  28.                " u.device_id AS deviceId, " +
  29.                " d.app_name AS appName, " +
  30.                " d.compose_file AS composeFile, " +
  31.                " e.failure_retry AS failureRetry, " +
  32.                " e.tetry_times AS retryTimes, " +
  33.                " e.device_managered AS deviceManagered " +
  34.                " FROM " +
  35.                " eiot_upgrade_task e " +
  36.                " LEFT JOIN eiot_upgrade_device u ON e. ID = u.upgrade_task_id " +
  37.                " LEFT JOIN eiot_app_detail d ON e.app_id = d. ID " +
  38.                " WHERE " +
  39.                " ( " +
  40.                " u.device_upgrade_status = 0 " +
  41.                " OR u.device_upgrade_status = 2" +
  42.                " )" +
  43.                " AND e.tetry_times > u.retry_times " +
  44.                " AND e. ID = ?";
  45.        return new JdbcCursorItemReaderBuilder<DispatchRequest>()
  46.                .name("itemReader")
  47.                .sql(querySql)
  48.                .dataSource(dataSource)
  49.                .queryArguments(new Object[]{parameters.get("taskId").getValue()})
  50.                .rowMapper(new DispatchRequest.DispatchRequestRowMapper())
  51.                .build();
  52.    }
  53. // 将结果写回数据库
  54.    @Bean
  55.    @StepScope
  56.    public ItemWriter<ProcessResult> itemWriter() {
  57.        return new ItemWriter<ProcessResult>() {
  58.            private int updateTaskStatus(DispatchRequest dispatchRequest, int status) {
  59.                log.info("update taskId: {}, deviceId: {} to status {}", dispatchRequest.getTaskId(), dispatchRequest.getDeviceId(), status);
  60.                Integer retryTimes = jdbcTemplate.queryForObject(
  61.                        "select retry_times from eiot_upgrade_device where device_id = ? and upgrade_task_id = ?",
  62.                        new Object[]{ dispatchRequest.getDeviceId(), dispatchRequest.getTaskId()}, Integer.class
  63.                );
  64.                retryTimes += 1;
  65.                int updateCount = jdbcTemplate.update("update eiot_upgrade_device set device_upgrade_status = ?, retry_times = ? " +
  66.                        "where device_id = ? and upgrade_task_id = ?", status, retryTimes, dispatchRequest.getDeviceId(), dispatchRequest.getTaskId());
  67.                if (updateCount <= 0) {
  68.                    log.warn("no task updated");
  69.                } else {
  70.                    log.info("count of {} task updated", updateCount);
  71.                }
  72.                // 最后一次重试
  73.                if (status == STATUS_DISPATCH_FAILED && retryTimes == dispatchRequest.getRetryTimes()) {
  74.                    log.info("the last retry of {} failed, inc deviceManagered", dispatchRequest.getTaskId());
  75.                    return 1;
  76.                } else {
  77.                    return 0;
  78.                }
  79.            }
  80.            @Override
  81.            @Transactional
  82.            public void write(List<? extends ProcessResult> list) throws Exception {
  83.                Map taskMap = jdbcTemplate.queryForMap(
  84.                        "select device_managered, device_count, task_status from eiot_upgrade_task where id = ?",
  85.                        list.get(0).getDispatchRequest().getTaskId() // 我们认定一个批量里面,taskId都是一样的
  86.                        );
  87.                int deviceManagered = (int)taskMap.get("device_managered");
  88.                Integer deviceCount = (Integer) taskMap.get("device_count");
  89.                if (deviceCount == null) {
  90.                    log.warn("deviceCount of task {} is null", list.get(0).getDispatchRequest().getTaskId());
  91.                }
  92.                int taskStatus = (int)taskMap.get("task_status");
  93.                for (ProcessResult result: list) {
  94.                    deviceManagered += updateTaskStatus(result.getDispatchRequest(), result.getStatus());
  95.                }
  96.                if (deviceCount != null && deviceManagered == deviceCount) {
  97.                    taskStatus = 2; //任务状态 0:待升级,1:升级中,2:已完成
  98.                }
  99.                jdbcTemplate.update("update eiot_upgrade_task set device_managered = ?, task_status = ? " +
  100.                        "where id = ?", deviceManagered, taskStatus, list.get(0).getDispatchRequest().getTaskId());
  101.            }
  102.        };
  103.    }
  104.    /**
  105.     * 定义一个下发更新的 job
  106.     * @return
  107.     */
  108.    @Bean
  109.    public Job updateDeviceJob(Step updateDeviceStep) {
  110.        return jobBuilderFactory.get(UUID.randomUUID().toString().replace("-", ""))
  111.                .listener(new JobListener()) // 设置Job的监听器
  112.                .flow(updateDeviceStep)// 执行下发更新的Step
  113.                .end()
  114.                .build();
  115.    }
  116.    /**
  117.     * 定义一个下发更新的 step
  118.     * @return
  119.     */
  120.    @Bean
  121.    public Step updateDeviceStep(JdbcCursorItemReader<DispatchRequest> itemReader,ItemWriter<ProcessResult> itemWriter) {
  122.        return stepBuilderFactory.get(UUID.randomUUID().toString().replace("-", ""))
  123.                .<DispatchRequest, ProcessResult> chunk(batchSize)
  124.                .reader(itemReader) //根据taskId从数据库读取更新设备信息
  125.                .processor(taskItemProcessor) // 每条更新信息,执行下发更新接口
  126.                .writer(itemWriter)
  127.                .build();
  128.    }
  129.    // job 监听器
  130.    public class JobListener implements JobExecutionListener {
  131.        @Override
  132.        public void beforeJob(JobExecution jobExecution) {
  133.            log.info(jobExecution.getJobInstance().getJobName() + " before... ");
  134.            parameters = jobExecution.getJobParameters().getParameters();
  135.            taskId = parameters.get("taskId").getValue();
  136.            log.info("job param taskId : " + parameters.get("taskId"));
  137.        }
  138.        @Override
  139.        public void afterJob(JobExecution jobExecution) {
  140.            log.info(jobExecution.getJobInstance().getJobName() + " after... ");
  141.            // 当所有job执行完之后,查询设备更新状态,如果有失败,则要定时重新执行job
  142.            String sql = " SELECT " +
  143.                    " count(*) " +
  144.                    " FROM " +
  145.                    " eiot_upgrade_device d " +
  146.                    " LEFT JOIN eiot_upgrade_task u ON d.upgrade_task_id = u. ID " +
  147.                    " WHERE " +
  148.                    " u. ID = ? " +
  149.                    " AND d.retry_times < u.tetry_times " +
  150.                    " AND ( " +
  151.                    " d.device_upgrade_status = 0 " +
  152.                    " OR d.device_upgrade_status = 2 " +
  153.                    " ) ";
  154.            // 获取更新失败的设备个数
  155.            Integer count = jdbcTemplate.queryForObject(sql, new Object[]{taskId}, Integer.class);
  156.            log.info("update device failure count : " + count);
  157.            // 下面是使用Quartz触发定时任务
  158.            // 获取任务时间,单位秒
  159. // String time = jdbcTemplate.queryForObject(sql, new Object[]{taskId}, Integer.class);
  160.            // 此处方便测试,应该从数据库中取taskId对应的重试间隔,单位秒
  161.            Integer millSecond = 10;
  162.            if(count != null && count > 0){
  163.                String jobName = "UpgradeTask_" + taskId;
  164.                String reTaskId = taskId.toString();
  165.                Map<String,Object> params = new HashMap<>();
  166.                params.put("jobName",jobName);
  167.                params.put("taskId",reTaskId);
  168.                if (QuartzManager.checkNameNotExist(jobName))
  169.                {
  170.                    QuartzManager.scheduleRunOnceJob(jobName, RunOnceJobLogic.class,params,millSecond);
  171.                }
  172.            }
  173.        }
  174.    }
  175. }

5、Processor,处理每条数据,可以在此对数据进行过滤操作

  1. @Component("taskItemProcessor")
  2. public class TaskItemProcessor implements ItemProcessor<DispatchRequest, ProcessResult> {
  3.    public static final int STATUS_DISPATCH_FAILED = 2;
  4.    public static final int STATUS_DISPATCH_SUCC = 1;
  5.    private static final Logger log = LoggerFactory.getLogger(TaskItemProcessor.class);
  6.    @Value("${upgrade-dispatch-base-url:http://localhost/api/v2/rpc/dispatch/command/}")
  7.    private String dispatchUrl;
  8.    @Autowired
  9.    JdbcTemplate jdbcTemplate;
  10.    /**
  11.     * 在这里,执行 下发更新指令 的操作
  12.     * @param dispatchRequest
  13.     * @return
  14.     * @throws Exception
  15.     */
  16.    @Override
  17.    public ProcessResult process(final DispatchRequest dispatchRequest) {
  18.        // 调用接口,下发指令
  19.        String url = dispatchUrl + dispatchRequest.getDeviceId()+"/"+dispatchRequest.getUserId();
  20.        log.info("request url:" + url);
  21.        RestTemplate restTemplate = new RestTemplate();
  22.        HttpHeaders headers = new HttpHeaders();
  23.        headers.setContentType(MediaType.APPLICATION_JSON_UTF8);
  24.        MultiValueMap<String, String> params = new LinkedMultiValueMap<String, String>();
  25.        JSONObject jsonOuter = new JSONObject();
  26.        JSONObject jsonInner = new JSONObject();
  27.        try {
  28.            jsonInner.put("jobId",dispatchRequest.getTaskId());
  29.            jsonInner.put("name",dispatchRequest.getName());
  30.            jsonInner.put("composeFile", Base64Util.bytesToBase64Str(dispatchRequest.getComposeFile()));
  31.            jsonInner.put("policy",new JSONObject().put("startTime",dispatchRequest.getPolicy()));
  32.            jsonInner.put("timestamp",dispatchRequest.getTimestamp());
  33.            jsonOuter.put("method","updateApp");
  34.            jsonOuter.put("params",jsonInner);
  35.        } catch (JSONException e) {
  36.            log.info("JSON convert Exception :" + e);
  37.        }catch (IOException e) {
  38.            log.info("Base64Util bytesToBase64Str :" + e);
  39.        }
  40.        log.info("request body json :" + jsonOuter);
  41.        HttpEntity<String> requestEntity = new HttpEntity<String>(jsonOuter.toString(),headers);
  42.        int status;
  43.        try {
  44.            ResponseEntity<String> response = restTemplate.postForEntity(url,requestEntity,String.class);
  45.            log.info("response :" + response);
  46.            if (response.getStatusCode() == HttpStatus.OK) {
  47.                status = STATUS_DISPATCH_SUCC;
  48.            } else {
  49.                status = STATUS_DISPATCH_FAILED;
  50.            }
  51.        }catch (Exception e){
  52.            status = STATUS_DISPATCH_FAILED;
  53.        }
  54.        return new ProcessResult(dispatchRequest, status);
  55.    }
  56. }

6、封装数据库返回数据的实体Bean,注意静态内部类

  1. public class DispatchRequest {
  2.    private String taskId;
  3.    private String deviceId;
  4.    private String userId;
  5.    private String name;
  6.    private byte[] composeFile;
  7.    private String policy;
  8.    private String timestamp;
  9.    private String md5;
  10.    private int failureRetry;
  11.    private int retryTimes;
  12.    private int deviceManagered;
  13.   // 省略构造函数,setter/getter/tostring方法
  14.   //......
  15.    public static class DispatchRequestRowMapper implements RowMapper<DispatchRequest> {
  16.        @Override
  17.        public DispatchRequest mapRow(ResultSet resultSet, int i) throws SQLException {
  18.            DispatchRequest dispatchRequest = new DispatchRequest();
  19.            dispatchRequest.setTaskId(resultSet.getString("taskId"));
  20.            dispatchRequest.setUserId(resultSet.getString("userId"));
  21.            dispatchRequest.setPolicy(resultSet.getString("startTime"));
  22.            dispatchRequest.setDeviceId(resultSet.getString("deviceId"));
  23.            dispatchRequest.setName(resultSet.getString("appName"));
  24.            dispatchRequest.setComposeFile(resultSet.getBytes("composeFile"));
  25.            dispatchRequest.setTimestamp(DateUtil.DateToString(new Date()));
  26.            dispatchRequest.setRetryTimes(resultSet.getInt("retryTimes"));
  27.            dispatchRequest.setFailureRetry(resultSet.getInt("failureRetry"));
  28.            dispatchRequest.setDeviceManagered(resultSet.getInt("deviceManagered"));
  29.            return dispatchRequest;
  30.        }
  31.    }
  32. }

7、启动类上要加上注解

  1. @SpringBootApplication
  2. @EnableBatchProcessing
  3. public class Application {
  4.    public static void main(String[] args) {
  5.        SpringApplication.run(Application.class, args);
  6.    }
  7. }


相关文章
|
28天前
|
Java Spring
使用 Spring Boot 多个定时任务阻塞问题的解决方案
我是小假 期待与你的下一次相遇 ~
|
6天前
|
缓存 大数据 PHP
PHP性能优化实战:告别缓慢脚本
PHP性能优化实战:告别缓慢脚本
160 89
|
24天前
|
缓存 前端开发 安全
解锁下一代 React:Server Components 实战指南
解锁下一代 React:Server Components 实战指南
127 84
|
24天前
|
前端开发
用 CSS Grid 轻松构建复杂布局
用 CSS Grid 轻松构建复杂布局
181 83
|
24天前
|
前端开发
轻松掌握 React Hooks:简化状态与副作用管理
轻松掌握 React Hooks:简化状态与副作用管理
140 80
|
24天前
|
前端开发 安全 JavaScript
掌握 React useEffect:避开三大高频陷阱,提升组件稳定性
掌握 React useEffect:避开三大高频陷阱,提升组件稳定性
147 78
|
12天前
|
数据库 对象存储
2025年 | 7月云大使推广奖励规则
云大使推广返利活动,企业新用户下单返佣加码5%,推广最高返佣45%,新老用户都可参与返利活动。
|
3天前
|
存储 自动驾驶 安全
USB‑C 式的工具联接:MCP 的模块化及通用标准探讨
本文探讨了 USB-C 接口与 MCP 模块化连接平台的结合及其标准化前景。USB-C 凭借高速传输、双向充电和正反插设计,已成为主流接口;而 MCP 通过模块化架构,提供灵活、可扩展的连接方案。两者融合不仅提升了设备互联的兼容性与效率,也为智能家居、移动办公、电动汽车等场景带来创新应用。未来,随着技术发展与标准统一,这一组合有望推动设备连接迈向更智能、通用的新时代。
46 27
USB‑C 式的工具联接:MCP 的模块化及通用标准探讨
|
1天前
|
SQL
SQL中如何将一列中的值显示出字符指定位置与指定长度
在对比系统生日与身份证号时,如何提取特定位置的值?例如,从身份证第7位开始取8位数字,可用SQL的`SUBSTRING`函数实现。
|
3天前
|
Java 索引
Java ArrayList中的常见删除操作及方法详解。
通过这些方法,Java `ArrayList` 提供了灵活而强大的操作来处理元素的移除,这些方法能够满足不同场景下的需求。
60 30