1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
|
package
com.baidu.uilt;
import
java.io.*;
import
org.apache.hadoop.io.*;
public
class
TextPair
implements
WritableComparable<TextPair> {
private
Text first;
private
Text second;
public
TextPair() {
set(
new
Text(),
new
Text());
}
public
TextPair(String first, String second) {
set(
new
Text(first),
new
Text(second));
}
public
TextPair(Text first, Text second) {
set(first, second);
}
public
void
set(Text first, Text second) {
this
.first = first;
this
.second = second;
}
public
Text getFirst() {
return
first;
}
public
Text getSecond() {
return
second;
}
@Override
public
void
write(DataOutput out)
throws
IOException {
first.write(out);
second.write(out);
}
@Override
public
void
readFields(DataInput in)
throws
IOException {
first.readFields(in);
second.readFields(in);
}
@Override
public
int
hashCode() {
return
first.hashCode() *
163
+ second.hashCode();
}
@Override
public
boolean
equals(Object o) {
if
(o
instanceof
TextPair) {
TextPair tp = (TextPair) o;
return
first.equals(tp.first) && second.equals(tp.second);
}
return
false
;
}
@Override
public
String toString() {
return
first +
"\t"
+ second;
}
@Override
public
int
compareTo(TextPair tp) {
int
cmp = first.compareTo(tp.first);
if
(cmp !=
0
) {
return
cmp;
}
return
second.compareTo(tp.second);
}
public
static
class
Comparator
extends
WritableComparator {
private
static
final
Text.Comparator TEXT_COMPARATOR =
new
Text.Comparator();
public
Comparator() {
super
(TextPair.
class
);
}
@Override
public
int
compare(
byte
[] b1,
int
s1,
int
l1,
byte
[] b2,
int
s2,
int
l2) {
try
{
int
firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);
int
firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);
int
cmp = TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
if
(cmp !=
0
) {
return
cmp;
}
return
TEXT_COMPARATOR.compare(b1, s1 + firstL1, l1 - firstL1,
b2, s2 + firstL2, l2 - firstL2);
}
catch
(IOException e) {
throw
new
IllegalArgumentException(e);
}
}
}
static
{
WritableComparator.define(TextPair.
class
,
new
Comparator());
}
public
static
class
FirstComparator
extends
WritableComparator {
private
static
final
Text.Comparator TEXT_COMPARATOR =
new
Text.Comparator();
public
FirstComparator() {
super
(TextPair.
class
);
}
@Override
public
int
compare(
byte
[] b1,
int
s1,
int
l1,
byte
[] b2,
int
s2,
int
l2) {
try
{
int
firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);
int
firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);
return
TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
}
catch
(IOException e) {
throw
new
IllegalArgumentException(e);
}
}
@Override
public
int
compare(WritableComparable a, WritableComparable b) {
if
(a
instanceof
TextPair && b
instanceof
TextPair) {
return
((TextPair) a).first.compareTo(((TextPair) b).first);
}
return
super
.compare(a, b);
}
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
package
com.baidu.loan;
/***
*
* /home/users/ouerqiang/hadoop/hadoop-client-palo/hadoop/bin/hadoop jar LoanIdeaInfoText.jar com.baidu.loan.LoanIdeainfoJoinIterialByDAILI6 /test/fbiz/loan/ideainfo/LoanIdeainfoByDAILIUnitID_0928 /test/fbiz/loan/ideainfo/LoanIterialByDAI_0928 /test/fbiz/loan/ideainfo/LoanIdeainfoJoinIterialByDAILI6_1_0928
*
* **/
import
java.io.IOException;
import
java.util.Iterator;
import
org.apache.hadoop.mapred.FileOutputFormat;
import
org.apache.hadoop.mapred.JobClient;
import
org.apache.hadoop.mapred.JobConf;
import
org.apache.hadoop.mapred.MapReduceBase;
import
org.apache.hadoop.mapred.Mapper;
import
org.apache.hadoop.mapred.OutputCollector;
import
org.apache.hadoop.mapred.Partitioner;
import
org.apache.hadoop.mapred.Reducer;
import
org.apache.hadoop.mapred.Reporter;
import
org.apache.hadoop.mapred.TextInputFormat;
import
org.apache.hadoop.mapred.lib.MultipleInputs;
import
org.apache.hadoop.util.Tool;
import
org.apache.hadoop.util.ToolRunner;
import
org.apache.hadoop.conf.Configured;
import
org.apache.hadoop.fs.Path;
import
org.apache.hadoop.io.LongWritable;
import
org.apache.hadoop.io.Text;
import
com.baidu.uilt.TextPair;
public
class
LoanIdeainfoJoinIterialByDAILI6
extends
Configured
implements
Tool {
public
static
class
JoinUnitMapper
extends
MapReduceBase
implements
Mapper<LongWritable, Text, TextPair, Text> {
public
void
map(LongWritable key, Text value,
OutputCollector<TextPair, Text> output, Reporter reporter)
throws
IOException {
String gbkStr = value.toString();
if
(gbkStr.split(
"\t"
).length <
2
&& gbkStr.split(
","
).length ==
4
) {
String[] strs = gbkStr.split(
","
);
output.collect(
new
TextPair(strs[
0
],
"0"
), value);
}
}
}
public
static
class
JoinIterialMapper
extends
MapReduceBase
implements
Mapper<LongWritable, Text, TextPair, Text> {
public
void
map(LongWritable key, Text value,
OutputCollector<TextPair, Text> output, Reporter reporter)
throws
IOException {
String gbkStr = value.toString();
if
(gbkStr.split(
"\t"
).length >
4
) {
// LoanIterial
String[] strs = gbkStr.split(
"\t"
);
output.collect(
new
TextPair(strs[
0
],
"1"
), value);
}
}
}
public
static
class
JoinReducer
extends
MapReduceBase
implements
Reducer<TextPair, Text, Text, Text> {
public
void
reduce(TextPair key, Iterator<Text> values,
OutputCollector<Text, Text> output, Reporter reporter)
throws
IOException {
Text stationName =
new
Text(values.next());
while
(values.hasNext()) {
Text record = values.next();
Text outValue =
new
Text(stationName.toString() +
"\t"
+ record.toString());
output.collect(stationName, record);
//output.collect(key.getFirst(), outValue);
}
}
}
public
static
class
KeyPartitioner
implements
Partitioner<TextPair, Text> {
@Override
public
void
configure(JobConf job) {}
@Override
public
int
getPartition(TextPair key, Text value,
int
numPartitions) {
return
(key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions;
}
}
@Override
public
int
run(String[] args)
throws
Exception {
if
(args.length !=
3
) {
return
-
1
;
}
JobConf conf =
new
JobConf(getConf(), getClass());
conf.setJobName(
"Join record with station name"
);
String strPathUnit =args[
0
];
String strPathIterial =args[
1
];
Path outputPath=
new
Path(args[
2
]);
MultipleInputs.addInputPath(conf,
new
Path(strPathUnit),
TextInputFormat.
class
, JoinUnitMapper.
class
);
MultipleInputs.addInputPath(conf,
new
Path(strPathIterial),
TextInputFormat.
class
, JoinIterialMapper.
class
);
FileOutputFormat.setOutputPath(conf, outputPath);
conf.setPartitionerClass(KeyPartitioner.
class
);
conf.setOutputValueGroupingComparator(TextPair.FirstComparator.
class
);
conf.setMapOutputKeyClass(TextPair.
class
);
conf.setReducerClass(JoinReducer.
class
);
conf.setOutputKeyClass(Text.
class
);
JobClient.runJob(conf);
return
0
;
}
public
static
void
main(String[] args)
throws
Exception {
int
exitCode = ToolRunner.run(
new
LoanIdeainfoJoinIterialByDAILI6(), args);
System.exit(exitCode);
}
}
|
需要注意的是上面的代码只是针对两表的一对一(多)关系,不满足多对多关系。如果需要满足多对多关系则需要加上一下判断即可。
本文转自 梦朝思夕 51CTO博客,原文链接:http://blog.51cto.com/qiangmzsx/1560553