32 MAPREDUCE的map端join算法实现

简介: 32 MAPREDUCE的map端join算法实现
原理阐述

适用于关联表中有小表的情形;

可以将小表分发到所有的map节点,这样,map节点就可以在本地对自己所读到的大表数据进行join并输出最终结果,可以大大提高join操作的并发度,加快处理速度。

实现示例

1.在mapper类中预先定义好小表,进行join

2.引入实际场景中的解决方案:一次加载数据库或者用distributedcache

public class TestDistributedCache {
  static class TestDistributedCacheMapper extends Mapper<LongWritable, Text, Text, Text>{
    FileReader in = null;
    BufferedReader reader = null;
    HashMap<String,String> b_tab = new HashMap<String, String>();
    String localpath =null;
    String uirpath = null;
    //是在map任务初始化的时候调用一次
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
      //通过这几句代码可以获取到cache file的本地绝对路径,测试验证用
      Path[] files = context.getLocalCacheFiles();
      localpath = files[0].toString();
      URI[] cacheFiles = context.getCacheFiles();
      //缓存文件的用法——直接用本地IO来读取
      //这里读的数据是map task所在机器本地工作目录中的一个小文件
      in = new FileReader("b.txt");
      reader =new BufferedReader(in);
      String line =null;
      while(null!=(line=reader.readLine())){
        String[] fields = line.split(",");
        b_tab.put(fields[0],fields[1]);
      }
      IOUtils.closeStream(reader);
      IOUtils.closeStream(in);
    }
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
      //这里读的是这个map task所负责的那一个切片数据(在hdfs上)
       String[] fields = value.toString().split("\t");
       String a_itemid = fields[0];
       String a_amount = fields[1];
       String b_name = b_tab.get(a_itemid);
       // 输出结果  1001  98.9  banan
       context.write(new Text(a_itemid), new Text(a_amount + "\t" + ":" + localpath + "\t" +b_name ));
    }
  }
  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf);
    job.setJarByClass(TestDistributedCache.class);
    job.setMapperClass(TestDistributedCacheMapper.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(LongWritable.class);
    //这里是我们正常的需要处理的数据所在路径
    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    //不需要reducer
    job.setNumReduceTasks(0);
    //分发一个文件到task进程的工作目录
    job.addCacheFile(new URI("hdfs://hadoop-server01:9000/cachefile/b.txt"));
    //分发一个归档文件到task进程的工作目录
//    job.addArchiveToClassPath(archive);
    //分发jar包到task节点的classpath下
//    job.addFileToClassPath(jarfile);
    job.waitForCompletion(true);
  }
}


目录
相关文章
|
6月前
|
SQL 算法 关系型数据库
深入理解MySQL中的Join算法
在数据库处理中,Join操作是最基本且最重要的操作之一,它能将不同的表连接起来,实现对数据集的更深层次分析。
341 8
深入理解MySQL中的Join算法
|
2月前
|
算法 测试技术 C++
【动态规划】【map】【C++算法】1289. 下降路径最小和 II
【动态规划】【map】【C++算法】1289. 下降路径最小和 II
|
3月前
|
算法 测试技术 C#
【map】【滑动窗口】【字典树】C++算法:最长合法子字符串的长度
【map】【滑动窗口】【字典树】C++算法:最长合法子字符串的长度
|
3月前
|
算法 测试技术 C#
【map】【滑动窗口】C++算法:最小区间
【map】【滑动窗口】C++算法:最小区间
|
3月前
|
分布式计算
MapReduce中的Map和Reduce函数分别是什么作用?
MapReduce中的Map和Reduce函数分别是什么作用?
42 0
|
3月前
|
缓存 分布式计算 Java
MapReduce编程:join操作和聚合操作
MapReduce编程:join操作和聚合操作
32 0
|
8月前
|
数据采集 分布式计算 搜索推荐
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(一)
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(一)
|
5月前
|
存储 SQL 分布式计算
31 MAPREDUCE的reduce端join算法实现
31 MAPREDUCE的reduce端join算法实现
19 0
|
8月前
|
SQL 算法 关系型数据库
MySQL中的Join 的算法(NLJ、BNL、BKA)
MySQL中的Join 的算法(NLJ、BNL、BKA)
149 0
|
8月前
|
数据采集 缓存 分布式计算
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(二)
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(二)