37 MAPREDUCE中的DistributedCache应用

简介: 37 MAPREDUCE中的DistributedCache应用
map端join案例需求

实现两个“表”的join操作,其中一个表数据量小,一个表很大,这种场景在实际中非常常见,比如“订单日志” join “产品信息”。

分析

原理阐述:

  • 适用于关联表中有小表的情形;
  • 可以将小表分发到所有的map节点,这样,map节点就可以在本地对自己所读到的大表数据进行join并输出最终结果
  • 可以大大提高join操作的并发度,加快处理速度

示例:

先在mapper类中预先定义好小表,进行join

。并用distributedcache机制将小表的数据分发到每一个maptask执行节点,从而每一个maptask节点可以从本地加载到小表的数据,进而在本地即可实现join。

实现
public class TestDistributedCache {
  static class TestDistributedCacheMapper extends Mapper<LongWritable, Text, Text, Text>{
    FileReader in = null;
    BufferedReader reader = null;
    HashMap<String,String> b_tab = new HashMap<String, String>();
    String localpath =null;
    String uirpath = null;
    //是在map任务初始化的时候调用一次
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
      //通过这几句代码可以获取到cache file的本地绝对路径,测试验证用
      Path[] files = context.getLocalCacheFiles();
      localpath = files[0].toString();
      URI[] cacheFiles = context.getCacheFiles();
      //缓存文件的用法——直接用本地IO来读取
      //这里读的数据是map task所在机器本地工作目录中的一个小文件
      in = new FileReader("b.txt");
      reader =new BufferedReader(in);
      String line =null;
      while(null!=(line=reader.readLine())){
        String[] fields = line.split(",");
        b_tab.put(fields[0],fields[1]);
      }
      IOUtils.closeStream(reader);
      IOUtils.closeStream(in);
    }
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
      //这里读的是这个map task所负责的那一个切片数据(在hdfs上)
       String[] fields = value.toString().split("\t");
       String a_itemid = fields[0];
       String a_amount = fields[1];
       String b_name = b_tab.get(a_itemid);
       // 输出结果  1001  98.9  banan
       context.write(new Text(a_itemid), new Text(a_amount + "\t" + ":" + localpath + "\t" +b_name ));
    }
  }
  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf);
    job.setJarByClass(TestDistributedCache.class);
    job.setMapperClass(TestDistributedCacheMapper.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(LongWritable.class);
    //这里是我们正常的需要处理的数据所在路径
    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    //不需要reducer
    job.setNumReduceTasks(0);
    //分发一个文件到task进程的工作目录
    job.addCacheFile(new URI("hdfs://hadoop-server01:9000/cachefile/b.txt"));
    //分发一个归档文件到task进程的工作目录
//    job.addArchiveToClassPath(archive);
    //分发jar包到task节点的classpath下
//    job.addFileToClassPath(jarfile);
    job.waitForCompletion(true);
  }
}


目录
相关文章
|
分布式计算 数据处理
38 MAPREDUCE中的其他应用
38 MAPREDUCE中的其他应用
91 0
|
4月前
|
分布式计算 并行计算 算法
MapReduce在实现PageRank算法中的应用
总结来说,在实现PageRank算法时使用MapReduce能够有效地进行大规模并行计算,并且具有良好的容错性和可扩展性。
178 76
|
数据采集 分布式计算 搜索推荐
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(一)
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(一)
|
数据采集 缓存 分布式计算
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(二)
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(二)
|
机器学习/深度学习 分布式计算 监控
Hadoop生态系统中的数据处理技术:MapReduce的原理与应用
Hadoop生态系统中的数据处理技术:MapReduce的原理与应用
|
存储 编解码 分布式计算
云计算与大数据实验六 MapReduce综合应用
云计算与大数据实验六 MapReduce综合应用
425 0
|
存储 分布式计算 监控
|
9月前
|
分布式计算 资源调度 Hadoop
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
235 3

热门文章

最新文章