MapReduce 中的两表 join 实例(二)

简介:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package  com.baidu.uilt;
import  java.io.*;
 
import  org.apache.hadoop.io.*;
 
public  class  TextPair  implements  WritableComparable<TextPair> {
 
   private  Text first;
   private  Text second;
   
   public  TextPair() {
     set( new  Text(),  new  Text());
   }
   
   public  TextPair(String first, String second) {
     set( new  Text(first),  new  Text(second));
   }
   
   public  TextPair(Text first, Text second) {
     set(first, second);
   }
   
   public  void  set(Text first, Text second) {
     this .first = first;
     this .second = second;
   }
   
   public  Text getFirst() {
     return  first;
   }
 
   public  Text getSecond() {
     return  second;
   }
 
   @Override
   public  void  write(DataOutput out)  throws  IOException {
     first.write(out);
     second.write(out);
   }
 
   @Override
   public  void  readFields(DataInput in)  throws  IOException {
     first.readFields(in);
     second.readFields(in);
   }
   
   @Override
   public  int  hashCode() {
     return  first.hashCode() *  163  + second.hashCode();
   }
   
   @Override
   public  boolean  equals(Object o) {
     if  (o  instanceof  TextPair) {
       TextPair tp = (TextPair) o;
       return  first.equals(tp.first) && second.equals(tp.second);
     }
     return  false ;
   }
 
   @Override
   public  String toString() {
     return  first +  "\t"  + second;
   }
   
   @Override
   public  int  compareTo(TextPair tp) {
     int  cmp = first.compareTo(tp.first);
     if  (cmp !=  0 ) {
       return  cmp;
     }
     return  second.compareTo(tp.second);
   }
 
   public  static  class  Comparator  extends  WritableComparator {
     
     private  static  final  Text.Comparator TEXT_COMPARATOR =  new  Text.Comparator();
     
     public  Comparator() {
       super (TextPair. class );
     }
 
     @Override
     public  int  compare( byte [] b1,  int  s1,  int  l1,
                        byte [] b2,  int  s2,  int  l2) {
       
       try  {
         int  firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);
         int  firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);
         int  cmp = TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
         if  (cmp !=  0 ) {
           return  cmp;
         }
         return  TEXT_COMPARATOR.compare(b1, s1 + firstL1, l1 - firstL1,
                                        b2, s2 + firstL2, l2 - firstL2);
       catch  (IOException e) {
         throw  new  IllegalArgumentException(e);
       }
     }
   }
 
   static  {
     WritableComparator.define(TextPair. class new  Comparator());
   }
   public  static  class  FirstComparator  extends  WritableComparator {
     
     private  static  final  Text.Comparator TEXT_COMPARATOR =  new  Text.Comparator();
     
     public  FirstComparator() {
       super (TextPair. class );
     }
 
     @Override
     public  int  compare( byte [] b1,  int  s1,  int  l1,
                        byte [] b2,  int  s2,  int  l2) {
       
       try  {
         int  firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);
         int  firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);
         return  TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
       catch  (IOException e) {
         throw  new  IllegalArgumentException(e);
       }
     }
     
     @Override
     public  int  compare(WritableComparable a, WritableComparable b) {
       if  (a  instanceof  TextPair && b  instanceof  TextPair) {
         return  ((TextPair) a).first.compareTo(((TextPair) b).first);
       }
       return  super .compare(a, b);
     }
   }
 
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package  com.baidu.loan;
/***
 
  * /home/users/ouerqiang/hadoop/hadoop-client-palo/hadoop/bin/hadoop jar LoanIdeaInfoText.jar  com.baidu.loan.LoanIdeainfoJoinIterialByDAILI6  /test/fbiz/loan/ideainfo/LoanIdeainfoByDAILIUnitID_0928  /test/fbiz/loan/ideainfo/LoanIterialByDAI_0928  /test/fbiz/loan/ideainfo/LoanIdeainfoJoinIterialByDAILI6_1_0928
 
  * **/
import  java.io.IOException;
import  java.util.Iterator;
import  org.apache.hadoop.mapred.FileOutputFormat;
import  org.apache.hadoop.mapred.JobClient;
import  org.apache.hadoop.mapred.JobConf;
import  org.apache.hadoop.mapred.MapReduceBase;
import  org.apache.hadoop.mapred.Mapper;
import  org.apache.hadoop.mapred.OutputCollector;
import  org.apache.hadoop.mapred.Partitioner;
import  org.apache.hadoop.mapred.Reducer;
import  org.apache.hadoop.mapred.Reporter;
import  org.apache.hadoop.mapred.TextInputFormat;
import  org.apache.hadoop.mapred.lib.MultipleInputs;
import  org.apache.hadoop.util.Tool;
import  org.apache.hadoop.util.ToolRunner;
import  org.apache.hadoop.conf.Configured;
import  org.apache.hadoop.fs.Path;
import  org.apache.hadoop.io.LongWritable;
import  org.apache.hadoop.io.Text;
 
import  com.baidu.uilt.TextPair;
 
public  class  LoanIdeainfoJoinIterialByDAILI6  extends  Configured  implements  Tool {
 
     public  static  class  JoinUnitMapper  extends  MapReduceBase  implements
             Mapper<LongWritable, Text, TextPair, Text> {
 
         public  void  map(LongWritable key, Text value,
                 OutputCollector<TextPair, Text> output, Reporter reporter)
                 throws  IOException {
             String gbkStr = value.toString();
             if  (gbkStr.split( "\t" ).length <  2  && gbkStr.split( "," ).length ==  4 ) {
                 String[] strs = gbkStr.split( "," );
                 output.collect( new  TextPair(strs[ 0 ],  "0" ), value);
             }
 
         }
     }
 
     public  static  class  JoinIterialMapper  extends  MapReduceBase  implements
             Mapper<LongWritable, Text, TextPair, Text> {
 
         public  void  map(LongWritable key, Text value,
                 OutputCollector<TextPair, Text> output, Reporter reporter)
                 throws  IOException {
             String gbkStr = value.toString();
             if  (gbkStr.split( "\t" ).length >  4 ) { // LoanIterial
                 String[] strs = gbkStr.split( "\t" );
                 output.collect( new  TextPair(strs[ 0 ],  "1" ), value);
             }
         }
     }
 
     public  static  class  JoinReducer  extends  MapReduceBase  implements
             Reducer<TextPair, Text, Text, Text> {
 
         public  void  reduce(TextPair key, Iterator<Text> values,
                 OutputCollector<Text, Text> output, Reporter reporter)
                 throws  IOException {
 
             Text stationName =  new  Text(values.next());
             while  (values.hasNext()) {
                 Text record = values.next();
                 Text outValue =  new  Text(stationName.toString() +  "\t"
                         + record.toString());
                 output.collect(stationName, record);
                 //output.collect(key.getFirst(), outValue);
             }
         }
     }
     
     public  static  class  KeyPartitioner  implements  Partitioner<TextPair, Text> {
         @Override
         public  void  configure(JobConf job) {}
         
         @Override
         public  int  getPartition(TextPair key, Text value,  int  numPartitions) {
           return  (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions;
         }
       }
     
     @Override
     public  int  run(String[] args)  throws  Exception {
         if  (args.length !=  3 ) {
               return  - 1 ;
             }
             
             JobConf conf =  new  JobConf(getConf(), getClass());
             conf.setJobName( "Join record with station name" );
             
             String strPathUnit =args[ 0 ];
             String strPathIterial =args[ 1 ];
             Path outputPath=  new  Path(args[ 2 ]);
             
             MultipleInputs.addInputPath(conf,  new  Path(strPathUnit),
                 TextInputFormat. class , JoinUnitMapper. class );
             MultipleInputs.addInputPath(conf,  new  Path(strPathIterial),
                 TextInputFormat. class , JoinIterialMapper. class );
             FileOutputFormat.setOutputPath(conf, outputPath);
 
             conf.setPartitionerClass(KeyPartitioner. class );
             conf.setOutputValueGroupingComparator(TextPair.FirstComparator. class );
             
             conf.setMapOutputKeyClass(TextPair. class );
             
             conf.setReducerClass(JoinReducer. class );
 
             conf.setOutputKeyClass(Text. class );
             
             JobClient.runJob(conf);
             return  0 ;
     }
     
     public  static  void  main(String[] args)  throws  Exception {
         int  exitCode = ToolRunner.run( new  LoanIdeainfoJoinIterialByDAILI6(), args);
         System.exit(exitCode);
     }
 
     
 
}


需要注意的是上面的代码只是针对两表的一对一(多)关系,不满足多对多关系。如果需要满足多对多关系则需要加上一下判断即可。


本文转自 梦朝思夕 51CTO博客,原文链接:http://blog.51cto.com/qiangmzsx/1560553


相关文章
|
9月前
|
分布式计算 算法 数据库
32 MAPREDUCE的map端join算法实现
32 MAPREDUCE的map端join算法实现
27 0
|
数据采集 分布式计算 搜索推荐
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(一)
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(一)
|
3月前
|
缓存 分布式计算 Java
MapReduce编程:join操作和聚合操作
MapReduce编程:join操作和聚合操作
57 0
|
9月前
|
存储 SQL 分布式计算
31 MAPREDUCE的reduce端join算法实现
31 MAPREDUCE的reduce端join算法实现
29 0
|
11月前
|
分布式计算 资源调度 Java
大数据MapReduce统计单词实例
大数据MapReduce统计单词实例
213 0
|
数据采集 缓存 分布式计算
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(二)
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(二)
|
存储 分布式计算 资源调度
|
分布式计算
mapreduce辅助排序和序列化的实例练习
mapreduce辅助排序和序列化的实例练习
mapreduce辅助排序和序列化的实例练习
|
分布式计算 Java Hadoop
java:MapReduce原理及入门实例:wordcount
java:MapReduce原理及入门实例:wordcount
166 0
java:MapReduce原理及入门实例:wordcount