32 MAPREDUCE的map端join算法实现

简介: 32 MAPREDUCE的map端join算法实现
原理阐述

适用于关联表中有小表的情形;

可以将小表分发到所有的map节点,这样,map节点就可以在本地对自己所读到的大表数据进行join并输出最终结果,可以大大提高join操作的并发度,加快处理速度。

实现示例

1.在mapper类中预先定义好小表,进行join

2.引入实际场景中的解决方案:一次加载数据库或者用distributedcache

public class TestDistributedCache {
  static class TestDistributedCacheMapper extends Mapper<LongWritable, Text, Text, Text>{
    FileReader in = null;
    BufferedReader reader = null;
    HashMap<String,String> b_tab = new HashMap<String, String>();
    String localpath =null;
    String uirpath = null;
    //是在map任务初始化的时候调用一次
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
      //通过这几句代码可以获取到cache file的本地绝对路径,测试验证用
      Path[] files = context.getLocalCacheFiles();
      localpath = files[0].toString();
      URI[] cacheFiles = context.getCacheFiles();
      //缓存文件的用法——直接用本地IO来读取
      //这里读的数据是map task所在机器本地工作目录中的一个小文件
      in = new FileReader("b.txt");
      reader =new BufferedReader(in);
      String line =null;
      while(null!=(line=reader.readLine())){
        String[] fields = line.split(",");
        b_tab.put(fields[0],fields[1]);
      }
      IOUtils.closeStream(reader);
      IOUtils.closeStream(in);
    }
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
      //这里读的是这个map task所负责的那一个切片数据(在hdfs上)
       String[] fields = value.toString().split("\t");
       String a_itemid = fields[0];
       String a_amount = fields[1];
       String b_name = b_tab.get(a_itemid);
       // 输出结果  1001  98.9  banan
       context.write(new Text(a_itemid), new Text(a_amount + "\t" + ":" + localpath + "\t" +b_name ));
    }
  }
  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf);
    job.setJarByClass(TestDistributedCache.class);
    job.setMapperClass(TestDistributedCacheMapper.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(LongWritable.class);
    //这里是我们正常的需要处理的数据所在路径
    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    //不需要reducer
    job.setNumReduceTasks(0);
    //分发一个文件到task进程的工作目录
    job.addCacheFile(new URI("hdfs://hadoop-server01:9000/cachefile/b.txt"));
    //分发一个归档文件到task进程的工作目录
//    job.addArchiveToClassPath(archive);
    //分发jar包到task节点的classpath下
//    job.addFileToClassPath(jarfile);
    job.waitForCompletion(true);
  }
}


目录
相关文章
|
6月前
|
算法 5G
基于LDPC编译码和FP-MAP球形检测算法的协作MIMO系统误码率matlab仿真
基于LDPC编译码和FP-MAP球形检测算法的协作MIMO系统误码率matlab仿真
|
1月前
|
SQL 分布式计算 Java
Hadoop-11-MapReduce JOIN 操作的Java实现 Driver Mapper Reducer具体实现逻辑 模拟SQL进行联表操作
Hadoop-11-MapReduce JOIN 操作的Java实现 Driver Mapper Reducer具体实现逻辑 模拟SQL进行联表操作
31 3
|
6月前
|
存储 分布式计算 算法
【底层服务/编程功底系列】「大数据算法体系」带你深入分析MapReduce算法 — Shuffle的执行过程
【底层服务/编程功底系列】「大数据算法体系」带你深入分析MapReduce算法 — Shuffle的执行过程
93 0
|
4月前
|
人工智能 算法 大数据
算法金 | 推导式、生成器、向量化、map、filter、reduce、itertools,再见 for 循环
这篇内容介绍了编程中避免使用 for 循环的一些方法,特别是针对 Python 语言。它强调了 for 循环在处理大数据或复杂逻辑时可能导致的性能、可读性和复杂度问题。
51 6
算法金 | 推导式、生成器、向量化、map、filter、reduce、itertools,再见 for 循环
|
4月前
|
监控 前端开发 SQL
ODPS SQL问题之在何种情况下建议使用Distributed Map Join
ODPS SQL问题之在何种情况下建议使用Distributed Map Join
|
5月前
|
算法 关系型数据库 MySQL
深入理解MySQL中的JOIN算法
深入理解MySQL中的JOIN算法
|
6月前
|
Python
Python内置函数map、split、join讲解
Python内置函数map、split、join讲解
118 0
|
6月前
|
算法 测试技术 C++
【动态规划】【map】【C++算法】1289. 下降路径最小和 II
【动态规划】【map】【C++算法】1289. 下降路径最小和 II
|
6月前
|
算法 测试技术 C#
【map】【滑动窗口】【字典树】C++算法:最长合法子字符串的长度
【map】【滑动窗口】【字典树】C++算法:最长合法子字符串的长度
|
6月前
|
算法 测试技术 C#
【map】【滑动窗口】C++算法:最小区间
【map】【滑动窗口】C++算法:最小区间