Oozie分布式任务的工作流——Spark篇

简介:

Spark是现在应用最广泛的分布式计算框架,oozie支持在它的调度中执行spark。在我的日常工作中,一部分工作就是基于oozie维护好每天的spark离线任务,合理的设计工作流并分配适合的参数对于spark的稳定运行十分重要。

Spark Action

这个Action允许执行spark任务,需要用户指定job-tracker以及name-node。先看看语法规则:

语法规则

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.3">
    ...
    <action name="[NODE-NAME]">
        <spark xmlns="uri:oozie:spark-action:0.1">
            <job-tracker>[JOB-TRACKER]</job-tracker>
            <name-node>[NAME-NODE]</name-node>
            <prepare>
               <delete path="[PATH]"/>
               ...
               <mkdir path="[PATH]"/>
               ...
            </prepare>
            <job-xml>[SPARK SETTINGS FILE]</job-xml>
            <configuration>
                <property>
                    <name>[PROPERTY-NAME]</name>
                    <value>[PROPERTY-VALUE]</value>
                </property>
                ...
            </configuration>
            <master>[SPARK MASTER URL]</master>
            <mode>[SPARK MODE]</mode>
            <name>[SPARK JOB NAME]</name>
            <class>[SPARK MAIN CLASS]</class>
            <jar>[SPARK DEPENDENCIES JAR / PYTHON FILE]</jar>
            <spark-opts>[SPARK-OPTIONS]</spark-opts>
            <arg>[ARG-VALUE]</arg>
                ...
            <arg>[ARG-VALUE]</arg>
            ...
        </spark>
        <ok to="[NODE-NAME]"/>
        <error to="[NODE-NAME]"/>
    </action>
    ...
</workflow-app>

prepare元素

它里面可以执行删除文件或者创建目录的操作,比如

<delete path="hdfs://xxxx/a"/>
<mkdir path="hdfs://xxxx"/>

一般来说,离线的spark任务最重都会生成一些数据,这些数据可能存储到数据库中,也可能直接存储到hdfs,如果存储到hdfs就涉及到清除目录了。比如你可能在测试环境需要频繁的重复运行spark任务,那么每次都需要清除目录文件,创建新的目录才行。

job-xml

spark 任务的参数也可以放在job-xml所在的xml中。

confugration

这里面的配置的参数将会传递给spark任务。

master

spark运行的模式,表示spark连接的集群管理器。默认可以使spark的独立集群(spark://host:port)或者是mesos(mesos://host:port)或者是yarn(yarn),以及本地模式local

mode

因为spark任务也可以看做主节点和工作节点模式,主节点就是驱动程序。这个驱动程序既可以运行在提交任务的机器,也可以放在集群中运行。

这个参数就是用来设置,驱动程序是以客户端的形式运行在本地机器,还是以集群模式运行在集群中。

name

spark应用的名字

class

spark应用的主函数

jar

spark应用的jar包

spark-opts

提交给驱动程序的参数。比如--conf key=value或者是在oozie-site.xml中配置的oozie.service.SparkConfiguationService.spark.configurations

arg

这个参数是用来提交给spark应用的参数

例子

官网给出的例子:

<workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1">
    ...
    <action name="myfirstsparkjob">
        <spark xmlns="uri:oozie:spark-action:0.1">
            <job-tracker>foo:8021</job-tracker>
            <name-node>bar:8020</name-node>
            <prepare>
                <delete path="${jobOutput}"/>
            </prepare>
            <configuration>
                <property>
                    <name>mapred.compress.map.output</name>
                    <value>true</value>
                </property>
            </configuration>
            <master>local[*]</master>
            <mode>client<mode>
            <name>Spark Example</name>
            <class>org.apache.spark.examples.mllib.JavaALS</class>
            <jar>/lib/spark-examples_2.10-1.1.0.jar</jar>
            <spark-opts>--executor-memory 20G --num-executors 50</spark-opts>
            <arg>inputpath=hdfs://localhost/input/file.txt</arg>
            <arg>value=2</arg>
        </spark>
        <ok to="myotherjob"/>
        <error to="errorcleanup"/>
    </action>
    ...
</workflow-app>

我自己工作时的例子:

<action name="NODE1">
    <spark xmlns="uri:oozie:spark-action:0.1">
        <job-tracker>${jobTracker}</job-tracker>
        <name-node>${nameNode}</name-node>
        <master>yarn</master>
        <mode>cluster</mode>
        <name>NODE1</name>
        <class>com.test.NODE1_Task</class>
        <jar>/lib/dw.jar</jar>
        <spark-opts>--executor-memory 1G --num-executors 6 --executor-cores 1 --conf spark.storage.memoryFraction=0.8</spark-opts>
        <arg>参数1</arg>
        <arg>参数2</arg>
        <arg>参数3</arg>
    </spark>
</action>

日志

spark action日志会重定向到oozie的mapr启动程序的stdout/stderr中。

通过oozie的web控制条,可以看到spark的日志。

spark on yarn

如果想要把spark运行在yarn上,需要按照下面的步骤执行:

  • 在spark action中加载spark-assembly包
  • 指定master为yarn-client或者yarn-master

为了确保spark工作在spark历史服务器中可以查到,需要保证在--conf中或者oozie.service.SparkConfiturationService.spark.configrations中设置下面的三个参数:

  • spark.yarn.historyServer.address=http://spark-host:18088
  • spark.eventLog.dir=hdfs://NN:8020/user/spark/applicationHistory
  • spark.eventLog.enabled=true

spark action的schema

<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
           xmlns:spark="uri:oozie:spark-action:0.1" elementFormDefault="qualified"
           targetNamespace="uri:oozie:spark-action:0.1">    <xs:element name="spark" type="spark:ACTION"/>
    <xs:complexType name="ACTION">
        <xs:sequence>
            <xs:element name="job-tracker" type="xs:string" minOccurs="1" maxOccurs="1"/>
            <xs:element name="name-node" type="xs:string" minOccurs="1" maxOccurs="1"/>
            <xs:element name="prepare" type="spark:PREPARE" minOccurs="0" maxOccurs="1"/>
            <xs:element name="job-xml" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
            <xs:element name="configuration" type="spark:CONFIGURATION" minOccurs="0" maxOccurs="1"/>
            <xs:element name="master" type="xs:string" minOccurs="1" maxOccurs="1"/>
            <xs:element name="mode" type="xs:string" minOccurs="0" maxOccurs="1"/>
            <xs:element name="name" type="xs:string" minOccurs="1" maxOccurs="1"/>
            <xs:element name="class" type="xs:string" minOccurs="0" maxOccurs="1"/>
            <xs:element name="jar" type="xs:string" minOccurs="1" maxOccurs="1"/>
            <xs:element name="spark-opts" type="xs:string" minOccurs="0" maxOccurs="1"/>
            <xs:element name="arg" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
        </xs:sequence>
    </xs:complexType>
    <xs:complexType name="CONFIGURATION">
        <xs:sequence>
            <xs:element name="property" minOccurs="1" maxOccurs="unbounded">
                <xs:complexType>
                    <xs:sequence>
                        <xs:element name="name" minOccurs="1" maxOccurs="1" type="xs:string"/>
                        <xs:element name="value" minOccurs="1" maxOccurs="1" type="xs:string"/>
                        <xs:element name="description" minOccurs="0" maxOccurs="1" type="xs:string"/>
                    </xs:sequence>
                </xs:complexType>
            </xs:element>
        </xs:sequence>
    </xs:complexType>
    <xs:complexType name="PREPARE">
        <xs:sequence>
            <xs:element name="delete" type="spark:DELETE" minOccurs="0" maxOccurs="unbounded"/>
            <xs:element name="mkdir" type="spark:MKDIR" minOccurs="0" maxOccurs="unbounded"/>
        </xs:sequence>
    </xs:complexType>
    <xs:complexType name="DELETE">
        <xs:attribute name="path" type="xs:string" use="required"/>
    </xs:complexType>
    <xs:complexType name="MKDIR">
        <xs:attribute name="path" type="xs:string" use="required"/>
    </xs:complexType>
</xs:schema>
本文转自博客园xingoo的博客,原文链接:Oozie分布式任务的工作流——Spark篇,如需转载请自行联系原博主。
相关文章
|
6月前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。
710 1
|
4月前
|
消息中间件 NoSQL Java
使用Java实现分布式任务调度器
使用Java实现分布式任务调度器
|
23天前
|
分布式计算 资源调度 Hadoop
Hadoop-05-Hadoop集群 集群WordCount 超详细 真正的分布式计算 上传HDFS MapReduce计算 YRAN查看任务 上传计算下载查看
Hadoop-05-Hadoop集群 集群WordCount 超详细 真正的分布式计算 上传HDFS MapReduce计算 YRAN查看任务 上传计算下载查看
40 1
|
3月前
|
资源调度 Java 调度
项目环境测试问题之Schedulerx2.0通过分布式分片任务解决单机计算瓶颈如何解决
项目环境测试问题之Schedulerx2.0通过分布式分片任务解决单机计算瓶颈如何解决
项目环境测试问题之Schedulerx2.0通过分布式分片任务解决单机计算瓶颈如何解决
|
3月前
|
人工智能 监控 虚拟化
操作系统的演变:从单任务到多任务,再到并发和分布式
随着计算技术的发展,操作系统经历了从简单的单任务处理到复杂的多任务、并发处理,再到现代的分布式系统的转变。本文将探索这一演变过程中的关键里程碑,以及它们如何塑造我们今天使用的计算机系统的架构和性能。
|
4月前
|
人工智能 分布式计算 物联网
操作系统的演变:从单任务到多任务再到并发和分布式
在数字时代的浪潮中,操作系统作为计算机硬件与应用程序之间的桥梁,其发展史是一部技术革新与需求演进的史诗。本文将带领读者穿梭于操作系统的时空隧道,从早期简单而原始的单任务系统出发,一路见证它如何逐步进化为支持多任务、并发执行乃至分布式计算的复杂系统。我们将一探究竟,是什么推动了这些转变,它们又是如何影响我们日常的技术实践与生活的。
61 1
|
4月前
|
Web App开发 物联网 Unix
操作系统的演变:从单任务到多任务再到并发与分布式
本文旨在探讨操作系统的发展历程,着重分析其从处理单一任务的原始阶段,经历多任务处理能力的增强,直至支持并发计算和分布式架构的现代转型。我们将追溯关键时间节点,审视技术创新如何塑造了今日操作系统的复杂性与多样性,并预测未来可能的发展趋势。
|
6月前
|
分布式计算 Hadoop 大数据
分布式计算框架比较:Hadoop、Spark 与 Flink
【5月更文挑战第31天】Hadoop是大数据处理的开创性框架,专注于大规模批量数据处理,具有高扩展性和容错性。然而,它在实时任务上表现不足。以下是一个简单的Hadoop MapReduce的WordCount程序示例,展示如何统计文本中单词出现次数。
186 0
|
6月前
|
SQL 分布式计算 Hadoop
Spark分布式内存计算框架
Spark分布式内存计算框架
152 0
|
6月前
|
存储 Kubernetes Cloud Native
云原生离线工作流编排利器 -- 分布式工作流 Argo 集群
云原生离线工作流编排利器 -- 分布式工作流 Argo 集群
105234 2

热门文章

最新文章