Apache Pig是一个用来分析大数据集的平台,它由两部分组成:一部分是用于表达数据分析程序的高级脚本语言,另一部分是用于评估分析程序的基本工具。目前来看,Pig主要用于离线数据的批量处理应用场景,但是随着Pig的发展处理数据的速度会不断地提升,这可能依赖于Pig底层的执行引擎。比如,Pig通过指定执行模式,可以使用Hadoop的MapReduce计算引擎来实现数据处理,也可以使用基于Tez的计算引擎来实现(Tez是为了绕开MapReduce多阶段Job写磁盘而设计的DAG计算引擎,性能应该比MapReduce要快),看到Pig未来的发展路线图,以后可能会基于Storm或Spark计算平台实现底层计算引擎,那样速度会有极大地提升。
我们基于最新的0.15.0版本的Pig(Hadoop使用的是2.2.0版本),通过编写一些例子脚本来实践Pig的语言特性。
Pig安装与执行
Pig安装非常简单,只需要下载Pig包,然后解压缩即可:
2 |
tar xvzf pig-0.15.0. tar .gz |
3 |
sudo ln -s /usr/ local /pig-0.15.0 /usr/ local /pig |
如果希望直接使用pig命令,可以修改环境变量文件~/.bashrc,增加如下配置:
1 |
export PIG_HOME=/usr/ local /pig |
2 |
export PATH=$PATH:$PIG_HOME/bin |
使变量配置生效:
Pig支持如下4种执行模式:
本地模式主要是基于本地文件系统,比较适合调试脚本使用。进入本地模式执行如下命令:
Tez本地模式类似于前面的本地模式,它使用Tez运行时引擎,进入Tez本地模式执行如下命令:
不过该模式还处于试验阶段,不过多累述。
MapReduce模式基于Hadoop,数据存储在HDFS上,它基于运行于YARN之上的MapReduce进行处理。进入MapReduce运行模式执行如下命令:
一般,我们的数据都是存储在HDFS上的,使用该模式能够充分利用Hadoop集群的计算能力。
基于Tez模式执行,需要在安装Hadoop集群的时候,修改Hadoop配置文件mapred-site.xml,将属性mapreduce.framework.name的值设置为yarn-tez。进入Tez模式执行如下命令:
有关Tez相关内容,可以查看Apache Tez官网介绍。
数据类型
Pig的数据类型可以分为2类,分别为简单类型和复杂类型。简单类型包括:
int、long、float、double、chararray、bytearray、boolean、datetime、biginteger、bigdecimal。复杂类型包括:tuple、bag、map。
这里对特别的数据类型,解释说明一下:
chararray相当于字符串String;bytearray相当于字节数组;tuple是一个有序的字段的集合,可以理解为元组,例如(3090018, ‘Android’, 76);bag是tuple的集合,例如{(3090018, ‘Android’, 76), (3090019, ‘iOS’, 172)};map是键值对的集合,例如[name#Jeff Stone, age#28, healthy index#195.58]。
基本操作符
- 算数操作符(Arithmetic Operators)包括:+、-、*、/、%、?:、CASE WHEN THEN ELSE END。
- 布尔操作符(Boolean Operators)包括:AND、OR、IN、NOT。
- 类型转换操作符(Cast Operators):使用圆括号包含类型名,作用于一个字段,例如(int)age、(map[])、(chararray)COUNT($2)、(tuple(chararray,int,map[]))name_age_scores等等。
- 比较操作符(Comparison Operators)包括:==、!=、<、>、<=、>=、matches。其中,matches比较操作符使用Java的Pattern进行匹配来比较,例如user_name matches ‘[a-n]{3,12}’。
- 类型构造操作符(Type Construction Operators):可以创建复杂类型的数据,tuple使用(),map使用[],bag使用{},例如FOREACH users GENERATE (name, age, address)。
- 解引用操作符(Dereference Operators):解引用主要是针对集合类型tuple、bag、map,从集合中拿到对应字段的值。比如对于tuple,定义类型t=tuple(t1:int,t2:int,t3:int),则我要获取字段t1和t3的值,一种方式可以通过t.t1和t.t3得到,也可以通过t.$0和t.$2获取到。
关系操作符
操作符 |
语法 |
说明 |
ASSERT |
ASSERT alias BY expression [, message]; |
断言:判定某个字段的值的条件为true |
COGROUP |
alias = COGROUP alias { ALL | BY expression} [, alias ALL | BY expression …] [USING 'collected' | 'merge'] [PARTITION BY partitioner] [PARALLEL n]; |
数据分组,与GROUP相同,但是至多支持127个关系 |
CROSS |
alias = CROSS alias, alias [, alias …] [PARTITION BY partitioner] [PARALLEL n]; |
笛卡尔积 |
CUBE |
alias = CUBE alias BY { CUBE expression | ROLLUP expression }, [ CUBE expression | ROLLUP expression ] [PARALLEL n]; |
计算CUBE,支持ROLLUP操作 |
DEFINE |
DEFINE macro_name (param [, param ...]) RETURNS {void | alias [, alias ...]} { pig_latin_fragment }; DEFINE alias {function | [`command` [input] [output] [ship] [cache] [stderr] ] }; |
定义宏(类似函数),能够重用脚本代码 为UDF或streaming设置别名 |
DISTINCT |
alias = DISTINCT alias [PARTITION BY partitioner] [PARALLEL n]; |
去重操作,可以指定并行度(即Reducer个数) |
FILTER |
alias = FILTER alias BY expression; |
条件过滤 |
FOREACH |
alias = FOREACH alias GENERATE expression [AS schema] [expression [AS schema]….]; alias = FOREACH nested_alias { alias = {nested_op | nested_exp}; [{alias = {nested_op | nested_exp}; …] GENERATE expression [AS schema] [expression [AS schema]….] }; |
基于列对数据进行转换 |
GROUP |
alias = GROUP alias { ALL | BY expression} [, alias ALL | BY expression …] [USING 'collected' | 'merge'] [PARTITION BY partitioner] [PARALLEL n]; |
数据分组操作: 只支持一个关系 PARALLEL子句可以指定并行度(Reducer个数) |
IMPORT |
IMPORT ‘file-with-macro’; |
导入外部Pig脚本 |
JOIN |
alias = JOIN alias BY {expression|’(‘expression [, expression …]‘)’} (, alias BY {expression|’(‘expression [, expression …]‘)’} …) [USING 'replicated' | 'skewed' | 'merge' | 'merge-sparse'] [PARTITION BY partitioner] [PARALLEL n]; alias = JOIN left-alias BY left-alias-column [LEFT|RIGHT|FULL] [OUTER], right-alias BY right-alias-column [USING 'replicated' | 'skewed' | 'merge'] [PARTITION BY partitioner] [PARALLEL n]; |
内连接 外连接 |
LIMIT |
alias = LIMIT alias n; |
输出结果集的n个记录 |
LOAD |
LOAD ‘data’ [USING function] [AS schema]; |
从数据源加载数据 |
MAPREDUCE |
alias1 = MAPREDUCE ‘mr.jar’ STORE alias2 INTO ‘inputLocation’ USING storeFunc LOAD ‘outputLocation’ USING loadFunc AS schema [`params, ... `]; |
在Pig中执行MapReduce程序,需要指定使用的MapReduce程序JAR文件 |
ORDER BY |
alias = ORDER alias BY { * [ASC|DESC] | field_alias [ASC|DESC] [, field_alias [ASC|DESC] …] } [PARALLEL n]; |
排序 |
RANK |
alias = RANK alias [ BY { * [ASC|DESC] | field_alias [ASC|DESC] [, field_alias [ASC|DESC] …] } [DENSE] ]; |
排名操作:可能有排名相同的,即排名序号相同 |
SAMPLE |
SAMPLE alias size; |
用于采样,size范围[0, 1] |
SPLIT |
SPLIT alias INTO alias IF expression, alias IF expression [, alias IF expression …] [, alias OTHERWISE]; |
将一个大表,拆分成多个小表 |
STORE |
STORE alias INTO ‘directory’ [USING function]; |
存储结果到文件系统:如果为指定USING子句,则使用默认的PigStorage(),更多可以查看“Load/Store函数”。 |
STREAM |
alias = STREAM alias [, alias …] THROUGH {`command` | cmd_alias } [AS schema] ; |
将数据发送到外部程序或者脚本 |
UNION |
alias = UNION [ONSCHEMA] alias, alias [, alias …]; |
计算并集 |
关系操作符示例
1 |
live_user_ids = LOAD '/test/live_user_ids' USING PigStorage() AS (udid: chararray); |
2 |
ASSERT live_user_ids BY udid != null , 'udid MUST NOT be NULL!' ; |
上面断言表live_user_ids中的udid字段一定存在值。
根据某个或某些字段进行分组,只根据一个字段进行分组,比较简单。如果想要根据两个字段分组,则可以将两个字段构造成一个tuple,然后进行分组。Pig脚本如下所示:
1 |
events = LOAD '/data/etl/hive_input/20150613/basis_event_2015061306-r-00002' USING PigStorage( '\t' ); |
2 |
projected_events = FOREACH events GENERATE $1 AS (event_code: long), $2 AS (udid: chararray), $10 AS (network: chararray), $34 AS (area_code: int ); |
3 |
uniq_events = DISTINCT projected_events; |
4 |
uniq_events = FILTER uniq_events BY (event_code IS NOT NULL ) AND (udid IS NOT NULL ) AND (network IS NOT NULL ) AND (area_code IS NOT NULL ); |
5 |
groupped = GROUP uniq_events BY (udid, event_code) PARALLEL 2; |
6 |
selected10 = LIMIT groupped 10; |
从HDFS加载的文件中执行投影操作,生成包含event_code、udid、network、area_code这4个字段的一个表projected_events,接着执行去重、条件过滤操作,计算分组的时候基于event_code、udid两个字段进行分组。
FOREACH操作可以针对一个数据集进行迭代处理操作,生成一个新的数据集。它有2种使用方法,一种是执行投影操作,选择部分字段的数据,例如脚本:
1 |
provinces = LOAD '/test/provinces' USING PigStorage( ',' ) AS (country_id: int , province_id: int , name : chararray); |
2 |
compositekeyed_provinces = FOREACH provinces GENERATE (CONCAT(CONCAT((chararray)country_id, '_' ), (chararray)province_id) AS pid, name ); |
这里,将原数据集的两个主键字段的值进行拼接合并,作为新表的一个字段。
另一种是,支持在FOREACH操作中使用代码段,可以增加更复杂的处理逻辑,摘自官网的例子,例如脚本:
01 |
a = LOAD '/test/data' AS (url:chararray, outlink:chararray); |
03 |
(www.ccc.com,www.hjk.com) |
04 |
(www.ddd.com,www.xyz.org) |
05 |
(www.aaa.com,www.cvn.org) |
06 |
(www.www.com,www.kpt.net) |
07 |
(www.www.com,www.xyz.org) |
08 |
(www.ddd.com,www.xyz.org) |
11 |
(www.aaa.com,{(www.aaa.com,www.cvn.org)}) |
12 |
(www.ccc.com,{(www.ccc.com,www.hjk.com)}) |
13 |
(www.ddd.com,{(www.ddd.com,www.xyz.org),(www.ddd.com,www.xyz.org)}) |
14 |
(www.www.com,{(www.www.com,www.kpt.net),(www.www.com,www.xyz.org)}) |
17 |
filterda = FILTER a BY outlink == 'www.xyz.org' ; |
18 |
filtered_outlinks = filterda.outlink; |
19 |
filtered_outlinks = DISTINCT filtered_outlinks; |
20 |
GENERATE group , COUNT (filtered_outlinks); |
上面,表b的第一个字段为chararray类型的字段(存放域名字符串),第二个字段是一个bag类型的字段(存在当前域名的出链接,即<url, outlink>的集合),最后统计的结果是给定的url的出链接的个数。
根据条件进行过滤,相当于SQL中WHERE子句。示例脚本如下所示:
1 |
live_info = LOAD '/test/live_info' USING PigStorage() AS (id: long, name : chararray); |
2 |
newly_added_lives = FILTER live_info BY (id % 140000 >= 0) AND ( name matches '[a-zA-Z0-9]{8, 32}' OR name == 'test' ); |
上面内容很好理解,不再累述。
JOIN操作支持支持配置并行度,指定Reducer的数量。
表连接操作,支持内连接和外连接,内连接脚本示例如下:
1 |
live = LOAD '/test/shiyj/pig/pig_live' USING PigStorage( '\t' ) AS (id: long, name : chararray); |
2 |
program = LOAD '/test/shiyj/pig/pig_live_program' USING PigStorage( '\t' ) AS (id: long, name : chararray,live_id: long,live_start: chararray,live_end: chararray); |
3 |
program_info = JOIN live BY id, program BY live_id; |
根据表live的id字段,表program的live_id字段进行内连接。
外连接的操作,官网给出了4个例子,可以分别看一下。左外连接例子如下:
1 |
A = LOAD 'a.txt' AS (n:chararray, a: int ); |
2 |
B = LOAD 'b.txt' AS (n:chararray, m:chararray); |
3 |
C = JOIN A by $0 LEFT OUTER , B BY $0; |
全外连接的例子如下所示:
1 |
A = LOAD 'a.txt' AS (n:chararray, a: int ); |
2 |
B = LOAD 'b.txt' AS (n:chararray, m:chararray); |
3 |
C = JOIN A BY $0 FULL , B BY $0; |
支持复制的左外连接(Replicated Join),示例如下:
3 |
C= JOIN A BY $0 LEFT , B BY $0 USING 'replicated' ; |
只有左外连接支持这种方式,实际上Replicated Join会在MapReduce的Map阶段把做左表进行复制,也就是说做表应该是小表,能够在内存中放得下,然后与右表进行连接操作。
还有一种使用Skewed Join,示例如下所示:
1 |
A = LOAD 'studenttab' as ( name , age, gpa); |
2 |
B = LOAD 'votertab' as ( name , age, registration, contribution); |
3 |
C = JOIN A BY name FULL , B BY name USING 'skewed' ; |
只有在进行外连接的两表的数据,明显不对称,称为数据倾斜,一个表很大,另一个表相对小,但是内存中放不下,这种情况可以使用Skewed Join操作。目前,Pig支持基于两表的Skewed Join操作。
去重操作使用DISTINCT,比较简单,示例如下所示:
1 |
live_user_ids = LOAD '/test/live_user_ids' USING PigStorage() AS (udid: chararray); |
2 |
uniq_user_ids = DISTINCT live_user_ids PARALLEL 8; |
DISTINCT操作支持配置并行度,指定Reducer的数量。
计算并集操作,使用UNION操作符,示例如下所示:
1 |
a = LOAD 'data' AS (a1: int ,a2: int ,a3: int ); |
2 |
b = LOAD 'data' AS (b1: int ,b2: int ); |
计算并集,不要求两表的字段数一定相同。
LIMIT选择计算结果的一部分,示例如下所示:
1 |
top100_user_ids = LIMIT live_user_ids 100; |
STORE操作用来保存计算结果,示例如下所示:
1 |
STORE play_users INTO '/test/shiyj/tmp.play_users' USING PigStorage ( '\t' ); |
如果没有指定USING子句,则默认使用PigStorage()函数,另外Pig还支持如下的Store/Load函数:
PigStorage([field_delimiter] , ['options']) |
HBaseStorage('columns', ['options']) |
AvroStorage(['schema|record name'], ['options']) |
TrevniStorage(['schema|record name'], ['options']) |
AccumuloStorage(['columns'[, 'options']]) |
具体使用方法,可以参考文档介绍。
比较容易理解,摘自官网上的例子,如下所示:
02 |
a = LOAD 'data1' AS (a1: int ,a2: int ,a3: int ); |
08 |
b = LOAD 'data2' AS (b1: int ,b2: int ); |
这个操作符功能比较强大,如下所示:
01 |
users = LOAD '/test/shiyj/pig/pig_live_users' AS (create_date,room_id,audio_id,udid); |
02 |
groupped = COGROUP users BY (create_date,room_id,audio_id); |
03 |
groupped_count = FOREACH groupped { |
04 |
uniq = DISTINCT users.udid; |
05 |
GENERATE group , COUNT (uniq); |
07 |
STORE groupped_count INTO '/test/shiyj/pig/groupped_count' USING PigStorage( '\t' ); |
09 |
groupped_count = LOAD '/test/shiyj/pig/groupped_count/part-r-*' AS (k: tuple(chararray, long, long), cnt: int ); |
10 |
groupped_count = FOREACH groupped_count GENERATE k.$0, k.$1, k.$2, cnt; |
11 |
cubed_users = CUBE groupped_count BY CUBE ($0, $1, $2); |
我们可以看下官网文档的例子,简单比较容易理解:
(1)CUBE操作
Pig脚本内容,如下所示:
1 |
salesinp = LOAD '/pig/data/salesdata' USING PigStorage( ',' ) AS (product:chararray, year : int , region:chararray, state:chararray, city:chararray, sales:long); |
2 |
cubedinp = CUBE salesinp BY CUBE (product, year ); |
3 |
result = FOREACH cubedinp GENERATE FLATTEN( group ), SUM ( cube .sales) AS totalsales; |
如果输入数据为(car, 2012, midwest, ohio, columbus, 4000),则上面脚本执行CUBE操作,结果输出内容如下所示:
上面针对产品(product)和年度(year)两个维度进行查询。
(2)ROLLUP操作
CUBE操作支持ROLLUP(上卷操作),例如脚本内容:
1 |
salesinp = LOAD '/pig/data/salesdata' USING PigStorage( ',' ) AS (product:chararray, year : int , region:chararray, state:chararray, city:chararray, sales:long); |
2 |
rolledup = CUBE salesinp BY ROLLUP (region,state,city); |
3 |
result = FOREACH rolledup GENERATE FLATTEN( group ), SUM ( cube .sales) AS totalsales; |
同样如果输入tuple值为(car, 2012, midwest, ohio, columbus, 4000),则ROLLUP操作结果如下所示:
1 |
(midwest,ohio,columbus,4000) |
上面只是根据ROLLUP表达式指定的维度执行CUBE操作。
(3)合并CUBE和ROLLUP操作
还可以将CUBE操作和ROLLUP操作合并起来,例如执行脚本内容:
1 |
salesinp = LOAD '/pig/data/salesdata' USING PigStorage( ',' ) AS (product:chararray, year : int , region:chararray, state:chararray, city:chararray, sales:long); |
2 |
cubed_and_rolled = CUBE salesinp BY CUBE (product, year ), ROLLUP (region, state, city); |
3 |
result = FOREACH cubed_and_rolled GENERATE FLATTEN( group ), SUM ( cube .sales) AS totalsales; |
上面的CUBE操作等价于下面两中操作:
1 |
cubed_and_rolled = CUBE salesinp BY CUBE (product, year ,region, state, city); |
3 |
cubed_and_rolled = CUBE salesinp BY ROLLUP (product, year ,region, state, city); |
执行结果,如下所示:
01 |
(car,2012,midwest,ohio,columbus,4000) |
02 |
(car,2012,midwest,ohio,,4000) |
03 |
(car,2012,midwest,,,4000) |
05 |
(car,,midwest,ohio,columbus,4000) |
06 |
(car,,midwest,ohio,,4000) |
09 |
(,2012,midwest,ohio,columbus,4000) |
10 |
(,2012,midwest,ohio,,4000) |
11 |
(,2012,midwest,,,4000) |
13 |
(,,midwest,ohio,columbus,4000) |
14 |
(,,midwest,ohio,,4000) |
对数据进行取样操作,脚本如下所示:
02 |
users = LOAD '/test/shiyj/pig/pig_live_users' AS (create_date,room_id,audio_id,udid); |
03 |
g_users = GROUP users ALL ; |
04 |
total_user_cnt = FOREACH g_users GENERATE COUNT (users); |
08 |
sampled_users = SAMPLE users 0.15; |
09 |
g_sampled_users = GROUP sampled_users ALL ; |
10 |
sampled_user_cnt = FOREACH g_sampled_users GENERATE COUNT (sampled_users); |
11 |
DUMP sampled_user_cnt; |
MAPREDUCE操作允许在Pig脚本内部执行MapReduce程序,示例脚本来自官网,如下所示:
1 |
A = LOAD 'WordcountInput.txt' ; |
2 |
B = MAPREDUCE 'wordcount.jar' STORE A INTO 'inputDir' LOAD 'outputDir' |
3 |
AS (word:chararray, count : int ) `org.myorg.WordCount inputDir outputDir`; |
如果直接写原生的MapReduce程序各个能解决实际问题,可以将写好的程序打包,在Pig脚本中指定相关参数即可运行。
将一个表拆分成多个表,可以按照“水平拆分”的思想进行操作,根据某些条件来生成新表。示例如下所示:
01 |
A = LOAD 'data' AS (f1: int ,f2: int ,f3: int ); |
07 |
SPLIT A INTO X IF f1<7, Y IF f2==5, Z IF (f3<6 OR f3>6); |
求值函数(Eval Functions)
函数 |
语法 |
说明 |
AVG |
AVG(expression) |
计算某一个数字类型的列的均值,数字类型支持:int,long,float,double,bigdecimal,biginteger,bytearray |
BagToString |
BagToString(vals:bag [, delimiter:chararray]) |
将bag转换成字符串,可以指定分隔符,适合拼接bag中字符串 |
CONCAT |
CONCAT(expression, expression, [...expression]) |
字符串拼接 |
COUNT |
COUNT(expression) |
计算一个bag中元素的总数,不含NULL值 |
COUNT_STAR |
COUNT_STAR(expression) |
计算一个bag中元素的总数,包含NULL值 |
DIFF |
DIFF (expression, expression) |
比较一个tuple中的两个字段,这两个字段都是bag类型,结果返回在两个bag中不同的元素,结果仍然是一个bag |
IsEmpty |
IsEmpty(expression) |
检查一个map或bag是否为空 |
MAX |
MAX(expression) |
计算最大值,支持数组类型:int,long,float,double,bigdecimal,biginteger,bytearray |
MIN |
MIN(expression) |
计算最小值,支持数组类型:int,long,float,double,bigdecimal,biginteger,bytearray |
PluckTuple |
DEFINE pluck PluckTuple(expression1) DEFINE pluck PluckTuple(expression1,expression3) pluck(expression2) |
允许定义一个字符串前缀,然后过滤指定的列,满足:一概字符串前缀开始,或者匹配该正则表达式,下面是官网的例子:
DEFINE pluck PluckTuple( 'a::' ); <code> |
d = FOREACH c GENERATE FLATTEN(pluck(*)); |
c: {a::x: bytearray,a::y: bytearray,b::x: bytearray,b::y: bytearray} |
d: {plucked::a::x: bytearray,plucked::a::y: bytearray} |
DEFINE pluckNegative PluckTuple( 'a::' , 'false' ); |
d = FOREACH c GENERATE FLATTEN(pluckNegative(*)); |
d: {plucked::b::x: bytearray,plucked::b::y: bytearray} |
|
SIZE |
SIZE(expression) |
计算Pig指定数据类型的元素的数量,支持类型:int,long,float,double,,chararray,bytearray、tuple、bag、map |
SUBTRACT |
SUBTRACT(expression, expression) |
bag操作符,用来对两个bag做差集操作,结果为:包含在第一个bag中但不包含在第二个bag中的元素 |
SUM |
SUM(expression) |
求和操作,支持类型:int,long,float,double,bigdecimal,biginteger,将bytearray转换为double类型 |
TOKENIZE |
TOKENIZE(expression [, 'field_delimiter']) |
拆分一个字符串,得到一个bag结果集 |
数学函数
数学函数比较简单,不再详细描述,主要包括如下20个:
具体使用可以查看官方文档。
字符串函数
字符串函数非常常用,主要包括如下20个:
使用方法可以查看文档。
日期时间函数
日期函数有下面24个,如下所示:
集合函数
集合函数主要是,将其他类型的数据转换为集合类型tuple、bag、map,如下所示:
TOTUPLE
TOBAG
TOMAP
TOP
前面3个都是生成集合的函数,最后一个用来计算一个集合中的topN个元素,可以指定是按照升序/降序得到的结果,语法为TOP(topN,column,relation)。
Hive UDF函数
在Pig中可以直接调用Hive的UDF,HiveUDAF和HiveUDTF,语法如下表所示:
函数 |
语法 |
说明 |
HiveUDF |
HiveUDF(name[, constant parameters]) |
DEFINE sin HiveUDF( 'sin' ); |
a = LOAD 'student' AS ( name :chararray, age: int , gpa: double ); |
b = FOREACH a GENERATE sin(gpa); |
|
HiveUDAF |
HiveUDAF(name[, constant parameters]) |
DEFINE explode HiveUDTF( 'explode' ); |
a = LOAD 'mydata' AS (a0:{(b0:chararray)}); |
b = FOREACH a GENERATE FLATTEN(explode(a0)); |
|
HiveUDTF |
HiveUDTF(name[, constant parameters]) |
DEFINE avg HiveUDAF( 'avg' ); |
a = LOAD 'student' AS ( name :chararray, age: int , gpa: double ); |
c = FOREACH b GENERATE group , AVG (a.age); |
|
Pig UDF
Pig也支持用户自定义函数UDF,而且支持使用多种编程语言来实现UDF,目前支持的变成语言包括:Java、JavaScript、Jython、Ruby、Groovy、Python。
以Java为例,可以通过继承自类org.apache.pig.EvalFunc来实现一个UDF,将实现的UDF泪打包后,Pig安装目录下的CLASSPATH下面,然后可以在Pig脚本中使用。例如,我们实现的UDF类为org.shirdrn.pig.udf.IPAddressConverterUDF,用来根据ip代码(long类型),转换为对应的点分十进制的IP地址字符串,打包后JAR文件名称为iptool.jar,则可以在Pig脚本中这样使用:
2 |
a = LOAD '/data/etl/$date_string/$event_file' AS (event_code: long, udid: chararray, ip_code: long, network: chararray); |
3 |
b = FOREACH a GENERATE event_code, udid, org.shirdrn.etl.pig.udf.IPAddressConverterUDF(ip_code); |
也可以实现一个自定义的累加器(Accumulator)或者过滤器,或者其他一些功能,可以实现相关的接口:Algebraic,Accumulator,FilterFunc,LoadFunc,StoreFunc,具体可以参考相关文档或资料。
另外,也可以通过在PiggyBank在来查找其它用户分享的UDF,可以在http://svn.apache.org/repos/asf/pig/trunk/contrib/piggybank中找到。
相关问题总结
在实际使用中,我们经常需要从外部传递参数到Pig脚本中,例如,文件路径,或者日期时间,等等,可以直接使用pig命令的-p参数从外部传参,例如,有下面的Pig脚本compute_event_user_count.pig:
1 |
a = LOAD '/data/etl/hive_input/20150613/basis_event_2015061305-r-00001' USING PigStorage( '\t' ); |
3 |
b = FOREACH a GENERATE $1 AS event_code, $2 AS udid, $4 AS install_id; |
4 |
c = GROUP b BY (event_code, udid); |
5 |
d = FOREACH c GENERATE group , COUNT ($1); |
上面我们是直接将文件路径写死在脚本中,如果需要从外部传递输入文件、输出目录,则可以改写为:
1 |
a = LOAD '$input_file' USING PigStorage( '\t' ); |
3 |
b = FOREACH a GENERATE $1 AS event_code, $2 AS udid, $4 AS install_id; |
4 |
c = GROUP b BY (event_code, udid); |
5 |
d = FOREACH c GENERATE group , COUNT ($1); |
6 |
STORE d INTO '$output_dir' USING PigStorage ( '\t' ); |
则可以执行Pig脚本,并传递参数:
1 |
bin/pig -p input_file=/data/etl/hive_input/20150613/basis_event_2015061305-r-00001 -p output_dir=/ test /shiyj/pig/example_output -x mapreduce compute_event_user_count.pig |
这样就可以实现从外部向Pig脚本传递参数,可以到HDFS上查看结果输出文件/test/shiyj/pig/example_output/part-r-00000。pig命令更多选项,可以查看pig帮助命令:
- 运行Pig脚本出现异常“Retrying connect to server: 0.0.0.0/0.0.0.0:10020”
实际应用中,我们几乎不可能将Pig安装到Hadoop集群的NameNode所在的节点,如果可以安装到NameNode节点上,基本不会报这个错误的。这个错误主要有是由于Pig没有安装在NameNode节点上,而是以外的其它节点上,它在执行计算过程中,需要与MapReduce的JobHistoryServer的IPC服务进行通信,所以在安装Hadoop时需要允许JobHistoryServer的IPC主机和端口被外部其它节点访问,只需要修改etc/hadoop/mapreduce-site.xml配置文件,增加如下配置即可:
2 |
< name >mapreduce.jobhistory.address</ name > |
3 |
< value >10.10.4.130:10020</ value > |
4 |
< description >MapReduce JobHistory Server IPC host:port</ description > |
如果第一次安装Hadoop没有配置该属性mapreduce.jobhistory.address,那么Hadoop集群的所有节点上会使用默认的配置值为0.0.0.0:10020,所以如果不在NameNode上安装Pig程序,导致Pig所在的节点上安装的Hadoop的配置属性mapreduce.jobhistory.address使用默认值,也就是Pig所在节点0.0.0.0:10020,所以Pig脚本就会执行过程中与本机的10020端口通信,显然会失败的。
其实,如果已经在NameNode上启动了JobHistoryServer进程,只需要修改mapreduce.jobhistory.address的属性值,然后同步到所有安装Hadoop文件的节点,包括Pig所在节点即可,不需要重启NameNode节点上的JobHistoryServer进程。如果没有在NameNode上启动JobHistoryServer进程,执行如下命令启动即可:
1 |
mr-jobhistory-daemon.sh start historyserver |