MapReduce介绍
计算扑克牌中的黑桃个数
就是我们平时打牌时用的扑克牌,现在呢,有一摞牌,我想知道这摞牌中有多少张黑桃
最直接的方式是一张一张检查并且统计出有多少张是黑桃,但是这种方式的效率比较低,如果说这一摞牌只有几十张也就无所谓了,如果这一摞拍有上千张呢?你一张一张去检查还不疯了?
这个时候我们可以使用MapReduce的计算方法
第一步:把这摞牌分配给在座的所有玩家
第二步:让每个玩家查一下自己手中的牌有多少张是黑桃,然后把这个数目汇报给你
第三步:你把所有玩家告诉你的数字加起来,得到最终的结果
之前是一张一张的串行计算,现在使用mapreduce是把数据分配给多个人,并行计算,每一个人获得一个局部聚合的临时结果,最终再统一汇总一下。
这样就可以快速得到答案了,这其实就是MapReduce的计算思想。
分布式计算介绍
移动数据
我们自己写的JDBC代码是在一台机器上运行,mysql数据库是在另一台机器上运行。正常情况下,我们通过jdbc代码去mysql中获取一条数据,速度还是很快的,
但是有一个极端情况,如果我们要获取的数据量很大,达到了几个G,甚至于几十G。这个时候我们使用jdbc代码去拉取数据的时候,就会变得非常慢,
这个慢主要是由于两个方面造成的, 一个是磁盘io(会进行磁盘读写操作), 一个是网络io(网络传输)。
这两个里面其实最耗时的还是网络io,我们平时在两台电脑之间传输一个几十G的文件也需要很长时间的,但是如果是使用U盘拷贝就很快了,所以可以看出来主要耗时的地方是在网络IO上面。
这种计算方式我们称之为移动数据
新思潮: 移动计算比移动数据更划算,因为大量数据的时间耗时主要是在网络I/O 和磁盘I/O.
案例:
如果copy一个50G的内容,就算是内网的两台机器也很慢,如果是用U盘,那就比较快。
为什么呢? 因为没有网络IO,而磁盘IO一直是存在的。
移动计算
如果我们考虑把计算程序移动到mysql上面去执行,是不是就可以节省网络io了,是的! 这种方式称之为移动计算,就是把计算程序移动到数据所在的节点上面
如果我们数据量很大的话,我们的数据肯定是由很多个节点存储的,这个时候我们就可以把我们的程序代码拷贝到对应的节点上面去执行,程序代码都是很小的,一般也就几十KB或者几百KB,加上外部依赖包,最大也就几兆 ,甚至几十兆,但是我们需要计算的数据动辄都是几十G、几百G,他们两个之间的差距不是一星半点
这样我们的代码就可以在每个数据节点上面执行了,但是这个代码只能计算当前节点上的数据的,如果我们想要统计数据的总行数,这里每个数据节点上的代码只能计算当前节点上数据的行数,所以还的有一个汇总程序,这样每个数据节点上面计算的临时结果就可以通过汇总程序得到最终的结果了。
此时汇总程序需要传递的数据量就很小了,只需要接收一个数字即可。
这个计算过程就是分布式计算,这个步骤分为两步
第一步:对每个节点上面的数据进行局部计算
第二步:对每个节点上面计算的局部结果进行最终全局汇总
MapReduce原理剖析
MapReduce是一种分布式计算模型,是Google提出来的,主要用于搜索领域,解决海量数据的计算问 题.
MapReduce是分布式运行的,由两个阶段组成:Map和Reduce,
Map阶段是一个独立的程序,在很多个节点同时运行,每个节点处理一部分数据。
Reduce阶段也是一个独立的程序,可以在一个或者多个节点同时运行,每个节点处理一部分数据
在这map就是对数据进行局部汇总,reduce就是对局部数据进行最终汇总。
结合到我们前面分析的统计黑桃的例子中,这里的map阶段就是指每个人统计自己手里的黑桃的个数,reduce就是对每个人统计的黑桃个数进行最终汇总
在这我们再举一个例子,看这个图
这是一个Hadoop集群,一共5个节点一个主节点,四个从节点
假设我们有一个512M的文件,这个文件会产生4个block块,假设这4个block块正好分别存储到了 4个节点上,我们的计算程序会被分发到每一个数据所在的节点,然后开始执行计算,在map阶段每一个block块对应的数据都会产生一个map任务(这个map任务其实就是执行这个计算程序的), 也就意味着会产生4个map任务并行执行,4个map阶段都执行完毕以后,会执行reduce阶段,在r 阶段中会对这4个map任务的输出数据进行汇总统计,得到最终的结果。
官方的mapreduce原理图
左下角是一个文件,文件最下面是几个block块,说明这个文件被切分成了这几个block块,文件上面是一些split,注意,咱们前面说的每个block产生一个map任务,其实这是不严谨的,其实严谨一点来说的话 应该是一个split产生一个map任务。
block和split之间有什么关系?
block块是文件的物理切分,在磁盘上是真实存在的。是对文件的真正切分
split是逻辑划分,不是对文件真正的切分,默认情况下我们可以认为一个split的大小和一个block的大 小是一样的,所以实际上是一个split会产生一个map task
这里面的map Task就是咱们前面说的map任务,看后面有一个reduce Task,reduce会把结果数据输出到hdfs上,有几个reduce任务就会产生几个文件,这里有三个reduce任务,就产生了3个文件。
map的输入 输出 reduce的输入 输出
map的输入是k1,v1 输出是k2,v2
reduce的输入是k2,v2 输出是k3,v3 都是键值对的形式。
map端做的是局部的聚合排序,reduce是全局的聚合和排序。reduce是处理多个map多个分区的聚合结果。不可以混淆两者
MapReduce之Map阶段
mapreduce主要分为两大步骤 map和reduce,map和reduce在代码层面对应的就是两个类,map对应的是mapper类,reduce对应的是reducer类,下面我们就来根据一个案例具体分析一下这两个步骤
假设我们有一个文件,文件里面有两行内容
第一行是hello you
第二行是hello me
首先是map阶段
第一步框架会把输入文件(夹)划分为很多InputSplit,这里的inputsplit就是前面我们所说的split 件进行逻辑划分产生的,默认情况下,每个HDFS的Block对应一个InputSplit。再通过Record 类,把每个InputSplit解析成一个一个的<k1,v1>。k1是指每一行的起始偏移量,v1代表的是那一行内容,
所以,针对文件中的数据,经过map处理之后的结果是这样的
<0,hello you>
<10,hello me> 这里的10指的是第二行的开始偏移量.
注意:map第一次执行会产生<0,hello you>,第二次执行会产生<10,hello me>
不是执行一次就获取到这两行结果了,因为每次只会读取一行数据
第二步:框架调用Mapper类中的map(…)函数,map函数的输入是<k1,v1>,输出是<k2,v2>。一个InputSplit对应一个map task。程序员需要自己覆盖Mapper类中的map函数,实现具体的业务逻辑。因为我们需要统计文件中每个单词出现的总次数,所以需要先把每一行内容中的单词切开,然后记录出现次数为1,这个逻辑就需要我们在map函数中实现了
那针对<0,hello you>执行这个逻辑之后的结果就是
<hello,1>
<you,1>
针对<10,hello me>执行这个逻辑之后的结果是
<hello,1>
<me,1>
第三步:框架对map函数输出的<k2,v2>进行分区。不同分区中的<k2,v2>由不同的reduce task处理
默认只有1个分区,所以所有的数据都在一个分区,最后只会产生一个reduce task。
经过这个步骤之后,数据没什么变化,如果有多个分区的话,需要把这些数据根据分区规则分开,在这里默认只有1个分区。
<hello,1>
<you,1>
<hello,1>
<me,1>
单词计数,其实就是把每个单词出现的次数进行汇总即可,需要进行全局的汇总,不需要进行分区,所以一个redeuce任务就可以搞定,
如果你的业务逻辑比较复杂,需要进行分区,那么就会产生多个reduce任务了,
map任务输出的数据到底给哪个reduce使用?这个就需要划分一下,要不然就乱套了。 假设有两个reduce,map的输出到底给哪个reduce,如何分配,这是一个问题。
这个问题,由分区来完成。
map输出的那些数据到底给哪个reduce使用,这个就是分区干的事了。
第四步:框架对每个分区中的数据,都会按照k2进行排序、分组。分组指的是相同k2的v2分成一个组。先按照k2排序
<hello,1>
<hello,1>
<me,1>
<you,1>
然后按照k2进行分组,把相同k2的v2分成一个组
<hello,{1,1}>
<me,{1}>
<you,{1}>
第五步:在map阶段,框架可以选择执行Combiner过程【可选步骤】
Combiner可以翻译为规约,规约是什么意思呢?
在刚才的例子中,咱们最终是要在reduce端计算单词出现的总次数的,所以其实是可以在map端提前执行reduce的计算逻辑,先对在map端对单词出现的次 数进行局部求和操作,这样就可以减少map端到reduce端数据传输的大小,这就是规约的好处,当然 了,并不是所有场景都可以使用规约,针对求平均值之类的操作就不能使用规约了,否则最终计算的结果就不准确了。
Combiner一个可选步骤,默认这个步骤是不执行的。
第六步:框架会把map task输出的<k2,v2>写入到linux 的磁盘文件中
<hello,{1,1}>
<me,{1}>
<you,{1}>
至此,整个map阶段执行结束最后注意一点:
MapReduce程序是由map和reduce这两个阶段组成的,但是reduce阶段不是必须的,也就是说有的
mapreduce任务只有map阶段,为什么会有这种任务呢?
reduce主要是做最终聚合的,如果我们这个需求是不需要聚合操作,直接对数据做过滤处理就行了,那也就意味着数据经过map阶段处理完就结束了,所以如果reduce阶段不存在的话,map的结果是可以直接保存到HDFS中的
注意,如果没有reduce阶段,其实map阶段只需要执行到第二步就可以,第二步执行完成以后,结果就可以直接输出到HDFS了。
MapReduce之Reduce阶段
第一步:框架对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点。这个过程 称作shuffle
针对我们这个需求,只有一个分区,所以把数据拷贝到reduce端之后还是老样子
<hello,{1,1}>
<me,{1}>
<you,{1}>
第二步:框架对reduce端接收的相同分区的<k2,v2>数据进行合并、排序、分组。
reduce端接收到的是多个map的输出,对多个map任务中相同分区的数据进行合并 排序 分组
注意,之前在map中已经做了排序 分组,这边也做这些操作 重复吗?
不重复,因为map端是局部的操作 reduce端是全局的操作
之前是每个map任务内进行排序,是有序的,但是多个map任务之间就是无序的了。
不过针对我们这个需求只有一个map任务一个分区,所以最终的结果还是老样子
<hello,{1,1}>
<me,{1}>
<you,{1}>
第三步:框架调用Reducer类中的reduce方法,reduce方法的输入是<k2,{v2}>,输出是<k3,v3>。一个
<k2,{v2}>调用一次reduce函数。程序员需要覆盖reduce函数,实现具体的业务逻辑。
那我们在这里就需要在reduce函数中实现最终的聚合计算操作了,将相同k2的{v2}累加求和,然后再转
化为k3,v3写出去,在这里最终会调用三次reduce函数
<hello,2>
<me,1>
<you,1>
第四步:框架把reduce的输出结果保存到HDFS中。
hello 2
me 1
you 1
至此,整个reduce阶段结束。
单文件wordConut图例
多文件wordCount图例
wordCount java代码实现
/** * Map阶段 */ public static class MyMapper extends Mapper<LongWritable, Text,Text,LongWritable>{ Logger logger = LoggerFactory.getLogger(MyMapper.class); /** * 需要实现map函数 * 这个map函数就是可以接收<k1,v1>,产生<k2,v2> * @param k1 * @param v1 * @param context * @throws IOException * @throws InterruptedException */ @Override protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { //输出k1,v1的值 //System.out.println("<k1,v1>=<"+k1.get()+","+v1.toString()+">"); //logger.info("<k1,v1>=<"+k1.get()+","+v1.toString()+">"); //k1 代表的是每一行数据的行首偏移量,v1代表的是每一行内容 //对获取到的每一行数据进行切割,把单词切割出来 String[] words = v1.toString().split(" "); //迭代切割出来的单词数据 for (String word : words) { //把迭代出来的单词封装成<k2,v2>的形式 Text k2 = new Text(word); LongWritable v2 = new LongWritable(1L); //把<k2,v2>写出去 context.write(k2,v2); } } } /** * Reduce阶段 */ public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable>{ Logger logger = LoggerFactory.getLogger(MyReducer.class); /** * 针对<k2,{v2...}>的数据进行累加求和,并且最终把数据转化为k3,v3写出去 * @param k2 * @param v2s * @param context * @throws IOException * @throws InterruptedException */ @Override protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context) throws IOException, InterruptedException { //创建一个sum变量,保存v2s的和 long sum = 0L; //对v2s中的数据进行累加求和 for(LongWritable v2: v2s){ //输出k2,v2的值 //System.out.println("<k2,v2>=<"+k2.toString()+","+v2.get()+">"); //logger.info("<k2,v2>=<"+k2.toString()+","+v2.get()+">"); sum += v2.get(); } //组装k3,v3 Text k3 = k2; LongWritable v3 = new LongWritable(sum); //输出k3,v3的值 //System.out.println("<k3,v3>=<"+k3.toString()+","+v3.get()+">"); //logger.info("<k3,v3>=<"+k3.toString()+","+v3.get()+">"); // 把结果写出去 context.write(k3,v3); } } /** * 组装Job=Map+Reduce */ public static void main(String[] args) { try{ if(args.length!=2){ //如果传递的参数不够,程序直接退出 System.exit(100); } //指定Job需要的配置参数 Configuration conf = new Configuration(); //创建一个Job Job job = Job.getInstance(conf); //注意了:这一行必须设置,否则在集群中执行的时候是找不到WordCountJob这个类的 job.setJarByClass(WordCountJob.class); //指定输入路径(可以是文件,也可以是目录) FileInputFormat.setInputPaths(job,new Path(args[0])); //指定输出路径(只能指定一个不存在的目录) FileOutputFormat.setOutputPath(job,new Path(args[1])); //指定map相关的代码 job.setMapperClass(MyMapper.class); //指定k2的类型 job.setMapOutputKeyClass(Text.class); //指定v2的类型 job.setMapOutputValueClass(LongWritable.class); //指定reduce相关的代码 job.setReducerClass(MyReducer.class); //指定k3的类型 job.setOutputKeyClass(Text.class); //指定v3的类型 job.setOutputValueClass(LongWritable.class); //提交job job.waitForCompletion(true); }catch(Exception e){ e.printStackTrace(); } }
wordCount程序在hadoop上的执行
note: 通过host访问hadoop yarn界面
需要修改C:\Windows\System32\drivers\etc\host
添加一下内容
192.168.56.10 node1
hadoop-2.7.5]# hadoop jar /opt/soft/hadoop-2.7.5/db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar com.imooc.mr.WordCountJob /test/hello.txt /out 21/08/16 13:59:42 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032 21/08/16 13:59:44 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 21/08/16 13:59:45 INFO input.FileInputFormat: Total input paths to process : 1 21/08/16 13:59:45 INFO mapreduce.JobSubmitter: number of splits:1 21/08/16 13:59:46 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1629122147114_0002 21/08/16 13:59:47 INFO impl.YarnClientImpl: Submitted application application_1629122147114_0002 21/08/16 13:59:47 INFO mapreduce.Job: The url to track the job: http://node1:8088/proxy/application_1629122147114_0002/ 21/08/16 13:59:47 INFO mapreduce.Job: Running job: job_1629122147114_0002 21/08/16 14:00:11 INFO mapreduce.Job: Job job_1629122147114_0002 running in uber mode : false 21/08/16 14:00:11 INFO mapreduce.Job: map 0% reduce 0% 21/08/16 14:00:27 INFO mapreduce.Job: map 100% reduce 0% 21/08/16 14:00:41 INFO mapreduce.Job: map 100% reduce 100% 21/08/16 14:00:42 INFO mapreduce.Job: Job job_1629122147114_0002 completed successfully 21/08/16 14:00:44 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=65 FILE: Number of bytes written=243715 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=116 HDFS: Number of bytes written=19 HDFS: Number of read operations=6 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Job Counters Launched map tasks=1 Launched reduce tasks=1 Data-local map tasks=1 Total time spent by all maps in occupied slots (ms)=13624 Total time spent by all reduces in occupied slots (ms)=12076 Total time spent by all map tasks (ms)=13624 Total time spent by all reduce tasks (ms)=12076 Total vcore-milliseconds taken by all map tasks=13624 Total vcore-milliseconds taken by all reduce tasks=12076 Total megabyte-milliseconds taken by all map tasks=13950976 Total megabyte-milliseconds taken by all reduce tasks=12365824 Map-Reduce Framework Map input records=2 Map output records=4 Map output bytes=51 Map output materialized bytes=65 Input split bytes=97 Combine input records=0 Combine output records=0 Reduce input groups=3 Reduce shuffle bytes=65 Reduce input records=4 Reduce output records=3 Spilled Records=8 Shuffled Maps =1 Failed Shuffles=0 Merged Map outputs=1 GC time elapsed (ms)=365 CPU time spent (ms)=2800 Physical memory (bytes) snapshot=311902208 Virtual memory (bytes) snapshot=4160294912 Total committed heap usage (bytes)=165810176 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=19 File Output Format Counters Bytes Written=19
MapReduce任务日志查看
在自定义mapper类的map函数中增加一个输出,将k1,v1的值打印出来
重新打包在hadoop集群上执行
注意,针对输出目录,要么换一个新的不存在的目录,要么把之前的out目录删掉
mapreduce任务正在执行的时候是可以通过history查看log,包括map类中自定义的log。
当任务执行完毕之后,这个链接就点不开了.
如果想查看应该怎么办呢?
1、yarn-site增加配置log server
<property> <name>yarn.log-aggregation-enable</name> <value>true</value> </property> <property> <name>yarn.log.server.url</name> <value>http://node1:19888/jobhistory/logs/</value> </property>
2、重启集群
3、启动historyserver进程,需要在集群的所有节点上都启动这个进程
bin/mapred --daemon start historyserver
如果是hadoop2.7使用
hadoop-2.7.5]# mr-jobhistory-daemon.sh start historyserver
4、jps确认
5、重新提交任务并通过yarn界面查看log
这边还有一个报错,等待解决.
此外还可以直接用yarn 命令来查看log
按照这片文章修复这个问题: https://blog.csdn.net/yh_zeng2/article/details/52281712
停止Hadoop集群中的任务
如果一个mapreduce任务处理的数据量比较大的话,这个任务会执行很长时间,可能几十分钟或者几个小时都有可能,假设一个场景,任务执行了一半了我们发现我们的代码写的有问题,需要修改代码重新提交执行,这个时候之前的任务就没有必要再执行了,没有任何意义了,最终的结果肯定是错误的,所以我们就想把它停掉,要不然会额外浪费集群的资源,如何停止呢?
CTRL +C是无法停止已经提交到集群的任务的.
yarn application -kill application_1629126377681_0001
MapReduce程序扩展
reduce阶段不是必须的,那也就意味着MapReduce程序可以只包含map阶段。什么场景下会只需要map阶段呢?
当数据只需要进行普通的过滤、解析等操作,不需要进行聚合,这个时候就不需要使用reduce阶段了,代码中如何设置呢?
//禁用reduce阶段 job.setNumReduceTasks(0);