注:将整个项目的数据处理过程,从数据采集到数据分析,再到结果数据的导出,一系列的任务分割成若干个oozie的工作流,并用coordinator进行协调。
工作流定义示例
Ooize配置片段示例,详见项目工程
1.日志预处理mr程序工作流定义
<workflow-app name="weblogpreprocess" xmlns="uri:oozie:workflow:0.4"> <start to="firstjob"/> <action name="firstjob"> <map-reduce> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <prepare> <delete path="${nameNode}/${outpath}"/> </prepare> <configuration> <property> <name>mapreduce.job.map.class</name> <value>cn.itcast.bigdata.hive.mr.WeblogPreProcess$WeblogPreProcessMapper</value> </property> <property> <name>mapreduce.job.output.key.class</name> <value>org.apache.hadoop.io.Text</value> </property> <property> <name>mapreduce.job.output.value.class</name> <value>org.apache.hadoop.io.NullWritable</value> </property> <property> <name>mapreduce.input.fileinputformat.inputdir</name> <value>${inpath}</value> </property> <property> <name>mapreduce.output.fileoutputformat.outputdir</name> <value>${outpath}</value> </property> <property> <name>mapred.mapper.new-api</name> <value>true</value> </property> <property> <name>mapred.reducer.new-api</name> <value>true</value> </property> </configuration> </map-reduce> <ok to="end"/> <error to="kill"/>
2.数据加载etl工作流定义
<workflow-app xmlns="uri:oozie:workflow:0.5" name="hive2-wf"> <start to="hive2-node"/> <action name="hive2-node"> <hive2 xmlns="uri:oozie:hive2-action:0.1"> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <configuration> <property> <name>mapred.job.queue.name</name> <value>${queueName}</value> </property> </configuration> <jdbc-url>jdbc:hive2://hdp-node-01:10000</jdbc-url> <script>script.q</script> <param>input=/weblog/outpre2</param> </hive2> <ok to="end"/> <error to="fail"/> </action> <kill name="fail"> <message>Hive2 (Beeline) action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> </kill> <end name="end"/> </workflow-app>
3.数据加载工作流所用hive脚本
create database if not exists dw_weblog; use dw_weblog; drop table if exists t_orgin_weblog; create table t_orgin_weblog(valid string,remote_addr string, remote_user string, time_local string, request string, status string, body_bytes_sent string, http_referer string, http_user_agent string) row format delimited fields terminated by '\001'; load data inpath '/weblog/preout' overwrite into table t_orgin_weblog; drop table if exists t_ods_detail_tmp_referurl; create table t_ods_detail_tmp_referurl as SELECT a.*,b.* FROM t_orgin_weblog a LATERAL VIEW parse_url_tuple(regexp_replace(http_referer, "\"", ""), 'HOST', 'PATH','QUERY', 'QUERY:id') b as host, path, query, query_id; drop table if exists t_ods_detail; create table t_ods_detail as select b.*,substring(time_local,0,11) as daystr, substring(time_local,13) as tmstr, substring(time_local,4,3) as month, substring(time_local,0,2) as day, substring(time_local,13,2) as hour from t_ods_detail_tmp_referurl b; drop table t_ods_detail_prt; create table t_ods_detail_prt( valid string, remote_addr string, remote_user string, time_local string, request string, status string, body_bytes_sent string, http_referer string, http_user_agent string, host string, path string, query string, query_id string, daystr string, tmstr string, month string, day string, hour string) partitioned by (mm string,dd string); insert into table t_ods_detail_prt partition(mm='Sep',dd='18') select * from t_ods_detail where daystr='18/Sep/2013'; insert into table t_ods_detail_prt partition(mm='Sep',dd='19') select * from t_ods_detail where daystr='19/Sep/2013';
工作流单元测试
1、工作流定义配置上传
[hadoop@hdp-node-01 wf-oozie]$ hadoop fs -put hive2-etl /user/hadoop/oozie/myapps/ [hadoop@hdp-node-01 wf-oozie]$ hadoop fs -put hive2-dw /user/hadoop/oozie/myapps/ [hadoop@hdp-node-01 wf-oozie]$ ll total 12 drwxrwxr-x. 2 hadoop hadoop 4096 Nov 23 16:32 hive2-dw drwxrwxr-x. 2 hadoop hadoop 4096 Nov 23 16:32 hive2-etl drwxrwxr-x. 3 hadoop hadoop 4096 Nov 23 11:24 weblog [hadoop@hdp-node-01 wf-oozie]$ export OOZIE_URL=http://localhost:11000/oozie
2、工作流单元提交启动
oozie job -D inpath=/weblog/input -D outpath=/weblog/outpre -config weblog/job.properties -run
启动etl的hive工作流
oozie job -config hive2-etl/job.properties -run
启动pvs统计的hive工作流
oozie job -config hive2-dw/job.properties -run
3、工作流coordinator配置(片段)
多个工作流job用coordinator组织协调:
[hadoop@hdp-node-01 hive2-etl]$ ll total 28 -rw-rw-r--. 1 hadoop hadoop 265 Nov 13 16:39 config-default.xml -rw-rw-r--. 1 hadoop hadoop 512 Nov 26 16:43 coordinator.xml -rw-rw-r--. 1 hadoop hadoop 382 Nov 26 16:49 job.properties drwxrwxr-x. 2 hadoop hadoop 4096 Nov 27 11:26 lib -rw-rw-r--. 1 hadoop hadoop 1910 Nov 23 17:49 script.q -rw-rw-r--. 1 hadoop hadoop 687 Nov 23 16:32 workflow.xml
config-default.xml:
<configuration> <property> <name>jobTracker</name> <value>hdp-node-01:8032</value> </property> <property> <name>nameNode</name> <value>hdfs://hdp-node-01:9000</value> </property> <property> <name>queueName</name> <value>default</value> </property> </configuration>
job.properties:
user.name=hadoop oozie.use.system.libpath=true oozie.libpath=hdfs://hdp-node-01:9000/user/hadoop/share/lib oozie.wf.application.path=hdfs://hdp-node-01:9000/user/hadoop/oozie/myapps/hive2-etl/
workflow.xml:
<workflow-app xmlns="uri:oozie:workflow:0.5" name="hive2-wf"> <start to="hive2-node"/> <action name="hive2-node"> <hive2 xmlns="uri:oozie:hive2-action:0.1"> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <configuration> <property> <name>mapred.job.queue.name</name> <value>${queueName}</value> </property> </configuration> <jdbc-url>jdbc:hive2://hdp-node-01:10000</jdbc-url> <script>script.q</script> <param>input=/weblog/outpre2</param> </hive2> <ok to="end"/> <error to="fail"/> </action> <kill name="fail"> <message>Hive2 (Beeline) action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> </kill> <end name="end"/> </workflow-app>
coordinator.xml:
<coordinator-app name="cron-coord" frequency="${coord:minutes(5)}" start="${start}" end="${end}" timezone="Asia/Shanghai" xmlns="uri:oozie:coordinator:0.2"> <action> <workflow> <app-path>${workflowAppUri}</app-path> <configuration> <property> <name>jobTracker</name> <value>${jobTracker}</value> </property> <property> <name>nameNode</name> <value>${nameNode}</value> </property> <property> <name>queueName</name> <value>${queueName}</value> </property> </configuration> </workflow> </action> </coordinator-app>