Hadoop Join

简介: Hadoop 中的join分为三种Reduce端join,适合于两个大表Map端join,适合一个大表和一个小表,小表放到 Distribute Cache里面semi join 当join只用到其中一个表中的一小...

Hadoop 中的join分为三种

  • Reduce端join,适合于两个大表
  • Map端join,适合一个大表和一个小表,小表放到 Distribute Cache里面
  • semi join 当join只用到其中一个表中的一小部分时

Reduce端join

  • 读入两个大表,对value按文件进行标记
  • 在Reduce端收集属于不同文件的value到不同的list,对同一key的不同list中的value做笛卡尔积
  • Logger 用来记录错误
  • Counter 用来记数想要的一些数据
  • configuration context用来传递数据
public class ReduceJoin {
    private static final String DELIMITER = "\\s+";
    private static final Logger LOG = Logger.getLogger(ReduceJoin.class);
    public static class JoinMapper extends Mapper<Object, Text, Text, Text> {
        @Override
        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            FileSplit split = (FileSplit) context.getInputSplit();
            String path = split.getPath().toString();
            Configuration conf = context.getConfiguration();
            String t1 = conf.get("t1FileName");
            String t2 = conf.get("t2FileName");
            String line = value.toString();
            if (line == null || line.trim().equals("")) {
                return;
            }
            String[] values = line.split(DELIMITER);
            if (path.contains(t1)) {
                if (values.length != 2) {
                    LOG.error("t1 Number of Fields Error");
                    return;
                }
                context.getCounter("MapStage", "t1 read records").increment(1);
                context.write(new Text(values[0]), new Text("u#" + values[1]));
            } else if (path.contains(t2)) {
                if (values.length != 4) {
                    LOG.error("t2 Number of Fields Error");
                    return;
                }
                context.getCounter("MapStage", "t2 read records").increment(1);
                context.write(new Text(values[0]), new Text("l#" + values[2] + "\t" + values[3]));
            } else {
                context.getCounter("MapStage", "map filtered records").increment(1);
            }
        }
    }
    public static class JoinReducer extends Reducer<Text, Text, Text, Text> {
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException,
                InterruptedException {
            List<String> t1 = new ArrayList<String>();
            List<String> t2 = new ArrayList<String>();
            for (Text value : values) {
                String[] fields = value.toString().split("#");
                if (fields.length != 2) {
                    continue;
                }
                if (fields[0].equals("u")) {
                    t1.add(fields[1]);
                } else if (fields[0].equals("l")) {
                    t2.add(fields[1]);
                } else {
                    continue;
                }
            }
            for (String it1 : t1) {
                for (String it2 : t2) {
                    context.write(key, new Text(it1 + "\t" + it2));
                }
            }
        }
    }
    public static void main(String[] args) throws Exception {
        if (args.length != 4) {
            return;
        }
        Configuration conf = new Configuration();
        conf.set("t1FileName", args[2]);
        conf.set("t2FileName", args[3]);
        Job job = new Job(conf, "join");
        job.setJarByClass(ReduceJoin.class);
        job.setMapperClass(JoinMapper.class);
        job.setReducerClass(JoinReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}  

Map端join

  • 适用于一大一小两个表
  • 小表装进Distribute Cache里
public class MapJoin {
    private static final Logger LOG = Logger.getLogger(MapJoin.class);

    protected static class MapJoinMapper extends Mapper<Object,Text,Text,Text>{
        private Map<String,String> map = new HashMap<String,String>();
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            BufferedReader br = new BufferedReader(new FileReader("t1"));
            String line = null;
            while((line = br.readLine())!=null){
                if(line == null || line.equals("")){
                    return;
                }
                String[] fields = line.split("\\s+");
                if(fields.length!=2){
                    context.getCounter("MapStage","Input Record Fields Count Error").increment(1);
                    return;
                }
                map.put(fields[0], fields[1]);
            }
            br.close();
        }
        @Override
        protected void map(Object key, Text value, Context context)throws IOException, InterruptedException {
            String line = value.toString();
            if(line == null || line.equals("")){
                return;
            }
            String[] fields = line.split("\\s+");
            if(fields.length!=4){
                context.getCounter("ReduceStage","Map output Record Fields Count Error").increment(1);
            }
            if(map.containsKey(fields[0])){
                context.write(new Text(fields[0]), new Text(map.get(fields[0])+"\t"+fields[2]+"\t"+fields[3]));
            }
        }
    }

    public static void main(String[] args) throws Exception{
        Configuration conf = new Configuration();
        DistributedCache.addCacheFile(new URI("hdfs://namenode/user/zhanghu/cache/t1#t1"), conf);
        Job job = new Job(conf,"MapJoin");
        job.setJarByClass(MapJoin.class);
        job.setMapperClass(MapJoinMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true)?0:1);
    }
}  

Semi Join

  • 在map端进行数据过滤,只传输参与join的数据,减少shuffle阶段网络传输量
  • 前提是存在于Logs中的UserId字段可以被放入到Cache中
  • 实现方法
    • 首先对右表中的UserId字段进行去重,保存在UniqueUsers
    • 利用DistributeCache去除User表中UserId不在右表中的数据
/**
* 去重
**/
public class RemoveDuplicates {
    public static class RemoveDuplicatesMapper extends Mapper<Object, Text, Text, NullWritable> {
        Set<Text> set = new HashSet<Text>();
        @Override
        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] fields = line.split("\\s+");
            if (fields.length != 4) {
                return;
            }
            set.add(new Text(fields[0]));
        }
        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {
            for (Text value : set) {
                context.write(value, NullWritable.get());
            }
        }
    }
    public static class RemoveDuplicatesReducer extends Reducer<Text, Text, Text, NullWritable> {
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException,
                InterruptedException {
            context.write(key, NullWritable.get());
        }
    }
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = new Job(conf, "RemoveDuplicates");
        job.setJarByClass(RemoveDuplicates.class);
        job.setMapperClass(RemoveDuplicatesMapper.class);
        job.setReducerClass(RemoveDuplicatesReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true)?0:1);
    }
}
/**
* 连接,除了去除不在右表中的User外与ReduceJoin一样
**/
public class SemiJoin {
    public static class SemiJoinMapper extends Mapper<Object,Text,Text,Text>{
        Set<String> set = new HashSet<String>();
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            BufferedReader br = new BufferedReader(new FileReader("UniqueUsers"));
            String line = null;
            while((line = br.readLine()) != null){
                if(!line.trim().equals("")){
                    set.add(line.trim());
                }
            }
            br.close();
        }
        @Override
        protected void map(Object key, Text value, Context context)throws IOException, InterruptedException {
            FileSplit split = (FileSplit)context.getInputSplit();
            String path = split.getPath().toString();
            String line = value.toString();
            String[] fields = line.split("\\s+");
            if(path.contains("t1")){
                if(fields.length!=2){
                    return;
                }
                if(set.contains(fields[0])){
                    context.write(new Text(fields[0]), new Text("u#"+fields[1]));
                }
            }else if(path.contains("t2")){
                if(fields.length!=4){
                    return;
                }
                context.write(new Text(fields[0]), new Text("l#"+fields[2]+"\t"+fields[3]));
            }else{
                context.getCounter("MapStage","Invalid Records").increment(1);
            }
        }
    }

    public static class SemiJoinReducer extends Reducer<Text,Text,Text,Text>{
        private List<String> listT1 = new ArrayList<String>();
        private List<String> listT2    = new ArrayList<String>();
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {
            for(Text value:values){
                String[] fields = value.toString().split("#");
                if("u".equals(fields[0])){
                    listT1.add(fields[1]);
                }
                if("l".equals(fields[0])){
                    listT2.add(fields[1]);
                }
            }
            for(String t1:listT1){
                for(String t2:listT2){
                    context.write(key, new Text(t1+"\t"+t2));
                }
            }
        }
    }

    public static void main(String[] args) throws Exception{
        Configuration conf = new Configuration();
        DistributedCache.addCacheFile(new URI("/user/zhanghu/cache/UniqueUsers#UniqueUsers"),conf);
        Job job = new Job(conf,"SemiJoin");
        job.setJarByClass(SemiJoin.class);
        job.setMapperClass(SemiJoinMapper.class);
        job.setReducerClass(SemiJoinReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true)?0:1);
    }
}

改进方案

  • 第二步中还是用到了ReduceJoin所以还是需要传输较多数据
  • 前提经过过滤后的用户表可以被全部放入到cache中
  • 实现方案
    • 对右表中的UserID字段进行去重,保存在UniquUsers中
    • 以UniqueUsers作为cache对Users表进行过滤,得到FilteredUsers
    • 以FiltereddUsers作为cache,与UserLog进行Map端连接
  • 改进方案的特点
    • 优点:三个步骤全部只有Map,没有Shuffle阶段,完全并行
    • 缺点:需要启动三个作业,且要多次读入Cache,如果Cache比较大得不偿失
目录
相关文章
|
3月前
|
SQL 分布式计算 Java
Hadoop-11-MapReduce JOIN 操作的Java实现 Driver Mapper Reducer具体实现逻辑 模拟SQL进行联表操作
Hadoop-11-MapReduce JOIN 操作的Java实现 Driver Mapper Reducer具体实现逻辑 模拟SQL进行联表操作
55 3
|
数据采集 分布式计算 搜索推荐
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(一)
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(一)
|
数据采集 缓存 分布式计算
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(二)
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(二)
|
缓存 分布式计算 Hadoop
hadoop之Map join和Reduce join (13)
hadoop之Map join和Reduce join (13)
151 0
hadoop之Map join和Reduce join (13)
|
分布式计算 Hadoop Java
|
3月前
|
分布式计算 Kubernetes Hadoop
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
204 6

相关实验场景

更多