开发者学堂课程【高校精品课-上海交通大学-企业级应用体系架构:Hadoop 2】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/75/detail/15842
Hadoop 2(二)
内容介绍:
一、Map Reduce
二、Map Reduce-WordCount
三、MapReduce
四、Java MapReduce
五、Scaling Out
六、MapReduce inside:JobTracker
七、YARN
四、Java MapReduce
public class MaxTemperatureMapper
extends Mapper{
private static final int MISSING= 9999;
@0verride
Public void map(LongWritable key,Text vable,Context context)
throws IOException, InterruptedException{
String line = value.toString();
String year-=line.substring(15,19);
Int airTemperature;
If(line.charAt(87)==
’
+
’
){//parselnt doesn
’
t like leading plus signs
airTemperature=Integer.parselnt(line.substring(88,92));
}else{
airTemperature=Integer. parselnt(line. substring(87,92));
}
String quality = line. substring(92,93)I;
If(airTemperature!=MSSING && quality.matchesf (“【01459】”)){
context.write(new Text(year), new IntWritable[air Temperature))
}
}
上图是代码,跟刚才是类似的,改写 map reduce 就行了,map 就是给一个字符串,会把15到19就是这一行里面的年份给抽取出来。就是给一行完整的记录抽取年份,然后需要看温度,是正的还是负的。是正就直接把后面四个字符取出来解析成一个整数,如果是负。需要把包含符号在内的字符拿出来一起解析。到温度之后,再看后面的字符,1表示字符这个温度是可以的,质量是可以的。如果是别的,有可能就不行。表示01459的5种就说明温度是可以接受,质量可以接受,其他就不能接受。是设计表格的时候约定的,这5种表示温度测出来没有问题,一方面温度不是9999表示没测到,另一方面温度的质量是接受了,这种情况下就产生一个年份加温度的数据。依次类推就是给输入里面每一个 value 都这样处理,当给很多条,就会产生很多个输出。
public class MaxTemperatureReducer
extends Reducer{
@Override
public void reduce (Text key/ IterablecIntWritable>values
Context (context)
throws IOException, InterruptedException {
int maxValue = Integer.MIN_ VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value. Get());
}
context.write(key, new IntWritable( maxValue));
}
}
然后看 reducer, 跟刚才的 word count 类似,给一个 key 会输出包含所有 key 的值,跟 word count 不一样的就是值不都是1,值是一组温度。这个函数的目的就是便利最大值。把最大的变量,跟当前的进行比较,比较完后的值就是最大温度,为了保证值比所有的温度都小,要取最小整数,处理完后在结果里面要显示这年的最大温度。
public class MaxTemperature {
public static void main (String【】args) throws Exception{
if (args.length != 2){
System.err.printin( "Usage: MaxTemperature cinput path> ")
System.exit(-1);
}
Configuration conf = new Configuration();
conf set("dfs.defaultFS", "hdfs://hadoop:9000");
Job job = Job.getinstange(conf, "max temperature")
job setlarByClass(MaxTemperature.class);
job. setobName( "Max temperature");
FilelnputFormat.addlnputPath(job, new Path(args[OI);
FileOutputFormat.setOutputPath(job, new Path(args[1J》)
job.setMapperClss(MaxTemperatureMapper.class); job.setReducerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass( Text.class); job.setOutputValueClass(IntWritable.class);
System.exit(job.waltForcompletion(true) ?0:1);
}
然后 job 跟前面的一样,作业是当前这个类,给作业起个名字。输入的文件在哪里输出文件在哪里,设置 map和reduce 多少,输入的 key 值是什么。这个代码跟刚才的没什么区别,只是具体的 maper 和 reducer 不同。有了这样的东西,再去运行,结果就会产生这样的输出。
这个例子能看到1901年的数据,一行有很多字符,然后里面描述的不同,刚才写了三个类,就是 job 类,另外两个一个是 maper 一个是 reducer。现在做的事情比较简单,运行这个 job 逐一输出到 output 里面,input 就是1901年和1902年两年的温度。然后就产生了 output 里面的结果,1901年查出来最高温是317,1902年是244。跟上一个例子的结果是一样的。
这个例子会发现运行两遍,结果是一样的,这就说明在运程的支持上去做处理,结果不会每次都不一样,结果一定是稳定的例子,第二我们可以看到,是用两个 maper 运行两个文件。然后做 reducer。跟刚才的例子道理类似,所有的复杂性全部给屏蔽掉,全部被我们看到的框架就屏蔽了。
public class MaxTemperatureReducer
extends ReducercText, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key/Iterablelvalues
Context context)
throws IOException, InterruptedException {
int maxValue = Integer.MIN_ VALUE;
for (IntWritable value :values){
maxValue = Math.max( maxValue,value.get());
}
context.write(keynew IntWritable(maxValue));
}
}
上面就是 map reduce 框架的具体逻辑,两部分代码最高温度的例子 word count 的例子只是在 map 和 reduce逻辑上不同,其他的都一样。所以框架已经让运行变得非常简单,只需要关注业务逻辑就可以,其他复杂的已经全部处理了。
系统都基于 HDFS 来做处理。所以在 map reduce 进行处理的时候,实际可以在 HDFS 这个集群上去做 scaling out,也就是把任务分发到多个机器上面处理(把输入分成很多小块,尺寸都差不多。
●Hadoop divides the input to a MapReduce job into fixed-size pieces called input splits, or just splits.
一 Hadoop creates one map task for each split, which runs the user defined map function for each record in the split.
● Having many splits means the time taken to process each split is small compared to the time to process the whole input.
- On the other hand, if splits are too smal, then the overhead of managing the splits and of map task creation begins to dominate the total job execution time,.
-For most jobs,a g00d split size tends to be the size of an HDFS block,6 MB by default.
不同的map和reduce在不同的地方,处理其中的一小块,每一个 map 处理一小块,会产生一个结果,然后所有的处理完之后合并到一起产生输出,HDFS 里面默认存储大小是64MB。切分的时候最好也是64MB,当有一个20g 影片,给每一份里面打水印的时候需要分成很多64MB 的小块,如果不合适可以改。在执行 map 打水印动作的时候,就有很多 maper 可以提出来,在不同的部分里面处理。
五、Scaling Out
如果输入被分成了三块,每一块处理完之后,把结果拷到一个地方,通过 reduce 产生结果,甚至 reduce 本身就跟复杂,所以切开以后按照一定逻辑,
例如处理人名,A-N 结果放在上面,O-Z 放在下面,每个 map 都这样处理,然后 A-N 存到一个机器上,O-Z存到另外一个机器上,右边的上面就会所有 A-N 的,右边的下边就会有所有 O-Z 的。然后把他们合成一个文件,用一个reducer来处理,就得到了一个从 A-N、O-Z 的结果,合起来就是最终结果,在这个例子中,mapet和reducer都是多个.
如果操作足够简单,不需要reduce动作,那可能直接 map 出来,把结果合并产生一个输出。计算 maper 和 reducer 可以编程,也可以不去指定call,Hadoop 框架可以 master 自动处理,通常会根据机器上的一些配置信息来决定,map 不一定要和数量一样,可以少一点.