KnowStreaming系列教程第三篇——调度任务模块

简介: KnowStreaming系列教程第三篇——调度任务模块

前一篇文章

讲述了KS的整体项目目录,这边文章来讲述下KS在调度模块里面对于指标采集和元数据同步

一、调度模块代码主要在km-task里面

public class TaskClusterAddedListener implements ApplicationListener<ClusterPhyAddedEvent> {
    private static final ILog LOGGER = LogFactory.getLog(TaskClusterAddedListener.class);
    @Override
    public void onApplicationEvent(ClusterPhyAddedEvent event) {
        LOGGER.info("method=onApplicationEvent||clusterPhyId={}||msg=listened new cluster", event.getClusterPhyId());
        Long now = System.currentTimeMillis();
        // 交由KS自定义的线程池,异步执行任务
        FutureUtil.quickStartupFutureUtil.submitTask(() -> triggerAllTask(event.getClusterPhyId(), now));
    }
    private void triggerAllTask(Long clusterPhyId, Long startTimeUnitMs) {
        ClusterPhy tempClusterPhy = null;
        // 120秒内无加载进来,则直接返回退出
        while (System.currentTimeMillis() - startTimeUnitMs <= 120L * 1000L) {
            tempClusterPhy = LoadedClusterPhyCache.getByPhyId(clusterPhyId);
            if (tempClusterPhy != null) {
                break;
            }
            BackoffUtils.backoff(1000);
        }
        if (tempClusterPhy == null) {
            return;
        }
        // 获取到之后,再延迟5秒,保证相关的集群都被正常加载进来,这里的5秒不固定
        BackoffUtils.backoff(5000);
        final ClusterPhy clusterPhy = tempClusterPhy;
        // 集群执行集群元信息同步
        List<AbstractAsyncMetadataDispatchTask> metadataServiceList = new ArrayList<>(SpringTool.getBeansOfType(AbstractAsyncMetadataDispatchTask.class).values());
        for (AbstractAsyncMetadataDispatchTask dispatchTask: metadataServiceList) {
            try {
                dispatchTask.asyncProcessSubTask(clusterPhy, startTimeUnitMs);
            } catch (Exception e) {
                // ignore
            }
        }
        // 再延迟5秒,保证集群元信息都已被正常同步至DB,这里的5秒不固定
        BackoffUtils.backoff(5000);
        // 集群集群指标采集
        List<AbstractAsyncMetricsDispatchTask> metricsServiceList = new ArrayList<>(SpringTool.getBeansOfType(AbstractAsyncMetricsDispatchTask.class).values());
        for (AbstractAsyncMetricsDispatchTask dispatchTask: metricsServiceList) {
            try {
                dispatchTask.asyncProcessSubTask(clusterPhy, startTimeUnitMs);
            } catch (Exception e) {
                // ignore
            }
        }
    }
}

通过监听集群添加事件,触发元数据同步和指标采集调度任务

具体实现可参考:

spring 根据接口或者抽象类获取子类执行: https://blog.csdn.net/u012501054/article/details/103927674

二、调度任务分布式系统如何做到单节点运行,避免多台机器调度

AbstractDispatchTask 里面的execute 方法通过实现任务分配

public TaskResult execute(JobContext jobContext) {
        try {
            long triggerTimeUnitMs = System.currentTimeMillis();
            // 获取所有的任务
            List<E> allTaskList = this.listAllTasks();
            if (ValidateUtils.isEmptyList(allTaskList)) {
                LOGGER.debug("all-task is empty, finish process, taskName:{} jobContext:{}", taskName, jobContext);
                return TaskResult.SUCCESS;
            }
            // 计算当前机器需要执行的任务
            List<E> subTaskList = this.selectTask(allTaskList, jobContext.getAllWorkerCodes(), jobContext.getCurrentWorkerCode());
            if (ValidateUtils.isEmptyList(allTaskList)) {
                LOGGER.debug("sub-task is empty, finish process, taskName:{} jobContext:{}", taskName, jobContext);
                return TaskResult.SUCCESS;
            }
            // 进行任务处理
            TaskResult ret = this.processTask(subTaskList, triggerTimeUnitMs);
            //组装信息
            TaskResult taskResult = new TaskResult();
            taskResult.setCode(ret.getCode());
            taskResult.setMessage(ConvertUtil.list2String(subTaskList, ","));
            return taskResult;
        } catch (Exception e) {
            LOGGER.error("process task failed, taskName:{}", taskName, e);
            return new TaskResult(TaskResult.FAIL_CODE, e.toString());
        }
    }

对应代码解释如下:

参考:

https://github.com/didi/KnowStreaming/blob/master/docs/dev_guide/Task%E6%A8%A1%E5%9D%97%E7%AE%80%E4%BB%8B.md


相关文章
|
7月前
【sgTopo】强哥古法炮制、纯手工打造简单拓扑图、流程图、思维导图组件(完善中ing)(二)
【sgTopo】强哥古法炮制、纯手工打造简单拓扑图、流程图、思维导图组件(完善中ing)
|
7月前
|
设计模式 前端开发 Java
KnowStreaming系列教程第二篇——项目整体架构分析
KnowStreaming系列教程第二篇——项目整体架构分析
88 0
|
7月前
【sgTopo】强哥古法炮制、纯手工打造简单拓扑图、流程图、思维导图组件(完善中ing)(一)
【sgTopo】强哥古法炮制、纯手工打造简单拓扑图、流程图、思维导图组件(完善中ing)
|
7月前
|
存储 API 调度
FreeRTOS深入教程(任务创建的深入和任务调度机制分析)
FreeRTOS深入教程(任务创建的深入和任务调度机制分析)
419 0
|
算法 测试技术 iOS开发
【第三篇】XiaoZaiMultiAutoAiDevices之运行流程
本框架大部分代码都是有详细的注释,配合此教程系列,把流程梳理通应该是没有什么问题。
117 0
【第三篇】XiaoZaiMultiAutoAiDevices之运行流程
|
程序员 编译器 C++
C++(入门、核心、提高三篇)总结及补充
C++(入门、核心、提高三篇)总结及补充
114 0
C++(入门、核心、提高三篇)总结及补充
|
Android开发 iOS开发 计算机视觉
【第四篇】XiaoZaiMultiAutoAiDevices之核心机制
在上一期说到主要的流程和部分核心运行流程,这一期我们主讲:`如何通过外部参数指定脚本运行指定设备` 测试框架传参,可能一部分同学会想到unittest的DDT,使用pytest相关装饰器和各种外部文件的数据传入方式。
107 0
|
数据采集 Prometheus 监控
美团动态线程池思路开源框架(DynamicTp),监控及源码解析篇
大家好,动态线程池项目DynamicTp开源一个多月,目前400多star,说明还是比较受欢迎的,现在已经有一些小伙伴在接入使用或者即将接入使用了,为了项目以后更好的发展迭代,打算出几篇文章来对DynamicTp做一些更详细的介绍,有兴趣的小伙伴欢迎一起参与进来完善迭代项目。
708 1
美团动态线程池思路开源框架(DynamicTp),监控及源码解析篇
|
消息中间件 资源调度 分布式计算
实现一个任务调度系统,看这篇就够了
干货,教你如何实现一个任务调度系统
1601 2
实现一个任务调度系统,看这篇就够了
|
Java 程序员 网络安全
CoProcessFunction实战三部曲之二:状态处理
双流场景,处理一个流时如何得到另一个流的信息,或者如何将自己的信息传给另一个流的处理逻辑,本篇通过实战掌握其中关键
134 0
CoProcessFunction实战三部曲之二:状态处理

热门文章

最新文章