章节内容
上一节我们完成了:
新工程的建立 和 POM 的导入
Java连接到HDFS集群
Java操作HDFS集群,如上传下载,遍历目录,PUT GET 等等操作
背景介绍
这里是三台公网云服务器,每台 2C4G,搭建一个Hadoop的学习环境,供我学习。
之前已经在 VM 虚拟机上搭建过一次,但是没留下笔记,这次趁着前几天薅羊毛的3台机器,赶紧尝试在公网上搭建体验一下。
注意,如果你和我一样,打算用公网部署,那一定要做好防火墙策略,避免不必要的麻烦!!!
请大家都以学习为目的,也请不要对我的服务进行嗅探或者攻击!!!
但是有一台公网服务器我还运行着别的服务,比如前几天发的:autodl-keeper 自己写的小工具,防止AutoDL机器过期的。还跑着别的Web服务,所以只能挤出一台 2C2G 的机器。那我的配置如下了:
2C4G 编号 h121
2C4G 编号 h122
2C2G 编号 h123
MapReduce 图片介绍
Hadoop 序列化
为什么是Hadoop实现的序列化,而不是使用Java自带的?
序列化在分布式中非常重要,在Hadoop中,集群中多个节点之间的通信是通过RPC实现的,RPC将数据序列化为二进制的流发送到远程节点,远程节点接收到二进制的流数据之后再转换为原始的消息。
RPC可以更小的体积更快的速度。
Hadoop使用自己的Writable,它比Java的序列化更紧凑更快,一个对象使用序列化后,会携带额外的校验信息等等···
Mapper规范
用户自定义一个Mapper类继承Hadoop的Mapper类
Mapper的输入数据是KV的形式
Map阶段的业务逻辑定义子啊map()方法中
Mapper的输出数据是KV对的形式
Reducer规范
用户自定义Reducer类要继承Hadoop的Reducer类
Reducer的输入数据类型对应Mapper的数据类型
Reducer的业务逻辑在reduce()方法中
Reduce()方法是对相同的K的一组KV对 调用执行一次
Driver规范
创建提交YARN集群运行的JOB对象,其中封装了MapReduce程序运行所需要的相关参数:
输入数据路径
输出数据路径
Mapper
Reducer
也相当于一个YRAN集群的客户端,主要作用就是提交我们的MapReduce程序运行。
WordCount
需求介绍
下面是我做的一些操作,这里有一个 1.txt 文档,当中是一些文本内容。我们将对其做计算,统计出每个单词出现的频率。
root@hecs-393573:/opt/wzk# ls wordcount.txt root@hecs-393573:/opt/wzk# hadoop fs -get /wzk/test/1.txt ./ root@hecs-393573:/opt/wzk# ls 1.txt wordcount.txt root@hecs-393573:/opt/wzk# hadoop fs -ls / Found 5 items
操作输出的结果 如下
实现步骤
- 创建工程
- 导入POM
- 编写 Mapper
- 编写 Reducer
- 编写 Driver
POM
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>hadoop-demo</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <!-- Hadoop Dependencies --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.9.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.9.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.9.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-common</artifactId> <version>2.9.0</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.8.2</version> </dependency> </dependencies> <!--maven打包插件 --> <build> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
Mapper编写
按照上述的规范,我们编写一个Mapper出来
package icu.wzk.demo02; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { Text k = new Text(); IntWritable v = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { String line = value.toString(); String[] words = line.split(" "); for (String word : words) { k.set(word); context.write(k, v); } } }
Reducer编写
同样,按照之前文章中的说的规范,编写一个 Reducer 出来。
package icu.wzk.demo02; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { int sum; IntWritable v = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { sum = 0; for (IntWritable count : values) { sum += count.get(); } v.set(sum); context.write(key, v); } }
Driver编写
package icu.wzk.demo02; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.Job; import java.io.IOException; public class WordCountDriver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { // String inputPath = args[0]; // String outputPath = args[1]; // === 测试 === String inputPath = "wc.txt"; String outputPath = "wc-out"; // ======= // 配置 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); // 运行的类 job.setJarByClass(WordCountDriver.class); // Mapper job.setMapperClass(WordCountMapper.class); // Reducer job.setReducerClass(WordCountReducer.class); // Map Output Key Value job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 最终 Key Value job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 路劲参数 FileInputFormat.setInputPaths(job, new Path(inputPath)); FileOutputFormat.setOutputPath(job, new Path(outputPath)); // 等待结果 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
本地测试
在代码中,修改为本地的路径。
// === 测试 === String inputPath = "wc.txt"; String outputPath = "wc-out"; // =======
运行 Driver
代码之后,我们发现文件目录中生成了如下的结果
打开文本内容,我们可以看到如下的结果