KnowStreaming系列教程第三篇——调度任务模块

简介: KnowStreaming系列教程第三篇——调度任务模块

前一篇文章

讲述了KS的整体项目目录,这边文章来讲述下KS在调度模块里面对于指标采集和元数据同步

一、调度模块代码主要在km-task里面

public class TaskClusterAddedListener implements ApplicationListener<ClusterPhyAddedEvent> {
    private static final ILog LOGGER = LogFactory.getLog(TaskClusterAddedListener.class);
    @Override
    public void onApplicationEvent(ClusterPhyAddedEvent event) {
        LOGGER.info("method=onApplicationEvent||clusterPhyId={}||msg=listened new cluster", event.getClusterPhyId());
        Long now = System.currentTimeMillis();
        // 交由KS自定义的线程池,异步执行任务
        FutureUtil.quickStartupFutureUtil.submitTask(() -> triggerAllTask(event.getClusterPhyId(), now));
    }
    private void triggerAllTask(Long clusterPhyId, Long startTimeUnitMs) {
        ClusterPhy tempClusterPhy = null;
        // 120秒内无加载进来,则直接返回退出
        while (System.currentTimeMillis() - startTimeUnitMs <= 120L * 1000L) {
            tempClusterPhy = LoadedClusterPhyCache.getByPhyId(clusterPhyId);
            if (tempClusterPhy != null) {
                break;
            }
            BackoffUtils.backoff(1000);
        }
        if (tempClusterPhy == null) {
            return;
        }
        // 获取到之后,再延迟5秒,保证相关的集群都被正常加载进来,这里的5秒不固定
        BackoffUtils.backoff(5000);
        final ClusterPhy clusterPhy = tempClusterPhy;
        // 集群执行集群元信息同步
        List<AbstractAsyncMetadataDispatchTask> metadataServiceList = new ArrayList<>(SpringTool.getBeansOfType(AbstractAsyncMetadataDispatchTask.class).values());
        for (AbstractAsyncMetadataDispatchTask dispatchTask: metadataServiceList) {
            try {
                dispatchTask.asyncProcessSubTask(clusterPhy, startTimeUnitMs);
            } catch (Exception e) {
                // ignore
            }
        }
        // 再延迟5秒,保证集群元信息都已被正常同步至DB,这里的5秒不固定
        BackoffUtils.backoff(5000);
        // 集群集群指标采集
        List<AbstractAsyncMetricsDispatchTask> metricsServiceList = new ArrayList<>(SpringTool.getBeansOfType(AbstractAsyncMetricsDispatchTask.class).values());
        for (AbstractAsyncMetricsDispatchTask dispatchTask: metricsServiceList) {
            try {
                dispatchTask.asyncProcessSubTask(clusterPhy, startTimeUnitMs);
            } catch (Exception e) {
                // ignore
            }
        }
    }
}

通过监听集群添加事件,触发元数据同步和指标采集调度任务

具体实现可参考:

spring 根据接口或者抽象类获取子类执行: https://blog.csdn.net/u012501054/article/details/103927674

二、调度任务分布式系统如何做到单节点运行,避免多台机器调度

AbstractDispatchTask 里面的execute 方法通过实现任务分配

public TaskResult execute(JobContext jobContext) {
        try {
            long triggerTimeUnitMs = System.currentTimeMillis();
            // 获取所有的任务
            List<E> allTaskList = this.listAllTasks();
            if (ValidateUtils.isEmptyList(allTaskList)) {
                LOGGER.debug("all-task is empty, finish process, taskName:{} jobContext:{}", taskName, jobContext);
                return TaskResult.SUCCESS;
            }
            // 计算当前机器需要执行的任务
            List<E> subTaskList = this.selectTask(allTaskList, jobContext.getAllWorkerCodes(), jobContext.getCurrentWorkerCode());
            if (ValidateUtils.isEmptyList(allTaskList)) {
                LOGGER.debug("sub-task is empty, finish process, taskName:{} jobContext:{}", taskName, jobContext);
                return TaskResult.SUCCESS;
            }
            // 进行任务处理
            TaskResult ret = this.processTask(subTaskList, triggerTimeUnitMs);
            //组装信息
            TaskResult taskResult = new TaskResult();
            taskResult.setCode(ret.getCode());
            taskResult.setMessage(ConvertUtil.list2String(subTaskList, ","));
            return taskResult;
        } catch (Exception e) {
            LOGGER.error("process task failed, taskName:{}", taskName, e);
            return new TaskResult(TaskResult.FAIL_CODE, e.toString());
        }
    }

对应代码解释如下:

参考:

https://github.com/didi/KnowStreaming/blob/master/docs/dev_guide/Task%E6%A8%A1%E5%9D%97%E7%AE%80%E4%BB%8B.md


相关文章
|
8月前
【sgTopo】强哥古法炮制、纯手工打造简单拓扑图、流程图、思维导图组件(完善中ing)(二)
【sgTopo】强哥古法炮制、纯手工打造简单拓扑图、流程图、思维导图组件(完善中ing)
|
8月前
|
设计模式 前端开发 Java
KnowStreaming系列教程第二篇——项目整体架构分析
KnowStreaming系列教程第二篇——项目整体架构分析
104 0
|
存储 数据采集 人工智能
如何设计一个监控平台(上篇)
在大型分布式微服务场景下,各个服务版本快速迭代,各类业务规模不断膨胀,同时监控的场景也在不断的发生变化,线上故障随时可能发生,各个平台错综复杂,如何保证线上服务稳定运行,同时提升运维效率,降低运维成本成了监控平台的挑战。
如何设计一个监控平台(上篇)
|
8月前
【sgTopo】强哥古法炮制、纯手工打造简单拓扑图、流程图、思维导图组件(完善中ing)(一)
【sgTopo】强哥古法炮制、纯手工打造简单拓扑图、流程图、思维导图组件(完善中ing)
|
Kubernetes 调度 Perl
10分钟搞懂K8S的亲和与反亲和调度
首先来个一句话总结:亲和性调度就像关系亲密的闺蜜,你去哪儿我也去哪儿。反亲和性调度就像赌气的两个孩子,赌气永远不在一起玩儿。更多解释和实战详见下文。花10分钟看到最后,你肯定会有收获。
10分钟搞懂K8S的亲和与反亲和调度
|
8月前
|
存储 API 调度
FreeRTOS深入教程(任务创建的深入和任务调度机制分析)
FreeRTOS深入教程(任务创建的深入和任务调度机制分析)
496 0
|
算法 调度 Windows
|
算法 测试技术 iOS开发
【第三篇】XiaoZaiMultiAutoAiDevices之运行流程
本框架大部分代码都是有详细的注释,配合此教程系列,把流程梳理通应该是没有什么问题。
121 0
【第三篇】XiaoZaiMultiAutoAiDevices之运行流程
|
定位技术 调度 开发者
|
安全 Linux 调度

热门文章

最新文章