Oozie WorkFlow中MR Action使用讲解

简介: 笔记

一、如何开发workflow


1.参考官网给我们的example

2.workflow三个关键点

   1)自定义:job.properties 
   2)自定义:workflow.xml
    <start>
    <action>
       <ok to="end"/>
       <error to="fail"/>
    </action>
    <kill>
    <end>
   3)依赖包 lib目录


二、Oozie workflow中使用MapReduce Action


目的:使用Ooize调度MapReduce程序。

方式:将以前Java MapReduce程序中的Driver部分写到workflow.xml中的configuration里面。

示例:用Oozie安装包中自带的examples例子跑wordcount程序。


第一步: 先做准备工作,根据examples模版自己开发,在oozie目录下新建一个oozie-apps目录,将examples中的map-reduce目录拷贝到oozie-apps目录下,再将oozie-apps目录下job-with-config-class.properties和workflow-with-config-class.xml两个文件删除,只留下job.properties、lib、workflow.xml这三个。


第二步: 编写job.prperties文件

内容如下:

nameNode=hdfs://bigdata-pro-m01:9000
jobTracker=bigdata-pro-m01:8032
queueName=default
oozieAppRoot=user/caizhengjie/oozie-apps
oozieDataRoot=user/caizhengjie/oozie-datas
oozie.wf.application.path=${nameNode}/${oozieAppRoot}/map-reduce/workflow.xml
outputDir=map-reduce/output
inputDir=map-reduce/input

第三步: 在HDFS上创建数据输入的目录并运行wordcount的MR程序

bin/hdfs dfs -mkdir -p /user/caizhengjie/oozie-datas/map-reduce/input

将数据放到/user/caizhengjie/oozie-datas/map-reduce/input下

bin/hdfs dfs -put /opt/datas/wordcount.txt /user/caizhengjie/oozie-datas/map-reduce/input

运行wordcount的MR程序

bin/hadoop jar /opt/jars/bigdata_study.jar /user/caizhengjie/oozie-datas/map-reduce/input/wordcount.txt /user/caizhengjie/oozie-datas/map-reduce/output

第四步: 编写workflow.xml文件

运行了MR程序之后,在19888web界面找配置项写到workflow.xml文件中。

10.png

编写workflow.xml文件,内容如下:

<workflow-app xmlns="uri:oozie:workflow:0.5" name="kfk-mr">
    <start to="mr-node"/>
    <action name="mr-node">
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${nameNode}/${oozieDataRoot}/${outputDir}"/>
            </prepare>
            <configuration>
                <property>
                    <name>mapred.mapper.new-api</name>
                    <value>true</value>
                </property>
                <property>
                    <name>mapred.reducer.new-api</name>
                    <value>true</value>
                </property>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
                <property>
                    <name>mapreduce.input.fileinputformat.inputdir</name>
                    <value>${nameNode}/${oozieDataRoot}/${inputDir}/wordcount.txt</value>
                </property>
                <property>
                    <name>mapreduce.job.map.class</name>
                    <value>com.kfk.hadoop.mr.WordCountMR$WordCountMapper</value>
                </property>
                <property>
                    <name>mapreduce.map.output.key.class</name>
                    <value>org.apache.hadoop.io.Text</value>
                </property>
                <property>
                    <name>mapreduce.map.output.value.class</name>
                    <value>org.apache.hadoop.io.IntWritable</value>
                </property>
                <property>
                    <name>mapreduce.job.reduce.class</name>
                    <value>com.kfk.hadoop.mr.WordCountMR$WordCountReducer</value>
                </property>
                <property>
                    <name>mapreduce.job.output.key.class</name>
                    <value>org.apache.hadoop.io.Text</value>
                </property>
                <property>
                    <name>mapreduce.job.output.value.class</name>
                    <value>org.apache.hadoop.io.IntWritable</value>
                </property>
                <property>
                    <name>mapred.map.tasks</name>
                    <value>1</value>
                </property>
                <property>
                    <name>mapreduce.output.fileoutputformat.outputdir</name>
                    <value>${nameNode}/${oozieDataRoot}/${outputDir}</value>
                </property>
            </configuration>
        </map-reduce>
        <ok to="end"/>
        <error to="fail"/>
    </action>
    <kill name="fail">
        <message>Map/Reduce failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="end"/>
</workflow-app>

第五步: 将oozie-apps放在HDFS上

bin/hdfs dfs -put /opt/modules/oozie/oozie-apps /user/caizhengjie/

第六步:/user/caizhengjie/oozie-apps/map-reduce/lib里面的oozie-examples-4.1.0-cdh5.9.3.jar删除

rm -rf oozie-examples-4.1.0-cdh5.9.3.jar

第七步: 将MR的程序jar包放到opt/modules/oozie/oozie-apps/map-reduce/lib目录下

cp /opt/jars/bigdata_study.jar /opt/modules/oozie/oozie-apps/map-reduce/lib

第八步: 将oozie-apps上传到HDFS用户目录下的oozie-apps中

bin/hdfs dfs -put /opt/modules/oozie/oozie-apps/map-reduce /user/caizhengjie/oozie-apps

第九步: 运行mr action

bin/oozie job -oozie http://bigdata-pro-m01:11000/oozie -config oozie-apps/map-reduce/job.properties -run


如果要更改workflow.xml文件的话,需要将HDFS上的workflow.xml文件删除,然后重新上传

bin/hdfs dfs -rm -r /user/caizhengjie/oozie-apps/map-reduce/workflow.xml
bin/hdfs dfs -put /opt/modules/oozie/oozie-apps/map-reduce/workflow.xml /user/caizhengjie/oozie-apps/map-reduce

出现的问题:

ACTION[0000001-181019035430741-oozie-kfk-W@mr-node] Launcher exception: mapreduce.job.map.class is incompatible with map compatability mode.
java.io.IOException: mapreduce.job.map.class is incompatible with map compatability mode.

解决方法:

在workflow.xml文件中的configuration节点中加入如下的属性项配置

<property>
    <name>mapred.mapper.new-api</name>
    <value>true</value>
</property>
<property>
    <name>mapred.reducer.new-api</name>
    <value>true</value>
</property>

查看运行结果:

11.png

查看MR的运行结果

[caizhengjie@bigdata-pro-m01 hadoop]$ bin/hdfs dfs -text /user/caizhengjie/oozie-datas/map-reduce/output/part-r-00000
flink   2
hadoop  3
hbase   3
hive    4
hue     1
java    6
kafka   2
linux   1
python  4
spark   6
storm   1
unix    1
zookeeper       2
相关文章
68 Azkaban Command类型多job工作流flow
68 Azkaban Command类型多job工作流flow
61 0
|
5月前
|
分布式计算 资源调度 监控
什么是 Spark DAG?
【8月更文挑战第14天】
400 5
|
7月前
|
存储 监控 测试技术
Agent Workflow
【6月更文挑战第25天】
209 3
|
SQL IDE API
Flink Streaming Job中没有定义任何Operator
Flink Streaming Job中没有定义任何Operator
65 1
|
8月前
|
存储 监控 调度
【Flink】怎么提交的实时任务,有多少Job Manager?
【4月更文挑战第18天】【Flink】怎么提交的实时任务,有多少Job Manager?
|
SQL 分布式计算 资源调度
Spark on Yarn Job的执行流程简介
2017-12-19-Hadoop2.0架构及HA集群配置(1) 2017-12-24-Hadoop2.0架构及HA集群配置(2) 2017-12-25-Spark集群搭建 2017-12-29-Hadoop和Spark的异同 2017-12-28-Spark-HelloWorld(Spark开发环境搭建)
|
XML SQL 分布式计算
Apache Oozie- 节点类型 (control flow. action) &amp; 工作流类型 (coordinator. bundle)|学习笔记
快速学习 Apache Oozie- 节点类型 (control flow. action) &amp; 工作流类型 (coordinator. bundle)
Apache Oozie- 节点类型 (control flow. action) &amp; 工作流类型 (coordinator. bundle)|学习笔记