Hadoop MapReduce 保姆级吐血宝典,学习与面试必读此文!(三)

简介: Hadoop MapReduce 宝典

8. Map端实现 JOIN


8.1 概述


适用于关联表中有小表的情形.


使用分布式缓存,可以将小表分发到所有的map节点,这样,map节点就可以在本地对自己所读到的大表数据进行join并输出最终结果,可以大大提高join操作的并发度,加快处理速度


8.2 实现步骤


先在mapper类中预先定义好小表,进行join

引入实际场景中的解决方案:一次加载数据库


  1. 定义Mapper


import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
public class MapperTask extends Mapper<LongWritable, Text, Text, Text> {
    private Map<String,String> map = new HashMap<>();
    // 初始化的方法, 只会被初始化一次
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        URI[] cacheFiles = DistributedCache.getCacheFiles(context.getConfiguration());
        URI fileURI = cacheFiles[0];
        FileSystem fs = FileSystem.get(fileURI, context.getConfiguration());
        FSDataInputStream inputStream = fs.open(new Path(fileURI));
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
        String readLine  ="";
        while ((readLine = bufferedReader.readLine() ) != null  ) {
            // readlLine:  product一行数据
            String[] split = readLine.split(",");
            String pid = split[0];
            map.put(pid,split[1]+"\t"+split[2]+"\t"+split[3]);
        }
    }
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //1. 读取一行数据: orders数据
        String line = value.toString();
        //2. 切割
        String[] split = line.split(",");
        String pid = split[2];
        //3. 到map中获取商品信息:
        String product = map.get(pid);
        //4. 发送给reduce: 输出
        context.write(new Text(pid),new Text(split[0]+"\t"+split[1]+"\t"+product +"\t"+split[3]));
    }
}


  1. 定义主类


import com.itheima.join.reduce.JobReduceJoinMain;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.net.URI;
public class JobMapperJoinMain extends Configured implements Tool{
    @Override
    public int run(String[] args) throws Exception {
        //设置缓存的位置, 必须在run的方法的最前, 如果放置在job任务创建后, 将无效
        // 缓存文件的路径, 必须存储在hdfs上, 否则也是无效的
        DistributedCache.addCacheFile(new URI("hdfs://node01:8020/cache/pdts.txt"),super.getConf());
        //1. 获取job 任务
        Job job = Job.getInstance(super.getConf(), "jobMapperJoinMain");
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job,new Path("E:\\传智工作\\上课\\北京大数据30期\\大数据第六天\\资料\\map端join\\map_join_iput"));
        job.setMapperClass(MapperTask.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job,new Path("E:\\传智工作\\上课\\北京大数据30期\\大数据第六天\\资料\\map端join\\out_put_map"));
        boolean b = job.waitForCompletion(true);
        return b?0:1;
    }
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        JobMapperJoinMain jobMapperJoinMain = new JobMapperJoinMain();
        int i = ToolRunner.run(conf, jobMapperJoinMain, args);
        System.exit(i);
    }
}


9. 社交粉丝数据分析


9.1 需求分析


以下是qq的好友列表数据,冒号前是一个用户,冒号后是该用户的所有好友(数据中的好友关系是单向的)


A:B,C,D,F,E,O
B:A,C,E,K
C:A,B,D,E,I 
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J


求出哪些人两两之间有共同好友,及他俩的共同好友都有谁?


【解题思路】


第一步  
map
读一行   A:B,C,D,F,E,O
输出    <B,A><C,A><D,A><F,A><E,A><O,A>
在读一行   B:A,C,E,K
输出   <A,B><C,B><E,B><K,B>
REDUCE
拿到的数据比如<C,A><C,B><C,E><C,F><C,G>......
输出:  
<A-B,C>
<A-E,C>
<A-F,C>
<A-G,C>
<B-E,C>
<B-F,C>.....
第二步
map
读入一行<A-B,C>
直接输出<A-B,C>
reduce
读入数据  <A-B,C><A-B,F><A-B,G>.......
输出: A-B  C,F,G,.....


9.2 实现步骤


第一个MapReduce代码实现


【Mapper类】


public class Step1Mapper extends Mapper<LongWritable,Text,Text,Text> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
         //1:以冒号拆分行文本数据: 冒号左边就是V2
        String[] split = value.toString().split(":");
        String userStr = split[0];
        //2:将冒号右边的字符串以逗号拆分,每个成员就是K2
        String[] split1 = split[1].split(",");
        for (String s : split1) {
            //3:将K2和v2写入上下文中
            context.write(new Text(s), new Text(userStr));
        }
    }
}


【Reducer类】


public class Step1Reducer extends Reducer<Text,Text,Text,Text> {
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        //1:遍历集合,并将每一个元素拼接,得到K3
        StringBuffer buffer = new StringBuffer();
        for (Text value : values) {
            buffer.append(value.toString()).append("-");
        }
        //2:K2就是V3
        //3:将K3和V3写入上下文中
        context.write(new Text(buffer.toString()), key);
    }
}


JobMain:


public class JobMain extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        //1:获取Job对象
        Job job = Job.getInstance(super.getConf(), "common_friends_step1_job");
        //2:设置job任务
            //第一步:设置输入类和输入路径
            job.setInputFormatClass(TextInputFormat.class);
            TextInputFormat.addInputPath(job, new Path("file:///D:\\input\\common_friends_step1_input"));
            //第二步:设置Mapper类和数据类型
            job.setMapperClass(Step1Mapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            //第三,四,五,六
            //第七步:设置Reducer类和数据类型
            job.setReducerClass(Step1Reducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            //第八步:设置输出类和输出的路径
            job.setOutputFormatClass(TextOutputFormat.class);
            TextOutputFormat.setOutputPath(job, new Path("file:///D:\\out\\common_friends_step1_out"));
        //3:等待job任务结束
        boolean bl = job.waitForCompletion(true);
        return bl ? 0: 1;
    }
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        //启动job任务
        int run = ToolRunner.run(configuration, new JobMain(), args);
        System.exit(run);
    }
}


第二个MapReduce代码实现


【Mapper类】


public class Step2Mapper extends Mapper<LongWritable,Text,Text,Text> {
    /*
     K1           V1
     0            A-F-C-J-E-  B
    ----------------------------------
     K2             V2
     A-C            B
     A-E            B
     A-F            B
     C-E            B
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //1:拆分行文本数据,结果的第二部分可以得到V2
        String[] split = value.toString().split("\t");
        String   friendStr =split[1];
        //2:继续以'-'为分隔符拆分行文本数据第一部分,得到数组
        String[] userArray = split[0].split("-");
        //3:对数组做一个排序
        Arrays.sort(userArray);
        //4:对数组中的元素进行两两组合,得到K2
        /*
          A-E-C ----->  A  C  E
          A  C  E
            A  C  E
         */
        for (int i = 0; i <userArray.length -1 ; i++) {
            for (int j = i+1; j  < userArray.length ; j++) {
                //5:将K2和V2写入上下文中
                context.write(new Text(userArray[i] +"-"+userArray[j]), new Text(friendStr));
            }
        }
    }
}


【Reducer类】


public class Step2Reducer extends Reducer<Text,Text,Text,Text> {
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        //1:原来的K2就是K3
        //2:将集合进行遍历,将集合中的元素拼接,得到V3
        StringBuffer buffer = new StringBuffer();
        for (Text value : values) {
            buffer.append(value.toString()).append("-");
        }
        //3:将K3和V3写入上下文中
        context.write(key, new Text(buffer.toString()));
    }
}


【JobMain】


public class JobMain extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        //1:获取Job对象
        Job job = Job.getInstance(super.getConf(), "common_friends_step2_job");
        //2:设置job任务
            //第一步:设置输入类和输入路径
            job.setInputFormatClass(TextInputFormat.class);
            TextInputFormat.addInputPath(job, new Path("file:///D:\\out\\common_friends_step1_out"));
            //第二步:设置Mapper类和数据类型
            job.setMapperClass(Step2Mapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            //第三,四,五,六
            //第七步:设置Reducer类和数据类型
            job.setReducerClass(Step2Reducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            //第八步:设置输出类和输出的路径
            job.setOutputFormatClass(TextOutputFormat.class);
            TextOutputFormat.setOutputPath(job, new Path("file:///D:\\out\\common_friends_step2_out"));
        //3:等待job任务结束
        boolean bl = job.waitForCompletion(true);
        return bl ? 0: 1;
    }
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        //启动job任务
        int run = ToolRunner.run(configuration, new JobMain(), args);
        System.exit(run);
    }
}


10. 倒排索引建立


10.1 需求分析


需求:有大量的文本(文档、网页),需要建立搜索索引


思路分析:


首选将文档的内容全部读取出来,加上文档的名字作为key,文档的value为1,组织成这样的一种形式的数据


map端数据输出:


hello-a.txt  1
hello-a.txt 1
hello-a.txt 1


reduce端数据输出:


hello-a.txt 3


10.2 代码实现


public class IndexCreate extends Configured implements Tool {
    public static void main(String[] args) throws Exception {
        ToolRunner.run(new Configuration(),new IndexCreate(),args);
    }
    @Override
    public int run(String[] args) throws Exception {
        Job job = Job.getInstance(super.getConf(), IndexCreate.class.getSimpleName());
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job,new Path("file:///D:\\倒排索引\\input"));
        job.setMapperClass(IndexCreateMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setReducerClass(IndexCreateReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job,new Path("file:///D:\\倒排索引\\outindex"));
        boolean bool = job.waitForCompletion(true);
        return bool?0:1;
    }
    public static class IndexCreateMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
        Text text = new Text();
        IntWritable v = new IntWritable(1);
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //获取文件切片
            FileSplit fileSplit  = (FileSplit) context.getInputSplit();
            //通过文件切片获取文件名
            String name = fileSplit.getPath().getName();
            String line = value.toString();
            String[] split = line.split(" ");
            //输出 单词--文件名作为key  value是1
            for (String word : split) {
               text.set(word+"--"+name);
                context.write(text,v);
            }
        }
    }
    public static class IndexCreateReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
        IntWritable value = new IntWritable();
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int count = 0;
            for (IntWritable value : values) {
                count += value.get();
            }
            value.set(count);
            context.write(key,value);
        }
    }
}
相关实践学习
简单用户画像分析
本场景主要介绍基于海量日志数据进行简单用户画像分析为背景,如何通过使用DataWorks完成数据采集 、加工数据、配置数据质量监控和数据可视化展现等任务。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
相关文章
|
4月前
|
分布式计算 Hadoop
Hadoop系列 mapreduce 原理分析
Hadoop系列 mapreduce 原理分析
40 1
|
4月前
|
存储 分布式计算 负载均衡
【大数据技术Hadoop+Spark】MapReduce概要、思想、编程模型组件、工作原理详解(超详细)
【大数据技术Hadoop+Spark】MapReduce概要、思想、编程模型组件、工作原理详解(超详细)
62 0
|
18天前
|
存储 分布式计算 监控
Hadoop【基础知识 01+02】【分布式文件系统HDFS设计原理+特点+存储原理】(部分图片来源于网络)【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
【4月更文挑战第3天】【分布式文件系统HDFS设计原理+特点+存储原理】(部分图片来源于网络)【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
69 2
|
16天前
|
分布式计算 Hadoop 大数据
Hadoop【hadoop学习大纲完全总结01+02+03+04+05】【自学阶段整理的xmind思维导图分享】【点击可放大看高清】
【4月更文挑战第5天】Hadoop【hadoop学习大纲完全总结01+02+03+04+05】【自学阶段整理的xmind思维导图分享】【点击可放大看高清】
37 5
|
18天前
|
分布式计算 监控 Hadoop
Hadoop【基础知识 02】【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
【4月更文挑战第3天】Hadoop【基础知识 02】【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
55 0
|
19天前
|
分布式计算 资源调度 监控
Hadoop生态系统深度剖析:面试经验与必备知识点解析
本文深入探讨了Hadoop生态系统的面试重点,涵盖Hadoop架构、HDFS、YARN和MapReduce。了解Hadoop的主从架构、HDFS的读写流程及高级特性,YARN的资源管理与调度,以及MapReduce编程模型。通过代码示例,如HDFS文件操作和WordCount程序,帮助读者巩固理解。此外,文章强调在面试中应结合个人经验、行业动态和技术进展展示技术实力。
|
2月前
|
存储 缓存 NoSQL
揭秘一线大厂Redis面试高频考点(3万字长文、吐血整理)
揭秘一线大厂Redis面试高频考点(3万字长文、吐血整理)
452 5
揭秘一线大厂Redis面试高频考点(3万字长文、吐血整理)
|
3月前
|
存储 分布式计算 监控
Hadoop的JobTracker和TaskTracker在MapReduce中的作用是什么?
Hadoop的JobTracker和TaskTracker在MapReduce中的作用是什么?
56 0
|
4月前
|
存储 分布式计算 资源调度
2022年Hadoop面试题最全整理,两万字干货分享【建议收藏】
2022年Hadoop面试题最全整理,两万字干货分享【建议收藏】
142 0
|
4月前
|
存储 SQL 分布式计算
Hadoop(HDFS+MapReduce+Hive+数仓基础概念)学习笔记(自用)
Hadoop(HDFS+MapReduce+Hive+数仓基础概念)学习笔记(自用)
268 0