背景
本场景需要往数据库调度任务表或者从后台页面自行配置相关参数,实现动态可控的定时任务,定时调起linkis服务。
linkis restApi接口介绍
提交任务接口
接口 /api/rest_j/v1/entrance/submit 提交方式 POST
spark引擎提交
{ "executionContent": { "code": "show databases", "runType": "sql" // 运行的脚本类型 sql, py(pyspark), scala, data_calc }, "params": { "variable": { // 自定义参数,做变量替换 "execDateTime": "2022-04-21 12:10:00", // 下面两个变量可废弃 "dtDay":"2022-04-21", "dtHour":"14", }, "configuration": { // 非必填 "startup": { // 指定集群资源参数 "spark.executor.memory": "1g", "spark.driver.memory": "1g", "spark.executor.cores": "1", "spark.executor.instances": 1, // 是否加载用户所有 udf,调度系统固定 false,按需加载 "user.udf.all.load": false, // udf 的 id 列表,多个用英文逗号分隔 "user.udf.custom.ids": "48,12,5" }, // 非必填,运行时参数 "runtime": { } } }, "source": { // 可选 "scriptPath": "file:///mnt/bdp/hadoop/xxx.sql" // file:///或者hdfs:/// }, "labels": { "engineType": "spark-2.4.7", // userCreator: dip 是任务提交的用户所属组织code,yarn 的资源用户。 // IDE 是系统名,在 Linkis 后台管理,含义:dip 用户 在 IDE 上所用资源。 "userCreator": "dip-IDE", // 非必填,执行一次,就是计算完成后关闭当前引擎(如 Spark )实例,不复用原有实例,任务调度使用 "executeOnce": "true", // 资源不足时,重试次数 "jobRetryCount": 30, // 资源不足时,重试间隔,单位ms "jobRetryTimeout": 60000, // 非必填,指定特定机器执行 "emInstance": "linkis-cg-engineconnmanager-172.21.213.155:9102" } }
jdbc方式提交
{ "executionContent": { "code": "select * from cxo_vehicle_mile limit 10", "runType": "jdbc" }, "params": { "variable": { // 参考 Spark 类型 }, "configuration": { "runtime": { "jdbc.url": "jdbc:mysql://...", "jdbc.username": "xxx", "jdbc.password": "xxx" } } }, "labels": { "engineType": "jdbc-4", "userCreator": "linkis-IDE" } }
python形式提交
{ "executionContent": { "code": "print(100)", "runType": "python" }, "params": { "variable": { // 参考 Spark 类型 }, "configuration": { // 非必填 "startup": { // 用户自定义python的hdfs路径,现阶段仅支持.tar.gz包 "spark.user.hdfs.python": "hdfs:///linkis_test/bml/default/python.tar.gz", // 用户自定义python包解压之后的python执行程序的相对路径 "spark.user.hdfs.python.rp": "/python/bin/python3" }, "runtime": { } } }, "labels": { "engineType": "python-python3", "userCreator": "linkis-IDE" } }
hive引擎提交
{ "executionContent": { "code": "show databases", "runType": "hql" }, "params": { "variable": { // 参考 Spark 类型 }, "configuration": { "startup": { // Yarn 上 AppName "hive.session.id": "yarn_app_name" }, "runtime": null } }, "source": null, "labels": { "engineType": "hive-1.2.1", "userCreator": "dip_bp-IDE", "executeOnce": "true" } }
presto
{ "executionContent": {"code": "show teblas;", "runType": "psql"}, "params": { "variable": {}, "configuration": { "runtime": { "wds.linkis.presto.url":"http://127.0.0.1:9090", "wds.linkis.presto.catalog ":"hive", "wds.linkis.presto.schema ":"default", "wds.linkis.presto.source ":"" } } }, "source": {"scriptPath": "file:///mnt/bdp/hadoop/1.sql"}, "labels": { "engineType": "presto-0.234", "userCreator": "hadoop-IDE" } }
trnio
{ "executionContent": {"code": "select * from system.jdbc.schemas limit 10;", "runType": "sql"}, "params": { "variable": {}, "configuration": { "runtime": { "linkis.trino.url":"http://127.0.0.1:8080", "linkis.trino.catalog ":"hive", "linkis.trino.schema ":"default" } } }, "labels": { "engineType": "trino-371", "userCreator": "hadoop-IDE" } }
Seatunnel
{ "executionContent": {"code": 'env { spark.app.name = "SeaTunnel" spark.executor.instances = 2 spark.executor.cores = 1 spark.executor.memory = "1g" } source { Fake { result_table_name = "my_dataset" } } transform {} sink {Console {}}', "runType": "sql"}, "params": { "variable": {}, "configuration": { "runtime": { "wds.linkis.engine.seatunnel.plugin.home":"/opt/linkis/seatunnel" } } }, "labels": { "engineType": "seatunnel-2.1.2", "userCreator": "hadoop-IDE" } }
获取状态
接口 /api/rest_j/v1/entrance/${execID}/status 提交方式 GET
返回示例:
{ "method": "/api/rest_j/v1/entrance/{execID}/status", "status": 0, "message": "获取状态成功", "data": { "execID": "${execID}", "status": "Running" } }
获取日志
接口 /api/rest_j/v1/entrance/${execID}/log?fromLine=${fromLine}&size=${size} 提交方式 GET
请求参数fromLine是指从第几行开始获取,size是指该次请求获取几行日志 返回示例,其中返回的fromLine需要作为下次请求该接口的参数
返回示例:
{ "method": "/api/rest_j/v1/entrance/${execID}/log", "status": 0, "message": "返回日志信息", "data": { "execID": "${execID}", "log": ["error日志","warn日志","info日志", "all日志"], "fromLine": 56 } }
获取进度
接口 /api/rest_j/v1/entrance/${execID}/progress 提交方式 GET
返回示例:
{ "method": "/api/rest_j/v1/entrance/{execID}/progress", "status": 0, "message": "返回进度信息", "data": { "execID": "${execID}", "progress": 0.2, "progressInfo": [ { "id": "job-1", "succeedTasks": 2, "failedTasks": 0, "runningTasks": 5, "totalTasks": 10 }, { "id": "job-2", "succeedTasks": 5, "failedTasks": 0, "runningTasks": 5, "totalTasks": 10 } ] } }
获取历史任务信息
接口 /api/rest_j/v1/jobhistory/{id}/get 提交方式 GET
请求参数:
返回示例:
{ "method": null, "status": 0, "message": "OK", "data": { "task": { "taskID": 1, "instance": "xxx", "execId": "exec-id-xxx", "umUser": "test", "engineInstance": "xxx", "progress": "10%", "logPath": "hdfs://xxx/xxx/xxx", "resultLocation": "hdfs://xxx/xxx/xxx", "status": "FAILED", "createdTime": "2019-01-01 00:00:00", "updatedTime": "2019-01-01 01:00:00", "engineType": "spark", "errorCode": 100, "errDesc": "Task Failed with error code 100", "executeApplicationName": "hello world", "requestApplicationName": "hello world", "runType": "xxx", "paramJson": "{\"xxx\":\"xxx\"}", "costTime": 10000, "strongerExecId": "execId-xxx", "sourceJson": "{\"xxx\":\"xxx\"}" } } }
获取结果集信息
接口 /api/rest_j/v1/filesystem/getDirFileTrees 提交方式 GET
请求参数:
返回示例:
{ "method": "/api/filesystem/getDirFileTrees", "status": 0, "message": "OK", "data": { "dirFileTrees": { "name": "1946923", "path": "hdfs:///tmp/hadoop/linkis/2022-07-06/211446/IDE/1946923", "properties": null, "children": [ { "name": "_0.dolphin", "path": "hdfs:///tmp/hadoop/linkis/2022-07-06/211446/IDE/1946923/_0.dolphin",//result set 1 "properties": { "size": "7900", "modifytime": "1657113288360" }, "children": null, "isLeaf": true, "parentPath": "hdfs:///tmp/hadoop/linkis/2022-07-06/211446/IDE/1946923" }, { "name": "_1.dolphin", "path": "hdfs:///tmp/hadoop/linkis/2022-07-06/211446/IDE/1946923/_1.dolphin",//result set 2 "properties": { "size": "7900", "modifytime": "1657113288614" }, "children": null, "isLeaf": true, "parentPath": "hdfs:///tmp/hadoop/linkis/2022-07-06/211446/IDE/1946923" } ], "isLeaf": false, "parentPath": null } } }
获取结果集内容
接口 /api/rest_j/v1/filesystem/openFile 提交方式 GET
请求参数:
返回示例:
{ "method": "/api/filesystem/openFile", "status": 0, "message": "OK", "data": { "metadata": [ { "columnName": "count(1)", "comment": "NULL", "dataType": "long" } ], "totalPage": 0, "totalLine": 1, "page": 1, "type": "2", "fileContent": [ [ "28" ] ] } }
linkis 任务提交代码
linkisService层代
@Service("monitorLinkisService") public class MonitorLinkisServiceImp implements IMonitorLinkisService { private final static Logger LOGGER = LoggerFactory.getLogger(MonitorLinkisServiceImp.class); @Resource RestTemplate restTemplate; final IMonitorHistoryService monitorHistoryService; final IMonitorRuleResultService monitorRuleResultService; public MonitorLinkisServiceImp(IMonitorHistoryService monitorHistoryService, IMonitorRuleResultService monitorRuleResultService) { this.monitorHistoryService = monitorHistoryService; this.monitorRuleResultService = monitorRuleResultService; } @Override public JSONObject submit_1x(String sql, String name, String tokenUser, CommonConstant.ExecType execType) { JSONObject body = new JSONObject(); JSONObject params = new JSONObject(); JSONObject variableParams = new JSONObject(); JSONObject source = new JSONObject(); JSONObject executionContent = new JSONObject(); JSONObject labels = new JSONObject(); JSONObject configuration = new JSONObject(); JSONObject startup = new JSONObject(); JSONObject runtime= new JSONObject(); executionContent.put("code",sql); executionContent.put("runType", "sql"); variableParams.put("execDateTime", "2022-04-21 12:10:00"); variableParams.put("dtDay", "2022-04-21"); variableParams.put("dtHour", "14"); // startup.put("spark.executor.memory", "1g"); // startup.put("spark.driver.memory", "1g"); // startup.put("spark.executor.cores", "1"); // startup.put("spark.executor.instances", 1); // startup.put("user.udf.all.load", "false"); // startup.put("spark.udf.all.load","48,12,5"); configuration.put("startup",startup); configuration.put("runtime",runtime); params.put("variable", variableParams); params.put("configuration", configuration); runtime.put("jdbc.url","jdbc:mysql://localhost:3306/flink_drools?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&serverTimezone=Asia/Shanghai"); runtime.put("jdbc.username","user"); runtime.put("jdbc.password","Mdz6900520"); params.put("runtime", runtime); labels.put("engineType", "spark-2.4.7"); labels.put("userCreator", "dip_bp-IDE"); labels.put("executeOnce", "true"); labels.put("jobRetryCount", 30); labels.put("jobRetryTimeout", 60000); labels.put("emInstance", links-cg-engineconnmanager-xxx.xxx.xx.xx:9092); body.put("executionContent", executionContent); body.put("params", params); body.put("source", source); body.put("labels", labels); LinkisResponse res = this.linkisRestAPI(CommonConstant.LinkisRestApiEnum.submit, HttpMethod.POST, body, tokenUser, execType); if (res.getStatus() != CommonConstant.LinkisResponseStatus.OK.getValue()) { LOGGER.error("[linkis-submit1x-response] 提交任务失败!linkis响应状态:[{}],错误信息:[{}]", CommonConstant.getDescByStatusForLinkisResponseStatus(res.getStatus()), res.getMessage()); throw new DmpException(String.format("[linkis-submit1x-response] 提交任务失败!linkis响应状态:[%s],错误信息:[%s]", CommonConstant.getDescByStatusForLinkisResponseStatus(res.getStatus()), res.getMessage())); } JSONObject data = res.getData(); System.out.println(data); LOGGER.info("[linkis-submit1x-sql] 提交SQL成功!结果:[{}]", data.toJSONString()); return data; } /** * linkis通用调用方式 * * @param api * @param httpMethod * @param body * @param uriVariables * @return */ private LinkisResponse linkisRestAPI(CommonConstant.LinkisRestApiEnum api, HttpMethod httpMethod, Object body, String tokenUser, CommonConstant.ExecType execType, String... uriVariables) { String uri; switch (api) { case kill: uri = "/api/rest_j/v1/entrance/{execID}/kill"; break; case log: uri = "/api/rest_j/v1/entrance/{execID}/log?fromLine={fromLine}&size={size}"; break; case get: uri = "/api/rest_j/v1/jobhistory/{taskID}/get"; break; case status: uri = "/api/rest_j/v1/entrance/{execID}/status"; break; case progress: uri = "/api/rest_j/v1/entrance/{execID}/progress"; break; case openFile: uri = "/api/rest_j/v1/filesystem/openFile?path={path}"; break; case execute: uri = "/api/rest_j/v1/entrance/execute"; break; case submit: uri = "/api/rest_j/v1/entrance/submit"; break; case openLog: uri = "/api/rest_j/v1/filesystem/openLog?path={path}&proxyUser={proxyUser}"; break; default: throw new DmpException("暂不支持!"); } String url = "http://ip:port"+ uri; Map<String, String> headerMap = new HashMap<>(); headerMap.put("token-code", "TEST-AUTH"); headerMap.put("token-user", "test"); HttpHeaders headers = new HttpHeaders(); headers.setAll(headerMap); HttpEntity httpEntity = new HttpEntity(body, headers); LOGGER.info("[linkisRestAPI] header: {} body: {}", JSONObject.toJSONString(headerMap), JSONObject.toJSONString(body)); ResponseEntity<String> res = restTemplate.exchange(url, httpMethod, httpEntity, String.class, uriVariables); System.out.println(res); int statusCode = res.getStatusCodeValue(); LinkisResponse linkisResponse = JSON.parseObject(res.getBody(), LinkisResponse.class); linkisResponse.setHttpStatusCode(statusCode); return linkisResponse; } }
JobService层
@Service("monitorJobService") public class MonitorJobServiceImpl extends ServiceImpl<MonitorJobMapper, MonitorJob> implements IMonitorJobService { private final static Logger LOGGER = LoggerFactory.getLogger(MonitorJobServiceImpl.class); private final static Pattern EXPR_PATTERN = Pattern.compile("(?<=given input columns: \\[).*?(?=];)"); private final static Pattern CHINESE_PATTERN = Pattern.compile("[\u4e00-\u9fa5]"); @Resource final MonitorJobMapper monitorJobMapper; final IMonitorHistoryService monitorHistoryService; final IMonitorLinkisService monitorLinkisService; final IMonitorRuleResultService monitorRuleService; final IMonitorQuartzService monitorQuartzService; public MonitorJobServiceImpl(MonitorJobMapper monitorJobMapper, IMonitorHistoryService monitorHistoryService, IMonitorLinkisService monitorLinkisService, IMonitorRuleResultService monitorRuleService, IMonitorQuartzService monitorQuartzService) { this.monitorJobMapper = monitorJobMapper; this.monitorHistoryService = monitorHistoryService; this.monitorLinkisService = monitorLinkisService; this.monitorRuleService = monitorRuleService; this.monitorQuartzService = monitorQuartzService; } @Override public Integer monitorOnce(Integer id, String group, String tokenUser, String dt, CommonConstant.ExecType execType) { MonitorRuleDetailResponse body = monitorRuleService.getById(id); Integer ruleId = body.getRuleId(); String sqlTmp = body.getSqlTmp(); MonitorJobDetailResponse job = ObjectUtils.copy(body, MonitorJobDetailResponse.class); // 提交任务 JSONObject data = monitorLinkisService.submit_1x(sqlTmp, job.getId() + job.getJobName(), tokenUser, execType); // 维护执行历史表。将执行记录写入执行历史表 MonitorHistoryAddRequest entity = ObjectUtils.copy(job, MonitorHistoryAddRequest.class, "id"); // job表的id <--> 执行历史表的job_id entity.setJobId(job.getId()); // 设置执行类型,0-定时执行,1-手动执行,2-调度平台执行,3-补数执行 entity.setExecType(CommonConstant.getValueByNameForJobGroupName(group)); // 新建状态,设置任务为未完成状态 entity.setCompleted(CommonConstant.Completed.UNCOMPLETE.getValue()); // 新建状态,设置执行状态为初始化 entity.setExecStatus(CommonConstant.ExecutionNodeStatus.Inited.getState()); // 新建状态,设置执行进度0.0 entity.setExecProgress("0.0"); // 新建状态,设置开始时间为当前时间 entity.setStartTime(LocalDateTime.now()); // 设置execID entity.setExecId(data.get("execID").toString()); // 设置taskID entity.setTaskId(data.get("taskID").toString()); // 设置tokenUser entity.setTokenUser(tokenUser); MonitorHistoryDetailResponse hisEntity = monitorHistoryService.save(entity); LOGGER.info("[Monitor-Once] 执行成功!新建执行历史记录, 执行历史ID:[{}], 任务ID:[{}], 执行类型:[{}]", hisEntity.getId(), id, group); return hisEntity.getId(); } } }
整合quartz
想要通过quartz动态自定义定时任务,需要持久化一些quartz相关的111张表,以下为建表sql
USE dip_vehile_monitor; -- QUARTZ_TABLE DROP TABLE IF EXISTS QRTZ_FIRED_TRIGGERS; DROP TABLE IF EXISTS QRTZ_PAUSED_TRIGGER_GRPS; DROP TABLE IF EXISTS QRTZ_SCHEDULER_STATE; DROP TABLE IF EXISTS QRTZ_LOCKS; DROP TABLE IF EXISTS QRTZ_SIMPLE_TRIGGERS; DROP TABLE IF EXISTS QRTZ_SIMPROP_TRIGGERS; DROP TABLE IF EXISTS QRTZ_CRON_TRIGGERS; DROP TABLE IF EXISTS QRTZ_BLOB_TRIGGERS; DROP TABLE IF EXISTS QRTZ_TRIGGERS; DROP TABLE IF EXISTS QRTZ_JOB_DETAILS; DROP TABLE IF EXISTS QRTZ_CALENDARS; CREATE TABLE QRTZ_JOB_DETAILS( SCHED_NAME VARCHAR(120) NOT NULL, JOB_NAME VARCHAR(200) NOT NULL, JOB_GROUP VARCHAR(200) NOT NULL, DESCRIPTION VARCHAR(250) NULL, JOB_CLASS_NAME VARCHAR(250) NOT NULL, IS_DURABLE VARCHAR(1) NOT NULL, IS_NONCONCURRENT VARCHAR(1) NOT NULL, IS_UPDATE_DATA VARCHAR(1) NOT NULL, REQUESTS_RECOVERY VARCHAR(1) NOT NULL, JOB_DATA BLOB NULL, PRIMARY KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)) ENGINE=InnoDB; CREATE TABLE QRTZ_TRIGGERS ( SCHED_NAME VARCHAR(120) NOT NULL, TRIGGER_NAME VARCHAR(200) NOT NULL, TRIGGER_GROUP VARCHAR(200) NOT NULL, JOB_NAME VARCHAR(200) NOT NULL, JOB_GROUP VARCHAR(200) NOT NULL, DESCRIPTION VARCHAR(250) NULL, NEXT_FIRE_TIME BIGINT(13) NULL, PREV_FIRE_TIME BIGINT(13) NULL, PRIORITY INTEGER NULL, TRIGGER_STATE VARCHAR(16) NOT NULL, TRIGGER_TYPE VARCHAR(8) NOT NULL, START_TIME BIGINT(13) NOT NULL, END_TIME BIGINT(13) NULL, CALENDAR_NAME VARCHAR(200) NULL, MISFIRE_INSTR SMALLINT(2) NULL, JOB_DATA BLOB NULL, PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP), FOREIGN KEY (SCHED_NAME,JOB_NAME,JOB_GROUP) REFERENCES QRTZ_JOB_DETAILS(SCHED_NAME,JOB_NAME,JOB_GROUP)) ENGINE=InnoDB; CREATE TABLE QRTZ_SIMPLE_TRIGGERS ( SCHED_NAME VARCHAR(120) NOT NULL, TRIGGER_NAME VARCHAR(200) NOT NULL, TRIGGER_GROUP VARCHAR(200) NOT NULL, REPEAT_COUNT BIGINT(7) NOT NULL, REPEAT_INTERVAL BIGINT(12) NOT NULL, TIMES_TRIGGERED BIGINT(10) NOT NULL, PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP), FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP) REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)) ENGINE=InnoDB; CREATE TABLE QRTZ_CRON_TRIGGERS ( SCHED_NAME VARCHAR(120) NOT NULL, TRIGGER_NAME VARCHAR(200) NOT NULL, TRIGGER_GROUP VARCHAR(200) NOT NULL, CRON_EXPRESSION VARCHAR(120) NOT NULL, TIME_ZONE_ID VARCHAR(80), PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP), FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP) REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)) ENGINE=InnoDB; CREATE TABLE QRTZ_SIMPROP_TRIGGERS ( SCHED_NAME VARCHAR(120) NOT NULL, TRIGGER_NAME VARCHAR(200) NOT NULL, TRIGGER_GROUP VARCHAR(200) NOT NULL, STR_PROP_1 VARCHAR(512) NULL, STR_PROP_2 VARCHAR(512) NULL, STR_PROP_3 VARCHAR(512) NULL, INT_PROP_1 INT NULL, INT_PROP_2 INT NULL, LONG_PROP_1 BIGINT NULL, LONG_PROP_2 BIGINT NULL, DEC_PROP_1 NUMERIC(13,4) NULL, DEC_PROP_2 NUMERIC(13,4) NULL, BOOL_PROP_1 VARCHAR(1) NULL, BOOL_PROP_2 VARCHAR(1) NULL, PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP), FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP) REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)) ENGINE=InnoDB; CREATE TABLE QRTZ_BLOB_TRIGGERS ( SCHED_NAME VARCHAR(120) NOT NULL, TRIGGER_NAME VARCHAR(200) NOT NULL, TRIGGER_GROUP VARCHAR(200) NOT NULL, BLOB_DATA BLOB NULL, PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP), INDEX (SCHED_NAME,TRIGGER_NAME, TRIGGER_GROUP), FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP) REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)) ENGINE=InnoDB; CREATE TABLE QRTZ_CALENDARS ( SCHED_NAME VARCHAR(120) NOT NULL, CALENDAR_NAME VARCHAR(200) NOT NULL, CALENDAR BLOB NOT NULL, PRIMARY KEY (SCHED_NAME,CALENDAR_NAME)) ENGINE=InnoDB; CREATE TABLE QRTZ_PAUSED_TRIGGER_GRPS ( SCHED_NAME VARCHAR(120) NOT NULL, TRIGGER_GROUP VARCHAR(200) NOT NULL, PRIMARY KEY (SCHED_NAME,TRIGGER_GROUP)) ENGINE=InnoDB; CREATE TABLE QRTZ_FIRED_TRIGGERS ( SCHED_NAME VARCHAR(120) NOT NULL, ENTRY_ID VARCHAR(95) NOT NULL, TRIGGER_NAME VARCHAR(200) NOT NULL, TRIGGER_GROUP VARCHAR(200) NOT NULL, INSTANCE_NAME VARCHAR(200) NOT NULL, FIRED_TIME BIGINT(13) NOT NULL, SCHED_TIME BIGINT(13) NOT NULL, PRIORITY INTEGER NOT NULL, STATE VARCHAR(16) NOT NULL, JOB_NAME VARCHAR(200) NULL, JOB_GROUP VARCHAR(200) NULL, IS_NONCONCURRENT VARCHAR(1) NULL, REQUESTS_RECOVERY VARCHAR(1) NULL, PRIMARY KEY (SCHED_NAME,ENTRY_ID)) ENGINE=InnoDB; CREATE TABLE QRTZ_SCHEDULER_STATE ( SCHED_NAME VARCHAR(120) NOT NULL, INSTANCE_NAME VARCHAR(200) NOT NULL, LAST_CHECKIN_TIME BIGINT(13) NOT NULL, CHECKIN_INTERVAL BIGINT(13) NOT NULL, PRIMARY KEY (SCHED_NAME,INSTANCE_NAME)) ENGINE=InnoDB; CREATE TABLE QRTZ_LOCKS ( SCHED_NAME VARCHAR(120) NOT NULL, LOCK_NAME VARCHAR(40) NOT NULL, PRIMARY KEY (SCHED_NAME,LOCK_NAME)) ENGINE=InnoDB; CREATE INDEX IDX_QRTZ_J_REQ_RECOVERY ON QRTZ_JOB_DETAILS(SCHED_NAME,REQUESTS_RECOVERY); CREATE INDEX IDX_QRTZ_J_GRP ON QRTZ_JOB_DETAILS(SCHED_NAME,JOB_GROUP); CREATE INDEX IDX_QRTZ_T_J ON QRTZ_TRIGGERS(SCHED_NAME,JOB_NAME,JOB_GROUP); CREATE INDEX IDX_QRTZ_T_JG ON QRTZ_TRIGGERS(SCHED_NAME,JOB_GROUP); CREATE INDEX IDX_QRTZ_T_C ON QRTZ_TRIGGERS(SCHED_NAME,CALENDAR_NAME); CREATE INDEX IDX_QRTZ_T_G ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_GROUP); CREATE INDEX IDX_QRTZ_T_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_STATE); CREATE INDEX IDX_QRTZ_T_N_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP,TRIGGER_STATE); CREATE INDEX IDX_QRTZ_T_N_G_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_GROUP,TRIGGER_STATE); CREATE INDEX IDX_QRTZ_T_NEXT_FIRE_TIME ON QRTZ_TRIGGERS(SCHED_NAME,NEXT_FIRE_TIME); CREATE INDEX IDX_QRTZ_T_NFT_ST ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_STATE,NEXT_FIRE_TIME); CREATE INDEX IDX_QRTZ_T_NFT_MISFIRE ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME); CREATE INDEX IDX_QRTZ_T_NFT_ST_MISFIRE ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME,TRIGGER_STATE); CREATE INDEX IDX_QRTZ_T_NFT_ST_MISFIRE_GRP ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME,TRIGGER_GROUP,TRIGGER_STATE); CREATE INDEX IDX_QRTZ_FT_TRIG_INST_NAME ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,INSTANCE_NAME); CREATE INDEX IDX_QRTZ_FT_INST_JOB_REQ_RCVRY ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,INSTANCE_NAME,REQUESTS_RECOVERY); CREATE INDEX IDX_QRTZ_FT_J_G ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,JOB_NAME,JOB_GROUP); CREATE INDEX IDX_QRTZ_FT_JG ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,JOB_GROUP); CREATE INDEX IDX_QRTZ_FT_T_G ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP); CREATE INDEX IDX_QRTZ_FT_TG ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,TRIGGER_GROUP); -- HTTPJOB_DETAILS TABLE DROP TABLE IF EXISTS HTTPJOB_DETAILS; CREATE TABLE HTTPJOB_DETAILS( ID INT(11) NOT NULL AUTO_INCREMENT, JOB_NAME VARCHAR(200) NOT NULL, JOB_GROUP VARCHAR(200) NOT NULL, DESCRIPTION varchar(200) CHARACTER SET utf8 COLLATE utf8_general_ci NULL, REQUEST_TYPE varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, HTTP_URL VARCHAR(256) CHARACTER SET utf8 COLLATE utf8_general_ci NULL, HTTP_PARAMS VARCHAR(200) CHARACTER SET utf8 COLLATE utf8_general_ci NULL, CREATE_TIME TIMESTAMP(0) NULL DEFAULT CURRENT_TIMESTAMP, UPDATE_TIME TIMESTAMP(0) NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP(0), PRIMARY KEY (ID)) ENGINE=InnoDB; -- HTTPJOB_LOGS TABLE DROP TABLE IF EXISTS HTTPJOB_LOGS; CREATE TABLE HTTPJOB_LOGS( ID INT(11) NOT NULL AUTO_INCREMENT, JOB_NAME VARCHAR(200) NOT NULL, JOB_GROUP VARCHAR(200) NOT NULL, REQUEST_TYPE varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, HTTP_URL VARCHAR(256) CHARACTER SET utf8 COLLATE utf8_general_ci NULL, HTTP_PARAMS VARCHAR(200) CHARACTER SET utf8 COLLATE utf8_general_ci NULL, FIRE_TIME TIMESTAMP(0) NULL DEFAULT CURRENT_TIMESTAMP, RESULT VARCHAR(500) CHARACTER SET utf8 COLLATE utf8_general_ci NULL, PRIMARY KEY (ID)) ENGINE=InnoDB; ALTER TABLE HTTPJOB_DETAILS ADD UNIQUE INDEX `UNIQUEIDX_HTTPJOB_JN_JG`(`JOB_NAME`, `JOB_GROUP`); ALTER TABLE HTTPJOB_LOGS ADD INDEX `IDX_HTTPJOBHISTORY_JN_JG`(`JOB_NAME`, `JOB_GROUP`); commit;
quarzt.properties
#quarz相关配置 # =========================================================================== org.quartz.scheduler.instanceName: MyQuartzScheduler org.quartz.scheduler.instanceId: AUTO #============================================================================ # Configure ThreadPool #============================================================================ org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool org.quartz.threadPool.threadCount: 25 org.quartz.threadPool.threadPriority: 5 #============================================================================ # Configure JobStore #============================================================================ org.quartz.jobStore.misfireThreshold: 60000 org.quartz.jobStore.class = org.springframework.scheduling.quartz.LocalDataSourceJobStore org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate org.quartz.jobStore.useProperties: false org.quartz.jobStore.dataSource: myDS org.quartz.jobStore.tablePrefix: QRTZ_ org.quartz.jobStore.isClustered: true org.quartz.jobStore.clusterCheckinInterval: 20000 org.quartz.jobStore.acquireTriggersWithinLock: true ## Spring DATASOURCE (DataSourceAutoConfiguration & DataSourceProperties) spring.quartz.properties.org.quartz.dataSource.myDS.driver = com.mysql.jdbc.Driver spring.quartz.properties.org.quartz.dataSource.myDS.URL = jdbc:mysql://dip57-mysql-test.chj.cloud:3306/myDS spring.quartz.properties.org.quartz.dataSource.myDS.user = dip_vehile_monitor_rw spring.quartz.properties.org.quartz.dataSource.myDS.password = cgwcr8so!jedrR85 spring.quartz.properties.org.quartz.dataSource.myDS.maxConnections = 5
QuartzJobFactory类
@Configuration public class QuartzJobFactory extends AdaptableJobFactory { /** * @description:这个对象Spring会帮我们自动注入进来 * @author: */ @Autowired private AutowireCapableBeanFactory autowireCapableBeanFactory; @Override protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception { Object jobInstance = super.createJobInstance(bundle); //通过以下方式,解决Job任务无法使用Spring中的Bean问题 autowireCapableBeanFactory.autowireBean(jobInstance); return jobInstance; } }
QuartzSchedulerConfig
/** * @description:quartz配置类,将调度器交给spring管理 * @author * @time: */ @Configuration public class QuartzSchedulerConfig implements SchedulerFactoryBeanCustomizer { @Autowired private DataSource dataSource; @Autowired private QuartzJobFactory quartzJobFactory; @Bean public SchedulerFactoryBean schedulerFactoryBean() { SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean(); try { schedulerFactoryBean.setAutoStartup(true); schedulerFactoryBean.setDataSource(dataSource); schedulerFactoryBean.setJobFactory(quartzJobFactory); schedulerFactoryBean.setQuartzProperties(properties()); schedulerFactoryBean.setOverwriteExistingJobs(true); // 延迟3s启动quartz schedulerFactoryBean.setStartupDelay(3); schedulerFactoryBean.setWaitForJobsToCompleteOnShutdown(true); } catch (IOException e) { e.printStackTrace(); } return schedulerFactoryBean; } /** * @description:读取自定义的properties文件 * @author: jie */ @Bean public Properties properties() throws IOException { PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean(); propertiesFactoryBean.setLocation(new ClassPathResource("/quartz.properties")); propertiesFactoryBean.afterPropertiesSet(); return propertiesFactoryBean.getObject(); } @Bean(name = "scheduler") public Scheduler scheduler() { return schedulerFactoryBean().getScheduler(); } @Override public void customize(SchedulerFactoryBean schedulerFactoryBean) { schedulerFactoryBean.setStartupDelay(2); schedulerFactoryBean.setAutoStartup(true); schedulerFactoryBean.setOverwriteExistingJobs(true); schedulerFactoryBean.setJobFactory(quartzJobFactory); } }
QuartzServiceImp
@Service("MonitorQuartzService") @Transactional public class MonitorQuartzServiceImp implements IMonitorQuartzService { private final static Logger LOGGER = LoggerFactory.getLogger(MonitorQuartzServiceImp.class); /** * 注入任务调度器 */ @Autowired private Scheduler scheduler; @Override public void addJob(Class<? extends Job> clazz, JobKey jobKey, JobDataMap jobDataMap) { try { if (scheduler.checkExists(jobKey)) { LOGGER.info("[Quartz-addJob] 任务已存在,无需创建!name:{}, group:{}", jobKey.getName(), jobKey.getGroup()); return; } // 构建JobDetail信息 JobDetail jobDetail = JobBuilder.newJob(clazz) .withIdentity(jobKey) .setJobData(jobDataMap) .storeDurably() .build(); // 添加任务 ,replace=false,不允许有重复的 scheduler.addJob(jobDetail, false); LOGGER.info("[Quartz-addJob] 添加任务成功!name:{}, group:{}", jobKey.getName(), jobKey.getGroup()); } catch (SchedulerException e) { e.printStackTrace(); throw new DmpException(String.format("[Quartz-addJob] 添加任务失败! name: [%s], group: [%s], 异常信息: [%s]",jobKey.getName(), jobKey.getGroup(), e.getMessage())); } } @Override public void scheduleJob(Class<? extends Job> clazz, String jobName, String jobGroupName, String cronExpression) { try { JobKey jobKey = new JobKey(jobName, jobGroupName); if (scheduler.checkExists(jobKey)) { LOGGER.error("[Quartz-scheduleJob] 任务已存在,无法添加调度!name:{}, group:{}", jobName, jobGroupName); throw new DmpException(String.format("[Quartz-scheduleJob] 任务已存在,无法添加调度! name: %s, group: %s",jobName, jobGroupName)); } // 构建JobDetail信息 JobDetail jobDetail = JobBuilder.newJob(clazz) .withIdentity(jobKey) .storeDurably() .build(); // 表达式调度构建器(即任务执行的时间) CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression) // 忽略所有错过任务,按照正常任务执行 .withMisfireHandlingInstructionDoNothing(); // 添加或更新 TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroupName); CronTrigger trigger; if (scheduler.checkExists(triggerKey)) { trigger = (CronTrigger) scheduler.getTrigger(triggerKey); } else { trigger = TriggerBuilder.newTrigger().withIdentity(jobName, jobGroupName) .withSchedule(scheduleBuilder).build(); } // 添加任务 scheduler.scheduleJob(jobDetail, trigger); LOGGER.info("[Quartz-scheduleJob] 添加Cron调度任务成功!name:{}, group:{}", jobName, jobGroupName); } catch (SchedulerException e) { e.printStackTrace(); throw new DmpException(String.format("[Quartz-scheduleJob] 添加Cron调度任务失败! name: [%s], group: [%s], 异常信息: %s",jobName, jobGroupName, e.getMessage())); } } @Override public void pauseTrigger(String jobName, String jobGroupName) { try { TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroupName); if (!scheduler.checkExists(triggerKey)) { LOGGER.error("[Quartz-pauseTrigger] 调度不存在!name:{}, group:{}", jobName, jobGroupName); throw new DmpException(String.format("[Quartz-pauseTrigger] 调度不存在! name: %s, group: %s",jobName, jobGroupName)); } scheduler.pauseTrigger(triggerKey); LOGGER.info("[Quartz-pauseTrigger] 暂停调度成功!name:{}, group:{}", jobName, jobGroupName); } catch (SchedulerException e) { e.printStackTrace(); throw new DmpException(String.format("[Quartz-pauseTrigger] 暂停调度失败! name: [%s], group: [%s], 异常信息: %s",jobName, jobGroupName, e.getMessage())); } } @Override public void resumeTrigger(String jobName, String jobGroupName) { try { TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroupName); if (!scheduler.checkExists(triggerKey)) { LOGGER.error("[Quartz-resumeTrigger] 调度不存在!name:{}, group:{}", jobName, jobGroupName); throw new DmpException(String.format("[Quartz-resumeTrigger] 调度不存在! name: %s, group: %s",jobName, jobGroupName)); } scheduler.resumeTrigger(triggerKey); LOGGER.info("[Quartz-resumeTrigger] 恢复调度成功!name:{}, group:{}", jobName, jobGroupName); } catch (SchedulerException e) { e.printStackTrace(); throw new DmpException(String.format("[Quartz-resumeTrigger] 恢复调度失败! name: [%s], group: [%s], 异常信息: %s",jobName, jobGroupName, e.getMessage())); } } @Override public void deleteJob(String jobName, String jobGroupName) { try { JobKey jobKey = JobKey.jobKey(jobName, jobGroupName); if (!scheduler.checkExists(jobKey)) { LOGGER.info("[Quartz-deleteJob] 任务不存在,无需删除!name:{}, group:{}", jobName, jobGroupName); return; } scheduler.deleteJob(jobKey); LOGGER.info("[Quartz-deleteJob] 删除任务成功!name:{}, group:{}", jobName, jobGroupName); } catch (SchedulerException e) { e.printStackTrace(); throw new DmpException(String.format("[Quartz-deleteJob] 删除任务失败! name: [%s], group: [%s], 异常信息: [%s]",jobName, jobGroupName, e.getMessage())); } } @Override public void scheduleJobExistsIgnore(Class<? extends Job> clazz, String jobName, String jobGroupName, String cronExpression) { try { TriggerKey triggerKey = new TriggerKey(jobName, jobGroupName); Trigger.TriggerState triggerState = scheduler.getTriggerState(triggerKey); if (!scheduler.checkExists(triggerKey)) { LOGGER.info("[Quartz-scheduleJobExistsIgnore] 调度不存在,删除重建!name:{}, group:{}", jobName, jobGroupName); this.deleteJob(jobName, jobGroupName); this.scheduleJob(clazz, jobName, jobGroupName, cronExpression); } else { this.resumeTrigger(jobName, jobGroupName); } LOGGER.info("[Quartz-scheduleJobExistsIgnore] 添加Cron调度任务成功!name:{}, group:{}", jobName, jobGroupName); } catch (SchedulerException e) { e.printStackTrace(); throw new DmpException(String.format("[Quartz-scheduleJobExistsIgnore] 添加Cron调度任务失败! name: [%s], group: [%s], 异常信息: %s",jobName, jobGroupName, e.getMessage())); } } /** * 更新cron表达式 * @param jobName * @param jobGroup * @param cronExpression */ @Override public void updateCronExpression(String jobName, String jobGroup, String cronExpression) { TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup); //表达式调度构建器(即任务执行的时间) CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression); //按新的cronExpression重新构建trigger CronTrigger trigger = TriggerBuilder.newTrigger() .withIdentity(triggerKey) .withSchedule(scheduleBuilder).build(); try { scheduler.rescheduleJob(triggerKey, trigger); } catch (SchedulerException e) { throw new RuntimeException(e); } } }
linkis相关job调用
/** * 提交Linkis执行 */ @DisallowConcurrentExecution public class LinkisJobBean extends QuartzJobBean { private final static Logger LOGGER = LoggerFactory.getLogger(LinkisJobBean.class); @Autowired private IMonitorJobService monitorJobService; @Override protected void executeInternal(JobExecutionContext context) throws JobExecutionException { JobDetail jobDetail = context.getJobDetail(); System.out.println(jobDetail.getKey().getName() + "=====>"); List<MonitorJobDetailResponse> list = monitorJobService.getList(); MonitorJobDetailResponse monitorJobDetailResponse = list.get(list.size() - 1); String jobName = monitorJobDetailResponse.getJobName(); if (jobName.equals(jobDetail.getKey().getName())) { Integer id = monitorJobDetailResponse.getId(); String execType="manual"; String tokenUser ="test"; try { Integer historyId = monitorJobService.monitorOnce(id, execType, tokenUser, null, CommonConstant.ExecType.SCHE); LOGGER.info("[LinkisJobBean-execute] 提交linkis成功!具体任务请到【执行历史-定时-ID:{}】查看, tokenUser:[{}]", historyId, tokenUser); } catch (Exception e) { String errMsg = String.format("[LinkisJobBean-execute] 提交Quartz定时任务失败! tokenUser:[%s], jobDetail:[%s], 错误信息:[%s]", tokenUser, id.toString(), e.getMessage()); LOGGER.error(errMsg, e); throw new DmpException(errMsg); } } } }
QuartzRunner类(核心启动类)
@Component public class QuartzRunner implements ApplicationRunner { private final static Logger LOGGER = LoggerFactory.getLogger(QuartzRunner.class); @Autowired private IMonitorQuartzService monitorQuartzService; @Autowired private IMonitorJobService monitorJobService; @Override public void run(ApplicationArguments args) throws Exception { List<MonitorJobDetailResponse> list = monitorJobService.getList(); MonitorJobDetailResponse monitorJobDetailResponse = list.get(list.size() - 1); String cronExpress = monitorJobDetailResponse.getCronExpress(); String jobName = monitorJobDetailResponse.getJobName(); String group = jobName + monitorJobDetailResponse.getId(); LOGGER.info("[QuartzRunner] 启动定时调度开始!"); monitorQuartzService.scheduleJobExistsIgnore(LinkisJobBean.class,jobName,group,cronExpress); LOGGER.info("[QuartzRunner] 启动启动定时调度结束!"); } }
Controller层....