前言
之前写了一篇 xxljob的新手入门篇:
Springboot 整合 xxljob 使用定时任务调度(新手入门篇)_小目标青年的博客-CSDN博客
这一篇非常非常简单,就是非常快速的springboot整合 xxljob,相当于拿来即用,能够通过页面配合代码去实现定时任务的使用。
这一篇,我们将在上一篇的基础上,做一些进阶使用,实现动态地调度定时任务。
我们平时工作经常会遇到这些业务使用场景(举例):
执行某个业务后,需要产生一个定时任务;
怎么怎么判断成功后,需要停止某个任务;
怎么怎么判断符合条件后,需要重新执行某个任务;
怎么怎么....移除某个任务;
摊牌了,实现的效果就是:
通过API方式(或者方法函数),我们动态随意地去 增删改查、设置定时规则等等去调度任务。
上一篇几乎是基于HTML页面去对任务创建、启动、停在、删除等等, 非常需要有人去处理,这一篇就是解放我们双手!
正文
惯例,瞎话一张简图:
大致就是admin上面写一些开放接口,各个接入xxl job的demo服务都能通过接口调用,完成动态调度,至于啥时候调度,看自己的业务场景,自己使用。
① admin 服务 加接口,其实说实话,原先也提供了很多api接口,但是我这次非得自己搞一下。
提供的接口:
MyDynamicApiController.java :
import com.xxl.job.admin.controller.annotation.PermissionLimit; import com.xxl.job.admin.core.cron.CronExpression; import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.model.XxlJobQuery; import com.xxl.job.admin.core.thread.JobScheduleHelper; import com.xxl.job.admin.core.util.I18nUtil; import com.xxl.job.admin.service.LoginService; import com.xxl.job.admin.service.XxlJobService; import com.xxl.job.core.biz.model.ReturnT; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.text.ParseException; import java.util.Date; import java.util.Map; /** * @Author: JCccc * @Date: 2022-6-2 14:23 * @Description: xxl job rest api */ @RestController @RequestMapping("/api/jobinfo") public class MyDynamicApiController { private static Logger logger = LoggerFactory.getLogger(MyDynamicApiController.class); @Autowired private XxlJobService xxlJobService; @Autowired private LoginService loginService; @RequestMapping(value = "/pageList",method = RequestMethod.POST) public Map<String, Object> pageList(@RequestBody XxlJobQuery xxlJobQuery) { return xxlJobService.pageList( xxlJobQuery.getStart(), xxlJobQuery.getLength(), xxlJobQuery.getJobGroup(), xxlJobQuery.getTriggerStatus(), xxlJobQuery.getJobDesc(), xxlJobQuery.getExecutorHandler(), xxlJobQuery.getAuthor()); } @PostMapping("/save") public ReturnT<String> add(@RequestBody(required = true)XxlJobInfo jobInfo) { // next trigger time (5s后生效,避开预读周期) long nextTriggerTime = 0; try { Date nextValidTime = new CronExpression(jobInfo.getJobCron()).getNextValidTimeAfter(new Date(System.currentTimeMillis() + JobScheduleHelper.PRE_READ_MS)); if (nextValidTime == null) { return new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobinfo_field_cron_never_fire")); } nextTriggerTime = nextValidTime.getTime(); } catch (ParseException e) { logger.error(e.getMessage(), e); return new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobinfo_field_cron_unvalid")+" | "+ e.getMessage()); } jobInfo.setTriggerStatus(1); jobInfo.setTriggerLastTime(0); jobInfo.setTriggerNextTime(nextTriggerTime); jobInfo.setUpdateTime(new Date()); if(jobInfo.getId()==0){ return xxlJobService.add(jobInfo); }else{ return xxlJobService.update(jobInfo); } } @RequestMapping(value = "/delete",method = RequestMethod.GET) public ReturnT<String> delete(int id) { return xxlJobService.remove(id); } @RequestMapping(value = "/start",method = RequestMethod.GET) public ReturnT<String> start(int id) { return xxlJobService.start(id); } @RequestMapping(value = "/stop",method = RequestMethod.GET) public ReturnT<String> stop(int id) { return xxlJobService.stop(id); } @RequestMapping(value="login", method=RequestMethod.GET) @PermissionLimit(limit=false) public ReturnT<String> loginDo(HttpServletRequest request, HttpServletResponse response, String userName, String password, String ifRemember){ boolean ifRem = (ifRemember!=null && ifRemember.trim().length()>0 && "on".equals(ifRemember))?true:false; ReturnT<String> result= loginService.login(request, response, userName, password, ifRem); return result; } }
XxlJobQuery.java
(这里有说法,为什么我这篇特意写了一个这个查询类,抛砖引玉,在新手篇里面我介绍过,xxl job 是有数据库的,意味着我们可以很容易拓展)
/** * @Author: JCccc * @Date: 2022-6-2 14:23 * @Description: xxl job rest api */ public class XxlJobQuery { private int start; private int length; private int triggerStatus; private String jobDesc; private String executorHandler; private String author; private int jobGroup; public int getStart() { return start; } public void setStart(int start) { this.start = start; } public int getLength() { return length; } public void setLength(int length) { this.length = length; } public int getTriggerStatus() { return triggerStatus; } public void setTriggerStatus(int triggerStatus) { this.triggerStatus = triggerStatus; } public String getJobDesc() { return jobDesc; } public void setJobDesc(String jobDesc) { this.jobDesc = jobDesc; } public String getExecutorHandler() { return executorHandler; } public void setExecutorHandler(String executorHandler) { this.executorHandler = executorHandler; } public String getAuthor() { return author; } public void setAuthor(String author) { this.author = author; } public int getJobGroup() { return jobGroup; } public void setJobGroup(int jobGroup) { this.jobGroup = jobGroup; } }
② 在接入xxl job 的demo上 开始玩动态调度,其实就是调用admin里我们刚才写的提供的接口
PS: 本篇里面有些不好的编码习惯,例如返回值用Map;打印用的输出,没用log; 接口返回没有统一返回数据等等, 因为我该篇是为了传递 怎么动态调度使用,具体细节大家自行调整就行,我们都是成年人,不要在意我这些点。
先是pom.xml 引入使用的一些jar:
(一个是fastjson,大家别学我,我是为了实战示例图方便,用的JsonObject来传参)
(一个是httpClient ,用于调用admin服务的api接口)
<dependency> <groupId>commons-httpclient</groupId> <artifactId>commons-httpclient</artifactId> <version>3.1</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.47</version> </dependency>
XxlJobInfo.java
这个是原作者的其实,因为我们本质还是调用了作者xxl job提供的 XxlJobService 里面的方法。
import java.util.Date; /** * xxl-job info * * @author xuxueli 2016-1-12 18:25:49 */ public class XxlJobInfo { private int id; // 主键ID private int jobGroup; // 执行器主键ID private String jobDesc; // 备注 private String jobCron; private Date addTime; private Date updateTime; private String author; // 负责人 private String alarmEmail; // 报警邮件 private String scheduleType; // 调度类型 private String scheduleConf; // 调度配置,值含义取决于调度类型 private String misfireStrategy; // 调度过期策略 private String executorRouteStrategy; // 执行器路由策略 private String executorHandler; // 执行器,任务Handler名称 private String executorParam; // 执行器,任务参数 private String executorBlockStrategy; // 阻塞处理策略 private int executorTimeout; // 任务执行超时时间,单位秒 private int executorFailRetryCount; // 失败重试次数 private String glueType; // GLUE类型 #com.xxl.job.core.glue.GlueTypeEnum private String glueSource; // GLUE源代码 private String glueRemark; // GLUE备注 private Date glueUpdatetime; // GLUE更新时间 private String childJobId; // 子任务ID,多个逗号分隔 private int triggerStatus; // 调度状态:0-停止,1-运行 private long triggerLastTime; // 上次调度时间 private long triggerNextTime; // 下次调度时间 public String getJobCron() { return jobCron; } public void setJobCron(String jobCron) { this.jobCron = jobCron; } public int getId() { return id; } public void setId(int id) { this.id = id; } public int getJobGroup() { return jobGroup; } public void setJobGroup(int jobGroup) { this.jobGroup = jobGroup; } public String getJobDesc() { return jobDesc; } public void setJobDesc(String jobDesc) { this.jobDesc = jobDesc; } public Date getAddTime() { return addTime; } public void setAddTime(Date addTime) { this.addTime = addTime; } public Date getUpdateTime() { return updateTime; } public void setUpdateTime(Date updateTime) { this.updateTime = updateTime; } public String getAuthor() { return author; } public void setAuthor(String author) { this.author = author; } public String getAlarmEmail() { return alarmEmail; } public void setAlarmEmail(String alarmEmail) { this.alarmEmail = alarmEmail; } public String getScheduleType() { return scheduleType; } public void setScheduleType(String scheduleType) { this.scheduleType = scheduleType; } public String getScheduleConf() { return scheduleConf; } public void setScheduleConf(String scheduleConf) { this.scheduleConf = scheduleConf; } public String getMisfireStrategy() { return misfireStrategy; } public void setMisfireStrategy(String misfireStrategy) { this.misfireStrategy = misfireStrategy; } public String getExecutorRouteStrategy() { return executorRouteStrategy; } public void setExecutorRouteStrategy(String executorRouteStrategy) { this.executorRouteStrategy = executorRouteStrategy; } public String getExecutorHandler() { return executorHandler; } public void setExecutorHandler(String executorHandler) { this.executorHandler = executorHandler; } public String getExecutorParam() { return executorParam; } public void setExecutorParam(String executorParam) { this.executorParam = executorParam; } public String getExecutorBlockStrategy() { return executorBlockStrategy; } public void setExecutorBlockStrategy(String executorBlockStrategy) { this.executorBlockStrategy = executorBlockStrategy; } public int getExecutorTimeout() { return executorTimeout; } public void setExecutorTimeout(int executorTimeout) { this.executorTimeout = executorTimeout; } public int getExecutorFailRetryCount() { return executorFailRetryCount; } public void setExecutorFailRetryCount(int executorFailRetryCount) { this.executorFailRetryCount = executorFailRetryCount; } public String getGlueType() { return glueType; } public void setGlueType(String glueType) { this.glueType = glueType; } public String getGlueSource() { return glueSource; } public void setGlueSource(String glueSource) { this.glueSource = glueSource; } public String getGlueRemark() { return glueRemark; } public void setGlueRemark(String glueRemark) { this.glueRemark = glueRemark; } public Date getGlueUpdatetime() { return glueUpdatetime; } public void setGlueUpdatetime(Date glueUpdatetime) { this.glueUpdatetime = glueUpdatetime; } public String getChildJobId() { return childJobId; } public void setChildJobId(String childJobId) { this.childJobId = childJobId; } public int getTriggerStatus() { return triggerStatus; } public void setTriggerStatus(int triggerStatus) { this.triggerStatus = triggerStatus; } public long getTriggerLastTime() { return triggerLastTime; } public void setTriggerLastTime(long triggerLastTime) { this.triggerLastTime = triggerLastTime; } public long getTriggerNextTime() { return triggerNextTime; } public void setTriggerNextTime(long triggerNextTime) { this.triggerNextTime = triggerNextTime; } }
XxlJobUtil.java
import com.alibaba.fastjson.JSONObject; import org.apache.commons.httpclient.*; import org.apache.commons.httpclient.methods.GetMethod; import org.apache.commons.httpclient.methods.PostMethod; import org.apache.commons.httpclient.methods.RequestEntity; import org.apache.commons.httpclient.methods.StringRequestEntity; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; /** * @Author: JCccc * @Date: 2022-6-22 9:51 * @Description: */ public class XxlJobUtil { private static String cookie=""; /** * 查询现有的任务(可以关注这个整个调用链,可以自己模仿着写其他的拓展接口) * @param url * @param requestInfo * @return * @throws HttpException * @throws IOException */ public static JSONObject pageList(String url,JSONObject requestInfo) throws HttpException, IOException { String path = "/api/jobinfo/pageList"; String targetUrl = url + path; HttpClient httpClient = new HttpClient(); PostMethod post = new PostMethod(targetUrl); post.setRequestHeader("cookie", cookie); RequestEntity requestEntity = new StringRequestEntity(requestInfo.toString(), "application/json", "utf-8"); post.setRequestEntity(requestEntity); httpClient.executeMethod(post); JSONObject result = new JSONObject(); result = getJsonObject(post, result); System.out.println(result.toJSONString()); return result; } /** * 新增/编辑任务 * @param url * @param requestInfo * @return * @throws HttpException * @throws IOException */ public static JSONObject addJob(String url,JSONObject requestInfo) throws HttpException, IOException { String path = "/api/jobinfo/save"; String targetUrl = url + path; HttpClient httpClient = new HttpClient(); PostMethod post = new PostMethod(targetUrl); post.setRequestHeader("cookie", cookie); RequestEntity requestEntity = new StringRequestEntity(requestInfo.toString(), "application/json", "utf-8"); post.setRequestEntity(requestEntity); httpClient.executeMethod(post); JSONObject result = new JSONObject(); result = getJsonObject(post, result); System.out.println(result.toJSONString()); return result; } /** * 删除任务 * @param url * @param id * @return * @throws HttpException * @throws IOException */ public static JSONObject deleteJob(String url,int id) throws HttpException, IOException { String path = "/api/jobinfo/delete?id="+id; return doGet(url,path); } /** * 开始任务 * @param url * @param id * @return * @throws HttpException * @throws IOException */ public static JSONObject startJob(String url,int id) throws HttpException, IOException { String path = "/api/jobinfo/start?id="+id; return doGet(url,path); } /** * 停止任务 * @param url * @param id * @return * @throws HttpException * @throws IOException */ public static JSONObject stopJob(String url,int id) throws HttpException, IOException { String path = "/api/jobinfo/stop?id="+id; return doGet(url,path); } public static JSONObject doGet(String url,String path) throws HttpException, IOException { String targetUrl = url + path; HttpClient httpClient = new HttpClient(); HttpMethod get = new GetMethod(targetUrl); get.setRequestHeader("cookie", cookie); httpClient.executeMethod(get); JSONObject result = new JSONObject(); result = getJsonObject(get, result); return result; } private static JSONObject getJsonObject(HttpMethod postMethod, JSONObject result) throws IOException { if (postMethod.getStatusCode() == HttpStatus.SC_OK) { InputStream inputStream = postMethod.getResponseBodyAsStream(); BufferedReader br = new BufferedReader(new InputStreamReader(inputStream)); StringBuffer stringBuffer = new StringBuffer(); String str; while((str = br.readLine()) != null){ stringBuffer.append(str); } String response = new String(stringBuffer); br.close(); return (JSONObject) JSONObject.parse(response); } else { return null; } } /** * 登录 * @param url * @param userName * @param password * @return * @throws HttpException * @throws IOException */ public static String login(String url, String userName, String password) throws HttpException, IOException { String path = "/api/jobinfo/login?userName="+userName+"&password="+password; String targetUrl = url + path; HttpClient httpClient = new HttpClient(); HttpMethod get = new GetMethod((targetUrl)); httpClient.executeMethod(get); if (get.getStatusCode() == 200) { Cookie[] cookies = httpClient.getState().getCookies(); StringBuffer tmpcookies = new StringBuffer(); for (Cookie c : cookies) { tmpcookies.append(c.toString() + ";"); } cookie = tmpcookies.toString(); } else { try { cookie = ""; } catch (Exception e) { cookie=""; } } return cookie; } }
XxlJobController.java
(用于模拟触发我们的任务创建、编辑、删除、停止等等)
import java.util.Date; import com.alibaba.fastjson.JSONObject; import org.springframework.web.bind.annotation.*; import java.io.IOException; /** * @Author: JCccc * @Date: 2022-6-22 9:26 * @Description: */ @RestController public class XxlJobController { @RequestMapping(value = "/pageList",method = RequestMethod.GET) public Object pageList() throws IOException { //int jobGroup, int triggerStatus, String jobDesc, String executorHandler, String author JSONObject test=new JSONObject(); test.put("length",10); XxlJobUtil.login("http://127.0.0.1:8961/xxl-job-admin","admin","123456"); JSONObject response = XxlJobUtil.pageList("http://127.0.0.1:8961/xxl-job-admin", test); return response.get("data"); } @RequestMapping(value = "/add",method = RequestMethod.GET) public void add() throws IOException { XxlJobInfo xxlJobInfo=new XxlJobInfo(); xxlJobInfo.setJobCron("0/5 * * * * ?"); xxlJobInfo.setJobGroup(3); xxlJobInfo.setJobDesc("我来试试"); xxlJobInfo.setAddTime(new Date()); xxlJobInfo.setUpdateTime(new Date()); xxlJobInfo.setAuthor("JCccc"); xxlJobInfo.setAlarmEmail("864477182@com"); xxlJobInfo.setScheduleType("CRON"); xxlJobInfo.setScheduleConf("0/5 * * * * ?"); xxlJobInfo.setMisfireStrategy("DO_NOTHING"); xxlJobInfo.setExecutorRouteStrategy("FIRST"); xxlJobInfo.setExecutorHandler("clockInJobHandler"); xxlJobInfo.setExecutorParam("att"); xxlJobInfo.setExecutorBlockStrategy("SERIAL_EXECUTION"); xxlJobInfo.setExecutorTimeout(0); xxlJobInfo.setExecutorFailRetryCount(1); xxlJobInfo.setGlueType("BEAN"); xxlJobInfo.setGlueSource(""); xxlJobInfo.setGlueRemark("GLUE代码初始化"); xxlJobInfo.setGlueUpdatetime(new Date()); JSONObject test = (JSONObject) JSONObject.toJSON(xxlJobInfo); XxlJobUtil.login("http://127.0.0.1:8961/xxl-job-admin","admin","123456"); JSONObject response = XxlJobUtil.addJob("http://127.0.0.1:8961/xxl-job-admin", test); if (response.containsKey("code") && 200 == (Integer) response.get("code")) { System.out.println("新增成功"); } else { System.out.println("新增失败"); } } @RequestMapping(value = "/stop/{jobId}",method = RequestMethod.GET) public void stop(@PathVariable("jobId") Integer jobId) throws IOException { XxlJobUtil.login("http://127.0.0.1:8961/xxl-job-admin","admin","123456"); JSONObject response = XxlJobUtil.stopJob("http://127.0.0.1:8961/xxl-job-admin", jobId); if (response.containsKey("code") && 200 == (Integer) response.get("code")) { System.out.println("任务停止成功"); } else { System.out.println("任务停止失败"); } } @RequestMapping(value = "/delete/{jobId}",method = RequestMethod.GET) public void delete(@PathVariable("jobId") Integer jobId) throws IOException { XxlJobUtil.login("http://127.0.0.1:8961/xxl-job-admin","admin","123456"); JSONObject response = XxlJobUtil.deleteJob("http://127.0.0.1:8961/xxl-job-admin", jobId); if (response.containsKey("code") && 200 == (Integer) response.get("code")) { System.out.println("任务移除成功"); } else { System.out.println("任务移除失败"); } } @RequestMapping(value = "/start/{jobId}",method = RequestMethod.GET) public void start(@PathVariable("jobId") Integer jobId) throws IOException { XxlJobUtil.login("http://127.0.0.1:8961/xxl-job-admin","admin","123456"); JSONObject response = XxlJobUtil.startJob("http://127.0.0.1:8961/xxl-job-admin", jobId); if (response.containsKey("code") && 200 == (Integer) response.get("code")) { System.out.println("任务启动成功"); } else { System.out.println("任务启动失败"); } } }
开始验证测试
创建任务:
打开admin的HTML页面,这是原来上次我们手动创建的任务:
现在我们通过api调用创建一个新的任务:
调用接口后,可以看的任务创建出来了,按照我们的cron和其他调度规则开始跑了:
然后我们调用停止任务接口(通过任务ID停止):
停在成功:
那么玩到这,可能大伙会有疑问,我怎么知道哪个我要停止的任务的任务ID是哪个?
所以我抛砖引玉写了个查询列表接口,大家可以意会一下,数据都在的,数据还不好拓展么,什么业务场景,怎么使用怎么调度,都可以自由发挥啊:
数据就在库里,自己查不就好了:
剩下还有启动 start接口,删除delete接口,修改update接口,我就不一一展示测试了。
这些增删改查,怎么组合使用,什么业务使用,大家自己玩起来就行。
好吧,该篇就到这。