KnowStreaming系列教程第三篇——调度任务模块

简介: KnowStreaming系列教程第三篇——调度任务模块

前一篇文章

讲述了KS的整体项目目录,这边文章来讲述下KS在调度模块里面对于指标采集和元数据同步

一、调度模块代码主要在km-task里面

public class TaskClusterAddedListener implements ApplicationListener<ClusterPhyAddedEvent> {
    private static final ILog LOGGER = LogFactory.getLog(TaskClusterAddedListener.class);
    @Override
    public void onApplicationEvent(ClusterPhyAddedEvent event) {
        LOGGER.info("method=onApplicationEvent||clusterPhyId={}||msg=listened new cluster", event.getClusterPhyId());
        Long now = System.currentTimeMillis();
        // 交由KS自定义的线程池,异步执行任务
        FutureUtil.quickStartupFutureUtil.submitTask(() -> triggerAllTask(event.getClusterPhyId(), now));
    }
    private void triggerAllTask(Long clusterPhyId, Long startTimeUnitMs) {
        ClusterPhy tempClusterPhy = null;
        // 120秒内无加载进来,则直接返回退出
        while (System.currentTimeMillis() - startTimeUnitMs <= 120L * 1000L) {
            tempClusterPhy = LoadedClusterPhyCache.getByPhyId(clusterPhyId);
            if (tempClusterPhy != null) {
                break;
            }
            BackoffUtils.backoff(1000);
        }
        if (tempClusterPhy == null) {
            return;
        }
        // 获取到之后,再延迟5秒,保证相关的集群都被正常加载进来,这里的5秒不固定
        BackoffUtils.backoff(5000);
        final ClusterPhy clusterPhy = tempClusterPhy;
        // 集群执行集群元信息同步
        List<AbstractAsyncMetadataDispatchTask> metadataServiceList = new ArrayList<>(SpringTool.getBeansOfType(AbstractAsyncMetadataDispatchTask.class).values());
        for (AbstractAsyncMetadataDispatchTask dispatchTask: metadataServiceList) {
            try {
                dispatchTask.asyncProcessSubTask(clusterPhy, startTimeUnitMs);
            } catch (Exception e) {
                // ignore
            }
        }
        // 再延迟5秒,保证集群元信息都已被正常同步至DB,这里的5秒不固定
        BackoffUtils.backoff(5000);
        // 集群集群指标采集
        List<AbstractAsyncMetricsDispatchTask> metricsServiceList = new ArrayList<>(SpringTool.getBeansOfType(AbstractAsyncMetricsDispatchTask.class).values());
        for (AbstractAsyncMetricsDispatchTask dispatchTask: metricsServiceList) {
            try {
                dispatchTask.asyncProcessSubTask(clusterPhy, startTimeUnitMs);
            } catch (Exception e) {
                // ignore
            }
        }
    }
}

通过监听集群添加事件,触发元数据同步和指标采集调度任务

具体实现可参考:

spring 根据接口或者抽象类获取子类执行: https://blog.csdn.net/u012501054/article/details/103927674

二、调度任务分布式系统如何做到单节点运行,避免多台机器调度

AbstractDispatchTask 里面的execute 方法通过实现任务分配

public TaskResult execute(JobContext jobContext) {
        try {
            long triggerTimeUnitMs = System.currentTimeMillis();
            // 获取所有的任务
            List<E> allTaskList = this.listAllTasks();
            if (ValidateUtils.isEmptyList(allTaskList)) {
                LOGGER.debug("all-task is empty, finish process, taskName:{} jobContext:{}", taskName, jobContext);
                return TaskResult.SUCCESS;
            }
            // 计算当前机器需要执行的任务
            List<E> subTaskList = this.selectTask(allTaskList, jobContext.getAllWorkerCodes(), jobContext.getCurrentWorkerCode());
            if (ValidateUtils.isEmptyList(allTaskList)) {
                LOGGER.debug("sub-task is empty, finish process, taskName:{} jobContext:{}", taskName, jobContext);
                return TaskResult.SUCCESS;
            }
            // 进行任务处理
            TaskResult ret = this.processTask(subTaskList, triggerTimeUnitMs);
            //组装信息
            TaskResult taskResult = new TaskResult();
            taskResult.setCode(ret.getCode());
            taskResult.setMessage(ConvertUtil.list2String(subTaskList, ","));
            return taskResult;
        } catch (Exception e) {
            LOGGER.error("process task failed, taskName:{}", taskName, e);
            return new TaskResult(TaskResult.FAIL_CODE, e.toString());
        }
    }

对应代码解释如下:

参考:

https://github.com/didi/KnowStreaming/blob/master/docs/dev_guide/Task%E6%A8%A1%E5%9D%97%E7%AE%80%E4%BB%8B.md


相关文章
|
4月前
|
搜索推荐 Python
Python上下文管理器DIY指南:从入门到精通,轻松驾驭资源管理
【7月更文挑战第6天】Python的上下文管理器是资源管理的利器,简化文件操作、网络连接等场景。通过定义类及`__enter__`、`__exit__`方法,可自定义管理器,如示例中的`MyContextManager`,实现资源获取与释放。使用with语句,提升代码可读性和维护性,不仅用于基本资源管理,还可扩展到事务控制、自动重试等高级应用,让编程更加高效和灵活。
58 0
|
6月前
|
设计模式 前端开发 Java
KnowStreaming系列教程第二篇——项目整体架构分析
KnowStreaming系列教程第二篇——项目整体架构分析
80 0
|
19天前
|
存储 C++ UED
【实战指南】4步实现C++插件化编程,轻松实现功能定制与扩展
本文介绍了如何通过四步实现C++插件化编程,实现功能定制与扩展。主要内容包括引言、概述、需求分析、设计方案、详细设计、验证和总结。通过动态加载功能模块,实现软件的高度灵活性和可扩展性,支持快速定制和市场变化响应。具体步骤涉及配置文件构建、模块编译、动态库入口实现和主程序加载。验证部分展示了模块加载成功的日志和配置信息。总结中强调了插件化编程的优势及其在多个方面的应用。
165 61
|
14天前
|
JavaScript 前端开发 安全
轻松上手Web Worker:多线程解决方案的使用方法与实战指南
轻松上手Web Worker:多线程解决方案的使用方法与实战指南
34 0
|
6月前
|
前端开发 JavaScript UED
第五章(原理篇) 微前端技术之模块联邦与动态加载
第五章(原理篇) 微前端技术之模块联邦与动态加载
275 0
|
4月前
|
JavaScript 前端开发 jenkins
【开发脚手架】系列教程 -1- 脚手架的价值,功能,命令详解,执行原理图解
【开发脚手架】系列教程 -1- 脚手架的价值,功能,命令详解,执行原理图解
47 2
|
6月前
|
小程序 JavaScript 前端开发
【经验分享】如何实现小程序代码热更新| 江海计划
【经验分享】如何实现小程序代码热更新| 江海计划
114 8
|
6月前
|
存储 API 调度
FreeRTOS深入教程(任务创建的深入和任务调度机制分析)
FreeRTOS深入教程(任务创建的深入和任务调度机制分析)
280 0
|
传感器 调度 开发者
【Freertos基础教程】任务管理之基本使用
【Freertos基础教程】任务管理之基本使用
126 0
|
算法 测试技术 iOS开发
【第三篇】XiaoZaiMultiAutoAiDevices之运行流程
本框架大部分代码都是有详细的注释,配合此教程系列,把流程梳理通应该是没有什么问题。
107 0
【第三篇】XiaoZaiMultiAutoAiDevices之运行流程