Pig是一种数据流编程语言,由一系列操作和变换构成,每一个操作或者变换都对输入进行处理,然后产生输出结果,整体操作表示一个数据流。Pig的执行环境将数据流翻译为可执行的内部表示,在Pig内部,这些变换操作被转换为一系列的MapReduce作业。

      Pig自身有许多个方法,有时候需要我们自己定制特定的处理方法即UDF。

      UDF具体的步骤如下

第一步,继承计算类或者过滤类或者加载类或者存储类,重写里面的需要实现的方法,将写好的类进行打包生成jar文件。诸如命名为example.jar

第二步,进入Pig的grunt中,利用register将打包的文件注册进入Pig中。进入Pig的grunt中,当前本地路径就是用户输入Pig时候所在的路径。打包文件一定要加上它所在的路径。如register example.jar。

第三步,直接使用该自定义的UDF,在使用的过程中需要加上该类的权限定包名,如果这里example.jar的包结构为com.whut.FilterFunct。则引用的时候就是com.whut.FilterFunct(参数)。注意类的名称就是使用时候的方法名,必须要区分大小写。

第四步,为自己的UDF定义别名,这样使用的时候就不许要加包名了,如

define Goog com.whut.FilterFunct()。这样使用的时候就直接利用Goog了。

自定义过滤UDF:

        过滤UDF需要继承FilterFunc。实现其exec方法。该方法返回的是boolean型。在对温度统计的时候,就可以利用过滤UDF来过滤是否正确的气温。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package  whut;
import  java.io.IOException;
import  java.util.ArrayList;
import  java.util.List;
import  org.apache.pig.FilterFunc;
import  org.apache.pig.FuncSpec;
import  org.apache.pig.backend.executionengine.ExecException;
import  org.apache.pig.data.DataType;
import  org.apache.pig.data.Tuple;
import  org.apache.pig.impl.logicalLayer.FrontendException;
//删除记录中不符合要求的记录
//pig的自定义函数,过滤函数
public  class  IsGoodQuality  extends  FilterFunc{
     @Override
     public  Boolean exec(Tuple tuple)  throws  IOException {
         // TODO Auto-generated method stub
         if (tuple == null  ||tuple.size()== 0 )
         return  false ;
         try {
             Object obj=tuple.get( 0 );
             if (obj== null )
                 return  false ;
             //这里强制转换为一个整形
             int  i=(Integer)obj;
             return  i== 0  ||i== 1  || i== 2  || i== 3 ;
         } catch (ExecException e)
         {
             throw  new  IOException(e);
         }
     }
}

        这里的参数是一个元组,可以包含多个输入参数,在方法中直接利用get(索引位置)来直接获取。

自定义加载函数UDF

      在Pig中经常会使用到加载外部文件,一般使用Load进行加载,如Load 'input/tempdata' as (a:chararray,b:int) 。这里默认使用了内部加载存储函数,PigStorage。

Load 'input/tempdata' using PigStorage()  as (a:chararray,b:int)。这里PigStorage默认的每一行的字段分割符是制表符,当然也可以传递一个自己的字段分割符号。有时候每一行是一串字符串,想从中取出某一个字段,则就需要自己定义一个加载函数。以下面这个文件为例子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
aaaaa1990aaaaaa0039a
bbbbb1991bbbbbb0045a
ccccc1992cccccc0011c
ddddd1993dddddd0043d
eeeee1994eeeeee0047e
aaaaa1990aaaaaa0037a
bbbbb1991bbbbbb0027a
ccccc1992cccccc0032c
ddddd1993dddddd0090d
eeeee1994eeeeee0091e
aaaaa1980aaaaaa0041a
bbbbb1981bbbbbb0050a
ccccc1992cccccc0020c
ddddd1993dddddd0033d
eeeee1984eeeeee0061e
aaaaa1980aaaaaa0054a
bbbbb1991bbbbbb0075a
ccccc1982cccccc0011c
ddddd1993dddddd0003d
eeeee1974eeeeee0041e
aaaaa1990aaaaaa0039a
bbbbb1961bbbbbb0041a
ccccc1972cccccc0070c
ddddd1993dddddd0042d
eeeee1974eeeeee0043e
aaaaa1990aaaaaa0034a
bbbbb1971bbbbbb0025a
ccccc1992cccccc0056c
ddddd1993dddddd0037d
eeeee1984eeeeee0038e
aaaaa1990aaaaaa0049a
bbbbb1991bbbbbb0011a
ccccc1962cccccc0012c
ddddd1993dddddd0023d
eeeee1984eeeeee0031e
aaaaa1980aaaaaa0094a
bbbbb1971bbbbbb0045a
ccccc1992cccccc0041c
ddddd1993dddddd0003d
eeeee1984eeeeee0081e
aaaaa1960aaaaaa0099a
bbbbb1971bbbbbb0050a
ccccc1952cccccc0055c
ddddd1963dddddd0043d
eeeee1994eeeeee0041e
aaaaa1990aaaaaa0031a
bbbbb1991bbbbbb0020a
ccccc1952cccccc0030c
ddddd1983dddddd0013d
eeeee1974eeeeee0061e
aaaaa1980aaaaaa0071a
bbbbb1961bbbbbb0060a
ccccc1992cccccc0080c
ddddd1953dddddd0033d
eeeee1964eeeeee0051e
aaaaa1960aaaaaa0024a
bbbbb1951bbbbbb0035a
ccccc1952cccccc0048c
ddddd1953dddddd0053d
eeeee1954eeeeee0048e

       为了从中取出年份和温度,则就需要自己定义加载函数,这里每一列序号以0开始。自定义加载函数需要继承LoadFunc。具体的代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package  whut;
import  java.io.IOException;
import  java.util.ArrayList;
import  java.util.List;
import  org.apache.commons.logging.Log;
import  org.apache.commons.logging.LogFactory;
import  org.apache.hadoop.io.Text;
import  org.apache.hadoop.mapreduce.InputFormat;
import  org.apache.hadoop.mapreduce.Job;
import  org.apache.hadoop.mapreduce.RecordReader;
import  org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import  org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import  org.apache.pig.LoadFunc;
import  org.apache.pig.backend.executionengine.ExecException;
import  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import  org.apache.pig.data.DataByteArray;
import  org.apache.pig.data.Tuple;
import  org.apache.pig.data.TupleFactory;
class  Range
{
     //列的索引以0开始
     //字段分割的列的位置
     private  int  start;
     private  int  end;
     //根据输入来解析
     //字符串格式必须是(2~3,5~6)
     public  static  List<Range> parse(String cutStr) throws  Exception
     {
         List<Range> rangeList= new  ArrayList<Range>();
         //首先要判断是否格式正确
         boolean  state=cutStr.matches( "\\d+~\\d+(,\\d+~\\d+)*" );
         if (!state)
         {
           throw  new  Exception( "InputForat Error:\n"  +
             "Usage:number~number,number~number;Such 2~7,10~19" );
         }
         //先截取几个字段的列起止位置如2~8 
         String[] splits=cutStr.split( "," );
         //遍历长度设置Range
         for ( int  i= 0 ;i<splits.length;i++)
         {
             Range range= new  Range();
             String sub=splits[i];
             String[] subSplits=sub.split( "~" );
             int  subStart=Integer.parseInt(subSplits[ 0 ]);
             int  subEnd=Integer.parseInt(subSplits[ 1 ]);
             if (subStart>subEnd)
             throw  new  Exception( "InputForat Error:\n"  +
                     "Detail:first number must less than second number" );
             range.setStart(subStart);
             range.setEnd(subEnd);
             rangeList.add(range);
         }
         return  rangeList;
     }
     public  int  getStart() {
         return  start;
     }
     public  void  setStart( int  start) {
         this .start = start;
     }
     public  int  getEnd() {
         return  end;
     }
     public  void  setEnd( int  end) {
         this .end = end;
     }
               
     public  String getSubString(String inStr)
     {
         String res=inStr.substring(start, end);
         return  res;
     }
}
//定义加载函数,从每一行字符串提出年份,温度
public  class  LineLoadFunc  extends  LoadFunc{
               
     private  static  final  Log LOG=LogFactory.getLog(LineLoadFunc. class );
     //负责产生元组的各个字段
     private  final  TupleFactory tupleFactory=TupleFactory.getInstance();
     //负责读取输入记录
     private  RecordReader reader;
     //存每个字段的集合
     private  List<Range> ranges;
     //传递参数设置列的位置分割
     public  LineLoadFunc(String cutPattern) throws  Exception
     {
         ranges=Range.parse(cutPattern);
     }
     //设置文件的加载位置
     @Override
     public  void  setLocation(String location, Job job)  throws  IOException {
         FileInputFormat.setInputPaths(job, location);
     }
     //设置加载文件的输入文件格式
     //为每一个分片建立一个RecordReader
     @Override
     public  InputFormat getInputFormat()  throws  IOException {
         return  new  TextInputFormat();
     }
     @Override
     public  void  prepareToRead(RecordReader reader, PigSplit split)
             throws  IOException {
         this .reader=reader;
     }
     @Override
     public  Tuple getNext()  throws  IOException {
         // TODO Auto-generated method stub
         try {
             if (!reader.nextKeyValue())
                 return  null ;
             //TextInputFormat
             //key:LongWritable,value:Text
             Text value=(Text)reader.getCurrentValue();
             String line=value.toString();
             //设置每一个元组有几个字段
             Tuple tuple=tupleFactory.newTuple(ranges.size());
             for ( int  i= 0 ;i<ranges.size();i++)
             {
                 Range range=ranges.get(i);
                 if (range.getEnd()>line.length())
                 {
                     throw  new  ExecException( "InputFormat:Error\n"  +
                             "field length more than total length" );
                 }
                 //必须使用DataByteArray来构造字段的类型
                 tuple.set(i,  new  DataByteArray(range.getSubString(line)));
             }
             return  tuple;
         } catch (InterruptedException e)
         {
             throw  new  ExecException();
         }
     }
}


           具体使用的方法就是按照刚才所说的步骤进行的。