大数据常用调度平台

简介: 大数据常用调度平台

1. 项目结构

  • 目标
  • 理解项目中为什么需要调度平台
  • 步骤
  1. 项目介绍
  2. 项目结构

1.1. 项目介绍

项目功能

  • 标签创建
  • 标签计算
  • 标签审计


如果要对标签进行计算, 就需要把标签和标签的计算都管理起来


创建标签时, 执行标签计算任务

把计算和标签关联起来, 通过标签能找到任务, 通过任务也知道它计算的是什么标签

标签, 年龄就是一个标签

计算任务, 为了计算年龄标签, 需要去读取用户数据, 计算每一个用户是哪一个标签

用户画像数据, 每一个用户的标签, 就是用户画像数据

修改标签时, 标签计算任务同步修改

删除标签时, 标签计算任务停止

所以, 我们的项目需要和调度平台结合起来, 不仅管理标签本身, 也需要管理标签相关联的任务, 所以需要配合调度平台来管理


1.2. 项目结构

  • 管理者通过画像系统添加, 修改, 删除标签
  • 画像系统将标签抽象为标签和任务, 通过 Oozie 调度到 Yarn 中
  • Yarn 中的 Spark 任务读取 MySQL 中的标签元信息
  • Spark 任务通过 HBase 读取需要计算的数据
  • 计算完成后, 插入 HBase 对应的表中

2. Oozie 介绍

  • 目标
  • 了解 Oozie 的作用
  • 步骤
  1. 需求
  2. 可选的方式
  3. Oozie 和同类的对比

2.1. 需求

  • 现在要计算年龄标签, 有七个阶段, 50后、60后、70后、80后、90后、00后、10后、20后
  • 通过 Spark 写了一个程序, 读取用户年龄, 把每个用户的年龄映射到每个阶段内
  • 因为年龄标签变化的频率不高, 所以决定要一年计算一次
  • 如何完成这个计算任务的调度?

2.2. 可选的方式

2.2.1. Crontab

  • Crontab 是 Linux 的一个调度程序, 允许我们通过一个简洁的语法表示调度周期, 在对应的时间点周期性的执行某个 Shell 脚本或者 Shell 命令
# 分 时 日 月 星期 要运行的命令
# 每1分钟执行一次myCommand
* * * * * myCommand
# 每小时的第3和第15分钟执行
3,15 * * * * myCommand
# 在上午8点到11点的第3和第15分钟执行
3,15 8-11 * * * myCommand
# 每周一上午8点到11点的第3和第15分钟执行
3,15 8-11 * * 1 myCommand

我们可以编写一个 Shell 脚本, 其中写上

spark-submit --master yarn \
--class xx
--executor-memory 500m
--executor-cores 1
--num-executors 1
xx.jar
args1

然后再将如下的 cron 加入 crontab

0 20 * * * tag_age.shell

这样的方式有如下几个问题

  • cron 任务只能通过命令来管理
  • crontab -e
  • 如果换成如下的需求, 这个流程我们该如何控制呢?
  1. 把数据抽取到 HBase 中
  2. 同步执行如下两个任务
  • 合并用户数据和订单数据, 生成宽表
  • 合并用户数据和商品数据, 生成宽表
  1. 合并两个宽表, 计算用户标签
  • 如果执行的程序出错了, 只能登录到对应的主机查看 Log

2.2.2. Oozie

  1. 创建一个 Workflow, 表示一个任务的执行流程
  2. 创建一个 Coordinator, 表示任务的执行周期
  1. 通过 Hue 提交任务
  2. 通过 Hue 查看任务执行的 Log

2.3. Oozie 和竞品的对比

现在市面上比较多见的调度平台分别是 Oozie 和 Azkaban, 还有一个后起之秀 AirFlow

Airflow Azkaban Oozie
机构 Airbnb LinkedIn Apache
社区 Very Active Somewhat active Active
历史 4年 7 年 8 年
目的 General Purpose Batch Processing Hadoop 作业调度 Hadoop 作业调度
流程定义 Python Custom DSL XML
单节点 支持 Yes Yes
快速开始 支持 支持 不支持
高可用 Yes Yes Yes
单点故障 不, 使用 Yarn
运行模式 Push Push Poll
Rest API 触发 Yes Yes Yes
外部事件触发 Yes No Yes
Web 权限 LDAP/Password XML Password Kerberos
监控 Yes Limited Yes
可扩展性 Depending on executor setup Good Very Good
  • 在对工具的支持上, Airflow 和 Oozie 都很好, 但是 Airflow 要简便一些
  • 功能上, Oozie 最强大
  • 稳定性上, Oozie 最稳定
  • 上手速度, Azkaban 最快
  • 带有个人情绪的简评, Oozie 最稳定, 也最难以驾驭, 坑多

3. Oozie 组件

  • 目标
  • 理解 Oozie 各个组件
  • 步骤
  1. Workflow
  2. Coordinator

3.1. Workflow


Workflow 就是一个工作流, 从 start 开始, 到 end 结束

Workflow 中提供了 Fork 和 Join 机制

Fork 可以使得多个任务并行

Join 可以让并行的任务汇总

Workflow 中提供了 Decision 机制

类似于 if 一样, 可以提供分支选择功能

  • 在任务执行完成后, Workflow 可以以发送邮件, 短信等方式通知
  • Workflow 提供了一种类似 DAG 的方式来执行一组任务
  • Oozie 支持 MR, Hive, Spark, Shell 等的支持

3.2. Coordinator

  • Workflow 表示了任务的实现和关系
  • Oozie 提供了 Coodinator, 用来调度 Workflow
  • Coordinator 在 Hue 中叫做 Scheduler

4. 调度实现

  • 目标
  • 实现工程中的调度功能
  • 步骤
  1. 执行流程
  2. Workflow
  3. Coordinator
  4. Java 代码
  5. 调度 Workflow

4.1. 执行流程

  1. 用户在网页中添加四级标签, 因为四级标签代表标签, 五级标签代表标签值域
  1. 上传 Jar 包
  2. 提交标签
  1. 后端收到添加标签的请求, 存储标签信息并将 Jar 包上传到 HDFS
  • cn.oldlu.tag.web.basictag.controller.BasicTagController#modelUpload
  1. 用户在网页中开启标签计算
  2. 后端收到请求, 联系 Oozie 开始调度

添加标签的执行流程如下

  1. 调用了 BasicTagController
  2. 在 BasicTagController 中调用了 BasicTagService
  3. 保存 Tag 信息, 指的就是具体的标签数据, 比如说标签叫什么, 谁创建的, 什么时候创建的, 级别…
  4. 保存 Model 信息, 标签是需要计算的, 我们使用 Spark Job 计算, Main方法在哪个类, Jar 包的位置等…
  • 在保存之前, 把 Model 对应 Jar 包上传到 HDFS 中
  • 路径 /app/models/Tag_10/xx.jar

开启任务的流程如下

  1. 调用 TaskProcessing 中的方法
  1. TaskProcessing 会调用 EngineService 中的 start 方法
  • 传进来了一个 标签 ID

4.2. Workflow

  • Workflow 使用 XML 编写, 是一种通过配置表示过程的方式
  • Workflow 中的配置无需太过认真的了解, 大致知道流程即可
  • 因为一般情况下, 我们使用 Hue 或者 Ambari 的图形化方式去编辑 Workflow , 手写的可能性比较低
  • Workflow 中可以使用 EL 表达式填充信息
<workflow-app name="Extra goods from hive to hbase fully" xmlns="uri:oozie:workflow:0.5">
    <start to="tag-model-job"/>
    <action name="tag-model-job">
        <spark xmlns="uri:oozie:spark-action:0.2">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <master>${sparkMaster}</master>
            <mode>${sparkDeployMode}</mode>
            <name>${oozieWFName}</name>
            <class>${sparkJobMain}</class>
            <jar>${sparkJobJar}</jar>
            <spark-opts>${sparkJobOpts}</spark-opts>
            <arg>${sparkMainOpts}</arg>
            <file>${sparkContainerCacheFiles}</file>
        </spark>
        <ok to="End"/>
        <error to="Kill"/>
    </action>
    <kill name="Kill">
        <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="End"/>
</workflow-app>

4.3. Coordinator

  • 此处的 Coordinator 只是为了周期性调度 Workflow
  • 在项目中提供的调度周期有: 日, 月, 年
<?xml version="1.0" encoding="UTF-8"?>
<coordinator-app name="cron-coordinator" frequency="${coord:days(1)}"
                 start="${start}" end="${end}" timezone="Asia/Shanghai" xmlns="uri:oozie:coordinator:0.5">
    <action>
        <workflow>
            <app-path>${oozieWorkflowPath}</app-path>
        </workflow>
    </action>
</coordinator-app>

4.4. Java 代码

private boolean startEngineActually(EngineBean engineBean) {
        ModelBean modelBean = modelMapper.get(new ModelBean(engineBean.getTagId()));
        MetaDataBean metaDataBean = metaDataMapper.get(new MetaDataBean(engineBean.getTagId()));
        // 1. 判断模型和元数据是否存在, 如果不存在则启动失败
        HdfsUtil hdfsUtil = HdfsUtil.getInstance();
        if (!hdfsUtil.exist(modelBean.getModelPath())) {
            logger.error("模型包不存在, 启动失败");
            return false;
        }
        if (StringUtils.isBlank(metaDataBean.getInType())) {
            logger.error("模型元信息不存在或者缺失 InType, 启动失败");
            return false;
        }
        // 2. 上传 Oozie 配置到 HDFS
        String localOozieConfigPath = engineBean.getRemark() + oozieConfigDirName + "/";
        String tagModelPath = hdfsUtil.getPath(new File(modelBean.getModelPath()).getParent());
        hdfsUtil.uploadLocalFile2HDFS(localOozieConfigPath + workflowFileName, tagModelPath);
        hdfsUtil.uploadLocalFile2HDFS(localOozieConfigPath + coordinatorFileName, tagModelPath);
        // 3. 构建 Oozie job 参数
        OozieUtil oozie = oozieUtil.build();
        Properties oozieConf = oozie.getConf();
        // 3.1. Workflow 部分
        String separator = "/";
        oozieConf.setProperty("nameNode", nameNode);
        oozieConf.setProperty("sparkJobMain", modelBean.getModelMain());
        if (StringUtils.isNotBlank(modelBean.getArgs())) {
            oozieConf.setProperty("sparkJobOpts", modelBean.getArgs());
        }
        oozieConf.setProperty("sparkJobJar", "${nameNode}" + modelBean.getModelPath());
        oozieConf.setProperty("sparkContainerCacheFiles", "${nameNode}" + modelBean.getModelPath());
        // 3.2. Coordinator 部分, Coordinator 中的时间格式是 UTC, 如 2018-06-019T20:01Z
        oozieConf.setProperty("oozieWorkflowPath", "${nameNode}" + tagModelPath + separator);
        oozieConf.setProperty("oozie.coord.application.path", "${nameNode}" + tagModelPath + separator + coordinatorFileName);
        // ScheTime 的格式为 每天#2018-06-01 20::01#2018-06-01 20::01
        String[] scheduleArr = modelBean.getScheTime().split("#");
        String freq = scheduleArr[0];
        String startDateTime = scheduleArr[1];
        String endDateTime = scheduleArr[2];
        // 设置频率
        String freqStr = "";
        switch (freq) {
            case "每天":
                freqStr = "day";
                break;
            case "每周":
                freqStr = "week";
                break;
            case "每月":
                freqStr = "month";
                break;
            case "每年":
                freqStr = "year";
                break;
        }
        oozieConf.setProperty("freq", freqStr);
        // 设置开始时间
        String[] startDateTimeArr = startDateTime.split(" ");
        String startDate = startDateTimeArr[0];
        String startTime = startDateTimeArr[1];
        oozieConf.setProperty("start", startDate + "T" + startTime + "Z");
        // 设置结束时间
        String[] endDateTimeArr = endDateTime.split(" ");
        String endDate = endDateTimeArr[0];
        String endTime = endDateTimeArr[1];
        oozieConf.setProperty("end", endDate + "T" + endTime + "Z");
        logger.info(oozieConf.toString());
        // 4. 提交 Oozie 任务
        String jobId = oozie.start(oozieConf);
        // 5. 设置监控
        EngineBean mEngineBean = new EngineBean();
        mEngineBean.setJobid(jobId);
        mEngineBean.setTagId(engineBean.getTagId());
        mEngineBean.setStatus("3");
        int state = engineMapper.addMonitorInfo(mEngineBean);
        return state > 0;
    }

4.5. 执行流程

  1. 上传 Jar 包
  2. 创建四级标签
  3. 开启标签任务
  4. Hue 中查看
  • 注意把 Filter 清空

4.5. 调度 Workflow

因为在测试阶段, 我们希望每次提交任务都立刻执行, 所以可以注释掉提交 Oozie 任务的代码, 改为直接提交 Workflow 任务

  • 去掉 Coordinator 的位置, 改为 APP_PATH, 则会直接找到 Workflow 提交
    private boolean startEngineActually(EngineBean engineBean) {
        ModelBean modelBean = modelMapper.get(new ModelBean(engineBean.getTagId()));
        MetaDataBean metaDataBean = metaDataMapper.get(new MetaDataBean(engineBean.getTagId()));
        // 1. 判断模型和元数据是否存在, 如果不存在则启动失败
        HdfsUtil hdfsUtil = HdfsUtil.getInstance();
        if (!hdfsUtil.exist(modelBean.getModelPath())) {
            logger.error("模型包不存在, 启动失败");
            return false;
        }
        if (StringUtils.isBlank(metaDataBean.getInType())) {
            logger.error("模型元信息不存在或者缺失 InType, 启动失败");
            return false;
        }
        // 2. 上传 Oozie 配置到 HDFS
        String localOozieConfigPath = engineBean.getRemark() + oozieConfigDirName + "/";
        String tagModelPath = hdfsUtil.getPath(new File(modelBean.getModelPath()).getParent());
        hdfsUtil.uploadLocalFile2HDFS(localOozieConfigPath + workflowFileName, tagModelPath);
        hdfsUtil.uploadLocalFile2HDFS(localOozieConfigPath + coordinatorFileName, tagModelPath);
        // 3. 构建 Oozie job 参数
        OozieUtil oozie = oozieUtil.build();
        Properties oozieConf = oozie.getConf();
        // 3.1. Workflow 部分
        String separator = "/";
        oozieConf.setProperty("nameNode", nameNode);
        oozieConf.setProperty("sparkJobMain", modelBean.getModelMain());
        if (StringUtils.isNotBlank(modelBean.getArgs())) {
            oozieConf.setProperty("sparkJobOpts", modelBean.getArgs());
        }
        oozieConf.setProperty("sparkJobJar", "${nameNode}" + modelBean.getModelPath());
        oozieConf.setProperty("sparkContainerCacheFiles", "${nameNode}" + modelBean.getModelPath());
        oozieConf.setProperty(OozieClient.APP_PATH, "${nameNode}" + tagModelPath + separator + workflowFileName);
        // 3.2. Coordinator 部分, Coordinator 中的时间格式是 UTC, 如 2018-06-019T20:01Z
//        oozieConf.setProperty("oozieWorkflowPath", "${nameNode}" + tagModelPath + separator);
//        oozieConf.setProperty("oozie.coord.application.path", "${nameNode}" + tagModelPath + separator + coordinatorFileName);
//
//        // ScheTime 的格式为 每天#2018-06-01 20::01#2018-06-01 20::01
//        String[] scheduleArr = modelBean.getScheTime().split("#");
//        String freq = scheduleArr[0];
//        String startDateTime = scheduleArr[1];
//        String endDateTime = scheduleArr[2];
//
//        // 设置频率
//        String freqStr = "";
//        switch (freq) {
//            case "每天":
//                freqStr = "day";
//                break;
//            case "每周":
//                freqStr = "week";
//                break;
//            case "每月":
//                freqStr = "month";
//                break;
//            case "每年":
//                freqStr = "year";
//                break;
//        }
//        oozieConf.setProperty("freq", freqStr);
//
//        // 设置开始时间
//        String[] startDateTimeArr = startDateTime.split(" ");
//        String startDate = startDateTimeArr[0];
//        String startTime = startDateTimeArr[1];
//        oozieConf.setProperty("start", startDate + "T" + startTime + "Z");
//
//        // 设置结束时间
//        String[] endDateTimeArr = endDateTime.split(" ");
//        String endDate = endDateTimeArr[0];
//        String endTime = endDateTimeArr[1];
//        oozieConf.setProperty("end", endDate + "T" + endTime + "Z");
//
//        logger.info(oozieConf.toString());
        // 4. 提交 Oozie 任务
        String jobId = oozie.start(oozieConf);
        // 5. 设置监控
        EngineBean mEngineBean = new EngineBean();
        mEngineBean.setJobid(jobId);
        mEngineBean.setTagId(engineBean.getTagId());
        mEngineBean.setStatus("3");
        int state = engineMapper.addMonitorInfo(mEngineBean);
        return state > 0;
    }


相关实践学习
简单用户画像分析
本场景主要介绍基于海量日志数据进行简单用户画像分析为背景,如何通过使用DataWorks完成数据采集 、加工数据、配置数据质量监控和数据可视化展现等任务。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
4月前
|
存储 SQL 分布式计算
开源大数据比对平台设计与实践—dataCompare
开源大数据比对平台设计与实践—dataCompare
70 0
|
4月前
|
SQL 存储 大数据
某互联网大厂亿级大数据服务平台的建设和实践
某互联网大厂亿级大数据服务平台的建设和实践
68 0
|
4月前
|
SQL 大数据 关系型数据库
开源大数据比对平台(dataCompare)新版本发布
开源大数据比对平台(dataCompare)新版本发布
74 0
|
4月前
|
SQL 存储 分布式计算
从0到1介绍一下开源大数据比对平台dataCompare
从0到1介绍一下开源大数据比对平台dataCompare
124 0
|
5月前
|
人工智能 Cloud Native 大数据
构建高性能云原生大数据处理平台:融合人工智能优化数据分析流程
构建高性能云原生大数据处理平台:融合人工智能优化数据分析流程
193 0
|
2月前
|
存储 分布式计算 调度
MaxCompute资源问题之删除调度资源如何解决
MaxCompute资源指的是在MaxCompute项目中使用的计算资源和存储资源;本合集旨在向用户展示如何高效管理MaxCompute资源,包括资源包管理、配额调整和性能优化等方面。
30 0
|
2月前
|
监控 物联网 大数据
智慧工地管理平台系统源码基于物联网、云计算、大数据等技术
智慧工地平台APP通过对施工过程人机料法环的全面感知、互联互通、智能协同,提高施工现场的生产效率、管理水平和决策能力,实现施工管理的数字化、智能化、精益化。
56 0
|
3月前
|
SQL 分布式计算 HIVE
开源湖仓一体平台(二):Arctic(上篇)
开源湖仓一体平台(二):Arctic(上篇)
开源湖仓一体平台(二):Arctic(上篇)
|
3月前
|
SQL 消息中间件 分布式计算
开源湖仓一体平台(一):LakeSoul
开源湖仓一体平台(一):LakeSoul
|
4月前
|
SQL 存储 大数据
从0到1介绍一下开源大数据服务平台dataService
从0到1介绍一下开源大数据服务平台dataService
116 1

热门文章

最新文章