问题描述:
一个trade table表
product1"trade1
product2"trade2
product3"trade3
一个pay table表
product1"pay1
product2"pay2
product2"pay3
product1"pay4
product3"pay5
product3"pay6
建立两个表之间的连接,该两表是一对多关系的
如下:
trade1pay1
trade1pay4
trade2pay2
...
思路:
为了将两个表整合到一起,由于有相同的第一列,且第一个表与第二个表是一对多关系的。
这里依然采用分组,以及组内排序,只要保证一方最先到达reduce端,则就可以进行迭代处理了。
为了保证第一个表先到达reduce端,可以为定义一个组合键,包含两个值,第一个值为product,第二个值为0或者1,来分别代表第一个表和第二个表,只要按照组内升序排列即可。
具体代码:
自定义组合键策略
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
package
whut.onetomany;
import
java.io.DataInput;
import
java.io.DataOutput;
import
java.io.IOException;
import
org.apache.hadoop.io.WritableComparable;
public
class
TextIntPair
implements
WritableComparable{
//product1 0/1
private
String firstKey;
//product1
private
int
secondKey;
//0,1;0代表是trade表,1代表是pay表
//只需要保证trade表在pay表前面就行,则只需要对组顺序排列
public
String getFirstKey() {
return
firstKey;
}
public
void
setFirstKey(String firstKey) {
this
.firstKey = firstKey;
}
public
int
getSecondKey() {
return
secondKey;
}
public
void
setSecondKey(
int
secondKey) {
this
.secondKey = secondKey;
}
@Override
public
void
write(DataOutput out)
throws
IOException {
out.writeUTF(firstKey);
out.writeInt(secondKey);
}
@Override
public
void
readFields(DataInput in)
throws
IOException {
// TODO Auto-generated method stub
firstKey=in.readUTF();
secondKey=in.readInt();
}
@Override
public
int
compareTo(Object o) {
// TODO Auto-generated method stub
TextIntPair tip=(TextIntPair)o;
return
this
.getFirstKey().compareTo(tip.getFirstKey());
}
}
|
分组策略
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
package
whut.onetomany;
import
org.apache.hadoop.io.WritableComparable;
import
org.apache.hadoop.io.WritableComparator;
public
class
TextComparator
extends
WritableComparator{
protected
TextComparator() {
super
(TextIntPair.
class
,
true
);
//注册比较器
}
@Override
public
int
compare(WritableComparable a, WritableComparable b) {
// TODO Auto-generated method stub
TextIntPair tip1=(TextIntPair)a;
TextIntPair tip2=(TextIntPair)b;
return
tip1.getFirstKey().compareTo(tip2.getFirstKey());
}
}
|
组内排序策略:目的是保证第一个表比第二个表先到达
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
package
whut.onetomany;
import
org.apache.hadoop.io.WritableComparable;
import
org.apache.hadoop.io.WritableComparator;
//分组内部进行排序,按照第二个字段进行排序
public
class
TextIntComparator
extends
WritableComparator {
public
TextIntComparator()
{
super
(TextIntPair.
class
,
true
);
}
//这里可以进行排序的方式管理
//必须保证是同一个分组的
//a与b进行比较
//如果a在前b在后,则会产生升序
//如果a在后b在前,则会产生降序
@Override
public
int
compare(WritableComparable a, WritableComparable b) {
// TODO Auto-generated method stub
TextIntPair ti1=(TextIntPair)a;
TextIntPair ti2=(TextIntPair)b;
//首先要保证是同一个组内,同一个组的标识就是第一个字段相同
if
(!ti1.getFirstKey().equals(ti2.getFirstKey()))
return
ti1.getFirstKey().compareTo(ti2.getFirstKey());
else
return
ti1.getSecondKey()-ti2.getSecondKey();
//0,-1,1
}
}
|
分区策略:
1
2
3
4
5
6
7
8
9
10
|
package
whut.onetomany;
import
org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Partitioner;
public
class
PartitionByText
extends
Partitioner<TextIntPair, Text> {
@Override
public
int
getPartition(TextIntPair key, Text value,
int
numPartitions) {
// TODO Auto-generated method stub
return
(key.getFirstKey().hashCode()&Integer.MAX_VALUE)%numPartitions;
}
}
|
MapReduce
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
|
package
whut.onetomany;
import
java.io.IOException;
import
java.util.Iterator;
import
org.apache.hadoop.conf.Configuration;
import
org.apache.hadoop.conf.Configured;
import
org.apache.hadoop.fs.Path;
import
org.apache.hadoop.io.LongWritable;
import
org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Job;
import
org.apache.hadoop.mapreduce.Mapper;
import
org.apache.hadoop.mapreduce.Mapper.Context;
import
org.apache.hadoop.mapreduce.Reducer;
import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import
org.apache.hadoop.mapreduce.lib.input.FileSplit;
import
org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import
org.apache.hadoop.util.GenericOptionsParser;
import
org.apache.hadoop.util.Tool;
import
org.apache.hadoop.util.ToolRunner;
public
class
JoinMain
extends
Configured
implements
Tool {
public
static
class
JoinMapper
extends
Mapper<LongWritable, Text, TextIntPair, Text>
{
private
TextIntPair tp=
new
TextIntPair();
private
Text val=
new
Text();
@Override
protected
void
map(LongWritable key, Text value, Context context)
throws
IOException, InterruptedException {
// TODO Auto-generated method stub
//获取要处理的文件的名称
FileSplit file=(FileSplit)context.getInputSplit();
String fileName=file.getPath().toString();
//获取输入行分隔
String line=value.toString();
String[] lineKeyValue=line.split(
"\""
);
String lineKey=lineKeyValue[
0
];
String lineValue=lineKeyValue[
1
];
tp.setFirstKey(lineKey);
//判断是否是trade文件
if
(fileName.indexOf(
"trade"
)>=
0
)
{
tp.setSecondKey(
0
);
val.set(lineValue);
}
//判断是否是pay文件
else
if
(fileName.indexOf(
"pay"
)>=
0
)
{
tp.setSecondKey(
1
);
val.set(lineValue);
}
context.write(tp, val);
}
}
public
static
class
JoinReducer
extends
Reducer<TextIntPair, Text, Text, Text>
{
@Override
protected
void
reduce(TextIntPair key, Iterable<Text> values,
Context context)
throws
IOException, InterruptedException {
Iterator<Text> valList=values.iterator();
//注意这里一定要写成string不可变,写成Text有问题
//Text trade=valList.next();
String tradeName=valList.next().toString();
while
(valList.hasNext())
{
Text pay=valList.next();
context.write(
new
Text(tradeName), pay);
}
}
}
@Override
public
int
run(String[] args)
throws
Exception
{
Configuration conf=getConf();
Job job=
new
Job(conf,
"JoinJob"
);
job.setJarByClass(JoinMain.
class
);
//ToolRunner已经利用GenericOptionsParser解析了命令行中的参数
//并且将其存放在数组中,传递给该run()方法了
FileInputFormat.addInputPath(job,
new
Path(args[
0
]));
FileInputFormat.addInputPath(job,
new
Path(args[
1
]));
//输入文件必须以,隔开
//FileInputFormat.addInputPaths(job, args[0]);
FileOutputFormat.setOutputPath(job,
new
Path(args[
2
]));
job.setMapperClass(JoinMapper.
class
);
job.setReducerClass(JoinReducer.
class
);
//设置分区方法
job.setPartitionerClass(PartitionByText.
class
);
//设置分组排序
job.setGroupingComparatorClass(TextComparator.
class
);
job.setSortComparatorClass(TextIntComparator.
class
);
job.setMapOutputKeyClass(TextIntPair.
class
);
job.setMapOutputValueClass(Text.
class
);
job.setOutputKeyClass(Text.
class
);
job.setOutputValueClass(Text.
class
);
job.waitForCompletion(
true
);
int
exitCode=job.isSuccessful()?
0
:
1
;
return
exitCode;
}
public
static
void
main(String[] args)
throws
Exception
{
// TODO Auto-generated method stub
int
code=ToolRunner.run(
new
JoinMain(), args);
System.exit(code);
}
}
|
注意:
一般有些地方没有定义组内排序策略,但是经过多次测试,发现无法保证第一个表在第二个表之前到达,则这里就自定义了组内排序策略。版本号为Hadoop1.1.2
本文转自 zhao_xiao_long 51CTO博客,原文链接:http://blog.51cto.com/computerdragon/1287744