Hadoop 中的join分为三种
- Reduce端join,适合于两个大表
- Map端join,适合一个大表和一个小表,小表放到 Distribute Cache里面
- semi join 当join只用到其中一个表中的一小部分时
Reduce端join
- 读入两个大表,对value按文件进行标记
- 在Reduce端收集属于不同文件的value到不同的list,对同一key的不同list中的value做笛卡尔积
- Logger 用来记录错误
- Counter 用来记数想要的一些数据
- configuration context用来传递数据
public class ReduceJoin {
private static final String DELIMITER = "\\s+";
private static final Logger LOG = Logger.getLogger(ReduceJoin.class);
public static class JoinMapper extends Mapper<Object, Text, Text, Text> {
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
FileSplit split = (FileSplit) context.getInputSplit();
String path = split.getPath().toString();
Configuration conf = context.getConfiguration();
String t1 = conf.get("t1FileName");
String t2 = conf.get("t2FileName");
String line = value.toString();
if (line == null || line.trim().equals("")) {
return;
}
String[] values = line.split(DELIMITER);
if (path.contains(t1)) {
if (values.length != 2) {
LOG.error("t1 Number of Fields Error");
return;
}
context.getCounter("MapStage", "t1 read records").increment(1);
context.write(new Text(values[0]), new Text("u#" + values[1]));
} else if (path.contains(t2)) {
if (values.length != 4) {
LOG.error("t2 Number of Fields Error");
return;
}
context.getCounter("MapStage", "t2 read records").increment(1);
context.write(new Text(values[0]), new Text("l#" + values[2] + "\t" + values[3]));
} else {
context.getCounter("MapStage", "map filtered records").increment(1);
}
}
}
public static class JoinReducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException,
InterruptedException {
List<String> t1 = new ArrayList<String>();
List<String> t2 = new ArrayList<String>();
for (Text value : values) {
String[] fields = value.toString().split("#");
if (fields.length != 2) {
continue;
}
if (fields[0].equals("u")) {
t1.add(fields[1]);
} else if (fields[0].equals("l")) {
t2.add(fields[1]);
} else {
continue;
}
}
for (String it1 : t1) {
for (String it2 : t2) {
context.write(key, new Text(it1 + "\t" + it2));
}
}
}
}
public static void main(String[] args) throws Exception {
if (args.length != 4) {
return;
}
Configuration conf = new Configuration();
conf.set("t1FileName", args[2]);
conf.set("t2FileName", args[3]);
Job job = new Job(conf, "join");
job.setJarByClass(ReduceJoin.class);
job.setMapperClass(JoinMapper.class);
job.setReducerClass(JoinReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Map端join
- 适用于一大一小两个表
- 小表装进Distribute Cache里
public class MapJoin {
private static final Logger LOG = Logger.getLogger(MapJoin.class);
protected static class MapJoinMapper extends Mapper<Object,Text,Text,Text>{
private Map<String,String> map = new HashMap<String,String>();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
BufferedReader br = new BufferedReader(new FileReader("t1"));
String line = null;
while((line = br.readLine())!=null){
if(line == null || line.equals("")){
return;
}
String[] fields = line.split("\\s+");
if(fields.length!=2){
context.getCounter("MapStage","Input Record Fields Count Error").increment(1);
return;
}
map.put(fields[0], fields[1]);
}
br.close();
}
@Override
protected void map(Object key, Text value, Context context)throws IOException, InterruptedException {
String line = value.toString();
if(line == null || line.equals("")){
return;
}
String[] fields = line.split("\\s+");
if(fields.length!=4){
context.getCounter("ReduceStage","Map output Record Fields Count Error").increment(1);
}
if(map.containsKey(fields[0])){
context.write(new Text(fields[0]), new Text(map.get(fields[0])+"\t"+fields[2]+"\t"+fields[3]));
}
}
}
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
DistributedCache.addCacheFile(new URI("hdfs://namenode/user/zhanghu/cache/t1#t1"), conf);
Job job = new Job(conf,"MapJoin");
job.setJarByClass(MapJoin.class);
job.setMapperClass(MapJoinMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true)?0:1);
}
}
Semi Join
- 在map端进行数据过滤,只传输参与join的数据,减少shuffle阶段网络传输量
- 前提是存在于Logs中的UserId字段可以被放入到Cache中
- 实现方法
- 首先对右表中的UserId字段进行去重,保存在UniqueUsers
- 利用DistributeCache去除User表中UserId不在右表中的数据
/**
* 去重
**/
public class RemoveDuplicates {
public static class RemoveDuplicatesMapper extends Mapper<Object, Text, Text, NullWritable> {
Set<Text> set = new HashSet<Text>();
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split("\\s+");
if (fields.length != 4) {
return;
}
set.add(new Text(fields[0]));
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
for (Text value : set) {
context.write(value, NullWritable.get());
}
}
}
public static class RemoveDuplicatesReducer extends Reducer<Text, Text, Text, NullWritable> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException,
InterruptedException {
context.write(key, NullWritable.get());
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "RemoveDuplicates");
job.setJarByClass(RemoveDuplicates.class);
job.setMapperClass(RemoveDuplicatesMapper.class);
job.setReducerClass(RemoveDuplicatesReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true)?0:1);
}
}
/**
* 连接,除了去除不在右表中的User外与ReduceJoin一样
**/
public class SemiJoin {
public static class SemiJoinMapper extends Mapper<Object,Text,Text,Text>{
Set<String> set = new HashSet<String>();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
BufferedReader br = new BufferedReader(new FileReader("UniqueUsers"));
String line = null;
while((line = br.readLine()) != null){
if(!line.trim().equals("")){
set.add(line.trim());
}
}
br.close();
}
@Override
protected void map(Object key, Text value, Context context)throws IOException, InterruptedException {
FileSplit split = (FileSplit)context.getInputSplit();
String path = split.getPath().toString();
String line = value.toString();
String[] fields = line.split("\\s+");
if(path.contains("t1")){
if(fields.length!=2){
return;
}
if(set.contains(fields[0])){
context.write(new Text(fields[0]), new Text("u#"+fields[1]));
}
}else if(path.contains("t2")){
if(fields.length!=4){
return;
}
context.write(new Text(fields[0]), new Text("l#"+fields[2]+"\t"+fields[3]));
}else{
context.getCounter("MapStage","Invalid Records").increment(1);
}
}
}
public static class SemiJoinReducer extends Reducer<Text,Text,Text,Text>{
private List<String> listT1 = new ArrayList<String>();
private List<String> listT2 = new ArrayList<String>();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {
for(Text value:values){
String[] fields = value.toString().split("#");
if("u".equals(fields[0])){
listT1.add(fields[1]);
}
if("l".equals(fields[0])){
listT2.add(fields[1]);
}
}
for(String t1:listT1){
for(String t2:listT2){
context.write(key, new Text(t1+"\t"+t2));
}
}
}
}
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
DistributedCache.addCacheFile(new URI("/user/zhanghu/cache/UniqueUsers#UniqueUsers"),conf);
Job job = new Job(conf,"SemiJoin");
job.setJarByClass(SemiJoin.class);
job.setMapperClass(SemiJoinMapper.class);
job.setReducerClass(SemiJoinReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true)?0:1);
}
}
改进方案
- 第二步中还是用到了ReduceJoin所以还是需要传输较多数据
- 前提经过过滤后的用户表可以被全部放入到cache中
- 实现方案
- 对右表中的UserID字段进行去重,保存在UniquUsers中
- 以UniqueUsers作为cache对Users表进行过滤,得到FilteredUsers
- 以FiltereddUsers作为cache,与UserLog进行Map端连接
- 改进方案的特点
- 优点:三个步骤全部只有Map,没有Shuffle阶段,完全并行
- 缺点:需要启动三个作业,且要多次读入Cache,如果Cache比较大得不偿失