前一篇文章
讲述了KS的整体项目目录,这边文章来讲述下KS在调度模块里面对于指标采集和元数据同步
一、调度模块代码主要在km-task里面
public class TaskClusterAddedListener implements ApplicationListener<ClusterPhyAddedEvent> { private static final ILog LOGGER = LogFactory.getLog(TaskClusterAddedListener.class); @Override public void onApplicationEvent(ClusterPhyAddedEvent event) { LOGGER.info("method=onApplicationEvent||clusterPhyId={}||msg=listened new cluster", event.getClusterPhyId()); Long now = System.currentTimeMillis(); // 交由KS自定义的线程池,异步执行任务 FutureUtil.quickStartupFutureUtil.submitTask(() -> triggerAllTask(event.getClusterPhyId(), now)); } private void triggerAllTask(Long clusterPhyId, Long startTimeUnitMs) { ClusterPhy tempClusterPhy = null; // 120秒内无加载进来,则直接返回退出 while (System.currentTimeMillis() - startTimeUnitMs <= 120L * 1000L) { tempClusterPhy = LoadedClusterPhyCache.getByPhyId(clusterPhyId); if (tempClusterPhy != null) { break; } BackoffUtils.backoff(1000); } if (tempClusterPhy == null) { return; } // 获取到之后,再延迟5秒,保证相关的集群都被正常加载进来,这里的5秒不固定 BackoffUtils.backoff(5000); final ClusterPhy clusterPhy = tempClusterPhy; // 集群执行集群元信息同步 List<AbstractAsyncMetadataDispatchTask> metadataServiceList = new ArrayList<>(SpringTool.getBeansOfType(AbstractAsyncMetadataDispatchTask.class).values()); for (AbstractAsyncMetadataDispatchTask dispatchTask: metadataServiceList) { try { dispatchTask.asyncProcessSubTask(clusterPhy, startTimeUnitMs); } catch (Exception e) { // ignore } } // 再延迟5秒,保证集群元信息都已被正常同步至DB,这里的5秒不固定 BackoffUtils.backoff(5000); // 集群集群指标采集 List<AbstractAsyncMetricsDispatchTask> metricsServiceList = new ArrayList<>(SpringTool.getBeansOfType(AbstractAsyncMetricsDispatchTask.class).values()); for (AbstractAsyncMetricsDispatchTask dispatchTask: metricsServiceList) { try { dispatchTask.asyncProcessSubTask(clusterPhy, startTimeUnitMs); } catch (Exception e) { // ignore } } } }
通过监听集群添加事件,触发元数据同步和指标采集调度任务
具体实现可参考:
spring 根据接口或者抽象类获取子类执行: https://blog.csdn.net/u012501054/article/details/103927674
二、调度任务分布式系统如何做到单节点运行,避免多台机器调度
AbstractDispatchTask 里面的execute 方法通过实现任务分配
public TaskResult execute(JobContext jobContext) { try { long triggerTimeUnitMs = System.currentTimeMillis(); // 获取所有的任务 List<E> allTaskList = this.listAllTasks(); if (ValidateUtils.isEmptyList(allTaskList)) { LOGGER.debug("all-task is empty, finish process, taskName:{} jobContext:{}", taskName, jobContext); return TaskResult.SUCCESS; } // 计算当前机器需要执行的任务 List<E> subTaskList = this.selectTask(allTaskList, jobContext.getAllWorkerCodes(), jobContext.getCurrentWorkerCode()); if (ValidateUtils.isEmptyList(allTaskList)) { LOGGER.debug("sub-task is empty, finish process, taskName:{} jobContext:{}", taskName, jobContext); return TaskResult.SUCCESS; } // 进行任务处理 TaskResult ret = this.processTask(subTaskList, triggerTimeUnitMs); //组装信息 TaskResult taskResult = new TaskResult(); taskResult.setCode(ret.getCode()); taskResult.setMessage(ConvertUtil.list2String(subTaskList, ",")); return taskResult; } catch (Exception e) { LOGGER.error("process task failed, taskName:{}", taskName, e); return new TaskResult(TaskResult.FAIL_CODE, e.toString()); } }
对应代码解释如下:
参考: