Hadoop 2(一)|学习笔记

简介: 快速学习 Hadoop 2(一)

开发者学堂课程【高校精品课-上海交通大学-企业级应用体系架构:Hadoop 2】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/75/detail/15842


Hadoop 2(一)

 

内容介绍:

一、Map Reduce

二、Map Reduce-WordCount

三、MapReduce

四、Java MapReduce

五、Scaling Out

六、MapReduce inside:JobTracker

七、YARN

 

一、Map Reduce

The MapReduce framework

-operates exclusively on  pairs, that is, the framework viewS the input to the job as a set of  pairs and produces a set of  pairs as the output of the job, conceivably of different types.

The key and value classes

-have to be serializable by the framework and hence need to implement the Writable interface. Additionally, the key classes have to implement the Writablecomparable interface to facilitate sorting by the framework.

Input and Output types of a MapReduce job:

- (input)->map ->-> combine ->-> reduce ->(output)

以上出现的 MapReduce 处理的都是键值对。key、value 要去实现W ritable 的结果。然后 MapReduce 作业的输入输出类型分别是键值对作为输入进入 Map 之后,把它映射成为一个新的键值对,然后通过 combine 等多个 map出来的结果最后产生一个输出。

 

二、Map Reduce-WordCount

public class WordCount {

public static class TokenizerMapper

extends MappercObject, Text, Text, IntWritable>{

Private final static IntWritable one = new IntWritable(1);

private Text word = new Text();

public void map(Object key, Text value, Context context ) throws IOException, InterruptedException {

StringTokenizer itr = new StringTokenizer(value.toString0));

while (itr.hasMoreTokens()){

wordset(itr.next Token0);

context.write(word, one);

}

}

}

代码 WordCount。代码是首先有个 map,扩展出一个 may,在代码中真正起作用的是 map 这个方法把把传递过来的值转换为一个字符串。就是图中传递进来的文本给转成字符串,拿到字符串之后就需要通过空格断开成每个 Token,看里面是否有更多的 Token,如果有就要把每一个 Token 都做处理。

这个处理需要把 Token 拿出来,拿出来之后要往最终 text 里面写进去一个内容,就是写单词出现了几次,类似,这样的键值对。可以看到,每次拿到文本,断开,之后看里面的每一个单词,每个单词都设置成一个word 对象,这个 word对象可以在图里看到,都设置成了 text 对象,然后每一次写进去"text one",最终 text 里面就包含了所有输入的文本,所有里面的单词的一个便利结果(注意这个编程里面并没有统计,是碰到一个单词就产生一个输出,如果一个词出现了多次,就会出现多个。)

public static class IntSumReducer

extends Reducer{

private IntWritable result= new IntWritablel);

public void reduce(Text key, Iterable values Context context  

) throWs IOException, InterruptedException

int sum = 0;

for (IntWritable val : values) {\

Sum += val.get0);

}

result.set(sum);

context.write(key, result);

}

}

reduce 是另外一类,reducer 里面最重要的是 reduce,代码很像。map  对应 map,reduce 对应 reducer,两个是不同的类,为了方便给的例程里面是放到了 WordCount 类,作为这两类的两个子类,这两个内部类 static,意味这两个类将来编译出来之后不依赖于 WordCount,WordCount 可以单独存在。然后是 reducer,给 reducer 一批东西,context也是 reducer 的结果,便利一下 values 里面的内容。

给了一个 context 的可能 key,表示一个单词,我是对每一个单词种找到组,有一组整数,把这组整数里面的每一个整数提出来,从里面获取它的数量,累加到之前的 sum 上面,就会得到当前的这个 key 所有出现的次数,把这个总数设置到 result 里。最终的 key 没变,result 就是统计出来的结果。

Use the following:

etc/hadoop/core-site.xml:

<configuration>

<property>

<name>fs.defaultFS</name>

<value>hdfs://localhost:9000</value>

</property>

</configuration>

etc/hadoop/hdfs-site.xml:

<configuration>

<property>

<name>dfs.replications/name>

<value>1</value>

</property>

</configuration>

最后是整个 word count 类的一个 main 函数,这个函数就是前面的程序在做的全是准备工作,第一个就是创建了一个configuration 对象,里面需要设置用的位置,和下图的程序里面的设计一样,可以对应上。上面两个参数必须要设置原因,一个是 dfs 分布系统缺少什么,系统的位置在哪里,两个参数是一样的。

然后针对 conf  产生一个作业,起个名字叫 word count,然后要尝试设置 word count 为 map,和上面的程序一样。中间还有 combiner,意思就是把多个不同的 map 产生的结果放在和的时候中间还有一个像 shaper 那种动作,所这个例子非常简单,combiner 和 reducer 的动作是一个类来实现。即把这一个 key 对应的所有次数的东西组装到一个集合类里面以及对这个集合类里面所有整数做一个累加,是一个类来实现。上面这个程序是在说明整个 job 是什么来表示的,也就是当前写的 word count class。然后输出里面的 key 是文本,值是次数(整数),和前面内容也有对应。输入和输出一定要对应。

public static void main(Stringl args) throws Exception{

Configuration conf = new Configuration();

conf.se(" dfs.defaultFS", "hdfs://hadoop:9000");

Job job = Job.getlnstance(conf,word coUnte);

job.setarByClass(WordCount.class);

job.setMapperClass(TokenizerMapper.class);

job.setcombinerClass(IntSumReducer.Classi job.setReducerClass(lntSumReducer.class);

job.setOutputKeyClass( Text.class);

job.setOutputValueClass(IntWritable.class);

FilelnputFormat.addlnputPath(job, new Path(args[oJ);

FileOutputFormat.setOutputPath(job, new Path(args[1I)):

System.exit(job.waitForcompletion(true) ?0:1);

输入文件和输出文件分别用运行加上虚拟机时候传递的参数表示,可以对指定目录下的文件进行范围操作,输出到指定的目录里。然后等系统退出,等 job 完成,正常退出,如果任务是异常方式完成的,那系统就要以结束码为1的方式解除;如果没问题,程序就会以0结束,如果出现任何问题都会以1结束。

image.png

image.png

如图是一个代码,已经运行了 start-dfs.sh 脚本,已经运行了一个系统,然后可以看到一个代码,之前已经运行过一次,所以结果表示没有问题,之后就退出结束。代码本身的内容跟之前一样,不同的是之前的 ppt 里面内容过长,已经删除,从后面开始。运行的时候就是这样,通过参数可以看到,

image.png

input2目录里东西写到output2里去,如果出现过一次,第二遍的时候会提示output2目录已经存在,不让继续运行,要删除。所以其实只有一个代码 word count ,运行。可以看到 input2 里面有两个文件,一个是 Hello World Bye World ,一个是 Hello Hadoop Goodbye Hadoop 两个文件,对这两个文件再次统计,观察每个单词出现多少次,再次运行,

image.png

运行之后发现有个明显停顿,实际在执行 Hadoop 的时候,时间是图中这样变化的,

image.png

有一部分启动时间,即任何一个程序都要花大概相同的时间,后面要随文件的尺寸,时间,增加一个时间,所以当文件尺寸较大的时候,这个问题才会被分摊掉,才会觉得停顿不明显,现在示例文件太小,所以会觉得有明显卡顿,得出结论就是不适合处理小结论。然后运行之后发现多了一个 output2 的目录,也就是处理完的结构目录,结构目录主要是文件统计的结果,按字母排序统计的

image.png

bye 一次 ;Goodbye 一次; Hadoop 两次;Hello 两次 ;World 两次。 通过这个例子,确定结果是完全正确的。

再举一个较大数据的例子,开始弄一个 result 的文本文件,input1里面输出到 output 1 里面,再次运行 outcome,这次花的时间较长一些,但是没有明显的差别,从结果可以看出文件规模比刚才小例子多。当文件规模很小,运行的效果不明显,越大优势越明显。

image.png

看结果,分号开头加了一个 get,那就是说他有一块一块,所以要做好,需要对结果再做处理,把标点符号都删掉。文件就变得很长了,这个例子就有722行,所以共有722个单词。

image.png

为了运行这些例子,需要引入一些东西,所以就有了Hadoop-common,client-common等。这样代码才能运行。

<dependencies>

<dependency>

<groupld>org.apache.hadoop</groupld>

<artifactid>hadoop-common </artifactid>

<version>3.2.1 </version>

</dependency>

<dependency>

<groupld>org.apache.hadoop</groupld>

<artifactid>hadoop-mapreduce-client-commen< artfactid>

<version>3.2.1</version>

</dependeney>

</dependency>

<groupld> org.apache.hadoop /groupld>

<artifactld>hadoop-mapreduce-client-common/artactid>

<version>3.2.1</version>

<dependency>

</dependencies>

/project>

运行的代码用的 meand 创建,所以在结果有一个依赖库,直接查就可以。

通过例子,大体上知道 map reduce 的工作原理。这两个文件,得出了两个结果。

public void map(Object key, Text value, Context context

) throws IOException, InterruptedException {

StringTokenizer itr.= new StringTokenizer(value.toString());

while (itr.hasMoreTokens(){

word.set(itr.nextToken());

context.write(word, one);

}

}

For the given sample input the first map emits:

<Helo, 1><World, 1>< Bye, 1><World, 1>The second map emits:<Helo, 1><Hadoop, 1><Goodbye, 1>< Hadoop, 1>

第一个文件因为有两个文件,所以它里面有两个 map,第一个 map 产生的这样的数据,也就是针对每一个在处理一个文件里面传递进去的 value 的文本的内容,分别会产生不同的输出,然后因为已经指定了 combiner 也是 reduce这一类,也就是 combiner 会把同一个 map reduce 里面的结果再做一次处理,在同一个 reduce 里面再把两部分结合起来,最后得到了这样的结果。


三、MapReduce

A Weather Dataset

- The data we will use is from the National Climatic Data Center NCDC http:WWW.ncdc.n0aa,gOV). The data is stored using a line-oriented ASCIIformat, in which each line is a record.

-Sample: The line has been split into multiiE lines to show each field: in the real file, fields are packed into one lincith no delimiters.

0057  

332130    #USAF weather station identifier

99999   # WBAN weather station ldentifier

19500101 # observation date

0300    #observation time

4

+51317 #latitude (degreesX1000)

+028783# longitude (degrees x 1000)

FM-12

+-0171 # elevation [meters)

.......

再看一个更复杂的例子,在例子里的网站上是天气数据,天气数据是一行一行的去表示的,但是每行表示的比较麻烦,断开了一行数字和符号包含了不同的部分,是拼接起来的。然后可以看到日期等的观测时间,观测的经度纬度等,由不同的列构成。目的是知道在统计的时候,每一年的最高温度是多少,哪一年哪一月的哪一天观测到的数据是什么。由例子能知道一年的数据,能知道什么时候温度最高,有两个年份就会有两个文件,分别输出两年里温度最高的温度是多少。所以 map reduce 就会作用于这个阶段或者 reduce 阶段都是产生的键值队,两种情况不一样。所以在 map 阶段拿到原始的数据之后,需要把数据集里面根据刚才讲的每一行数据里面每一部分都表示什么意思把表示温度的那一个 offset 拿出来。抽取年份和温度出来,因为只处理哪一年这个温度,所以在map函数里面要去做这样的处理,在 reduce 阶段找每一年的最高温度。所以 map 和 reduce 分别要做这样的命令。还要把不良数据过滤掉,即在温度缺失或者错误情况下,把不良数据过滤掉。

To visualize the way the map works , consider the following sample lines of input data

00670119909999919500515070049999999N9+00001+99999999999...

0043011990999991950051512004.9999999N9+00221+99999999999...

0043011990999991950051518004.9999999N9-00111+99999999999...

0043012650999991949032412004._0500001N9101111199999999999...

0043012650999991949032418004..0500001N9+0078199999999999...

These lines are presented to the map function as the key - value pairs:

(0.0067011990999991950051507004_9999999N9+00001+99999999999)

(106,0043011990999991950051512004...9999999N9+00221+99999999999)

(212.00430119909999919500515180049999999N9-0011l+99999999999

(318,00430126509999919490324120040500001N9+0111l+99999999999)

(424,0043012650999991949032418004050000IN9+00781+99999999999.)

这是我们刚才看到的真正的每一行数据都是这么写的。要知道在里面,每一行看起来是一行一行的,但还有一个问题,在数据集里,整个行是在一个文件里,一行是106字符。实际要做的事情是在 map 这个阶段,行是没有用的,关键的问题是要断成一条一条记录(从文件的开头跳过字节取一条记录,后面信息就表示记录)。一条记录是106字节,所以第二条记录也是跳过106字节再取,依次类推。map 的目的就是产生偏移量以及真正表示这条数据的内容。

The key are the line offsets within the file.which we ignore in our map function.

- The map function merely extracts the year and the air temperature (indicated in bold text).and emits them as its output(the temperature values have been interpreted as integers):

(1950,0)

(1950,22)

(1950,-11)

(1949,111)

(1949,78)

The output from the map function is processed by the MapReduce framework before being sent to the reduce function.

This processing sorts and groups the key-value pairs by key .

-So, continuing the example, our reduce function sees the following input

(1949. [111,78])

(1950, 0.22-11】)

Each year appears with a list of al its air temperature readings.

这个 map 函数做的第一个动作就是把年份和记录先抽取出来,在 map 里我们在之前基础上只抽取年份和温度,map输出之后,要做一个 combiner,即在本地记录里面的单词。所以在 map 这个阶段要产生这样的东西。

image.png

然后再有 reduce,就是每一个年份后面跟着的集合里面取最大的出来。整个过程就是原始的输入,然后需要预处理产生输出,即把一条一条记录拿出来,然后在 map 阶段要把偏移量抽取出来,抽取年份和温度产生键值对。然后需要做一个 shuffle 把本地处理的结果合并得到了1949年和1950年的所有的温度,然后 reduce 就是在集合里面去挑取最高温度,最终产生一个结果。

相关文章
|
9月前
|
消息中间件 存储 分布式计算
Hadoop学习笔记(HDP)-Part.19 安装Kafka
01 关于HDP 02 核心组件原理 03 资源规划 04 基础环境配置 05 Yum源配置 06 安装OracleJDK 07 安装MySQL 08 部署Ambari集群 09 安装OpenLDAP 10 创建集群 11 安装Kerberos 12 安装HDFS 13 安装Ranger 14 安装YARN+MR 15 安装HIVE 16 安装HBase 17 安装Spark2 18 安装Flink 19 安装Kafka 20 安装Flume
225 0
Hadoop学习笔记(HDP)-Part.19 安装Kafka
|
9月前
|
SQL 分布式计算 Hadoop
Hadoop学习笔记(HDP)-Part.08 部署Ambari集群
01 关于HDP 02 核心组件原理 03 资源规划 04 基础环境配置 05 Yum源配置 06 安装OracleJDK 07 安装MySQL 08 部署Ambari集群 09 安装OpenLDAP 10 创建集群 11 安装Kerberos 12 安装HDFS 13 安装Ranger 14 安装YARN+MR 15 安装HIVE 16 安装HBase 17 安装Spark2 18 安装Flink 19 安装Kafka 20 安装Flume
320 0
Hadoop学习笔记(HDP)-Part.08 部署Ambari集群
|
9月前
|
分布式计算 资源调度 Hadoop
安装hadoop学习笔记
安装hadoop学习笔记
82 0
安装hadoop学习笔记
|
9月前
|
分布式计算 资源调度 Hadoop
Hadoop学习笔记(HDP)-Part.18 安装Flink
01 关于HDP 02 核心组件原理 03 资源规划 04 基础环境配置 05 Yum源配置 06 安装OracleJDK 07 安装MySQL 08 部署Ambari集群 09 安装OpenLDAP 10 创建集群 11 安装Kerberos 12 安装HDFS 13 安装Ranger 14 安装YARN+MR 15 安装HIVE 16 安装HBase 17 安装Spark2 18 安装Flink 19 安装Kafka 20 安装Flume
337 2
Hadoop学习笔记(HDP)-Part.18 安装Flink
|
9月前
|
SQL 分布式计算 Hadoop
Hadoop学习笔记(HDP)-Part.16 安装HBase
01 关于HDP 02 核心组件原理 03 资源规划 04 基础环境配置 05 Yum源配置 06 安装OracleJDK 07 安装MySQL 08 部署Ambari集群 09 安装OpenLDAP 10 创建集群 11 安装Kerberos 12 安装HDFS 13 安装Ranger 14 安装YARN+MR 15 安装HIVE 16 安装HBase 17 安装Spark2 18 安装Flink 19 安装Kafka 20 安装Flume
179 1
Hadoop学习笔记(HDP)-Part.16 安装HBase
|
9月前
|
SQL 分布式计算 Hadoop
Hadoop学习笔记(HDP)-Part.15 安装HIVE
01 关于HDP 02 核心组件原理 03 资源规划 04 基础环境配置 05 Yum源配置 06 安装OracleJDK 07 安装MySQL 08 部署Ambari集群 09 安装OpenLDAP 10 创建集群 11 安装Kerberos 12 安装HDFS 13 安装Ranger 14 安装YARN+MR 15 安装HIVE 16 安装HBase 17 安装Spark2 18 安装Flink 19 安装Kafka 20 安装Flume
257 1
Hadoop学习笔记(HDP)-Part.15 安装HIVE
|
9月前
|
分布式计算 Hadoop 关系型数据库
Hadoop学习笔记(HDP)-Part.10 创建集群
01 关于HDP 02 核心组件原理 03 资源规划 04 基础环境配置 05 Yum源配置 06 安装OracleJDK 07 安装MySQL 08 部署Ambari集群 09 安装OpenLDAP 10 创建集群 11 安装Kerberos 12 安装HDFS 13 安装Ranger 14 安装YARN+MR 15 安装HIVE 16 安装HBase 17 安装Spark2 18 安装Flink 19 安装Kafka 20 安装Flume
215 1
Hadoop学习笔记(HDP)-Part.10 创建集群
|
9月前
|
SQL 消息中间件 关系型数据库
Hadoop学习笔记(HDP)-Part.04 基础环境配置
01 关于HDP 02 核心组件原理 03 资源规划 04 基础环境配置 05 Yum源配置 06 安装OracleJDK 07 安装MySQL 08 部署Ambari集群 09 安装OpenLDAP 10 创建集群 11 安装Kerberos 12 安装HDFS 13 安装Ranger 14 安装YARN+MR 15 安装HIVE 16 安装HBase 17 安装Spark2 18 安装Flink 19 安装Kafka 20 安装Flume
139 1
|
9月前
|
消息中间件 存储 分布式计算
Hadoop学习笔记(HDP)-Part.20 安装Flume
01 关于HDP 02 核心组件原理 03 资源规划 04 基础环境配置 05 Yum源配置 06 安装OracleJDK 07 安装MySQL 08 部署Ambari集群 09 安装OpenLDAP 10 创建集群 11 安装Kerberos 12 安装HDFS 13 安装Ranger 14 安装YARN+MR 15 安装HIVE 16 安装HBase 17 安装Spark2 18 安装Flink 19 安装Kafka 20 安装Flume
155 0
Hadoop学习笔记(HDP)-Part.20 安装Flume
|
9月前
|
分布式计算 资源调度 Java
Hadoop学习笔记(HDP)-Part.17 安装Spark2
01 关于HDP 02 核心组件原理 03 资源规划 04 基础环境配置 05 Yum源配置 06 安装OracleJDK 07 安装MySQL 08 部署Ambari集群 09 安装OpenLDAP 10 创建集群 11 安装Kerberos 12 安装HDFS 13 安装Ranger 14 安装YARN+MR 15 安装HIVE 16 安装HBase 17 安装Spark2 18 安装Flink 19 安装Kafka 20 安装Flume
132 0
Hadoop学习笔记(HDP)-Part.17 安装Spark2

相关实验场景

更多