Hadoop MapReduce 保姆级吐血宝典,学习与面试必读此文!(一)

简介: Hadoop MapReduce 宝典

Hadoop 涉及的知识点如下图所示,本文将逐一讲解:


image.png


本文档参考了关于 Hadoop 的官网及其他众多资料整理而成,为了整洁的排版及舒适的阅读,对于模糊不清晰的图片及黑白图片进行重新绘制成了高清彩图。


目前企业应用较多的是Hadoop2.x,所以本文是以Hadoop2.x为主,对于Hadoop3.x新增的内容会进行说明!


二、MapReduce



1. MapReduce 介绍


MapReduce思想在生活中处处可见。或多或少都曾接触过这种思想。MapReduce的思想核心是“分而治之”,适用于大量复杂的任务处理场景(大规模数据处理场景)。即使是发布过论文实现分布式计算的谷歌也只是实现了这种思想,而不是自己原创。


  • Map负责“分”,即把复杂的任务分解为若干个“简单的任务”来并行处理。可以进行拆分的前提是这些小任务可以并行计算,彼此间几乎没有依赖关系。


  • Reduce负责“合”,即对map阶段的结果进行全局汇总。


  • MapReduce运行在yarn集群


  1. ResourceManager
  2. NodeManager


这两个阶段合起来正是MapReduce思想的体现。


image.png


还有一个比较形象的语言解释MapReduce:


我们要数图书馆中的所有书。你数1号书架,我数2号书架。这就是“Map”。我们人越多,数书就更快。


现在我们到一起,把所有人的统计数加在一起。这就是“Reduce”。


1.1 MapReduce 设计构思


MapReduce是一个分布式运算程序的编程框架,核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在Hadoop集群上。


既然是做计算的框架,那么表现形式就是有个输入(input),MapReduce操作这个输入(input),通过本身定义好的计算模型,得到一个输出(output)。


对许多开发者来说,自己完完全全实现一个并行计算程序难度太大,而MapReduce就是一种简化并行计算的编程模型,降低了开发并行应用的入门门槛。


Hadoop MapReduce构思体现在如下的三个方面:


  1. 如何对付大数据处理:分而治之


对相互间不具有计算依赖关系的大数据,实现并行最自然的办法就是采取分而治之的策略。并行计算的第一个重要问题是如何划分计算任务或者计算数据以便对划分的子任务或数据块同时进行计算。不可分拆的计算任务或相互间有依赖关系的数据无法进行并行计算!


  1. 构建抽象模型:Map和Reduce


MapReduce借鉴了函数式语言中的思想,用Map和Reduce两个函数提供了高层的并行编程抽象模型。


Map: 对一组数据元素进行某种重复式的处理;


Reduce: 对Map的中间结果进行某种进一步的结果整理。


MapReduce中定义了如下的Map和Reduce两个抽象的编程接口,由用户去编程实现:

map: (k1; v1) → [(k2; v2)]

reduce: (k2; [v2]) → [(k3; v3)]


Map和Reduce为程序员提供了一个清晰的操作接口抽象描述。通过以上两个编程接口,大家可以看出MapReduce处理的数据类型是<key,value>键值对。


  1. MapReduce框架结构


一个完整的mapreduce程序在分布式运行时有三类实例进程:


  • MR AppMaster:负责整个程序的过程调度及状态协调;
  • MapTask:负责map阶段的整个数据处理流程;
  • ReduceTask:负责reduce阶段的整个数据处理流程。


image.png


2. MapReduce 编程规范


MapReduce 的开发一共有八个步骤, 其中 Map 阶段分为 2 个步骤,Shuffle 阶段 4 个步骤,Reduce 阶段分为 2 个步骤


  1. Map 阶段 2 个步骤:


  • 1.1 设置 InputFormat 类, 将数据切分为 Key-Value(K1和V1) 对, 输入到第二步
  • 1.2 自定义 Map 逻辑, 将第一步的结果转换成另外的 Key-Value(K2和V2) 对, 输出结果


  1. Shuffle 阶段 4 个步骤:


  • 2.1 对输出的 Key-Value 对进行分区
  • 2.2 对不同分区的数据按照相同的 Key 排序
  • 2.3 (可选) 对分组过的数据初步规约, 降低数据的网络拷贝
  • 2.4 对数据进行分组, 相同 Key 的 Value 放入一个集合中


  1. Reduce 阶段 2 个步骤:


  • 3.1 对多个 Map 任务的结果进行排序以及合并, 编写 Reduce 函数实现自己的逻辑, 对输入的 Key-Value 进行处理, 转为新的 Key-Value(K3和V3)输出
  • 3.2 设置 OutputFormat 处理并保存 Reduce 输出的 Key-Value 数据


image.png


3. Mapper以及Reducer抽象类介绍


为了开发我们的MapReduce程序,一共可以分为以上八个步骤,其中每个步骤都是一个class类,我们通过job对象将我们的程序组装成一个任务提交即可。为了简化我们的MapReduce程序的开发,每一个步骤的class类,都有一个既定的父类,让我们直接继承即可,因此可以大大简化我们的MapReduce程序的开发难度,也可以让我们快速的实现功能开发。


MapReduce编程当中,其中最重要的两个步骤就是我们的Mapper类和Reducer类


  1. Mapper抽象类的基本介绍


在hadoop2.x当中Mapper类是一个抽象类,我们只需要覆写一个java类,继承自Mapper类即可,然后重写里面的一些方法,就可以实现我们特定的功能,接下来我们来介绍一下Mapper类当中比较重要的四个方法


  1. setup方法
    我们Mapper类当中的初始化方法,我们一些对象的初始化工作都可以放到这个方法里面来实现


  1. map方法
    读取的每一行数据,都会来调用一次map方法,这个方法也是我们最重要的方法,可以通过这个方法来实现我们每一条数据的处理


  1. cleanup方法
    在我们整个maptask执行完成之后,会马上调用cleanup方法,这个方法主要是用于做我们的一些清理工作,例如连接的断开,资源的关闭等等


  1. run方法
    如果我们需要更精细的控制我们的整个MapTask的执行,那么我们可以覆写这个方法,实现对我们所有的MapTask更精确的操作控制


  1. Reducer抽象类基本介绍

同样的道理,在我们的hadoop2.x当中,reducer类也是一个抽象类,抽象类允许我们可以继承这个抽象类之后,重新覆写抽象类当中的方法,实现我们的逻辑的自定义控制。


接下来我们也来介绍一下Reducer抽象类当中的四个抽象方法


  1. setup方法
    在我们的ReduceTask初始化之后马上调用,我们的一些对象的初始化工作,都可以在这个类当中实现


  1. reduce方法
    所有从MapTask发送过来的数据,都会调用reduce方法,这个方法也是我们reduce当中最重要的方法,可以通过这个方法实现我们的数据的处理


  1. cleanup方法
    在我们整个ReduceTask执行完成之后,会马上调用cleanup方法,这个方法主要就是在我们reduce阶段处理做我们一些清理工作,例如连接的断开,资源的关闭等等


  1. run方法
    如果我们需要更精细的控制我们的整个ReduceTask的执行,那么我们可以覆写这个方法,实现对我们所有的ReduceTask更精确的操作控制


4. WordCount示例编写


需求:在一堆给定的文本文件中统计输出每一个单词出现的总次数


node01服务器执行以下命令,准备数,数据格式准备如下:


cd /export/servers
vim wordcount.txt
#添加以下内容:
hello hello
world world
hadoop hadoop
hello world
hello flume
hadoop hive
hive kafka
flume storm
hive oozie

将数据文件上传到hdfs上面去


hdfs dfs -mkdir /wordcount/
hdfs dfs -put wordcount.txt /wordcount/


  • 定义一个mapper类


import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
// mapper程序:  需要继承 mapper类, 需要传入 四个类型:
/*  在hadoop中, 对java的类型都进行包装, 以提高传输的效率  writable
    keyin :  k1   Long     ---- LongWritable
    valin :  v1   String   ------ Text
    keyout : k2   String   ------- Text
    valout : v2   Long     -------LongWritable
 */
public class MapTask extends Mapper<LongWritable,Text,Text,LongWritable> {
    /**
     *
     * @param key  : k1
     * @param value   v1
     * @param context  上下文对象   承上启下功能
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //1. 获取 v1 中数据
        String val = value.toString();
        //2. 切割数据
        String[] words = val.split(" ");
        Text text = new Text();
        LongWritable longWritable = new LongWritable(1);
        //3. 遍历循环, 发给 reduce
        for (String word : words) {
            text.set(word);
            context.write(text,longWritable);
        }
    }
}


  • 定义一个reducer类


import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
 * KEYIN  :  k2    -----Text
 * VALUEIN :  v2   ------LongWritable
 * KEYOUT  : k3    ------  Text
 * VALUEOUT : v3   ------ LongWritable
 */
public class ReducerTask extends Reducer<Text, LongWritable, Text, LongWritable> {
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        //1. 遍历 values 获取每一个值
        long  v3 = 0;
        for (LongWritable longWritable : values) {
            v3 += longWritable.get();  //1
        }
        //2. 输出
        context.write(key,new LongWritable(v3));
    }
}


  • 定义一个主类,用来描述job并提交job


import com.sun.org.apache.bcel.internal.generic.NEW;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
// 任务的执行入口: 将八步组合在一起
public class JobMain extends Configured implements Tool {
    // 在run方法中编写组装八步
    @Override
    public int run(String[] args) throws Exception {
        Job job = Job.getInstance(super.getConf(), "JobMain");
        //如果提交到集群操作. 需要添加一步 : 指定入口类
        job.setJarByClass(JobMain.class);
        //1. 封装第一步:  读取数据
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job,new Path("hdfs://node01:8020/wordcount.txt"));
        //2. 封装第二步:  自定义 map程序
        job.setMapperClass(MapTask.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        //3. 第三步 第四步 第五步 第六步 省略
        //4. 第七步:  自定义reduce程序
        job.setReducerClass(ReducerTask.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        //5) 第八步  : 输出路径是一个目录, 而且这个目录必须不存在的
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job,new Path("hdfs://node01:8020/output"));
        //6) 提交任务:
        boolean flag = job.waitForCompletion(true); // 成功  true  不成功 false
        return flag ? 0 : 1;
    }
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        JobMain jobMain = new JobMain();
        int i = ToolRunner.run(configuration, jobMain, args); //返回值 退出码
        System.exit(i); // 退出程序  0 表示正常  其他值表示有异常 1
    }
}


提醒:代码开发完成之后,就可以打成jar包放到服务器上面去运行了,实际工作当中,都是将代码打成jar包,开发main方法作为程序的入口,然后放到集群上面去运行


5. MapReduce程序运行模式


  • 本地运行模式


  1. mapreduce程序是被提交给LocalJobRunner在本地以单进程的形式运行
  2. 而处理的数据及输出结果可以在本地文件系统,也可以在hdfs上
  3. 怎样实现本地运行?写一个程序,不要带集群的配置文件本质是程序的conf中是否有mapreduce.framework.name=local以及yarn.resourcemanager.hostname=local参数
  4. 本地模式非常便于进行业务逻辑的debug,只要在idea中打断点即可


【本地模式运行代码设置】


configuration.set("mapreduce.framework.name","local");
configuration.set("yarn.resourcemanager.hostname","local");
-----------以上两个是不需要修改的,如果要在本地目录测试, 可有修改hdfs的路径-----------------
TextInputFormat.addInputPath(job,new Path("file:///D:\\wordcount\\input"));
TextOutputFormat.setOutputPath(job,new Path("file:///D:\\wordcount\\output"));


  • 集群运行模式


  1. 将mapreduce程序提交给yarn集群,分发到很多的节点上并发执行


  1. 处理的数据和输出结果应该位于hdfs文件系统


  1. 提交集群的实现步骤:


将程序打成JAR包,然后在集群的任意一个节点上用hadoop命令启动
yarn jar hadoop_hdfs_operate-1.0-SNAPSHOT.jar cn.itcast.hdfs.demo1.JobMain

相关文章
|
3月前
|
存储 缓存 前端开发
No198.精选前端面试题,享受每天的挑战和学习
No198.精选前端面试题,享受每天的挑战和学习
|
3月前
|
前端开发 JavaScript API
No196.精选前端面试题,享受每天的挑战和学习
No196.精选前端面试题,享受每天的挑战和学习
|
3月前
|
存储 JSON 前端开发
No206.精选前端面试题,享受每天的挑战和学习
No206.精选前端面试题,享受每天的挑战和学习
No206.精选前端面试题,享受每天的挑战和学习
|
19天前
|
分布式计算 资源调度 监控
Hadoop生态系统深度剖析:面试经验与必备知识点解析
本文深入探讨了Hadoop生态系统的面试重点,涵盖Hadoop架构、HDFS、YARN和MapReduce。了解Hadoop的主从架构、HDFS的读写流程及高级特性,YARN的资源管理与调度,以及MapReduce编程模型。通过代码示例,如HDFS文件操作和WordCount程序,帮助读者巩固理解。此外,文章强调在面试中应结合个人经验、行业动态和技术进展展示技术实力。
|
1月前
|
网络协议
跟着动画学习TCP三次握手和四次挥手,及全部面试题
跟着动画学习TCP三次握手和四次挥手,及全部面试题
35 0
|
2月前
|
存储 缓存 NoSQL
揭秘一线大厂Redis面试高频考点(3万字长文、吐血整理)
揭秘一线大厂Redis面试高频考点(3万字长文、吐血整理)
452 5
揭秘一线大厂Redis面试高频考点(3万字长文、吐血整理)
|
3月前
|
存储 前端开发 JavaScript
No204.精选前端面试题,享受每天的挑战和学习
No204.精选前端面试题,享受每天的挑战和学习
|
3月前
|
前端开发 UED
No203.精选前端面试题,享受每天的挑战和学习
No203.精选前端面试题,享受每天的挑战和学习
|
3月前
|
前端开发 JavaScript
No201.精选前端面试题,享受每天的挑战和学习
No201.精选前端面试题,享受每天的挑战和学习
|
3月前
|
存储 前端开发 开发者
No199.精选前端面试题,享受每天的挑战和学习
No199.精选前端面试题,享受每天的挑战和学习
No199.精选前端面试题,享受每天的挑战和学习