Apache Pig简介与实践

简介:

Apache Pig是一个用来分析大数据集的平台,它由两部分组成:一部分是用于表达数据分析程序的高级脚本语言,另一部分是用于评估分析程序的基本工具。目前来看,Pig主要用于离线数据的批量处理应用场景,但是随着Pig的发展处理数据的速度会不断地提升,这可能依赖于Pig底层的执行引擎。比如,Pig通过指定执行模式,可以使用Hadoop的MapReduce计算引擎来实现数据处理,也可以使用基于Tez的计算引擎来实现(Tez是为了绕开MapReduce多阶段Job写磁盘而设计的DAG计算引擎,性能应该比MapReduce要快),看到Pig未来的发展路线图,以后可能会基于Storm或Spark计算平台实现底层计算引擎,那样速度会有极大地提升。
我们基于最新的0.15.0版本的Pig(Hadoop使用的是2.2.0版本),通过编写一些例子脚本来实践Pig的语言特性。

Pig安装与执行

Pig安装非常简单,只需要下载Pig包,然后解压缩即可:

1 wget http://mirror.bit.edu.cn/apache/pig/pig-0.15.0/pig-0.15.0.tar.gz
2 tar xvzf pig-0.15.0.tar.gz
3 sudo ln -s /usr/local/pig-0.15.0 /usr/local/pig
4 cd /usr/local/pig
5 bin/pig -x mapreduce

如果希望直接使用pig命令,可以修改环境变量文件~/.bashrc,增加如下配置:

1 export PIG_HOME=/usr/local/pig
2 export PATH=$PATH:$PIG_HOME/bin

使变量配置生效:

1 . ~/.bashrc

Pig支持如下4种执行模式:

  • 本地模式

本地模式主要是基于本地文件系统,比较适合调试脚本使用。进入本地模式执行如下命令:

1 pig -x local
  • Tez本地模式

Tez本地模式类似于前面的本地模式,它使用Tez运行时引擎,进入Tez本地模式执行如下命令:

1 pig -x tez_local

不过该模式还处于试验阶段,不过多累述。

  • MapReduce模式

MapReduce模式基于Hadoop,数据存储在HDFS上,它基于运行于YARN之上的MapReduce进行处理。进入MapReduce运行模式执行如下命令:

1 pig -x mapreduce

一般,我们的数据都是存储在HDFS上的,使用该模式能够充分利用Hadoop集群的计算能力。

  • Tez模式

基于Tez模式执行,需要在安装Hadoop集群的时候,修改Hadoop配置文件mapred-site.xml,将属性mapreduce.framework.name的值设置为yarn-tez。进入Tez模式执行如下命令:

1 pig -x tez

有关Tez相关内容,可以查看Apache Tez官网介绍。

数据类型

Pig的数据类型可以分为2类,分别为简单类型和复杂类型。简单类型包括:
int、long、float、double、chararray、bytearray、boolean、datetime、biginteger、bigdecimal。复杂类型包括:tuple、bag、map。
这里对特别的数据类型,解释说明一下:
chararray相当于字符串String;bytearray相当于字节数组;tuple是一个有序的字段的集合,可以理解为元组,例如(3090018, ‘Android’, 76);bag是tuple的集合,例如{(3090018, ‘Android’, 76), (3090019, ‘iOS’, 172)};map是键值对的集合,例如[name#Jeff Stone, age#28, healthy index#195.58]。

基本操作符

  • 算数操作符(Arithmetic Operators)包括:+、-、*、/、%、?:、CASE WHEN THEN ELSE END。
  • 布尔操作符(Boolean Operators)包括:AND、OR、IN、NOT。
  • 类型转换操作符(Cast Operators):使用圆括号包含类型名,作用于一个字段,例如(int)age、(map[])、(chararray)COUNT($2)、(tuple(chararray,int,map[]))name_age_scores等等。
  • 比较操作符(Comparison Operators)包括:==、!=、<、>、<=、>=、matches。其中,matches比较操作符使用Java的Pattern进行匹配来比较,例如user_name matches ‘[a-n]{3,12}’。
  • 类型构造操作符(Type Construction Operators):可以创建复杂类型的数据,tuple使用(),map使用[],bag使用{},例如FOREACH users GENERATE (name, age, address)。
  • 解引用操作符(Dereference Operators):解引用主要是针对集合类型tuple、bag、map,从集合中拿到对应字段的值。比如对于tuple,定义类型t=tuple(t1:int,t2:int,t3:int),则我要获取字段t1和t3的值,一种方式可以通过t.t1和t.t3得到,也可以通过t.$0和t.$2获取到。

关系操作符

操作符 语法 说明
ASSERT ASSERT alias BY expression [, message]; 断言:判定某个字段的值的条件为true
COGROUP alias = COGROUP alias { ALL | BY expression} [, alias ALL | BY expression …] [USING 'collected' | 'merge'] [PARTITION BY partitioner] [PARALLEL n]; 数据分组,与GROUP相同,但是至多支持127个关系
CROSS alias = CROSS alias, alias [, alias …] [PARTITION BY partitioner] [PARALLEL n]; 笛卡尔积
CUBE alias = CUBE alias BY { CUBE expression | ROLLUP expression }, [ CUBE expression | ROLLUP expression ] [PARALLEL n]; 计算CUBE,支持ROLLUP操作
DEFINE DEFINE macro_name (param [, param ...]) RETURNS {void | alias [, alias ...]} { pig_latin_fragment };
DEFINE alias {function | [`command` [input] [output] [ship] [cache] [stderr] ] };
定义宏(类似函数),能够重用脚本代码
为UDF或streaming设置别名
DISTINCT alias = DISTINCT alias [PARTITION BY partitioner] [PARALLEL n]; 去重操作,可以指定并行度(即Reducer个数)
FILTER alias = FILTER alias BY expression; 条件过滤
FOREACH alias = FOREACH alias GENERATE expression [AS schema] [expression [AS schema]….];

alias = FOREACH nested_alias {
alias = {nested_op | nested_exp}; [{alias = {nested_op | nested_exp}; …]
GENERATE expression [AS schema] [expression [AS schema]….]
};

基于列对数据进行转换
GROUP alias = GROUP alias { ALL | BY expression} [, alias ALL | BY expression …] [USING 'collected' | 'merge'] [PARTITION BY partitioner] [PARALLEL n]; 数据分组操作:
只支持一个关系
PARALLEL子句可以指定并行度(Reducer个数)
IMPORT IMPORT ‘file-with-macro’; 导入外部Pig脚本
JOIN alias = JOIN alias BY {expression|’(‘expression [, expression …]‘)’} (, alias BY {expression|’(‘expression [, expression …]‘)’} …) [USING 'replicated' | 'skewed' | 'merge' | 'merge-sparse'] [PARTITION BY partitioner] [PARALLEL n];

alias = JOIN left-alias BY left-alias-column [LEFT|RIGHT|FULL] [OUTER], right-alias BY right-alias-column [USING 'replicated' | 'skewed' | 'merge'] [PARTITION BY partitioner] [PARALLEL n];

内连接
外连接
LIMIT alias = LIMIT alias n; 输出结果集的n个记录
LOAD LOAD ‘data’ [USING function] [AS schema]; 从数据源加载数据
MAPREDUCE alias1 = MAPREDUCE ‘mr.jar’ STORE alias2 INTO ‘inputLocation’ USING storeFunc LOAD ‘outputLocation’ USING loadFunc AS schema [`params, ... `]; 在Pig中执行MapReduce程序,需要指定使用的MapReduce程序JAR文件
ORDER BY alias = ORDER alias BY { * [ASC|DESC] | field_alias [ASC|DESC] [, field_alias [ASC|DESC] …] } [PARALLEL n]; 排序
RANK alias = RANK alias [ BY { * [ASC|DESC] | field_alias [ASC|DESC] [, field_alias [ASC|DESC] …] } [DENSE] ]; 排名操作:可能有排名相同的,即排名序号相同
SAMPLE SAMPLE alias size; 用于采样,size范围[0, 1]
SPLIT SPLIT alias INTO alias IF expression, alias IF expression [, alias IF expression …] [, alias OTHERWISE]; 将一个大表,拆分成多个小表
STORE STORE alias INTO ‘directory’ [USING function]; 存储结果到文件系统:如果为指定USING子句,则使用默认的PigStorage(),更多可以查看“Load/Store函数”。
STREAM alias = STREAM alias [, alias …] THROUGH {`command` | cmd_alias } [AS schema] ; 将数据发送到外部程序或者脚本
UNION alias = UNION [ONSCHEMA] alias, alias [, alias …]; 计算并集

关系操作符示例

  • ASSERT
1 live_user_ids = LOAD '/test/live_user_ids' USING PigStorage() AS (udid: chararray);
2 ASSERT live_user_ids BY udid != null, 'udid MUST NOT be NULL!';

上面断言表live_user_ids中的udid字段一定存在值。

  • GROUP

根据某个或某些字段进行分组,只根据一个字段进行分组,比较简单。如果想要根据两个字段分组,则可以将两个字段构造成一个tuple,然后进行分组。Pig脚本如下所示:

1 events = LOAD '/data/etl/hive_input/20150613/basis_event_2015061306-r-00002' USING PigStorage('\t');
2 projected_events = FOREACH events GENERATE $1 AS (event_code: long), $2 AS (udid: chararray), $10 AS (network: chararray), $34 AS (area_code: int); -- $1是表events的第2个字段
3 uniq_events = DISTINCT projected_events;
4 uniq_events = FILTER uniq_events BY (event_code IS NOT NULL) AND (udid IS NOT NULL)AND (network IS NOT NULL) AND (area_code IS NOT NULL);
5 groupped = GROUP uniq_events BY (udid, event_code) PARALLEL 2; -- 指定使用2个Reducer
6 selected10 = LIMIT groupped 10;
7 DUMP selected10;

从HDFS加载的文件中执行投影操作,生成包含event_code、udid、network、area_code这4个字段的一个表projected_events,接着执行去重、条件过滤操作,计算分组的时候基于event_code、udid两个字段进行分组。

  • FOREACH

FOREACH操作可以针对一个数据集进行迭代处理操作,生成一个新的数据集。它有2种使用方法,一种是执行投影操作,选择部分字段的数据,例如脚本:

1 provinces = LOAD '/test/provinces' USING PigStorage(',') AS (country_id: int, province_id: int, name: chararray);
2 compositekeyed_provinces = FOREACH provinces GENERATE (CONCAT(CONCAT((chararray)country_id, '_'), (chararray)province_id) AS pid, name);

这里,将原数据集的两个主键字段的值进行拼接合并,作为新表的一个字段。
另一种是,支持在FOREACH操作中使用代码段,可以增加更复杂的处理逻辑,摘自官网的例子,例如脚本:

01 a = LOAD '/test/data' AS (url:chararray, outlink:chararray);
02 DUMP a;
03 (www.ccc.com,www.hjk.com)
04 (www.ddd.com,www.xyz.org)
05 (www.aaa.com,www.cvn.org)
06 (www.www.com,www.kpt.net)
07 (www.www.com,www.xyz.org)
08 (www.ddd.com,www.xyz.org)
09 b = GROUP a BY url;
10 DUMP b;
11 (www.aaa.com,{(www.aaa.com,www.cvn.org)})
12 (www.ccc.com,{(www.ccc.com,www.hjk.com)})
13 (www.ddd.com,{(www.ddd.com,www.xyz.org),(www.ddd.com,www.xyz.org)})
14 (www.www.com,{(www.www.com,www.kpt.net),(www.www.com,www.xyz.org)})
15
16 result = FOREACH b {
17 filterda = FILTER a BY outlink == 'www.xyz.org'; -- 过滤掉outlink字段值为'www.xyz.org'的记录
18 filtered_outlinks = filterda.outlink;
19 filtered_outlinks = DISTINCT filtered_outlinks; -- 对outlink集合进行去重
20 GENERATE group, COUNT(filtered_outlinks); -- 根据对表a进行分组得到group,计算每个分组中outlink的数量
21 };
22 DUMP result;
23 (www.aaa.com,0)
24 (www.ccc.com,0)
25 (www.ddd.com,1)
26 (www.www.com,1)

上面,表b的第一个字段为chararray类型的字段(存放域名字符串),第二个字段是一个bag类型的字段(存在当前域名的出链接,即<url, outlink>的集合),最后统计的结果是给定的url的出链接的个数。

  • FILTER

根据条件进行过滤,相当于SQL中WHERE子句。示例脚本如下所示:

1 live_info = LOAD '/test/live_info' USING PigStorage() AS (id: long, name: chararray);
2 newly_added_lives = FILTER live_info BY (id % 140000 >= 0) AND (name matches '[a-zA-Z0-9]{8, 32}' OR name == 'test');

上面内容很好理解,不再累述。

  • JOIN

JOIN操作支持支持配置并行度,指定Reducer的数量。
表连接操作,支持内连接和外连接,内连接脚本示例如下:

1 live = LOAD '/test/shiyj/pig/pig_live' USING PigStorage('\t') AS (id: long, name: chararray);
2 program = LOAD '/test/shiyj/pig/pig_live_program' USING PigStorage('\t') AS (id: long,name: chararray,live_id: long,live_start: chararray,live_end: chararray);
3 program_info = JOIN live BY id, program BY live_id;
4 DUMP program_info;

根据表live的id字段,表program的live_id字段进行内连接。
外连接的操作,官网给出了4个例子,可以分别看一下。左外连接例子如下:

1 A = LOAD 'a.txt' AS (n:chararray, a:int);
2 B = LOAD 'b.txt' AS (n:chararray, m:chararray);
3 C = JOIN A by $0 LEFT OUTER, B BY $0; -- 表A和B的字段n进行左外连接

全外连接的例子如下所示:

1 A = LOAD 'a.txt' AS (n:chararray, a:int);
2 B = LOAD 'b.txt' AS (n:chararray, m:chararray);
3 C = JOIN A BY $0 FULL, B BY $0; -- 使用FULL关键字

支持复制的左外连接(Replicated Join),示例如下:

1 A = LOAD 'large';
2 B = LOAD 'tiny';
3 C= JOIN A BY $0 LEFT, B BY $0 USING 'replicated';

只有左外连接支持这种方式,实际上Replicated Join会在MapReduce的Map阶段把做左表进行复制,也就是说做表应该是小表,能够在内存中放得下,然后与右表进行连接操作。
还有一种使用Skewed Join,示例如下所示:

1 A = LOAD 'studenttab' as (name, age, gpa);
2 B = LOAD 'votertab' as (name, age, registration, contribution);
3 C = JOIN A BY name FULL, B BY name USING 'skewed';

只有在进行外连接的两表的数据,明显不对称,称为数据倾斜,一个表很大,另一个表相对小,但是内存中放不下,这种情况可以使用Skewed Join操作。目前,Pig支持基于两表的Skewed Join操作。

  • DISTINCT

去重操作使用DISTINCT,比较简单,示例如下所示:

1 live_user_ids = LOAD '/test/live_user_ids' USING PigStorage() AS (udid: chararray);
2 uniq_user_ids = DISTINCT live_user_ids PARALLEL 8;

DISTINCT操作支持配置并行度,指定Reducer的数量。

  • UNION

计算并集操作,使用UNION操作符,示例如下所示:

1 a = LOAD 'data' AS (a1:int,a2:int,a3:int);
2 b = LOAD 'data' AS (b1:int,b2:int);
3 u = UNION a, b;

计算并集,不要求两表的字段数一定相同。

  • LIMIT

LIMIT选择计算结果的一部分,示例如下所示:

1 top100_user_ids = LIMIT live_user_ids 100;
  • STORE

STORE操作用来保存计算结果,示例如下所示:

1 STORE play_users INTO '/test/shiyj/tmp.play_users' USING PigStorage ('\t');

如果没有指定USING子句,则默认使用PigStorage()函数,另外Pig还支持如下的Store/Load函数:

BinStorage()
JsonLoader(['schema'])
JsonStorage()
PigDump()
PigStorage([field_delimiter] , ['options'])
TextLoader()
HBaseStorage('columns', ['options'])
AvroStorage(['schema|record name'], ['options'])
TrevniStorage(['schema|record name'], ['options'])
AccumuloStorage(['columns'[, 'options']])
OrcStorage(['options'])

具体使用方法,可以参考文档介绍。

  • CROSS

比较容易理解,摘自官网上的例子,如下所示:

01 -- 加载表a数据
02 a = LOAD 'data1' AS (a1:int,a2:int,a3:int);
03 DUMP a;
04 (1,2,3)
05 (4,2,1)
06
07 -- 加载表b数据
08 b = LOAD 'data2' AS (b1:int,b2:int);
09 DUMP b;
10 (2,4)
11 (8,9)
12 (1,3)
13
14 -- 计算笛卡尔积,并输出结果
15 result = CROSS a, b;
16 DUMP result;
17 (1,2,3,2,4)
18 (1,2,3,8,9)
19 (1,2,3,1,3)
20 (4,2,1,2,4)
21 (4,2,1,8,9)
22 (4,2,1,1,3)
  • CUBE

这个操作符功能比较强大,如下所示:

01 users = LOAD '/test/shiyj/pig/pig_live_users' AS (create_date,room_id,audio_id,udid);
02 groupped = COGROUP users BY (create_date,room_id,audio_id);
03 groupped_count = FOREACH groupped {
04 uniq = DISTINCT users.udid;
05 GENERATE group, COUNT(uniq);
06 };
07 STORE groupped_count INTO '/test/shiyj/pig/groupped_count' USING PigStorage('\t'); -- 将分组统计的结果保存到HDFS
08
09 groupped_count = LOAD '/test/shiyj/pig/groupped_count/part-r-*' AS (k: tuple(chararray, long, long), cnt: int); -- 加载前面保存的结果,进行CUBE计算
10 groupped_count = FOREACH groupped_count GENERATE k.$0, k.$1, k.$2, cnt;
11 cubed_users = CUBE groupped_count BY CUBE($0, $1, $2);
12 DUMP cubed_users;

我们可以看下官网文档的例子,简单比较容易理解:
(1)CUBE操作
Pig脚本内容,如下所示:

1 salesinp = LOAD '/pig/data/salesdata' USING PigStorage(',') AS (product:chararray,year:int, region:chararray, state:chararray, city:chararray, sales:long);
2 cubedinp = CUBE salesinp BY CUBE(product,year);
3 result = FOREACH cubedinp GENERATE FLATTEN(group), SUM(cube.sales) AS totalsales;
4 DUMP result;

如果输入数据为(car, 2012, midwest, ohio, columbus, 4000),则上面脚本执行CUBE操作,结果输出内容如下所示:

1 (car,2012,4000)
2 (car,,4000)
3 (,2012,4000)
4 (,,4000)

上面针对产品(product)和年度(year)两个维度进行查询。
(2)ROLLUP操作
CUBE操作支持ROLLUP(上卷操作),例如脚本内容:

1 salesinp = LOAD '/pig/data/salesdata' USING PigStorage(',') AS (product:chararray,year:int, region:chararray, state:chararray, city:chararray, sales:long);
2 rolledup = CUBE salesinp BY ROLLUP(region,state,city);
3 result = FOREACH rolledup GENERATE FLATTEN(group), SUM(cube.sales) AS totalsales;
4 DUMP result;

同样如果输入tuple值为(car, 2012, midwest, ohio, columbus, 4000),则ROLLUP操作结果如下所示:

1 (midwest,ohio,columbus,4000)
2 (midwest,ohio,,4000)
3 (midwest,,,4000)
4 (,,,4000)

上面只是根据ROLLUP表达式指定的维度执行CUBE操作。
(3)合并CUBE和ROLLUP操作
还可以将CUBE操作和ROLLUP操作合并起来,例如执行脚本内容:

1 salesinp = LOAD '/pig/data/salesdata' USING PigStorage(',') AS (product:chararray,year:int, region:chararray, state:chararray, city:chararray, sales:long);
2 cubed_and_rolled = CUBE salesinp BY CUBE(product,year), ROLLUP(region, state, city);
3 result = FOREACH cubed_and_rolled GENERATE FLATTEN(group), SUM(cube.sales) AStotalsales;

上面的CUBE操作等价于下面两中操作:

1 cubed_and_rolled = CUBE salesinp BY CUBE(product,year,region, state, city);
2 -- 或
3 cubed_and_rolled = CUBE salesinp BY ROLLUP(product,year,region, state, city);

执行结果,如下所示:

01 (car,2012,midwest,ohio,columbus,4000)
02 (car,2012,midwest,ohio,,4000)
03 (car,2012,midwest,,,4000)
04 (car,2012,,,,4000)
05 (car,,midwest,ohio,columbus,4000)
06 (car,,midwest,ohio,,4000)
07 (car,,midwest,,,4000)
08 (car,,,,,4000)
09 (,2012,midwest,ohio,columbus,4000)
10 (,2012,midwest,ohio,,4000)
11 (,2012,midwest,,,4000)
12 (,2012,,,,4000)
13 (,,midwest,ohio,columbus,4000)
14 (,,midwest,ohio,,4000)
15 (,,midwest,,,4000)
16 (,,,,,4000)
  • SAMPLE

对数据进行取样操作,脚本如下所示:

01 -- 加载原始数据集,并计算记录数
02 users = LOAD '/test/shiyj/pig/pig_live_users' AS (create_date,room_id,audio_id,udid);
03 g_users = GROUP users ALL;
04 total_user_cnt = FOREACH g_users GENERATE COUNT(users);
05 DUMP total_user_cnt;
06
07 -- 15%取样,计算取样记录数
08 sampled_users = SAMPLE users 0.15;
09 g_sampled_users = GROUP sampled_users ALL;
10 sampled_user_cnt = FOREACH g_sampled_users GENERATE COUNT(sampled_users);
11 DUMP sampled_user_cnt;
  • MAPREDUCE

MAPREDUCE操作允许在Pig脚本内部执行MapReduce程序,示例脚本来自官网,如下所示:

1 A = LOAD 'WordcountInput.txt';
2 B = MAPREDUCE 'wordcount.jar' STORE A INTO 'inputDir' LOAD 'outputDir'
3 AS (word:chararray, count: int) `org.myorg.WordCount inputDir outputDir`;

如果直接写原生的MapReduce程序各个能解决实际问题,可以将写好的程序打包,在Pig脚本中指定相关参数即可运行。

  • SPLIT

将一个表拆分成多个表,可以按照“水平拆分”的思想进行操作,根据某些条件来生成新表。示例如下所示:

01 A = LOAD 'data' AS (f1:int,f2:int,f3:int); -- 加载数据到表A
02 DUMP A;
03 (1,2,3)
04 (4,5,6)
05 (7,8,9)
06
07 SPLIT A INTO X IF f1<7, Y IF f2==5, Z IF (f3<6 OR f3>6); -- A表中满足条件f1<7的记录插入到表X中,满足条件f2==5的记录插入到T表中,满足条件(f3<6 OR f3>6)的记录插入到Z表中
08
09 DUMP X;
10 (1,2,3)
11 (4,5,6)
12
13 DUMP Y;
14 (4,5,6)
15
16 DUMP Z;
17 (1,2,3)
18 (7,8,9)

求值函数(Eval Functions)

函数 语法 说明
AVG AVG(expression) 计算某一个数字类型的列的均值,数字类型支持:int,long,float,double,bigdecimal,biginteger,bytearray
BagToString BagToString(vals:bag [, delimiter:chararray]) 将bag转换成字符串,可以指定分隔符,适合拼接bag中字符串
CONCAT CONCAT(expression, expression, [...expression]) 字符串拼接
COUNT COUNT(expression) 计算一个bag中元素的总数,不含NULL值
COUNT_STAR COUNT_STAR(expression) 计算一个bag中元素的总数,包含NULL值
DIFF DIFF (expression, expression) 比较一个tuple中的两个字段,这两个字段都是bag类型,结果返回在两个bag中不同的元素,结果仍然是一个bag
IsEmpty IsEmpty(expression) 检查一个map或bag是否为空
MAX MAX(expression) 计算最大值,支持数组类型:int,long,float,double,bigdecimal,biginteger,bytearray
MIN MIN(expression) 计算最小值,支持数组类型:int,long,float,double,bigdecimal,biginteger,bytearray
PluckTuple DEFINE pluck PluckTuple(expression1)
DEFINE pluck PluckTuple(expression1,expression3)
pluck(expression2)
允许定义一个字符串前缀,然后过滤指定的列,满足:一概字符串前缀开始,或者匹配该正则表达式,下面是官网的例子:
a = LOAD 'a' as (x, y);
b = LOAD 'b' as (x, y);
c = JOIN a by x, b by x; -- 表a和b连接,因为表a和b有相同的列名,所以连接后添加前缀“表名::”来区分
DEFINE pluck PluckTuple('a::'); <code>-- 定义前缀"a::",等价于DEFINE pluck PluckTuple('a::', true);
d = FOREACH c GENERATE FLATTEN(pluck(*)); -- 包含前缀"a::"的都保留
DESCRIBE c;
c: {a::x: bytearray,a::y: bytearray,b::x: bytearray,b::y: bytearray}
DESCRIBE d;
d: {plucked::a::x: bytearray,plucked::a::y: bytearray}
DEFINE pluckNegative PluckTuple('a::','false'); -- 定义前缀"a::",包含该前缀的过滤掉
d = FOREACH c GENERATE FLATTEN(pluckNegative(*));
DESCRIBE d; -- 结果中包含前缀"a::"的被排除掉
d: {plucked::b::x: bytearray,plucked::b::y: bytearray}
SIZE SIZE(expression) 计算Pig指定数据类型的元素的数量,支持类型:int,long,float,double,,chararray,bytearray、tuple、bag、map
SUBTRACT SUBTRACT(expression, expression) bag操作符,用来对两个bag做差集操作,结果为:包含在第一个bag中但不包含在第二个bag中的元素
SUM SUM(expression) 求和操作,支持类型:int,long,float,double,bigdecimal,biginteger,将bytearray转换为double类型
TOKENIZE TOKENIZE(expression [, 'field_delimiter']) 拆分一个字符串,得到一个bag结果集

数学函数

数学函数比较简单,不再详细描述,主要包括如下20个:

ABS
ACOS
ASIN
ATAN
CBRT
CEIL
COS
COSH
EXP
FLOOR
LOG
LOG10
RANDOM
ROUND
ROUND_TO
SIN
SINH
SQRT
TAN
TANH

具体使用可以查看官方文档。

字符串函数

字符串函数非常常用,主要包括如下20个:

ENDSWITH
EqualsIgnoreCase
INDEXOF
LAST_INDEX_OF
LCFIRST
LOWER
LTRIM
REGEX_EXTRACT
REGEX_EXTRACT_ALL
REPLACE
RTRIM
SPRINTF
STARTSWITH
STRSPLIT
STRSPLITTOBAG
SUBSTRING
TRIM
UCFIRST
UPPER
UniqueID

使用方法可以查看文档。

日期时间函数

日期函数有下面24个,如下所示:

AddDuration
CurrentTime
DaysBetween
GetDay
GetHour
GetMilliSecond
GetMinute
GetMonth
GetSecond
GetWeek
GetWeekYear
GetYear
HoursBetween
MilliSecondsBetween
MinutesBetween
MonthsBetween
SecondsBetween
SubtractDuration
ToDate
ToMilliSeconds
ToString
ToUnixTime
WeeksBetween
YearsBetween

集合函数

集合函数主要是,将其他类型的数据转换为集合类型tuple、bag、map,如下所示:
TOTUPLE
TOBAG
TOMAP
TOP
前面3个都是生成集合的函数,最后一个用来计算一个集合中的topN个元素,可以指定是按照升序/降序得到的结果,语法为TOP(topN,column,relation)。

Hive UDF函数

在Pig中可以直接调用Hive的UDF,HiveUDAF和HiveUDTF,语法如下表所示:

函数 语法 说明
HiveUDF HiveUDF(name[, constant parameters])
DEFINE sin HiveUDF('sin'); -- 定义HiveUDF,后面可以直接使用函数sin
a = LOAD 'student' AS (name:chararray, age:int, gpa:double);
b = FOREACH a GENERATE sin(gpa); -- 使用函数sin
HiveUDAF HiveUDAF(name[, constant parameters])
DEFINE explode HiveUDTF('explode');
a = LOAD 'mydata' AS (a0:{(b0:chararray)});
b = FOREACH a GENERATE FLATTEN(explode(a0));
HiveUDTF HiveUDTF(name[, constant parameters])
DEFINE avg HiveUDAF('avg');
a = LOAD 'student' AS (name:chararray, age:int, gpa:double);
b = GROUP a BY name;
c = FOREACH b GENERATE group,AVG(a.age);

Pig UDF

Pig也支持用户自定义函数UDF,而且支持使用多种编程语言来实现UDF,目前支持的变成语言包括:Java、JavaScript、Jython、Ruby、Groovy、Python。
以Java为例,可以通过继承自类org.apache.pig.EvalFunc来实现一个UDF,将实现的UDF泪打包后,Pig安装目录下的CLASSPATH下面,然后可以在Pig脚本中使用。例如,我们实现的UDF类为org.shirdrn.pig.udf.IPAddressConverterUDF,用来根据ip代码(long类型),转换为对应的点分十进制的IP地址字符串,打包后JAR文件名称为iptool.jar,则可以在Pig脚本中这样使用:

1 REGISTER 'iptool.jar';
2 a = LOAD '/data/etl/$date_string/$event_file' AS (event_code: long, udid: chararray, ip_code: long, network: chararray);
3 b = FOREACH a GENERATE event_code, udid, org.shirdrn.etl.pig.udf.IPAddressConverterUDF(ip_code);
4 DUMP b;

也可以实现一个自定义的累加器(Accumulator)或者过滤器,或者其他一些功能,可以实现相关的接口:Algebraic,Accumulator,FilterFunc,LoadFunc,StoreFunc,具体可以参考相关文档或资料。
另外,也可以通过在PiggyBank在来查找其它用户分享的UDF,可以在http://svn.apache.org/repos/asf/pig/trunk/contrib/piggybank中找到。

相关问题总结

  • 运行Pig脚本,从外部向脚本传递参数

在实际使用中,我们经常需要从外部传递参数到Pig脚本中,例如,文件路径,或者日期时间,等等,可以直接使用pig命令的-p参数从外部传参,例如,有下面的Pig脚本compute_event_user_count.pig:

1 a = LOAD '/data/etl/hive_input/20150613/basis_event_2015061305-r-00001' USING PigStorage('\t');
2 DESCRIBE a;
3 b = FOREACH a GENERATE $1 AS event_code, $2 AS udid, $4 AS install_id;
4 c = GROUP b BY (event_code, udid);
5 d = FOREACH c GENERATE group, COUNT($1);
6 DUMP d;

上面我们是直接将文件路径写死在脚本中,如果需要从外部传递输入文件、输出目录,则可以改写为:

1 a = LOAD '$input_file' USING PigStorage('\t');
2 DESCRIBE a;
3 b = FOREACH a GENERATE $1 AS event_code, $2 AS udid, $4 AS install_id;
4 c = GROUP b BY (event_code, udid);
5 d = FOREACH c GENERATE group, COUNT($1);
6 STORE d INTO '$output_dir' USING PigStorage ('\t');

则可以执行Pig脚本,并传递参数:

1 bin/pig -p input_file=/data/etl/hive_input/20150613/basis_event_2015061305-r-00001 -p output_dir=/test/shiyj/pig/example_output -x mapreduce compute_event_user_count.pig

这样就可以实现从外部向Pig脚本传递参数,可以到HDFS上查看结果输出文件/test/shiyj/pig/example_output/part-r-00000。pig命令更多选项,可以查看pig帮助命令:

1 bin/pig -h
  • 运行Pig脚本出现异常“Retrying connect to server: 0.0.0.0/0.0.0.0:10020”

实际应用中,我们几乎不可能将Pig安装到Hadoop集群的NameNode所在的节点,如果可以安装到NameNode节点上,基本不会报这个错误的。这个错误主要有是由于Pig没有安装在NameNode节点上,而是以外的其它节点上,它在执行计算过程中,需要与MapReduce的JobHistoryServer的IPC服务进行通信,所以在安装Hadoop时需要允许JobHistoryServer的IPC主机和端口被外部其它节点访问,只需要修改etc/hadoop/mapreduce-site.xml配置文件,增加如下配置即可:

1 <property>
2 <name>mapreduce.jobhistory.address</name>
3 <value>10.10.4.130:10020</value>
4 <description>MapReduce JobHistory Server IPC host:port</description>
5 </property>

如果第一次安装Hadoop没有配置该属性mapreduce.jobhistory.address,那么Hadoop集群的所有节点上会使用默认的配置值为0.0.0.0:10020,所以如果不在NameNode上安装Pig程序,导致Pig所在的节点上安装的Hadoop的配置属性mapreduce.jobhistory.address使用默认值,也就是Pig所在节点0.0.0.0:10020,所以Pig脚本就会执行过程中与本机的10020端口通信,显然会失败的。
其实,如果已经在NameNode上启动了JobHistoryServer进程,只需要修改mapreduce.jobhistory.address的属性值,然后同步到所有安装Hadoop文件的节点,包括Pig所在节点即可,不需要重启NameNode节点上的JobHistoryServer进程。如果没有在NameNode上启动JobHistoryServer进程,执行如下命令启动即可:

1 mr-jobhistory-daemon.sh start historyserver
目录
相关文章
|
5月前
|
存储 消息中间件 Java
Apache Flink 实践问题之原生TM UI日志问题如何解决
Apache Flink 实践问题之原生TM UI日志问题如何解决
55 1
|
2月前
|
消息中间件 存储 监控
构建高可用性Apache Kafka集群:从理论到实践
【10月更文挑战第24天】随着大数据时代的到来,数据传输与处理的需求日益增长。Apache Kafka作为一个高性能的消息队列服务,因其出色的吞吐量、可扩展性和容错能力而受到广泛欢迎。然而,在构建大规模生产环境下的Kafka集群时,保证其高可用性是至关重要的。本文将从个人实践经验出发,详细介绍如何构建一个高可用性的Kafka集群,包括集群规划、节点配置以及故障恢复机制等方面。
125 4
|
1天前
|
存储 运维 监控
金融场景 PB 级大规模日志平台:中信银行信用卡中心从 Elasticsearch 到 Apache Doris 的先进实践
中信银行信用卡中心每日新增日志数据 140 亿条(80TB),全量归档日志量超 40PB,早期基于 Elasticsearch 构建的日志云平台,面临存储成本高、实时写入性能差、文本检索慢以及日志分析能力不足等问题。因此使用 Apache Doris 替换 Elasticsearch,实现资源投入降低 50%、查询速度提升 2~4 倍,同时显著提高了运维效率。
金融场景 PB 级大规模日志平台:中信银行信用卡中心从 Elasticsearch 到 Apache Doris 的先进实践
|
5月前
|
消息中间件 分布式计算 Hadoop
Apache Flink 实践问题之Flume与Hadoop之间的物理墙问题如何解决
Apache Flink 实践问题之Flume与Hadoop之间的物理墙问题如何解决
72 3
|
5月前
|
消息中间件 运维 Kafka
Apache Flink 实践问题之达到网卡的最大速度如何解决
Apache Flink 实践问题之达到网卡的最大速度如何解决
66 2
|
2月前
|
存储 数据挖掘 数据处理
巴别时代使用 Apache Paimon 构建 Streaming Lakehouse 的实践
随着数据湖技术的发展,企业纷纷探索其优化潜力。本文分享了巴别时代使用 Apache Paimon 构建 Streaming Lakehouse 的实践。Paimon 支持流式和批处理,提供高性能、统一的数据访问和流批一体的优势。通过示例代码和实践经验,展示了如何高效处理实时数据,解决了数据一致性和故障恢复等挑战。
140 61
|
2月前
|
存储 消息中间件 分布式计算
Cisco WebEx 数据平台:统一 Trino、Pinot、Iceberg 及 Kyuubi,探索 Apache Doris 在 Cisco 的改造实践
Cisco WebEx 早期数据平台采用了多系统架构(包括 Trino、Pinot、Iceberg 、 Kyuubi 等),面临架构复杂、数据冗余存储、运维困难、资源利用率低、数据时效性差等问题。因此,引入 Apache Doris 替换了 Trino、Pinot 、 Iceberg 及 Kyuubi 技术栈,依赖于 Doris 的实时数据湖能力及高性能 OLAP 分析能力,统一数据湖仓及查询分析引擎,显著提升了查询性能及系统稳定性,同时实现资源成本降低 30%。
Cisco WebEx 数据平台:统一 Trino、Pinot、Iceberg 及 Kyuubi,探索 Apache Doris 在 Cisco 的改造实践
|
3月前
|
存储 小程序 Apache
10月26日@杭州,飞轮科技 x 阿里云举办 Apache Doris Meetup,探索保险、游戏、制造及电信领域数据仓库建设实践
10月26日,由飞轮科技与阿里云联手发起的 Apache Doris 杭州站 Meetup 即将开启!
86 0
|
5月前
|
存储 分布式计算 物联网
Apache IoTDB进行IoT相关开发实践
当今社会,物联网技术的发展带来了许多繁琐的挑战,尤其是在数据库管理系统领域,比如实时整合海量数据、处理流中的事件以及处理数据的安全性。例如,应用于智能城市的基于物联网的交通传感器可以实时生成大量的交通数据。据估计,未来5年,物联网设备的数量将达数万亿。物联网产生大量的数据,包括流数据、时间序列数据、RFID数据、传感数据等。要有效地管理这些数据,就需要使用数据库。数据库在充分处理物联网数据方面扮演着非常重要的角色。因此,适当的数据库与适当的平台同等重要。由于物联网在世界上不同的环境中运行,选择合适的数据库变得非常重要。 原创文字,IoTDB 社区可进行使用与传播 一、什么是IoTDB 我
216 9
Apache IoTDB进行IoT相关开发实践
|
5月前
|
SQL 运维 分布式计算
Apache Flink 实践问题之避免用户作业包中包含Flink的core包如何解决
Apache Flink 实践问题之避免用户作业包中包含Flink的core包如何解决
60 1
Apache Flink 实践问题之避免用户作业包中包含Flink的core包如何解决

热门文章

最新文章

相关实验场景

更多

推荐镜像

更多