在 Pig 中使用 OSS
在使用 OSS 路径的时候,请使用类似如下的形式
oss://${AccessKeyId}:${AccessKeySecret}@${bucket}.${endpoint}/${path}
参数说明:
[backcolor=transparent]${accessKeyId}:您账号的 AccessKeyId。
[backcolor=transparent]${accessKeySecret}:该 AccessKeyId 对应的密钥。
[backcolor=transparent]${bucket}: 该 AccessKeyId 对应的 bucket。
[backcolor=transparent]${endpoint}:访问 OSS 使用的网络,由您集群所在的 region 决定,对应的 OSS 也需要是在集群对应的 region。
[backcolor=transparent]${path}:bucket 中的路径。
具体的值请参考
OSS Endpoint
以 Pig 中带的 script1-hadoop.pig 为例进行说明,将 Pig 中的
tutorial.jar 和
excite.log.bz2 上传到 OSS 中,假设上传路径分别为oss://emr/jars/tutorial.jar和oss://emr/data/excite.log.bz2。
请参见如下操作步骤:
- 编写脚本。将脚本中的 jar 文件路径和输入输出路径做了修改,如下所示。注意 OSS 路径设置形式为 oss://${accesskeyId}:${accessKeySecret}@${bucket}.${endpoint}/object/path。
- [backcolor=transparent]/*
- [backcolor=transparent] * Licensed to the Apache Software Foundation (ASF) under one
- [backcolor=transparent]* or more contributor license agreements. See the NOTICE file
- [backcolor=transparent]* distributed with this work for additional information
- [backcolor=transparent]* regarding copyright ownership. The ASF licenses this file
- [backcolor=transparent]* to you under the Apache License, Version 2.0 (the
- [backcolor=transparent]* "License"); you may not use this file except in compliance
- [backcolor=transparent]* with the License. You may obtain a copy of the License at
- [backcolor=transparent]*
- [backcolor=transparent]* http://www.apache.org/licenses/LICENSE-2.0
- [backcolor=transparent]*
- [backcolor=transparent]* Unless required by applicable law or agreed to in writing, software
- [backcolor=transparent]* distributed under the License is distributed on an "AS IS" BASIS,
- [backcolor=transparent]* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- [backcolor=transparent]* See the License for the specific language governing permissions and
- [backcolor=transparent]* limitations under the License.
- [backcolor=transparent]*/
- [backcolor=transparent]--[backcolor=transparent] [backcolor=transparent]Query[backcolor=transparent] [backcolor=transparent]Phrase[backcolor=transparent] [backcolor=transparent]Popularity[backcolor=transparent] [backcolor=transparent]([backcolor=transparent]Hadoop[backcolor=transparent] cluster[backcolor=transparent])
- [backcolor=transparent]--[backcolor=transparent] [backcolor=transparent]This[backcolor=transparent] script processes a search query log file [backcolor=transparent]from[backcolor=transparent] the [backcolor=transparent]Excite[backcolor=transparent] search engine [backcolor=transparent]and[backcolor=transparent] finds search phrases that occur [backcolor=transparent]with[backcolor=transparent] particular high frequency during certain times of the day[backcolor=transparent].
- [backcolor=transparent]--[backcolor=transparent] [backcolor=transparent]Register[backcolor=transparent] the tutorial JAR file so that the included [backcolor=transparent]UDFs[backcolor=transparent] can be called [backcolor=transparent]in[backcolor=transparent] the script[backcolor=transparent].
- [backcolor=transparent]REGISTER oss[backcolor=transparent]:[backcolor=transparent]//${AccessKeyId}:${AccessKeySecret}@${bucket}.${endpoint}/data/tutorial.jar;
- [backcolor=transparent]--[backcolor=transparent] [backcolor=transparent]Use[backcolor=transparent] the [backcolor=transparent]PigStorage[backcolor=transparent] [backcolor=transparent]function[backcolor=transparent] to load the excite log file [backcolor=transparent]into[backcolor=transparent] the [backcolor=transparent]▒[backcolor=transparent]raw[backcolor=transparent]▒[backcolor=transparent] bag [backcolor=transparent]as[backcolor=transparent] an array of records[backcolor=transparent].
- [backcolor=transparent]--[backcolor=transparent] [backcolor=transparent]Input[backcolor=transparent]:[backcolor=transparent] [backcolor=transparent]([backcolor=transparent]user[backcolor=transparent],[backcolor=transparent]time[backcolor=transparent],[backcolor=transparent]query[backcolor=transparent])
- [backcolor=transparent]raw [backcolor=transparent]=[backcolor=transparent] LOAD [backcolor=transparent]'oss://${AccessKeyId}:${AccessKeySecret}@${bucket}.${endpoint}/data/excite.log.bz2'[backcolor=transparent] USING [backcolor=transparent]PigStorage[backcolor=transparent]([backcolor=transparent]'\t'[backcolor=transparent])[backcolor=transparent] AS [backcolor=transparent]([backcolor=transparent]user[backcolor=transparent],[backcolor=transparent] time[backcolor=transparent],[backcolor=transparent] query[backcolor=transparent]);
- [backcolor=transparent]--[backcolor=transparent] [backcolor=transparent]Call[backcolor=transparent] the [backcolor=transparent]NonURLDetector[backcolor=transparent] UDF to remove records [backcolor=transparent]if[backcolor=transparent] the query field [backcolor=transparent]is[backcolor=transparent] empty [backcolor=transparent]or[backcolor=transparent] a URL[backcolor=transparent].
- [backcolor=transparent]clean1 [backcolor=transparent]=[backcolor=transparent] FILTER raw BY org[backcolor=transparent].[backcolor=transparent]apache[backcolor=transparent].[backcolor=transparent]pig[backcolor=transparent].[backcolor=transparent]tutorial[backcolor=transparent].[backcolor=transparent]NonURLDetector[backcolor=transparent]([backcolor=transparent]query[backcolor=transparent]);
- [backcolor=transparent]--[backcolor=transparent] [backcolor=transparent]Call[backcolor=transparent] the [backcolor=transparent]ToLower[backcolor=transparent] UDF to change the query field to lowercase[backcolor=transparent].
- [backcolor=transparent]clean2 [backcolor=transparent]=[backcolor=transparent] FOREACH clean1 GENERATE user[backcolor=transparent],[backcolor=transparent] time[backcolor=transparent],[backcolor=transparent] org[backcolor=transparent].[backcolor=transparent]apache[backcolor=transparent].[backcolor=transparent]pig[backcolor=transparent].[backcolor=transparent]tutorial[backcolor=transparent].[backcolor=transparent]ToLower[backcolor=transparent]([backcolor=transparent]query[backcolor=transparent])[backcolor=transparent] [backcolor=transparent]as[backcolor=transparent] query[backcolor=transparent];
- [backcolor=transparent]--[backcolor=transparent] [backcolor=transparent]Because[backcolor=transparent] the log file only contains queries [backcolor=transparent]for[backcolor=transparent] a single day[backcolor=transparent],[backcolor=transparent] we are only interested [backcolor=transparent]in[backcolor=transparent] the hour[backcolor=transparent].
- [backcolor=transparent]--[backcolor=transparent] [backcolor=transparent]The[backcolor=transparent] excite query log timestamp format [backcolor=transparent]is[backcolor=transparent] YYMMDDHHMMSS[backcolor=transparent].
- [backcolor=transparent]--[backcolor=transparent] [backcolor=transparent]Call[backcolor=transparent] the [backcolor=transparent]ExtractHour[backcolor=transparent] UDF to extract the hour [backcolor=transparent]([backcolor=transparent]HH[backcolor=transparent])[backcolor=transparent] [backcolor=transparent]from[backcolor=transparent] the time field[backcolor=transparent].
- [backcolor=transparent]houred [backcolor=transparent]=[backcolor=transparent] FOREACH clean2 GENERATE user[backcolor=transparent],[backcolor=transparent] org[backcolor=transparent].[backcolor=transparent]apache[backcolor=transparent].[backcolor=transparent]pig[backcolor=transparent].[backcolor=transparent]tutorial[backcolor=transparent].[backcolor=transparent]ExtractHour[backcolor=transparent]([backcolor=transparent]time[backcolor=transparent])[backcolor=transparent] [backcolor=transparent]as[backcolor=transparent] hour[backcolor=transparent],[backcolor=transparent] query[backcolor=transparent];
- [backcolor=transparent]--[backcolor=transparent] [backcolor=transparent]Call[backcolor=transparent] the [backcolor=transparent]NGramGenerator[backcolor=transparent] UDF to compose the n[backcolor=transparent]-[backcolor=transparent]grams of the query[backcolor=transparent].
- [backcolor=transparent]ngramed1 [backcolor=transparent]=[backcolor=transparent] FOREACH houred GENERATE user[backcolor=transparent],[backcolor=transparent] hour[backcolor=transparent],[backcolor=transparent] flatten[backcolor=transparent]([backcolor=transparent]org[backcolor=transparent].[backcolor=transparent]apache[backcolor=transparent].[backcolor=transparent]pig[backcolor=transparent].[backcolor=transparent]tutorial[backcolor=transparent].[backcolor=transparent]NGramGenerator[backcolor=transparent]([backcolor=transparent]query[backcolor=transparent]))[backcolor=transparent] [backcolor=transparent]as[backcolor=transparent] ngram[backcolor=transparent];
- [backcolor=transparent]--[backcolor=transparent] [backcolor=transparent]Use[backcolor=transparent] the DISTINCT command to [backcolor=transparent]get[backcolor=transparent] the unique n[backcolor=transparent]-[backcolor=transparent]grams [backcolor=transparent]for[backcolor=transparent] all records[backcolor=transparent].
- [backcolor=transparent]ngramed2 [backcolor=transparent]=[backcolor=transparent] DISTINCT ngramed1[backcolor=transparent];
- [backcolor=transparent]--[backcolor=transparent] [backcolor=transparent]Use[backcolor=transparent] the GROUP command to [backcolor=transparent]group[backcolor=transparent] records [backcolor=transparent]by[backcolor=transparent] n[backcolor=transparent]-[backcolor=transparent]gram [backcolor=transparent]and[backcolor=transparent] hour[backcolor=transparent].
- [backcolor=transparent]hour_frequency1 [backcolor=transparent]=[backcolor=transparent] GROUP ngramed2 BY [backcolor=transparent]([backcolor=transparent]ngram[backcolor=transparent],[backcolor=transparent] hour[backcolor=transparent]);
- [backcolor=transparent]--[backcolor=transparent] [backcolor=transparent]Use[backcolor=transparent] the COUNT [backcolor=transparent]function[backcolor=transparent] to [backcolor=transparent]get[backcolor=transparent] the count [backcolor=transparent]([backcolor=transparent]occurrences[backcolor=transparent])[backcolor=transparent] of each n[backcolor=transparent]-[backcolor=transparent]gram[backcolor=transparent].
- [backcolor=transparent]hour_frequency2 [backcolor=transparent]=[backcolor=transparent] FOREACH hour_frequency1 GENERATE flatten[backcolor=transparent]([backcolor=transparent]$0[backcolor=transparent]),[backcolor=transparent] COUNT[backcolor=transparent]([backcolor=transparent]$1[backcolor=transparent])[backcolor=transparent] [backcolor=transparent]as[backcolor=transparent] count[backcolor=transparent];
- [backcolor=transparent]--[backcolor=transparent] [backcolor=transparent]Use[backcolor=transparent] the GROUP command to [backcolor=transparent]group[backcolor=transparent] records [backcolor=transparent]by[backcolor=transparent] n[backcolor=transparent]-[backcolor=transparent]gram only[backcolor=transparent].
- [backcolor=transparent]--[backcolor=transparent] [backcolor=transparent]Each[backcolor=transparent] [backcolor=transparent]group[backcolor=transparent] now corresponds to a distinct n[backcolor=transparent]-[backcolor=transparent]gram [backcolor=transparent]and[backcolor=transparent] has the count [backcolor=transparent]for[backcolor=transparent] each hour[backcolor=transparent].
- [backcolor=transparent]uniq_frequency1 [backcolor=transparent]=[backcolor=transparent] GROUP hour_frequency2 BY [backcolor=transparent]group[backcolor=transparent]::[backcolor=transparent]ngram[backcolor=transparent];
- [backcolor=transparent]--[backcolor=transparent] [backcolor=transparent]For[backcolor=transparent] each [backcolor=transparent]group[backcolor=transparent],[backcolor=transparent] identify the hour [backcolor=transparent]in[backcolor=transparent] which [backcolor=transparent]this[backcolor=transparent] n[backcolor=transparent]-[backcolor=transparent]gram [backcolor=transparent]is[backcolor=transparent] used [backcolor=transparent]with[backcolor=transparent] a particularly high frequency[backcolor=transparent].
- [backcolor=transparent]--[backcolor=transparent] [backcolor=transparent]Call[backcolor=transparent] the [backcolor=transparent]ScoreGenerator[backcolor=transparent] UDF to calculate a [backcolor=transparent]"popularity"[backcolor=transparent] score [backcolor=transparent]for[backcolor=transparent] the n[backcolor=transparent]-[backcolor=transparent]gram[backcolor=transparent].
- [backcolor=transparent]uniq_frequency2 [backcolor=transparent]=[backcolor=transparent] FOREACH uniq_frequency1 GENERATE flatten[backcolor=transparent]([backcolor=transparent]$0[backcolor=transparent]),[backcolor=transparent] flatten[backcolor=transparent]([backcolor=transparent]org[backcolor=transparent].[backcolor=transparent]apache[backcolor=transparent].[backcolor=transparent]pig[backcolor=transparent].[backcolor=transparent]tutorial[backcolor=transparent].[backcolor=transparent]ScoreGenerator[backcolor=transparent]([backcolor=transparent]$1[backcolor=transparent]));
- [backcolor=transparent]--[backcolor=transparent] [backcolor=transparent]Use[backcolor=transparent] the FOREACH[backcolor=transparent]-[backcolor=transparent]GENERATE command to assign names to the fields[backcolor=transparent].
- [backcolor=transparent]uniq_frequency3 [backcolor=transparent]=[backcolor=transparent] FOREACH uniq_frequency2 GENERATE $1 [backcolor=transparent]as[backcolor=transparent] hour[backcolor=transparent],[backcolor=transparent] $0 [backcolor=transparent]as[backcolor=transparent] ngram[backcolor=transparent],[backcolor=transparent] $2 [backcolor=transparent]as[backcolor=transparent] score[backcolor=transparent],[backcolor=transparent] $3 [backcolor=transparent]as[backcolor=transparent] count[backcolor=transparent],[backcolor=transparent] $4 [backcolor=transparent]as[backcolor=transparent] mean[backcolor=transparent];
- [backcolor=transparent]--[backcolor=transparent] [backcolor=transparent]Use[backcolor=transparent] the FILTER command to move all records [backcolor=transparent]with[backcolor=transparent] a score less than [backcolor=transparent]or[backcolor=transparent] equal to [backcolor=transparent]2.0[backcolor=transparent].
- [backcolor=transparent]filtered_uniq_frequency [backcolor=transparent]=[backcolor=transparent] FILTER uniq_frequency3 BY score [backcolor=transparent]>[backcolor=transparent] [backcolor=transparent]2.0[backcolor=transparent];
- [backcolor=transparent]--[backcolor=transparent] [backcolor=transparent]Use[backcolor=transparent] the ORDER command to sort the remaining records [backcolor=transparent]by[backcolor=transparent] hour [backcolor=transparent]and[backcolor=transparent] score[backcolor=transparent].
- [backcolor=transparent]ordered_uniq_frequency [backcolor=transparent]=[backcolor=transparent] ORDER filtered_uniq_frequency BY hour[backcolor=transparent],[backcolor=transparent] score[backcolor=transparent];
- [backcolor=transparent]--[backcolor=transparent] [backcolor=transparent]Use[backcolor=transparent] the [backcolor=transparent]PigStorage[backcolor=transparent] [backcolor=transparent]function[backcolor=transparent] to store the results[backcolor=transparent].
- [backcolor=transparent]--[backcolor=transparent] [backcolor=transparent]Output[backcolor=transparent]:[backcolor=transparent] [backcolor=transparent]([backcolor=transparent]hour[backcolor=transparent],[backcolor=transparent] n[backcolor=transparent]-[backcolor=transparent]gram[backcolor=transparent],[backcolor=transparent] score[backcolor=transparent],[backcolor=transparent] count[backcolor=transparent],[backcolor=transparent] average_counts_among_all_hours[backcolor=transparent])
- [backcolor=transparent]STORE ordered_uniq_frequency INTO [backcolor=transparent]'oss://${AccessKeyId}:${AccessKeySecret}@${bucket}.${endpoint}/data/script1-hadoop-results'[backcolor=transparent] USING [backcolor=transparent]PigStorage[backcolor=transparent]();
创建作业。将步骤 1 中编写的脚本存放到 OSS 上,假设存储路径为 oss://emr/jars/script1-hadoop.pig,在 E-MapReduce 作业中创建如下作业:
创建执行计划并运行。在 E-MapReduce 执行计划中创建执行计划,将上一步创建好的 Pig 作业添加到执行计划中,策略请选择“立即执行”,这样 script1-hadoop 作业就会在选定集群中运行起来了。