问题描述:

一个trade table表

product1"trade1

product2"trade2

product3"trade3

一个pay table表

product1"pay1

product2"pay2

product2"pay3

product1"pay4

product3"pay5

product3"pay6

建立两个表之间的连接,该两表是一对多关系的

如下:

trade1pay1

trade1pay4

trade2pay2

...

思路:

       为了将两个表整合到一起,由于有相同的第一列,且第一个表与第二个表是一对多关系的。

这里依然采用分组,以及组内排序,只要保证一方最先到达reduce端,则就可以进行迭代处理了。

为了保证第一个表先到达reduce端,可以为定义一个组合键,包含两个值,第一个值为product,第二个值为0或者1,来分别代表第一个表和第二个表,只要按照组内升序排列即可。

具体代码:

自定义组合键策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package  whut.onetomany;
import  java.io.DataInput;
import  java.io.DataOutput;
import  java.io.IOException;
import  org.apache.hadoop.io.WritableComparable;
public  class  TextIntPair  implements  WritableComparable{
     //product1 0/1
     private  String firstKey; //product1
     private  int  secondKey; //0,1;0代表是trade表,1代表是pay表
     //只需要保证trade表在pay表前面就行,则只需要对组顺序排列
                                                           
     public  String getFirstKey() {
         return  firstKey;
     }
     public  void  setFirstKey(String firstKey) {
         this .firstKey = firstKey;
     }
     public  int  getSecondKey() {
         return  secondKey;
     }
     public  void  setSecondKey( int  secondKey) {
         this .secondKey = secondKey;
     }
     @Override
     public  void  write(DataOutput out)  throws  IOException {
         out.writeUTF(firstKey);
         out.writeInt(secondKey);
     }
     @Override
     public  void  readFields(DataInput in)  throws  IOException {
         // TODO Auto-generated method stub
         firstKey=in.readUTF();
         secondKey=in.readInt();
     }
                                                           
     @Override
     public  int  compareTo(Object o) {
         // TODO Auto-generated method stub
         TextIntPair tip=(TextIntPair)o;
         return  this .getFirstKey().compareTo(tip.getFirstKey());
     }
}

分组策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package  whut.onetomany;
import  org.apache.hadoop.io.WritableComparable;
import  org.apache.hadoop.io.WritableComparator;
public  class  TextComparator  extends  WritableComparator{
     protected  TextComparator() {
         super (TextIntPair. class , true ); //注册比较器
     }
     @Override
     public  int  compare(WritableComparable a, WritableComparable b) {
         // TODO Auto-generated method stub
         TextIntPair tip1=(TextIntPair)a;
         TextIntPair tip2=(TextIntPair)b;
         return  tip1.getFirstKey().compareTo(tip2.getFirstKey());
     }
}

组内排序策略:目的是保证第一个表比第二个表先到达

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package  whut.onetomany;
import  org.apache.hadoop.io.WritableComparable;
import  org.apache.hadoop.io.WritableComparator;
//分组内部进行排序,按照第二个字段进行排序
public  class  TextIntComparator  extends  WritableComparator {
     public  TextIntComparator()
     {
         super (TextIntPair. class , true );
     }
     //这里可以进行排序的方式管理
     //必须保证是同一个分组的
     //a与b进行比较
     //如果a在前b在后,则会产生升序
     //如果a在后b在前,则会产生降序
     @Override
     public  int  compare(WritableComparable a, WritableComparable b) {
         // TODO Auto-generated method stub
         TextIntPair ti1=(TextIntPair)a;
         TextIntPair ti2=(TextIntPair)b;
         //首先要保证是同一个组内,同一个组的标识就是第一个字段相同
         if (!ti1.getFirstKey().equals(ti2.getFirstKey()))
            return  ti1.getFirstKey().compareTo(ti2.getFirstKey());
         else
            return  ti1.getSecondKey()-ti2.getSecondKey(); //0,-1,1
     }
                                      
}

分区策略:

1
2
3
4
5
6
7
8
9
10
package  whut.onetomany;
import  org.apache.hadoop.io.Text;
import  org.apache.hadoop.mapreduce.Partitioner;
public  class  PartitionByText  extends  Partitioner<TextIntPair, Text> {
     @Override
     public  int  getPartition(TextIntPair key, Text value,  int  numPartitions) {
         // TODO Auto-generated method stub
         return  (key.getFirstKey().hashCode()&Integer.MAX_VALUE)%numPartitions;
     }
}

MapReduce

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package  whut.onetomany;
import  java.io.IOException;
import  java.util.Iterator;
import  org.apache.hadoop.conf.Configuration;
import  org.apache.hadoop.conf.Configured;
import  org.apache.hadoop.fs.Path;
import  org.apache.hadoop.io.LongWritable;
import  org.apache.hadoop.io.Text;
import  org.apache.hadoop.mapreduce.Job;
import  org.apache.hadoop.mapreduce.Mapper;
import  org.apache.hadoop.mapreduce.Mapper.Context;
import  org.apache.hadoop.mapreduce.Reducer;
import  org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import  org.apache.hadoop.mapreduce.lib.input.FileSplit;
import  org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import  org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import  org.apache.hadoop.util.GenericOptionsParser;
import  org.apache.hadoop.util.Tool;
import  org.apache.hadoop.util.ToolRunner;
public  class  JoinMain  extends  Configured  implements  Tool {
     public  static  class  JoinMapper  extends  Mapper<LongWritable, Text, TextIntPair, Text>
     {
         private  TextIntPair tp= new  TextIntPair();
         private  Text val= new  Text();
         @Override
         protected  void  map(LongWritable key, Text value, Context context)
                 throws  IOException, InterruptedException {
             // TODO Auto-generated method stub
             //获取要处理的文件的名称
             FileSplit file=(FileSplit)context.getInputSplit();
             String fileName=file.getPath().toString();
             //获取输入行分隔
             String line=value.toString();
             String[] lineKeyValue=line.split( "\"" );
             String lineKey=lineKeyValue[ 0 ];
             String lineValue=lineKeyValue[ 1 ];
             tp.setFirstKey(lineKey);
             //判断是否是trade文件
             if (fileName.indexOf( "trade" )>= 0 )
             {
                 tp.setSecondKey( 0 );
                 val.set(lineValue);
             }
             //判断是否是pay文件
             else  if (fileName.indexOf( "pay" )>= 0 )
             {
                 tp.setSecondKey( 1 );
                 val.set(lineValue);
             }
             context.write(tp, val);
         }
     }
                       
     public  static  class  JoinReducer  extends  Reducer<TextIntPair, Text, Text, Text>
     {
         @Override
         protected  void  reduce(TextIntPair key, Iterable<Text> values,
                 Context context) throws  IOException, InterruptedException {
             Iterator<Text> valList=values.iterator();
             //注意这里一定要写成string不可变,写成Text有问题
             //Text trade=valList.next();
             String tradeName=valList.next().toString();
             while (valList.hasNext())
             {
                 Text pay=valList.next();
                 context.write( new  Text(tradeName), pay);
             }
         }
     }
     @Override
     public  int  run(String[] args)  throws  Exception
     {
         Configuration conf=getConf();
         Job job= new  Job(conf, "JoinJob" );
         job.setJarByClass(JoinMain. class );
         //ToolRunner已经利用GenericOptionsParser解析了命令行中的参数
         //并且将其存放在数组中,传递给该run()方法了
         FileInputFormat.addInputPath(job,  new  Path(args[ 0 ]));
         FileInputFormat.addInputPath(job,  new  Path(args[ 1 ]));
         //输入文件必须以,隔开
         //FileInputFormat.addInputPaths(job, args[0]);
         FileOutputFormat.setOutputPath(job,  new  Path(args[ 2 ]));
                           
         job.setMapperClass(JoinMapper. class );
         job.setReducerClass(JoinReducer. class );
         //设置分区方法
         job.setPartitionerClass(PartitionByText. class );
         //设置分组排序
         job.setGroupingComparatorClass(TextComparator. class );
         job.setSortComparatorClass(TextIntComparator. class );
                           
         job.setMapOutputKeyClass(TextIntPair. class );
         job.setMapOutputValueClass(Text. class );
         job.setOutputKeyClass(Text. class );
         job.setOutputValueClass(Text. class );
         job.waitForCompletion( true );
         int  exitCode=job.isSuccessful()? 0 : 1 ;
         return  exitCode;
     }
     public  static  void  main(String[] args) throws  Exception
     {
         // TODO Auto-generated method stub
         int  code=ToolRunner.run( new  JoinMain(), args);
         System.exit(code);
     }
}


注意:

     一般有些地方没有定义组内排序策略,但是经过多次测试,发现无法保证第一个表在第二个表之前到达,则这里就自定义了组内排序策略。版本号为Hadoop1.1.2