bigdata-08-MapReduce原理到实战

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: bigdata-08-MapReduce原理到实战

2.8-MapReduce

Hadoop中Hdfs负责存储,Yarn负责资源管理,负责计算的框架,名为MapReduce,仅仅存储数据是毫无意义的,数据的价值在于运算,在海量数据运算中,我们就能挖掘数据的价值。

通过讲数据分发到不同节点进行计算,最后再汇总的计算思想,就是MapReduce的设计核心。

分布式计算

我们自己写的JDBC代码是在一台机器上运行,mysql数据库是在另一台机器上运行。

正常情况下,我们通过jdbc代码去mysql中获取一条数据,速度还是很快的,但是有一个极端情况,如果我们要获取的数据量很大,达到了几个G,甚至于几十G,就会很慢,主要是两方面原因。

1.磁盘IO

2.网络IO

这两个里面其实最耗时的还是网络 io , 我们平时在两台电脑之间传输一个几十 G 的文件也需要很长时间的,但是如果是使用U盘拷贝就很快了,所以可以看出来主要耗时的地方是在网络IO上面。

如果我们考虑把计算程序移动到mysql上面去执行,就可以节省网络IO

移动数据是传统的计算方式,现在的一种新思路是移动计算。

如果我们数据量很大的话,我们的数据肯定是由很多个节点存储的,这个时候我们就可以把我们的程序代 码拷贝到对应的节点上面去执行。

分布式计算的步骤

  • 1.对每个节点进行局部计算
  • 2.对每个节点的局部计算结果就行全局汇总

原理及核心编程思想

组件模块

MapReduce :MapReduce 是一个分布式运算程序的编程框架,核心功能是将用户编写的业务逻辑代码和MapReduce自带默认组件整合成一个完整的分布式运算程序,并发运行在一个 Hadoop 集群上。

MapReduce 进程:

  • MrAppMaster:负责整个程序的过程调度及状态协调
  • MapTask:负责 Map 阶段的整个数据处理流程。并行处理输入数据
  • ReduceTask:负责 reduce阶段的整个数据处理流程。对 Map 结果进行汇总

数据块: Block 是 HDFS 物理上把数据分成一块一块。 数据块是 HDFS 存储数据单位。

数据切片: 数据切片只是在逻辑上对输入进行分片, 并不会在磁盘上将其切分成片进行存储。数据切片是 MapReduce 程序计算输入数据的单位,一个切片会对应启动一个 MapTask。

Combiner:是MR程序中Mapper和Reducer之外的一种组件。Combiner是在每一个MapTask所在的节点运行。Combiner的作用为对每一个MapTask的输出进行局部汇总,以减小网络传输量。

核心编程思想

图中分为2个文件,一个200M,一个100M。一个block大小默认为128M,则第一个文件128M分配给一个MapTask,剩下72M分配给另外一个MapTask。

以词频统计为例:

Map阶段:

  • 读数据并按行处理
  • 按照指定分隔符进行切分单词
  • 切分结果为KV键值对(单词,1)
  • 将所有KV中的Key值按照首字母顺序溢写到两个分区的磁盘。

Reduce阶段:

  • 汇总多个MapTask的结果输出到结果文件。
  • MapReduce 编程模型只能包含一个 Map 阶段和一个 Reduce 阶段。
  • 如果用户的业务逻辑非常复杂,那就只能多个 MapReduce 程序,串行运行。

框架原理

InputFormat:对输入进行控制,FileInputFormat、TextInputFormat、CombineTextInputFormat等。

Mapper:数据源通过InputFormat取读后,交给Mapper进行后续业务逻辑(用户自己写的代码)处理。

Shuffle:包含排序、分区、压缩、合并等等。

Reducer:拉取Mapper阶段处理的数据,拉的过程中,要经历shuffle的过程。

OutputFormat:对输出的控制,比如可以输出到文件、mysql、Hbase、ES等。

详细工作流程

1.假设待处理文件200m

2.客户端submit()前,获取待处理数据的信息,然后根据参数配置,形成一个任务分配的规划。默认按128M切片,分为 0~128和 128~200。

3.客户端提交到集群包含:Job.split(job的切片),wc.jar(代码),Job.xml(job运行相关的参数)

4.YARN的ResourceManager(整个集群所有资源的管理者)开启Mrappmaster(单个任务运行的老大,为应用程序申请资源并分配给内部的任务),Mrappmaster会取读Job.split切片信息,根据切片个数开启MapTask个数。

5.MapTask启动后,通过InputFormat(默认实现是TextInputFormat,方法是createRecordReader,按行读LineRecordReader)读输入的文件。

6.数据源通过InputFormat取读后,交给Mapper进行后续业务逻辑运算(用户自己写的代码)处理

7.outputCollector输出收集器,向环形缓冲区写入数据,其实就是一块内存,一半用于存数据(key;value),另外一半存索引(描述数据的元数据,index为索引;partition为分区;keystart指key在内存存储在什么位置;valstart指value在内存存储在什么位置)。outputCollector默认大小100M。当写入80%的数据后(为什么80%是因为可以边接收数据边往磁盘溢写数据),开始反向写,把数据溢写到磁盘

8.在溢写之前会将缓冲区的数据按照指定的分区规则(默认分区是根据key的hashCode对ReduceTasks个数取模得到的,用户没法控制哪个key存储到哪个分区。但是可以自定义)进行分区和排序。图中2个分区,分区1会进入reduce1,分区2会进入reduce2,互相不影响。排序是对分区内的数据进行排序,对index(索引)通过快排按字典顺序进行排序

9.当写入80%的数据后(或者数据已经全部处理完),就会把环形缓冲区的数据溢写到磁盘。可能发生多次溢写,溢写到多个文件.

10.对所有溢写到磁盘的文件(已经有序,可以通过归并来排)进行归并排序合成一个文件。保证每个分区的数据是有序的。

11.Combine合并,预聚合(优化手段),可以对每个MapTask的输出进行局部汇总,以减少网络传输量

12.MrappMaster,所有MapTask任务完成后,启动相应数量的ReduceTask,并告知ReduceTask处理数据范围(数据分区)

13.ReduceTask主动从MapTask对应的分区,拉取数据。因为虽然每个MapTask的数据已经是有序,但是会从多个MapTask拉取数据,所以还要进行归并排序。

14.将数据传给reduce进行处理,一次读取一组数据。

15.GroupingComparator,用的比较少。hadoop默认分组是按key,也就是一个key是一组,GroupingComparator主要的作用是可以决定哪些数据作为一组。

16.最后通过OutputFormat输出,默认是TextOutputFormat。

Shuffle机制

何为Shuffle?

Map 方法之后, Reduce 方法之前的数据处理过程称之为 Shuffle。

首先,通过getPartition获取是哪个分区。标记分区后,进入环形缓冲区。一半用于存数据,另外一半存索引。当写入80%的数据后,就会反向溢写。在溢写之前会将缓冲区的数据进行排序。之后可以进行Combiner(可选)。然后进行多次溢写,一个是spill.index(索引),一个是Spill.out(数据)。之后对所有溢写到磁盘的文件进行归并排序。之后可以进行Combiner(可选)。之后可以设置压缩(提高传输效率)。之后数据写到磁盘上,等待reduce拉取数据。

ReduceTask主动从MapTask对应的分区,拉取数据。先尝试把数据存在内存里。如果文件大小超过一定阈值,则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。然后做分组(按相同key分组)。再进入Reduce方法。

从WordCount开始

流程:

1.开发Map阶段代码

2.开发Reduce阶段代码

3.组装job

Map阶段代码:

public static class MyMapper extends Mapper<LongWritable, Text,Text,LongWritable>{
     Logger logger = LoggerFactory.getLogger(MyMapper.class);
     @Override
     protected void map(LongWritable k1, Text v1, Context context)
             throws IOException, InterruptedException {
         //输出k1,v1的值
         //System.out.println("<k1,v1>=<"+k1.get()+","+v1.toString()+">");
         //logger.info("<k1,v1>=<"+k1.get()+","+v1.toString()+">");
         //k1 代表的是每一行数据的行首偏移量,v1代表的是每一行内容
         //对获取到的每一行数据进行切割,把单词切割出来
         String[] words = v1.toString().split(" ");
         //迭代切割出来的单词数据
         for (String word : words) {
             //把迭代出来的单词封装成<k2,v2>的形式
             Text k2 = new Text(word);
             LongWritable v2 = new LongWritable(1L);
             //把<k2,v2>写出去
             context.write(k2,v2);
         }
     }
 }

Reduce阶段代码:

public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
     Logger logger = LoggerFactory.getLogger(MyReducer.class);
     @Override
     protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context)
             throws IOException, InterruptedException {
         //创建一个sum变量,保存v2s的和
         long sum = 0L;
         //对v2s中的数据进行累加求和
         for(LongWritable v2: v2s){
             //输出k2,v2的值
             //System.out.println("<k2,v2>=<"+k2.toString()+","+v2.get()+">");
             //logger.info("<k2,v2>=<"+k2.toString()+","+v2.get()+">");
             sum += v2.get();
         }
         //组装k3,v3
         Text k3 = k2;
         LongWritable v3 = new LongWritable(sum);
         //输出k3,v3的值
         //System.out.println("<k3,v3>=<"+k3.toString()+","+v3.get()+">");
         //logger.info("<k3,v3>=<"+k3.toString()+","+v3.get()+">");
         // 把结果写出去
         context.write(k3,v3);
     }
 }

组装Job:

public static void main(String[] args) {
     try{
         if(args.length!=2){
             //如果传递的参数不够,程序直接退出
             System.exit(100);
         }
         //指定Job需要的配置参数
         Configuration conf = new Configuration();
         //创建一个Job
         Job job = Job.getInstance(conf);
         //注意了:这一行必须设置,否则在集群中执行的时候是找不到WordCountJob这个类的
         job.setJarByClass(WordCountJob.class);
         //指定输入路径(可以是文件,也可以是目录)
         FileInputFormat.setInputPaths(job,new Path(args[0]));
         //指定输出路径(只能指定一个不存在的目录)
         FileOutputFormat.setOutputPath(job,new Path(args[1]));
         //指定map相关的代码
         job.setMapperClass(MyMapper.class);
         //指定k2的类型
         job.setMapOutputKeyClass(Text.class);
         //指定v2的类型
         job.setMapOutputValueClass(LongWritable.class);
         //指定reduce相关的代码
         job.setReducerClass(MyReducer.class);
         //指定k3的类型
         job.setOutputKeyClass(Text.class);
         //指定v3的类型
         job.setOutputValueClass(LongWritable.class);
         //提交job
         job.waitForCompletion(true);
     }catch(Exception e){
         e.printStackTrace();
     }
 }

接下来就可以打包发布到集群

指定mapreduce接收到的第一个参数:文件路径

指定mapreduce接收到的第二个参数:输出目录

访问 http://bigdata01:8088 也可以查看任务输出结果

在out输出目录中,_SUCCESS是一个标记文件,有这个文件表示这个任务执行成功了。 part-r-00000是具体的数据文件,如果有多个reduce任务会产生多个这种文件,多个文件的话会按照从0往下排

还要一点需要注意的 ,part 后面的 r 表示这个结果文件是 reduce 步骤产生的, 如果一个 mapreduce 只有 map阶段没有reduce阶段,那么产生的结果文件是part-m-00000这样的。

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
7月前
|
分布式计算 Hadoop
Hadoop系列 mapreduce 原理分析
Hadoop系列 mapreduce 原理分析
84 1
|
分布式计算 资源调度 大数据
黑马程序员-大数据入门到实战-MapReduce & YARN入门
黑马程序员-大数据入门到实战-MapReduce & YARN入门
159 0
|
7月前
|
存储 分布式计算 负载均衡
【大数据技术Hadoop+Spark】MapReduce概要、思想、编程模型组件、工作原理详解(超详细)
【大数据技术Hadoop+Spark】MapReduce概要、思想、编程模型组件、工作原理详解(超详细)
234 0
|
4月前
|
分布式计算 大数据 Hadoop
揭秘MapReduce背后的魔法:从基础类型到高级格式,带你深入理解这一大数据处理利器的奥秘与实战技巧,让你从此不再是编程门外汉!
【8月更文挑战第17天】MapReduce作为分布式计算模型,是大数据处理的基石。它通过Map和Reduce函数处理大规模数据集,简化编程模型,使开发者聚焦业务逻辑。MapReduce分单阶段和多阶段,支持多种输入输出格式如`TextInputFormat`和`SequenceFileInputFormat`。例如,简单的单词计数程序利用`TextInputFormat`读取文本行并计数;而`SequenceFileInputFormat`适用于高效处理二进制序列文件。合理选择类型和格式可有效解决大数据问题。
70 1
|
4月前
|
分布式计算 大数据 分布式数据库
"揭秘HBase MapReduce高效数据处理秘诀:四步实战攻略,让你轻松玩转大数据分析!"
【8月更文挑战第17天】大数据时代,HBase以高性能、可扩展性成为关键的数据存储解决方案。结合MapReduce分布式计算框架,能高效处理HBase中的大规模数据。本文通过实例展示如何配置HBase集群、编写Map和Reduce函数,以及运行MapReduce作业来计算HBase某列的平均值。此过程不仅限于简单的统计分析,还可扩展至更复杂的数据处理任务,为企业提供强有力的大数据技术支持。
78 1
|
7月前
|
分布式计算 数据可视化 Hadoop
大数据实战——基于Hadoop的Mapreduce编程实践案例的设计与实现
大数据实战——基于Hadoop的Mapreduce编程实践案例的设计与实现
|
7月前
|
分布式计算 Hadoop 大数据
【云计算与大数据计算】Hadoop MapReduce实战之统计每个单词出现次数、单词平均长度、Grep(附源码 )
【云计算与大数据计算】Hadoop MapReduce实战之统计每个单词出现次数、单词平均长度、Grep(附源码 )
319 1
|
7月前
|
分布式计算 并行计算 数据处理
什么是MapReduce?请简要解释其工作原理。
什么是MapReduce?请简要解释其工作原理。
110 0
|
7月前
|
存储 分布式计算 搜索推荐
【大数据技术Hadoop+Spark】MapReduce之单词计数和倒排索引实战(附源码和数据集 超详细)
【大数据技术Hadoop+Spark】MapReduce之单词计数和倒排索引实战(附源码和数据集 超详细)
228 0
|
机器学习/深度学习 分布式计算 大数据
大数据 - MapReduce:从原理到实战的全面指南
大数据 - MapReduce:从原理到实战的全面指南
1393 0

相关实验场景

更多