从官方将源码下载下来,当前发布版本为2.2.0,该版本下载的为最新版本2.2.1,。代码相差不大,接下来对xxl-job进行一步一步拆解
github地址 GitHub - xuxueli/xxl-job: A distributed task scheduling framework.(分布式任务调度平台XXL-JOB)
码云地址 xxl-job: 一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。
该框架有如下优点
1、使用层面
简单、好用,源码易读,修改简单
2、功能层面
1、有可视化界面进行操作,可以集中化管理任务
2、通过可视化可以对任务的管理,执行,调度,任务生命周期管理
3、任务执行支持手动和corn表达式定时触发
4、任务调度支持配置任务链以及任务执行失败的阻断策略
5、任务执行时间,执行信息等记录,可以更好地分析任务瓶颈以及执行分布时间
等等
1、xxl目录结构
xxl-job的目录如下
1.1 xxl-job-admin
该目录为调度中心源码,包括任务的管理,执行,调度,以及运行监控报表。喏,就下面这个
1.2 xxl-job-core
该代码为服务集成的核心源码包,包括重要注解,执行器等。
1.3 xxl-job-executor-samples
该包为服务集成测试demo,支持frameless,jfinal,spring,SpringBoot等多个框架集成。
2、架构设计
2.1 整体架构设计
通过服务注册的方式,将job注册到任务调度中心,通过调度中心进行统一的任务管理
2.2 服务集成
以SpringBoot集成xxl-job为例,集成直接运行demo即可,可以参考01.XXL-JOB这个文章。
1、配置文件
# web port spring web容器端口 server.port=8083 # no web #spring.main.web-environment=false # log config logging.config=classpath:logback.xml ### xxl-job注册中心地址 xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin ### xxl-job access token xxl.job.accessToken= ### xxl-job 执行器名称 xxl.job.executor.appname=omsJob ### xxl-job executor 执行器ip xxl.job.executor.ip= ### xxl-job executor 执行器端口 xxl.job.executor.port=9999 ### xxl-job executor log-path 日志配置 xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler ### xxl-job executor log-retention-days 日志定时清理时间 xxl.job.executor.logretentiondays=30
2、初始化执行器
@Bean public XxlJobSpringExecutor xxlJobExecutor() { logger.info(">>>>>>>>>>> xxl-job config init."); XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor(); xxlJobSpringExecutor.setAdminAddresses(adminAddresses); xxlJobSpringExecutor.setAppname(appname); xxlJobSpringExecutor.setAddress(address); xxlJobSpringExecutor.setIp(ip); xxlJobSpringExecutor.setPort(port); xxlJobSpringExecutor.setAccessToken(accessToken); xxlJobSpringExecutor.setLogPath(logPath); xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays); return xxlJobSpringExecutor; }
3、使用xxl-job注解配置任务
/** * 1、简单任务示例(Bean模式) */ @XxlJob("demoJobHandler") public ReturnT<String> demoJobHandler(String param) throws Exception { XxlJobLogger.log("XXL-JOB, Hello World."); for (int i = 0; i < 5; i++) { XxlJobLogger.log("beat at:" + i); TimeUnit.SECONDS.sleep(2); } return ReturnT.SUCCESS; }
4、启动成功后,服务会开启两个端口,一个是业务端口,一个是调度中心下发job执行的端口。(此处要优化,服务和job触发其实使用一个端口即可)
3、启动源码分析
1、服务启动流程
@Bean public XxlJobSpringExecutor xxlJobExecutor() { logger.info(">>>>>>>>>>> xxl-job config init."); XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor(); xxlJobSpringExecutor.setAdminAddresses(adminAddresses); xxlJobSpringExecutor.setAppname(appname); xxlJobSpringExecutor.setAddress(address); xxlJobSpringExecutor.setIp(ip); xxlJobSpringExecutor.setPort(port); xxlJobSpringExecutor.setAccessToken(accessToken); xxlJobSpringExecutor.setLogPath(logPath); xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays); return xxlJobSpringExecutor; }
服务主要启动是初始化XxlJobSpringExecutor这个bean对象,该对象定义了执行器xxlJobSpringExecutor的相关配置,如注册中心地址,服务提供地址,以及授权token等。该对象类图如下,
XxlJobSpringExecutor执行器继承XxlJobExecutor并实现Spring的ApplicationContextAware等类,对该bean进行了增强。主要核心类为XxlJobExecutor。
因为继承SmartInitializingSingleton,所以bean初始化完成之后会执行afterSingletonsInstantiated方法,该类主要为initJobHandlerMethodRepository这个方法,用于扫描xxl-job注解,进行任务加载和管理
// start @Override public void afterSingletonsInstantiated() { // 扫描xxl-job注解,进行任务加载和管理 initJobHandlerMethodRepository(applicationContext); // refresh GlueFactory GlueFactory.refreshInstance(1); // super start try { super.start(); } catch (Exception e) { throw new RuntimeException(e); } }
initJobHandlerMethodRepository方法主要如下
该方法主要是通过获取Spring管理的容器bean,然后扫描带有xxljob注解的方法,将他们保存在jobHandlerRepository对象中
private void initJobHandlerMethodRepository(ApplicationContext applicationContext) { if (applicationContext == null) { return; } // 扫描Spring管理的bean String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true); for (String beanDefinitionName : beanDefinitionNames) { Object bean = applicationContext.getBean(beanDefinitionName); Map<Method, XxlJob> annotatedMethods = null; // referred to :org.springframework.context.event.EventListenerMethodProcessor.processBean try { annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(), new MethodIntrospector.MetadataLookup<XxlJob>() { @Override public XxlJob inspect(Method method) { // 获取注解为XxlJob的方法,并保存在annotatedMethods return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class); } }); } catch (Throwable ex) { logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex); } if (annotatedMethods==null || annotatedMethods.isEmpty()) { continue; } //获取方法属性,并存储在jobHandlerRepository对象中 for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) { Method method = methodXxlJobEntry.getKey(); XxlJob xxlJob = methodXxlJobEntry.getValue(); if (xxlJob == null) { continue; } String name = xxlJob.value(); if (name.trim().length() == 0) { throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + bean.getClass() + "#" + method.getName() + "] ."); } if (loadJobHandler(name) != null) { throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts."); } // execute method if (!(method.getParameterTypes().length == 1 && method.getParameterTypes()[0].isAssignableFrom(String.class))) { throw new RuntimeException("xxl-job method-jobhandler param-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " + "The correct method format like \" public ReturnT<String> execute(String param) \" ."); } if (!method.getReturnType().isAssignableFrom(ReturnT.class)) { throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " + "The correct method format like \" public ReturnT<String> execute(String param) \" ."); } method.setAccessible(true); // init and destory Method initMethod = null; Method destroyMethod = null; if (xxlJob.init().trim().length() > 0) { try { initMethod = bean.getClass().getDeclaredMethod(xxlJob.init()); initMethod.setAccessible(true); } catch (NoSuchMethodException e) { throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + bean.getClass() + "#" + method.getName() + "] ."); } } if (xxlJob.destroy().trim().length() > 0) { try { destroyMethod = bean.getClass().getDeclaredMethod(xxlJob.destroy()); destroyMethod.setAccessible(true); } catch (NoSuchMethodException e) { throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + bean.getClass() + "#" + method.getName() + "] ."); } } // registry jobhandler //将任务存储在jobHandlerRepository对象中,后续下发任务使用 registJobHandler(name, new MethodJobHandler(bean, method, initMethod, destroyMethod)); } }
之后执行super.start(),执行父类XxlJobExecutor的start方法。该方法主要有日志初始化,日志清理任务初始化,RPC调用触发器回调线程启动,调度中心列表初始化以及执行器端口初始化。
public void start() throws Exception { // init logpath //初始化任务执行日志路径 XxlJobFileAppender.initLogPath(logPath); // 日志定时清理任务 JobLogFileCleanThread.getInstance().start(logRetentionDays); // 初始化触发器回调线程(用RPC回调调度中心接口) TriggerCallbackThread.getInstance().start(); //初始化调度中心列表 initAdminBizList( adminAddresses, accessToken); // init executor-server 执行器端口启动 initEmbedServer(address, ip, port, appname, accessToken); }
XxlJobFileAppender.initLogPath(logPath)和JobLogFileCleanThread.getInstance().start(logRetentionDays)主要对执行日志进行初始化,就不多解释了,直接往下看。
TriggerCallbackThread.getInstance().start();
public void start() { // 调度中心注册表会否为空 if (XxlJobExecutor.getAdminBizList() == null) { logger.warn(">>>>>>>>>>> xxl-job, executor callback config fail, adminAddresses is null."); return; } // callback triggerCallbackThread = new Thread(new Runnable() { @Override public void run() { // 监听阻塞队列 while(!toStop){ try { HandleCallbackParam callback = getInstance().callBackQueue.take(); if (callback != null) { // 组装callback返回的参数 List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>(); int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList); callbackParamList.add(callback); // 执行回调 if (callbackParamList!=null && callbackParamList.size()>0) { doCallback(callbackParamList); } } } catch (Exception e) { if (!toStop) { logger.error(e.getMessage(), e); } } } // last callback try { List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>(); int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList); if (callbackParamList!=null && callbackParamList.size()>0) { doCallback(callbackParamList); } } catch (Exception e) { if (!toStop) { logger.error(e.getMessage(), e); } } logger.info(">>>>>>>>>>> xxl-job, executor callback thread destory."); } }); triggerCallbackThread.setDaemon(true); triggerCallbackThread.setName("xxl-job, executor TriggerCallbackThread"); triggerCallbackThread.start(); // retry triggerRetryCallbackThread = new Thread(new Runnable() { @Override public void run() { while(!toStop){ try { retryFailCallbackFile(); } catch (Exception e) { if (!toStop) { logger.error(e.getMessage(), e); } } try { TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT); } catch (InterruptedException e) { if (!toStop) { logger.error(e.getMessage(), e); } } } logger.info(">>>>>>>>>>> xxl-job, executor retry callback thread destory."); } }); triggerRetryCallbackThread.setDaemon(true); triggerRetryCallbackThread.start(); }
doCallback(callbackParamList)如下
/** * do callback, will retry if error * @param callbackParamList */ private void doCallback(List<HandleCallbackParam> callbackParamList){ boolean callbackRet = false; // 向所有的调度中心发送回调信息 for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) { try { //本质上是调用注册中心的api/callback接口。记录调用结果。 ReturnT<String> callbackResult = adminBiz.callback(callbackParamList); if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) { callbackLog(callbackParamList, "<br>----------- xxl-job job callback finish."); callbackRet = true; break; } else { callbackLog(callbackParamList, "<br>----------- xxl-job job callback fail, callbackResult:" + callbackResult); } } catch (Exception e) { callbackLog(callbackParamList, "<br>----------- xxl-job job callback error, errorMsg:" + e.getMessage()); } } if (!callbackRet) { appendFailCallbackFile(callbackParamList); } }
adminBiz.callback(callbackParamList)
调用注册中心api接口
@Override public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) { return XxlJobRemotingUtil.postBody(addressUrl+"api/callback", accessToken, timeout, callbackParamList, String.class);
initAdminBizList( adminAddresses, accessToken); 初始化注册中心列表,用于后期和注册中心交互
//扫描xxl.job.admin.addresses配置,将他们加入注册中心列表adminBizList对象中。用于后期发送回调 private void initAdminBizList(String adminAddresses, String accessToken) throws Exception { if (adminAddresses!=null && adminAddresses.trim().length()>0) { for (String address: adminAddresses.trim().split(",")) { if (address!=null && address.trim().length()>0) { AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken); if (adminBizList == null) { adminBizList = new ArrayList<AdminBiz>(); } adminBizList.add(adminBiz); } } } }
// init executor-server initEmbedServer(address, ip, port, appname, accessToken);<核心>
//初始化xxljob执行器服务 private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception { //初始化ip和端口,如果没有ip则自动获取本地ip port = port>0?port: NetUtil.findAvailablePort(9999); ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp(); // generate address if (address==null || address.trim().length()==0) { String ip_port_address = IpUtil.getIpPort(ip, port); // registry-address:default use address to registry , otherwise use ip:port if address is null address = "http://{ip_port}/".replace("{ip_port}", ip_port_address); } // 启动服务 embedServer = new EmbedServer(); embedServer.start(address, port, appname, accessToken); }
embedServer.start(address, port, appname, accessToken); 本质上是一个Netty服务,标准的Netty服务启动,我们只看EmbedHttpServerHandler,Netty处理请求的handler
public void start(final String address, final int port, final String appname, final String accessToken) { executorBiz = new ExecutorBizImpl(); thread = new Thread(new Runnable() { @Override public void run() { // param EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor( 0, 200, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(2000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r, "xxl-rpc, EmbedServer bizThreadPool-" + r.hashCode()); } }, new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!"); } }); try { // start server ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel channel) throws Exception { channel.pipeline() .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle .addLast(new HttpServerCodec()) .addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool)); } }) .childOption(ChannelOption.SO_KEEPALIVE, true); // bind ChannelFuture future = bootstrap.bind(port).sync(); logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port); //注册到调度中心 startRegistry(appname, address); // wait util stop future.channel().closeFuture().sync(); } catch (InterruptedException e) { if (e instanceof InterruptedException) { logger.info(">>>>>>>>>>> xxl-job remoting server stop."); } else { logger.error(">>>>>>>>>>> xxl-job remoting server error.", e); } } finally { // stop try { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } catch (Exception e) { logger.error(e.getMessage(), e); } } } }); thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave thread.start(); }
public static class EmbedHttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> { private static final Logger logger = LoggerFactory.getLogger(EmbedHttpServerHandler.class); private ExecutorBiz executorBiz; //执行器 private String accessToken; //token private ThreadPoolExecutor bizThreadPool;//执行器线程池 public EmbedHttpServerHandler(ExecutorBiz executorBiz, String accessToken, ThreadPoolExecutor bizThreadPool) { this.executorBiz = executorBiz; this.accessToken = accessToken; this.bizThreadPool = bizThreadPool; } @Override protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception { // request parse //final byte[] requestBytes = ByteBufUtil.getBytes(msg.content()); // byteBuf.toString(io.netty.util.CharsetUtil.UTF_8); String requestData = msg.content().toString(CharsetUtil.UTF_8);//获取请求数据 String uri = msg.uri(); HttpMethod httpMethod = msg.method(); boolean keepAlive = HttpUtil.isKeepAlive(msg); String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN); // invoke bizThreadPool.execute(new Runnable() { @Override public void run() { // 处理请求 Object responseObj = process(httpMethod, uri, requestData, accessTokenReq); // 格式化为JSON String responseJson = GsonTool.toJson(responseObj); // 写回客户端 writeResponse(ctx, keepAlive, responseJson); } }); } private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) { // valid if (HttpMethod.POST != httpMethod) { return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support."); } if (uri==null || uri.trim().length()==0) { return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty."); } if (accessToken!=null && accessToken.trim().length()>0 && !accessToken.equals(accessTokenReq)) { return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong."); } // services mapping try { //接收注册中心请求接口处理 if ("/beat".equals(uri)) { return executorBiz.beat(); } else if ("/idleBeat".equals(uri)) { IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class); return executorBiz.idleBeat(idleBeatParam); } else if ("/run".equals(uri)) { //注册中心执行接口 TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class); return executorBiz.run(triggerParam); } else if ("/kill".equals(uri)) { KillParam killParam = GsonTool.fromJson(requestData, KillParam.class); return executorBiz.kill(killParam); } else if ("/log".equals(uri)) { LogParam logParam = GsonTool.fromJson(requestData, LogParam.class); return executorBiz.log(logParam); } else { return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found."); } } catch (Exception e) { logger.error(e.getMessage(), e); return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e)); }
我们主要看下run方法的执行过程
@Override public ReturnT<String> run(TriggerParam triggerParam) { // 根据jobid加载对应的job执行信息,第一次执行为null JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId()); IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;//根绝jobThread获取job处理handler String removeOldReason = null; // valid:jobHandler + jobThread GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType()); //获取任务类型 if (GlueTypeEnum.BEAN == glueTypeEnum) { // new jobhandler IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());//获取任务的执行器 // 校验新老job是否一致,不一致将老的进行初始化。有可能任务更新。通过jobid获取的是老的 if (jobThread!=null && jobHandler != newJobHandler) { // change handler, need kill old thread removeOldReason = "change jobhandler or glue type, and terminate the old job thread."; jobThread = null; jobHandler = null; } // valid handler if (jobHandler == null) { jobHandler = newJobHandler; //将新处理handler赋值给老的 if (jobHandler == null) { return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found."); } } } else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) { // valid old jobThread if (jobThread != null && !(jobThread.getHandler() instanceof GlueJobHandler && ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) { // change handler or gluesource updated, need kill old thread removeOldReason = "change job source or glue type, and terminate the old job thread."; jobThread = null; jobHandler = null; } if (jobHandler == null) { try { IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource()); jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime()); } catch (Exception e) { logger.error(e.getMessage(), e); return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage()); } } } else if (glueTypeEnum!=null && glueTypeEnum.isScript()) { // valid old jobThread if (jobThread != null && !(jobThread.getHandler() instanceof ScriptJobHandler && ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) { // change script or gluesource updated, need kill old thread removeOldReason = "change job source or glue type, and terminate the old job thread."; jobThread = null; jobHandler = null; } // valid handler if (jobHandler == null) { jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType())); } } else { return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid."); } // executor block strategy if (jobThread != null) { ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null); if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) { // discard when running if (jobThread.isRunningOrHasQueue()) { return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle()); } } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) { // kill running jobThread if (jobThread.isRunningOrHasQueue()) { removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle(); jobThread = null; } } else { // just queue trigger } } // 如果jobThread 为null,则将任务信息注册到jobThreadRepository对象进行缓存,并启动线程 if (jobThread == null) { jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason); } //将任务推送到自己现成的任务队列中区 ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam); return pushResult; }
看下registJobThread方法,该方法主要是根据任务信息,创建一个jobThread,之后启动该线程。然后将其缓存到jobThreadRepository中。如果存在老的任务,则将老的任务停掉。
public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){ JobThread newJobThread = new JobThread(jobId, handler);//创建新得jobThread对象 newJobThread.start();//启动线程 logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler}); //将新的jobThread放入map中,并弹出老的。 JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread); // putIfAbsent | oh my god, map's put method return the old value!!! if (oldJobThread != null) { //将老的停掉 oldJobThread.toStop(removeOldReason); oldJobThread.interrupt(); } return newJobThread; }
该线程执行如下,主要就是获取队列中的任务,然后通过新建FutureTask线程执行任务。之后将执行结果推到TriggerCallbackThread的队列中。通过TriggerCallbackThread推到任务调度中心,进行记录结果信息。
@Override public void run() { // init try { handler.init(); } catch (Throwable e) { logger.error(e.getMessage(), e); } // execute while(!toStop){ running = false; idleTimes++; TriggerParam triggerParam = null; ReturnT<String> executeResult = null; try { // to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout) //弹出队列任务 triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS); if (triggerParam!=null) { running = true; idleTimes = 0; triggerLogIdSet.remove(triggerParam.getLogId()); // log filename, like "logPath/yyyy-MM-dd/9999.log" String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId()); XxlJobContext.setXxlJobContext(new XxlJobContext( triggerParam.getLogId(), logFileName, triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal())); // execute XxlJobLogger.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + triggerParam.getExecutorParams()); if (triggerParam.getExecutorTimeout() > 0) { // limit timeout Thread futureThread = null; try { final TriggerParam triggerParamTmp = triggerParam; //创建futureTask线程任务,并执行 FutureTask<ReturnT<String>> futureTask = new FutureTask<ReturnT<String>>(new Callable<ReturnT<String>>() { @Override public ReturnT<String> call() throws Exception { return handler.execute(triggerParamTmp.getExecutorParams()); } }); futureThread = new Thread(futureTask); futureThread.start(); //获取执行结果 executeResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS); } catch (TimeoutException e) { XxlJobLogger.log("<br>----------- xxl-job job execute timeout"); XxlJobLogger.log(e); executeResult = new ReturnT<String>(IJobHandler.FAIL_TIMEOUT.getCode(), "job execute timeout "); } finally { futureThread.interrupt(); } } else { // just execute executeResult = handler.execute(triggerParam.getExecutorParams()); } if (executeResult == null) { executeResult = IJobHandler.FAIL; } else { executeResult.setMsg( (executeResult!=null&&executeResult.getMsg()!=null&&executeResult.getMsg().length()>50000) ?executeResult.getMsg().substring(0, 50000).concat("...") :executeResult.getMsg()); executeResult.setContent(null); // limit obj size } XxlJobLogger.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- ReturnT:" + executeResult); } else { if (idleTimes > 30) { if(triggerQueue.size() == 0) { // avoid concurrent trigger causes jobId-lost XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit."); } } } } catch (Throwable e) { if (toStop) { XxlJobLogger.log("<br>----------- JobThread toStop, stopReason:" + stopReason); } StringWriter stringWriter = new StringWriter(); e.printStackTrace(new PrintWriter(stringWriter)); String errorMsg = stringWriter.toString(); executeResult = new ReturnT<String>(ReturnT.FAIL_CODE, errorMsg); XxlJobLogger.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------"); } finally { //结果添加到回调队列中 if(triggerParam != null) { // callback handler info if (!toStop) { // commonm TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), executeResult)); } else { // is killed ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job running, killed]"); TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult)); } } } } // callback trigger request in queue while(triggerQueue !=null && triggerQueue.size()>0){ TriggerParam triggerParam = triggerQueue.poll(); if (triggerParam!=null) { // is killed ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job not executed, in the job queue, killed.]"); TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult)); } } // destroy try { handler.destroy(); } catch (Throwable e) { logger.error(e.getMessage(), e); } logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread()); }
至此,整个XXlJOb的启动和接收请求的处理大致梳理完了。代码整体很简单。就不细致展开了。了解整个设计思想即可。