Hadoop 涉及的知识点如下图所示,本文将逐一讲解:
本文档参考了关于 Hadoop 的官网及其他众多资料整理而成,为了整洁的排版及舒适的阅读,对于模糊不清晰的图片及黑白图片进行重新绘制成了高清彩图。
目前企业应用较多的是Hadoop2.x,所以本文是以Hadoop2.x为主,对于Hadoop3.x新增的内容会进行说明!
二、MapReduce
1. MapReduce 介绍
MapReduce思想在生活中处处可见。或多或少都曾接触过这种思想。MapReduce的思想核心是“分而治之”,适用于大量复杂的任务处理场景(大规模数据处理场景)。即使是发布过论文实现分布式计算的谷歌也只是实现了这种思想,而不是自己原创。
- Map负责“分”,即把复杂的任务分解为若干个“简单的任务”来并行处理。可以进行拆分的前提是这些小任务可以并行计算,彼此间几乎没有依赖关系。
- Reduce负责“合”,即对map阶段的结果进行全局汇总。
- MapReduce运行在yarn集群
- ResourceManager
- NodeManager
这两个阶段合起来正是MapReduce思想的体现。
还有一个比较形象的语言解释MapReduce:
我们要数图书馆中的所有书。你数1号书架,我数2号书架。这就是“Map”。我们人越多,数书就更快。
现在我们到一起,把所有人的统计数加在一起。这就是“Reduce”。
1.1 MapReduce 设计构思
MapReduce是一个分布式运算程序的编程框架,核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在Hadoop集群上。
既然是做计算的框架,那么表现形式就是有个输入(input),MapReduce操作这个输入(input),通过本身定义好的计算模型,得到一个输出(output)。
对许多开发者来说,自己完完全全实现一个并行计算程序难度太大,而MapReduce就是一种简化并行计算的编程模型,降低了开发并行应用的入门门槛。
Hadoop MapReduce构思体现在如下的三个方面:
- 如何对付大数据处理:分而治之
对相互间不具有计算依赖关系的大数据,实现并行最自然的办法就是采取分而治之的策略。并行计算的第一个重要问题是如何划分计算任务或者计算数据以便对划分的子任务或数据块同时进行计算。不可分拆的计算任务或相互间有依赖关系的数据无法进行并行计算!
- 构建抽象模型:Map和Reduce
MapReduce借鉴了函数式语言中的思想,用Map和Reduce两个函数提供了高层的并行编程抽象模型。
Map: 对一组数据元素进行某种重复式的处理;
Reduce: 对Map的中间结果进行某种进一步的结果整理。
MapReduce中定义了如下的Map和Reduce两个抽象的编程接口,由用户去编程实现:
map: (k1; v1) → [(k2; v2)]
reduce: (k2; [v2]) → [(k3; v3)]
Map和Reduce为程序员提供了一个清晰的操作接口抽象描述。通过以上两个编程接口,大家可以看出MapReduce处理的数据类型是<key,value>键值对。
- MapReduce框架结构
一个完整的mapreduce程序在分布式运行时有三类实例进程:
- MR AppMaster:负责整个程序的过程调度及状态协调;
- MapTask:负责map阶段的整个数据处理流程;
- ReduceTask:负责reduce阶段的整个数据处理流程。
2. MapReduce 编程规范
MapReduce 的开发一共有八个步骤, 其中 Map 阶段分为 2 个步骤,Shuffle 阶段 4 个步骤,Reduce 阶段分为 2 个步骤
- Map 阶段 2 个步骤:
- 1.1 设置 InputFormat 类, 将数据切分为 Key-Value(K1和V1) 对, 输入到第二步
- 1.2 自定义 Map 逻辑, 将第一步的结果转换成另外的 Key-Value(K2和V2) 对, 输出结果
- Shuffle 阶段 4 个步骤:
- 2.1 对输出的 Key-Value 对进行分区
- 2.2 对不同分区的数据按照相同的 Key 排序
- 2.3 (可选) 对分组过的数据初步规约, 降低数据的网络拷贝
- 2.4 对数据进行分组, 相同 Key 的 Value 放入一个集合中
- Reduce 阶段 2 个步骤:
- 3.1 对多个 Map 任务的结果进行排序以及合并, 编写 Reduce 函数实现自己的逻辑, 对输入的 Key-Value 进行处理, 转为新的 Key-Value(K3和V3)输出
- 3.2 设置 OutputFormat 处理并保存 Reduce 输出的 Key-Value 数据
3. Mapper以及Reducer抽象类介绍
为了开发我们的MapReduce程序,一共可以分为以上八个步骤,其中每个步骤都是一个class类,我们通过job对象将我们的程序组装成一个任务提交即可。为了简化我们的MapReduce程序的开发,每一个步骤的class类,都有一个既定的父类,让我们直接继承即可,因此可以大大简化我们的MapReduce程序的开发难度,也可以让我们快速的实现功能开发。
MapReduce编程当中,其中最重要的两个步骤就是我们的Mapper类和Reducer类
- Mapper抽象类的基本介绍
在hadoop2.x当中Mapper类是一个抽象类,我们只需要覆写一个java类,继承自Mapper类即可,然后重写里面的一些方法,就可以实现我们特定的功能,接下来我们来介绍一下Mapper类当中比较重要的四个方法
- setup方法:
我们Mapper类当中的初始化方法,我们一些对象的初始化工作都可以放到这个方法里面来实现
- map方法:
读取的每一行数据,都会来调用一次map方法,这个方法也是我们最重要的方法,可以通过这个方法来实现我们每一条数据的处理
- cleanup方法:
在我们整个maptask执行完成之后,会马上调用cleanup方法,这个方法主要是用于做我们的一些清理工作,例如连接的断开,资源的关闭等等
- run方法:
如果我们需要更精细的控制我们的整个MapTask的执行,那么我们可以覆写这个方法,实现对我们所有的MapTask更精确的操作控制
- Reducer抽象类基本介绍
同样的道理,在我们的hadoop2.x当中,reducer类也是一个抽象类,抽象类允许我们可以继承这个抽象类之后,重新覆写抽象类当中的方法,实现我们的逻辑的自定义控制。
接下来我们也来介绍一下Reducer抽象类当中的四个抽象方法
- setup方法:
在我们的ReduceTask初始化之后马上调用,我们的一些对象的初始化工作,都可以在这个类当中实现
- reduce方法:
所有从MapTask发送过来的数据,都会调用reduce方法,这个方法也是我们reduce当中最重要的方法,可以通过这个方法实现我们的数据的处理
- cleanup方法:
在我们整个ReduceTask执行完成之后,会马上调用cleanup方法,这个方法主要就是在我们reduce阶段处理做我们一些清理工作,例如连接的断开,资源的关闭等等
- run方法:
如果我们需要更精细的控制我们的整个ReduceTask的执行,那么我们可以覆写这个方法,实现对我们所有的ReduceTask更精确的操作控制
4. WordCount示例编写
需求:在一堆给定的文本文件中统计输出每一个单词出现的总次数
node01服务器执行以下命令,准备数,数据格式准备如下:
cd /export/servers vim wordcount.txt #添加以下内容: hello hello world world hadoop hadoop hello world hello flume hadoop hive hive kafka flume storm hive oozie
将数据文件上传到hdfs上面去
hdfs dfs -mkdir /wordcount/ hdfs dfs -put wordcount.txt /wordcount/
- 定义一个mapper类
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; // mapper程序: 需要继承 mapper类, 需要传入 四个类型: /* 在hadoop中, 对java的类型都进行包装, 以提高传输的效率 writable keyin : k1 Long ---- LongWritable valin : v1 String ------ Text keyout : k2 String ------- Text valout : v2 Long -------LongWritable */ public class MapTask extends Mapper<LongWritable,Text,Text,LongWritable> { /** * * @param key : k1 * @param value v1 * @param context 上下文对象 承上启下功能 * @throws IOException * @throws InterruptedException */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //1. 获取 v1 中数据 String val = value.toString(); //2. 切割数据 String[] words = val.split(" "); Text text = new Text(); LongWritable longWritable = new LongWritable(1); //3. 遍历循环, 发给 reduce for (String word : words) { text.set(word); context.write(text,longWritable); } } }
- 定义一个reducer类
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * KEYIN : k2 -----Text * VALUEIN : v2 ------LongWritable * KEYOUT : k3 ------ Text * VALUEOUT : v3 ------ LongWritable */ public class ReducerTask extends Reducer<Text, LongWritable, Text, LongWritable> { @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { //1. 遍历 values 获取每一个值 long v3 = 0; for (LongWritable longWritable : values) { v3 += longWritable.get(); //1 } //2. 输出 context.write(key,new LongWritable(v3)); } }
- 定义一个主类,用来描述job并提交job
import com.sun.org.apache.bcel.internal.generic.NEW; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; // 任务的执行入口: 将八步组合在一起 public class JobMain extends Configured implements Tool { // 在run方法中编写组装八步 @Override public int run(String[] args) throws Exception { Job job = Job.getInstance(super.getConf(), "JobMain"); //如果提交到集群操作. 需要添加一步 : 指定入口类 job.setJarByClass(JobMain.class); //1. 封装第一步: 读取数据 job.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job,new Path("hdfs://node01:8020/wordcount.txt")); //2. 封装第二步: 自定义 map程序 job.setMapperClass(MapTask.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); //3. 第三步 第四步 第五步 第六步 省略 //4. 第七步: 自定义reduce程序 job.setReducerClass(ReducerTask.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //5) 第八步 : 输出路径是一个目录, 而且这个目录必须不存在的 job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job,new Path("hdfs://node01:8020/output")); //6) 提交任务: boolean flag = job.waitForCompletion(true); // 成功 true 不成功 false return flag ? 0 : 1; } public static void main(String[] args) throws Exception { Configuration configuration = new Configuration(); JobMain jobMain = new JobMain(); int i = ToolRunner.run(configuration, jobMain, args); //返回值 退出码 System.exit(i); // 退出程序 0 表示正常 其他值表示有异常 1 } }
提醒:代码开发完成之后,就可以打成jar包放到服务器上面去运行了,实际工作当中,都是将代码打成jar包,开发main方法作为程序的入口,然后放到集群上面去运行
5. MapReduce程序运行模式
- 本地运行模式
- mapreduce程序是被提交给LocalJobRunner在本地以单进程的形式运行
- 而处理的数据及输出结果可以在本地文件系统,也可以在hdfs上
- 怎样实现本地运行?写一个程序,不要带集群的配置文件本质是程序的conf中是否有
mapreduce.framework.name=local
以及yarn.resourcemanager.hostname=local
参数 - 本地模式非常便于进行业务逻辑的debug,只要在idea中打断点即可
【本地模式运行代码设置】
configuration.set("mapreduce.framework.name","local"); configuration.set("yarn.resourcemanager.hostname","local"); -----------以上两个是不需要修改的,如果要在本地目录测试, 可有修改hdfs的路径----------------- TextInputFormat.addInputPath(job,new Path("file:///D:\\wordcount\\input")); TextOutputFormat.setOutputPath(job,new Path("file:///D:\\wordcount\\output"));
- 集群运行模式
- 将mapreduce程序提交给yarn集群,分发到很多的节点上并发执行
- 处理的数据和输出结果应该位于hdfs文件系统
- 提交集群的实现步骤:
将程序打成JAR包,然后在集群的任意一个节点上用hadoop命令启动
yarn jar hadoop_hdfs_operate-1.0-SNAPSHOT.jar cn.itcast.hdfs.demo1.JobMain