KnowStreaming系列教程第三篇——调度任务模块

简介: KnowStreaming系列教程第三篇——调度任务模块

前一篇文章

讲述了KS的整体项目目录,这边文章来讲述下KS在调度模块里面对于指标采集和元数据同步

一、调度模块代码主要在km-task里面

public class TaskClusterAddedListener implements ApplicationListener<ClusterPhyAddedEvent> {
    private static final ILog LOGGER = LogFactory.getLog(TaskClusterAddedListener.class);
    @Override
    public void onApplicationEvent(ClusterPhyAddedEvent event) {
        LOGGER.info("method=onApplicationEvent||clusterPhyId={}||msg=listened new cluster", event.getClusterPhyId());
        Long now = System.currentTimeMillis();
        // 交由KS自定义的线程池,异步执行任务
        FutureUtil.quickStartupFutureUtil.submitTask(() -> triggerAllTask(event.getClusterPhyId(), now));
    }
    private void triggerAllTask(Long clusterPhyId, Long startTimeUnitMs) {
        ClusterPhy tempClusterPhy = null;
        // 120秒内无加载进来,则直接返回退出
        while (System.currentTimeMillis() - startTimeUnitMs <= 120L * 1000L) {
            tempClusterPhy = LoadedClusterPhyCache.getByPhyId(clusterPhyId);
            if (tempClusterPhy != null) {
                break;
            }
            BackoffUtils.backoff(1000);
        }
        if (tempClusterPhy == null) {
            return;
        }
        // 获取到之后,再延迟5秒,保证相关的集群都被正常加载进来,这里的5秒不固定
        BackoffUtils.backoff(5000);
        final ClusterPhy clusterPhy = tempClusterPhy;
        // 集群执行集群元信息同步
        List<AbstractAsyncMetadataDispatchTask> metadataServiceList = new ArrayList<>(SpringTool.getBeansOfType(AbstractAsyncMetadataDispatchTask.class).values());
        for (AbstractAsyncMetadataDispatchTask dispatchTask: metadataServiceList) {
            try {
                dispatchTask.asyncProcessSubTask(clusterPhy, startTimeUnitMs);
            } catch (Exception e) {
                // ignore
            }
        }
        // 再延迟5秒,保证集群元信息都已被正常同步至DB,这里的5秒不固定
        BackoffUtils.backoff(5000);
        // 集群集群指标采集
        List<AbstractAsyncMetricsDispatchTask> metricsServiceList = new ArrayList<>(SpringTool.getBeansOfType(AbstractAsyncMetricsDispatchTask.class).values());
        for (AbstractAsyncMetricsDispatchTask dispatchTask: metricsServiceList) {
            try {
                dispatchTask.asyncProcessSubTask(clusterPhy, startTimeUnitMs);
            } catch (Exception e) {
                // ignore
            }
        }
    }
}

通过监听集群添加事件,触发元数据同步和指标采集调度任务

具体实现可参考:

spring 根据接口或者抽象类获取子类执行: https://blog.csdn.net/u012501054/article/details/103927674

二、调度任务分布式系统如何做到单节点运行,避免多台机器调度

AbstractDispatchTask 里面的execute 方法通过实现任务分配

public TaskResult execute(JobContext jobContext) {
        try {
            long triggerTimeUnitMs = System.currentTimeMillis();
            // 获取所有的任务
            List<E> allTaskList = this.listAllTasks();
            if (ValidateUtils.isEmptyList(allTaskList)) {
                LOGGER.debug("all-task is empty, finish process, taskName:{} jobContext:{}", taskName, jobContext);
                return TaskResult.SUCCESS;
            }
            // 计算当前机器需要执行的任务
            List<E> subTaskList = this.selectTask(allTaskList, jobContext.getAllWorkerCodes(), jobContext.getCurrentWorkerCode());
            if (ValidateUtils.isEmptyList(allTaskList)) {
                LOGGER.debug("sub-task is empty, finish process, taskName:{} jobContext:{}", taskName, jobContext);
                return TaskResult.SUCCESS;
            }
            // 进行任务处理
            TaskResult ret = this.processTask(subTaskList, triggerTimeUnitMs);
            //组装信息
            TaskResult taskResult = new TaskResult();
            taskResult.setCode(ret.getCode());
            taskResult.setMessage(ConvertUtil.list2String(subTaskList, ","));
            return taskResult;
        } catch (Exception e) {
            LOGGER.error("process task failed, taskName:{}", taskName, e);
            return new TaskResult(TaskResult.FAIL_CODE, e.toString());
        }
    }

对应代码解释如下:

参考:

https://github.com/didi/KnowStreaming/blob/master/docs/dev_guide/Task%E6%A8%A1%E5%9D%97%E7%AE%80%E4%BB%8B.md


相关文章
|
7月前
【sgTopo】强哥古法炮制、纯手工打造简单拓扑图、流程图、思维导图组件(完善中ing)(二)
【sgTopo】强哥古法炮制、纯手工打造简单拓扑图、流程图、思维导图组件(完善中ing)
|
存储 数据采集 人工智能
如何设计一个监控平台(上篇)
在大型分布式微服务场景下,各个服务版本快速迭代,各类业务规模不断膨胀,同时监控的场景也在不断的发生变化,线上故障随时可能发生,各个平台错综复杂,如何保证线上服务稳定运行,同时提升运维效率,降低运维成本成了监控平台的挑战。
如何设计一个监控平台(上篇)
|
7月前
【sgTopo】强哥古法炮制、纯手工打造简单拓扑图、流程图、思维导图组件(完善中ing)(一)
【sgTopo】强哥古法炮制、纯手工打造简单拓扑图、流程图、思维导图组件(完善中ing)
|
存储
Flowable:关于流程部署、启动、处理、完成各模块的浅析(图解)(三)
Flowable:关于流程部署、启动、处理、完成各模块的浅析(图解)
356 0
|
XML Java 数据库
Flowable:关于流程部署、启动、处理、完成各模块的浅析(图解)(一)
Flowable:关于流程部署、启动、处理、完成各模块的浅析(图解)
347 0
|
数据处理 数据安全/隐私保护
Flowable:关于流程部署、启动、处理、完成各模块的浅析(图解)(二)
Flowable:关于流程部署、启动、处理、完成各模块的浅析(图解)
465 0
|
算法 测试技术 iOS开发
【第三篇】XiaoZaiMultiAutoAiDevices之运行流程
本框架大部分代码都是有详细的注释,配合此教程系列,把流程梳理通应该是没有什么问题。
117 0
【第三篇】XiaoZaiMultiAutoAiDevices之运行流程
|
程序员 编译器 C++
C++(入门、核心、提高三篇)总结及补充
C++(入门、核心、提高三篇)总结及补充
114 0
C++(入门、核心、提高三篇)总结及补充
|
消息中间件 资源调度 分布式计算
实现一个任务调度系统,看这篇就够了
干货,教你如何实现一个任务调度系统
1599 2
实现一个任务调度系统,看这篇就够了
|
Java 程序员 网络安全
CoProcessFunction实战三部曲之一:基本功能
从CoProcessFunction的基本功能入手,为后面的状态、定时实战打好基础
154 0
CoProcessFunction实战三部曲之一:基本功能
下一篇
DataWorks