概述
根据官网的描述,XXL-JOB是一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。
背景
鉴于xxl-job官方并没有提供api的方式进行动态创建任务已经后台用api进行控制任务相关行为。基于次需求,本人编写了一个工具类,来实现以上需求。当然最主要的是还是xxl-job的restful api 设计,这才得益于我可以编一个http请求的工具类来实现
版本
- 基于 xxl-job 2.3.0
- springboot 2.6.13
特性
1、简单:支持通过Web页面对任务进行CRUD操作,操作简单,一分钟上手; 2、动态:支持动态修改任务状态、启动/停止任务,以及终止运行中任务,即时生效; 3、调度中心HA(中心式):调度采用中心式设计,“调度中心”自研调度组件并支持集群部署,可保证调度中心HA; 4、执行器HA(分布式):任务分布式执行,任务”执行器”支持集群部署,可保证任务执行HA; 5、注册中心: 执行器会周期性自动注册任务, 调度中心将会自动发现注册的任务并触发执行。同时,也支持手动录入执行器地址; 6、弹性扩容缩容:一旦有新执行器机器上线或者下线,下次调度时将会重新分配任务; 7、触发策略:提供丰富的任务触发策略,包括:Cron触发、固定间隔触发、固定延时触发、API(事件)触发、人工触发、父子任务触发; 8、调度过期策略:调度中心错过调度时间的补偿处理策略,包括:忽略、立即补偿触发一次等; 9、阻塞处理策略:调度过于密集执行器来不及处理时的处理策略,策略包括:单机串行(默认)、丢弃后续调度、覆盖之前调度; 10、任务超时控制:支持自定义任务超时时间,任务运行超时将会主动中断任务; 11、任务失败重试:支持自定义任务失败重试次数,当任务失败时将会按照预设的失败重试次数主动进行重试;其中分片任务支持分片粒度的失败重试; 12、任务失败告警;默认提供邮件方式失败告警,同时预留扩展接口,可方便的扩展短信、钉钉等告警方式; 13、路由策略:执行器集群部署时提供丰富的路由策略,包括:第一个、最后一个、轮询、随机、一致性HASH、最不经常使用、最近最久未使用、故障转移、忙碌转移等; 14、分片广播任务:执行器集群部署时,任务路由策略选择”分片广播”情况下,一次任务调度将会广播触发集群中所有执行器执行一次任务,可根据分片参数开发分片任务; 15、动态分片:分片广播任务以执行器为维度进行分片,支持动态扩容执行器集群从而动态增加分片数量,协同进行业务处理;在进行大数据量业务操作时可显著提升任务处理能力和速度。 16、故障转移:任务路由策略选择”故障转移”情况下,如果执行器集群中某一台机器故障,将会自动Failover切换到一台正常的执行器发送调度请求。 17、任务进度监控:支持实时监控任务进度; 18、Rolling实时日志:支持在线查看调度结果,并且支持以Rolling方式实时查看执行器输出的完整的执行日志; 19、GLUE:提供Web IDE,支持在线开发任务逻辑代码,动态发布,实时编译生效,省略部署上线的过程。支持30个版本的历史版本回溯。 20、脚本任务:支持以GLUE模式开发和运行脚本任务,包括Shell、Python、NodeJS、PHP、PowerShell等类型脚本; 21、命令行任务:原生提供通用命令行任务Handler(Bean任务,”CommandJobHandler”);业务方只需要提供命令行即可; 22、任务依赖:支持配置子任务依赖,当父任务执行结束且执行成功后将会主动触发一次子任务的执行, 多个子任务用逗号分隔; 23、一致性:“调度中心”通过DB锁保证集群分布式调度的一致性, 一次任务调度只会触发一次执行; 24、自定义任务参数:支持在线配置调度任务入参,即时生效; 25、调度线程池:调度系统多线程触发调度运行,确保调度精确执行,不被堵塞; 26、数据加密:调度中心和执行器之间的通讯进行数据加密,提升调度信息安全性; 27、邮件报警:任务失败时支持邮件报警,支持配置多邮件地址群发报警邮件; 28、推送maven中央仓库: 将会把最新稳定版推送到maven中央仓库, 方便用户接入和使用; 29、运行报表:支持实时查看运行数据,如任务数量、调度次数、执行器数量等;以及调度报表,如调度日期分布图,调度成功分布图等; 30、全异步:任务调度流程全异步化设计实现,如异步调度、异步运行、异步回调等,有效对密集调度进行流量削峰,理论上支持任意时长任务的运行; 31、跨语言:调度中心与执行器提供语言无关的 RESTful API 服务,第三方任意语言可据此对接调度中心或者实现执行器。除此之外,还提供了 “多任务模式”和“httpJobHandler”等其他跨语言方案; 32、国际化:调度中心支持国际化设置,提供中文、英文两种可选语言,默认为中文; 33、容器化:提供官方docker镜像,并实时更新推送dockerhub,进一步实现产品开箱即用; 34、线程池隔离:调度线程池进行隔离拆分,慢任务自动降级进入”Slow”线程池,避免耗尽调度线程,提高系统稳定性; 35、用户管理:支持在线管理系统用户,存在管理员、普通用户两种角色; 36、权限控制:执行器维度进行权限控制,管理员拥有全量权限,普通用户需要分配执行器权限后才允许相关操作;
HTTP 工具
引入依赖
<properties> <fastjson.version>2.0.33</fastjson.version> </properties> <dependencies> <dependency> <groupId>commons-httpclient</groupId> <artifactId>commons-httpclient</artifactId> <version>3.1</version> </dependency> <!-- https://mvnrepository.com/artifact/com.alibaba.fastjson2/fastjson2 --> <dependency> <groupId>com.alibaba.fastjson2</groupId> <artifactId>fastjson2</artifactId> <version>${fastjson.version}</version> </dependency> </dependencies>
创建相关类
- 创建一个
bean
,与XxlJobInfoBO
基本类似,主要当作接收参数转换使用
package com.example.jobdemo; /** * @Author zuiyu 创建任务,API 接口 使用 * @Date 2023/6/2 17:20 */ public class XxlJobInfoBO { private int id; // 主键ID private int jobGroup; // 执行器主键ID private String jobDesc; private String author; // 负责人 private String alarmEmail; // 报警邮件 private String scheduleType; // 调度类型 private String scheduleConf; private String executorHandler; // 执行器,任务Handler名称 private String executorParam; // 执行器,任务参数 private String executorRouteStrategy; // 执行器路由策略 private String misfireStrategy; // 调度过期策略 private String executorBlockStrategy; // 阻塞处理策略 private int executorTimeout; // 任务执行超时时间,单位秒 private int executorFailRetryCount; // 失败重试次数 private String glueType; // GLUE类型 #com.xxl.job.core.glue.GlueTypeEnum private String childJobId; // 子任务ID,多个逗号分隔 private String glueSource; // GLUE源代码 private String glueRemark; // GLUE备注 public String getGlueType() { return glueType; } public void setGlueType(String glueType) { this.glueType = glueType; } public String getChildJobId() { return childJobId; } public void setChildJobId(String childJobId) { this.childJobId = childJobId; } public String getGlueSource() { return glueSource; } public void setGlueSource(String glueSource) { this.glueSource = glueSource; } public String getGlueRemark() { return glueRemark; } public void setGlueRemark(String glueRemark) { this.glueRemark = glueRemark; } public int getId() { return id; } public void setId(int id) { this.id = id; } public int getJobGroup() { return jobGroup; } public void setJobGroup(int jobGroup) { this.jobGroup = jobGroup; } public String getJobDesc() { return jobDesc; } public void setJobDesc(String jobDesc) { this.jobDesc = jobDesc; } public String getAuthor() { return author; } public void setAuthor(String author) { this.author = author; } public String getAlarmEmail() { return alarmEmail; } public void setAlarmEmail(String alarmEmail) { this.alarmEmail = alarmEmail; } public String getScheduleType() { return scheduleType; } public void setScheduleType(String scheduleType) { this.scheduleType = scheduleType; } public String getScheduleConf() { return scheduleConf; } public void setScheduleConf(String scheduleConf) { this.scheduleConf = scheduleConf; } public String getExecutorHandler() { return executorHandler; } public void setExecutorHandler(String executorHandler) { this.executorHandler = executorHandler; } public String getExecutorParam() { return executorParam; } public void setExecutorParam(String executorParam) { this.executorParam = executorParam; } public String getExecutorRouteStrategy() { return executorRouteStrategy; } public void setExecutorRouteStrategy(String executorRouteStrategy) { this.executorRouteStrategy = executorRouteStrategy; } public String getMisfireStrategy() { return misfireStrategy; } public void setMisfireStrategy(String misfireStrategy) { this.misfireStrategy = misfireStrategy; } public String getExecutorBlockStrategy() { return executorBlockStrategy; } public void setExecutorBlockStrategy(String executorBlockStrategy) { this.executorBlockStrategy = executorBlockStrategy; } public int getExecutorTimeout() { return executorTimeout; } public void setExecutorTimeout(int executorTimeout) { this.executorTimeout = executorTimeout; } public int getExecutorFailRetryCount() { return executorFailRetryCount; } public void setExecutorFailRetryCount(int executorFailRetryCount) { this.executorFailRetryCount = executorFailRetryCount; } }
- 创建一个参数类,接收
springboot
配置文件中的配置参数值,当然,最重要的就是调度中心地址的配置
package com.example.jobdemo; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; /** * @Author zuiyu * @Date 2023/6/8 11:42 */ @Configuration public class XxlJobClientConfigProperties { @Value("${xxl.job.client.loginUrl:${xxl.job.admin.addresses}/login?userName=admin&password=123456}") private String loginUrl; @Value("${xxl.job.client.addUrl:${xxl.job.admin.addresses}/jobinfo/add}") private String jobInfoAddUrl; @Value("${xxl.job.client.deleteUrl:${xxl.job.admin.addresses}/jobinfo/remove?id=%s}") private String jobInfoDeleteUrl; @Value("${xxl.job.client.startJobUrl:${xxl.job.admin.addresses}/jobinfo/start?id=%s}") private String jobInfoStartJobUrl; @Value("${xxl.job.client.stopJobUrl:${xxl.job.admin.addresses}/jobinfo/stop?id=%s}") private String jobInfoStopJobUrl; @Value("${xxl.job.client.updateUrl:${xxl.job.admin.addresses}/jobinfo/update}") private String jobInfoUpdateUrl; @Value("${xxl.job.client.loadByIdUrl:${xxl.job.admin.addresses}/jobinfo/loadById/%s}") private String jobInfoLoadByIdUrl; /** * 任务列表 */ @Value("${xxl.job.client.jobInfoPageListUrl:${xxl.job.admin.addresses}/jobinfo/pageList}") private String jobInfoPageListUrl; /** * 执行器列表 */ @Value("${xxl.job.client.jobGroupPageListUrl:${xxl.job.admin.addresses}/jobgroup/pageList}") private String jobGroupPageListUrl; /** * 执行器创建 URL */ @Value("${xxl.job.client.jobGroupSaveUrl:${xxl.job.admin.addresses}/jobgroup/save") private String jobGroupSaveUrl; public String getLoginUrl() { return loginUrl; } public void setLoginUrl(String loginUrl) { this.loginUrl = loginUrl; } public String getJobInfoAddUrl() { return jobInfoAddUrl; } public void setJobInfoAddUrl(String jobInfoAddUrl) { this.jobInfoAddUrl = jobInfoAddUrl; } public String getJobInfoDeleteUrl() { return jobInfoDeleteUrl; } public void setJobInfoDeleteUrl(String jobInfoDeleteUrl) { this.jobInfoDeleteUrl = jobInfoDeleteUrl; } public String getJobInfoStartJobUrl() { return jobInfoStartJobUrl; } public void setJobInfoStartJobUrl(String jobInfoStartJobUrl) { this.jobInfoStartJobUrl = jobInfoStartJobUrl; } public String getJobInfoStopJobUrl() { return jobInfoStopJobUrl; } public void setJobInfoStopJobUrl(String jobInfoStopJobUrl) { this.jobInfoStopJobUrl = jobInfoStopJobUrl; } public String getJobInfoUpdateUrl() { return jobInfoUpdateUrl; } public void setJobInfoUpdateUrl(String jobInfoUpdateUrl) { this.jobInfoUpdateUrl = jobInfoUpdateUrl; } public String getJobInfoLoadByIdUrl() { return jobInfoLoadByIdUrl; } public void setJobInfoLoadByIdUrl(String jobInfoLoadByIdUrl) { this.jobInfoLoadByIdUrl = jobInfoLoadByIdUrl; } public String getJobInfoPageListUrl() { return jobInfoPageListUrl; } public void setJobInfoPageListUrl(String jobInfoPageListUrl) { this.jobInfoPageListUrl = jobInfoPageListUrl; } public String getJobGroupPageListUrl() { return jobGroupPageListUrl; } public void setJobGroupPageListUrl(String jobGroupPageListUrl) { this.jobGroupPageListUrl = jobGroupPageListUrl; } public String getJobGroupSaveUrl() { return jobGroupSaveUrl; } public void setJobGroupSaveUrl(String jobGroupSaveUrl) { this.jobGroupSaveUrl = jobGroupSaveUrl; } @Override public String toString() { return "XxlJobClientConfigProperties{" + "loginUrl='" + loginUrl + '\'' + ", jobInfoAddUrl='" + jobInfoAddUrl + '\'' + ", jobInfoDeleteUrl='" + jobInfoDeleteUrl + '\'' + ", jobInfoStartJobUrl='" + jobInfoStartJobUrl + '\'' + ", jobInfoStopJobUrl='" + jobInfoStopJobUrl + '\'' + ", jobInfoUpdateUrl='" + jobInfoUpdateUrl + '\'' + ", jobInfoLoadByIdUrl='" + jobInfoLoadByIdUrl + '\'' + ", jobInfoPageListUrl='" + jobInfoPageListUrl + '\'' + ", jobGroupPageListUrl='" + jobGroupPageListUrl + '\'' + '}'; } }
- 创建核心调用类
XxlJobClient
package com.example.jobdemo; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import org.apache.commons.httpclient.*; import org.apache.commons.httpclient.methods.GetMethod; import org.apache.commons.httpclient.methods.PostMethod; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; /** * @Author zuiyu * @Date 2023/5/31 11:12 */ @Component public class XxlJobClient { private String COOKIE =""; private HttpClient httpClient; private static final String POST_FORM_CONTENT_TYPE = "application/x-www-form-urlencoded; charset=UTF-8"; private static final Header POST_FORM_CONTENT_TYPE_HEADER = new Header("Content-Type",POST_FORM_CONTENT_TYPE); @Autowired private XxlJobClientConfigProperties clientConfigProperties; private final Logger log = LoggerFactory.getLogger(getClass()); @PostConstruct public void init() throws IOException { log.debug("xxl JOB 初始化配置:{}",clientConfigProperties.toString()); httpClient = new HttpClient(); login(); } /** * 登录获取 cookie * @throws IOException */ private void login() throws IOException { HttpMethod postMethod = new PostMethod(clientConfigProperties.getLoginUrl()); httpClient.executeMethod(postMethod); if (postMethod.getStatusCode() == 200) { Cookie[] cookies = httpClient.getState().getCookies(); StringBuilder tmpCookies = new StringBuilder(); for (Cookie c : cookies) { tmpCookies.append(c.toString()).append(";"); } COOKIE = tmpCookies.toString(); log.debug("xxlJob 登录成功"); }else { log.debug("xxlJob 登录失败:{}",postMethod.getStatusCode()); } } /** * 创建任务 * @param params * @return * @throws IOException */ public JSONObject createJob(JSONObject params) throws IOException { return doPost(clientConfigProperties.getJobInfoAddUrl(), params); } /** * 更新任务 * @param params * @return * @throws IOException */ public JSONObject updateJob(JSONObject params) throws IOException { return doPost(clientConfigProperties.getJobInfoUpdateUrl(), params); } /** * 根据任务 ID 加载 * @param id * @return * @throws IOException */ public JSONObject loadById(int id) throws IOException { log.info("loadById: {}",id); return doGet(String.format(clientConfigProperties.getJobInfoLoadByIdUrl(),id)); } /** * 删除任务 * @param id 任务 ID * @return * @throws IOException */ public JSONObject deleteJob(int id) throws IOException { log.info("deleteJob: {}",id); return doGet(String.format(clientConfigProperties.getJobInfoDeleteUrl(),id)); } /** * 开启任务 * @param id 任务 ID * @return * @throws IOException */ public JSONObject startJob(int id) throws IOException { log.info("startJob: {}",id); return doGet(String.format(clientConfigProperties.getJobInfoStartJobUrl(),id)); } /** * 停止任务 * @param id 任务 ID * @return * @throws IOException */ public JSONObject stopJob(int id) throws IOException { log.info("stopJob: {}",id); return doGet(String.format(clientConfigProperties.getJobInfoStopJobUrl(),id)); } /** * 创建执行器 * @param params * @return * @throws IOException */ public JSONObject createJobGroup(JSONObject params) throws IOException { return doPost(clientConfigProperties.getJobGroupSaveUrl(), params); } /** * 执行器列表 * @param params * @return * @throws IOException */ public JSONObject jobGroupPageList(JSONObject params) throws IOException { params.put("start",Optional.ofNullable(params.getInteger("start")).orElse(0)); params.put("length", Optional.ofNullable(params.getInteger("length")).orElse(10)); return doPost(clientConfigProperties.getJobGroupPageListUrl(),params); } /** * 任务列表 * @param params * @return * @throws IOException */ public JSONObject jobInfoPageList(JSONObject params) throws IOException { params.put("start",Optional.ofNullable(params.getInteger("start")).orElse(0)); params.put("length", Optional.ofNullable(params.getInteger("length")).orElse(10)); return doPost(clientConfigProperties.getJobInfoPageListUrl(),params); } /** * 发起 GET 请求 * @param url * @return * @throws IOException */ private JSONObject doGet(String url) throws IOException { GetMethod get = new GetMethod(url); get.setRequestHeader("cookie", COOKIE); httpClient.executeMethod(get); return readResponse(get); } /** * post 请求 * @param url * @param params * @return * @throws IOException */ private JSONObject doPost(String url,JSONObject params) throws IOException { PostMethod post = new PostMethod(url); post.setRequestHeader("cookie", COOKIE); List<NameValuePair> pairList = new ArrayList<>(); params.forEach((k,v)-> pairList.add(new NameValuePair(k, v.toString()))); NameValuePair[] arr = pairList.toArray(new NameValuePair[0]); post.setRequestBody(arr); post.setRequestHeader(POST_FORM_CONTENT_TYPE_HEADER); httpClient.executeMethod(post); return readResponse(post); } /** * 处理响应内容 * @param httpMethod * @return * @throws IOException */ private JSONObject readResponse(HttpMethod httpMethod) { if (httpMethod.getStatusCode() == HttpStatus.SC_OK) { try (InputStream inputStream = httpMethod.getResponseBodyAsStream(); InputStreamReader inputStreamReader = new InputStreamReader(inputStream); BufferedReader bufferedReader = new BufferedReader(inputStreamReader)) { return JSON.parseObject(bufferedReader.lines().collect(Collectors.joining(System.lineSeparator()))); } catch (IOException e) { log.error("读取响应失败:{}", e.getMessage(), e); JSONObject error = new JSONObject(); error.put("code",HttpStatus.SC_INTERNAL_SERVER_ERROR); error.put("msg","响应内容读取失败:"+e.getMessage()); return error; } } return new JSONObject(); } }
application.yml
配置
xxl: job: admin: addresses: http://192.168.80.88:8761/xxl-job-admin client: userName: admin password: 123456 loginUrl: ${xxl.job.admin.addresses}/login?userName=${xxl.job.client.userName}&password=${xxl.job.client.password} addUrl: ${xxl.job.admin.addresses}/jobinfo/add deleteUrl: ${xxl.job.admin.addresses}/jobinfo/remove?id=%s startJobUrl: ${xxl.job.admin.addresses}/jobinfo/start?id=%s stopJobUrl: ${xxl.job.admin.addresses}/jobinfo/stop?id=%s
示例
- 创建任务
XxlJobInfoBO info = new XxlJobInfoBO(); JSONObject executorParams = new JSONObject(); executorParams.put("id",id); info.setExecutorParam(JSONObject.toJSONString(executorParams)); info.setScheduleConf(cronExpression); // 省略其他参数 JSONObject jobInfo = JSONObject.parseObject(com.alibaba.fastjson2.JSON.toJSONString(info)); JSONObject creatResult = xxlJobClient.createJob(jobInfo);
- 开启任务
final String xxlJobAdminContentKey = "content"; Integer jobId = creatResult.getInteger(xxlJobAdminContentKey); xxlJobClient.startJob(jobId);
总结
在使用时,直接注入XxlJobClient的bean对象,然后封装调用参数即可,具体参考不同版本可能会有变化,具体以使用版本为主。如遇到问题欢迎评论区留言。感觉对你有帮助的话欢迎点赞关注转发。