Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容

简介: Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容

章节内容

上一节我们完成了:


新工程的建立 和 POM 的导入

Java连接到HDFS集群

Java操作HDFS集群,如上传下载,遍历目录,PUT GET 等等操作

背景介绍

这里是三台公网云服务器,每台 2C4G,搭建一个Hadoop的学习环境,供我学习。

之前已经在 VM 虚拟机上搭建过一次,但是没留下笔记,这次趁着前几天薅羊毛的3台机器,赶紧尝试在公网上搭建体验一下。


注意,如果你和我一样,打算用公网部署,那一定要做好防火墙策略,避免不必要的麻烦!!!

请大家都以学习为目的,也请不要对我的服务进行嗅探或者攻击!!!


但是有一台公网服务器我还运行着别的服务,比如前几天发的:autodl-keeper 自己写的小工具,防止AutoDL机器过期的。还跑着别的Web服务,所以只能挤出一台 2C2G 的机器。那我的配置如下了:


2C4G 编号 h121

2C4G 编号 h122

2C2G 编号 h123

MapReduce 图片介绍

Hadoop 序列化

为什么是Hadoop实现的序列化,而不是使用Java自带的?


序列化在分布式中非常重要,在Hadoop中,集群中多个节点之间的通信是通过RPC实现的,RPC将数据序列化为二进制的流发送到远程节点,远程节点接收到二进制的流数据之后再转换为原始的消息。

RPC可以更小的体积更快的速度。

Hadoop使用自己的Writable,它比Java的序列化更紧凑更快,一个对象使用序列化后,会携带额外的校验信息等等···

Mapper规范

用户自定义一个Mapper类继承Hadoop的Mapper类

Mapper的输入数据是KV的形式

Map阶段的业务逻辑定义子啊map()方法中

Mapper的输出数据是KV对的形式

Reducer规范

用户自定义Reducer类要继承Hadoop的Reducer类

Reducer的输入数据类型对应Mapper的数据类型

Reducer的业务逻辑在reduce()方法中

Reduce()方法是对相同的K的一组KV对 调用执行一次

Driver规范

创建提交YARN集群运行的JOB对象,其中封装了MapReduce程序运行所需要的相关参数:


输入数据路径

输出数据路径

Mapper

Reducer

也相当于一个YRAN集群的客户端,主要作用就是提交我们的MapReduce程序运行。


WordCount

需求介绍

下面是我做的一些操作,这里有一个 1.txt 文档,当中是一些文本内容。我们将对其做计算,统计出每个单词出现的频率。

root@hecs-393573:/opt/wzk# ls
wordcount.txt
root@hecs-393573:/opt/wzk# hadoop fs -get /wzk/test/1.txt ./
root@hecs-393573:/opt/wzk# ls
1.txt  wordcount.txt
root@hecs-393573:/opt/wzk# hadoop fs -ls /
Found 5 items

操作输出的结果 如下

实现步骤

  • 创建工程
  • 导入POM
  • 编写 Mapper
  • 编写 Reducer
  • 编写 Driver

POM

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>hadoop-demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <!-- Hadoop Dependencies -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.9.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.9.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.9.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-common</artifactId>
            <version>2.9.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.8.2</version>
        </dependency>
    </dependencies>

    <!--maven打包插件 -->
    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

Mapper编写

按照上述的规范,我们编写一个Mapper出来

package icu.wzk.demo02;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    Text k = new Text();
    IntWritable v = new IntWritable(1);
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] words = line.split(" ");
        for (String word : words) {
            k.set(word);
            context.write(k, v);
        }
    }
}

Reducer编写

同样,按照之前文章中的说的规范,编写一个 Reducer 出来。

Driver编写

package icu.wzk.demo02;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;

import java.io.IOException;

public class WordCountDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        // String inputPath = args[0];
        // String outputPath = args[1];

        // === 测试 ===
        String inputPath = "wc.txt";
        String outputPath = "wc-out";
        // =======

        // 配置
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        // 运行的类
        job.setJarByClass(WordCountDriver.class);
        // Mapper
        job.setMapperClass(WordCountMapper.class);
        // Reducer
        job.setReducerClass(WordCountReducer.class);
        // Map Output Key Value
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        // 最终 Key Value
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // 路劲参数
        FileInputFormat.setInputPaths(job, new Path(inputPath));
        FileOutputFormat.setOutputPath(job, new Path(outputPath));
        // 等待结果
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }

}

本地测试

在代码中,修改为本地的路径。

// === 测试 ===
String inputPath = "wc.txt";
String outputPath = "wc-out";
// =======

运行 Driver 代码之后,我们发现文件目录中生成了如下的结果

打开文本内容,我们可以看到如下的结果

目录
相关文章
|
2月前
|
分布式计算 Kubernetes Hadoop
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
167 6
|
2月前
|
SQL 分布式计算 Hadoop
Hadoop-37 HBase集群 JavaAPI 操作3台云服务器 POM 实现增删改查调用操作 列族信息 扫描全表
Hadoop-37 HBase集群 JavaAPI 操作3台云服务器 POM 实现增删改查调用操作 列族信息 扫描全表
33 3
|
2月前
|
分布式计算 Hadoop Shell
Hadoop-36 HBase 3节点云服务器集群 HBase Shell 增删改查 全程多图详细 列族 row key value filter
Hadoop-36 HBase 3节点云服务器集群 HBase Shell 增删改查 全程多图详细 列族 row key value filter
58 3
|
7月前
|
分布式计算 Hadoop
Hadoop系列 mapreduce 原理分析
Hadoop系列 mapreduce 原理分析
84 1
|
7月前
|
存储 分布式计算 负载均衡
【大数据技术Hadoop+Spark】MapReduce概要、思想、编程模型组件、工作原理详解(超详细)
【大数据技术Hadoop+Spark】MapReduce概要、思想、编程模型组件、工作原理详解(超详细)
224 0
|
6月前
|
分布式计算 Hadoop Java
Hadoop MapReduce编程
该教程指导编写Hadoop MapReduce程序处理天气数据。任务包括计算每个城市ID的最高、最低气温、气温出现次数和平均气温。在读取数据时需忽略表头,且数据应为整数。教程中提供了环境变量设置、Java编译、jar包创建及MapReduce执行的步骤说明,但假设读者已具备基础操作技能。此外,还提到一个扩展练习,通过分区功能将具有相同尾数的数字分组到不同文件。
64 1
|
6月前
|
数据采集 SQL 分布式计算
|
7月前
|
分布式计算 Hadoop Java
Hadoop MapReduce 调优参数
对于 Hadoop v3.1.3,针对三台4核4G服务器的MapReduce调优参数包括:`mapreduce.reduce.shuffle.parallelcopies`设为10以加速Shuffle,`mapreduce.reduce.shuffle.input.buffer.percent`和`mapreduce.reduce.shuffle.merge.percent`分别设为0.8以减少磁盘IO。
74 1
|
7月前
|
分布式计算 并行计算 搜索推荐
Hadoop MapReduce计算框架
【5月更文挑战第10天】HadoopMapReduce计算框架
54 3
|
6月前
|
存储 分布式计算 Hadoop
Hadoop生态系统详解:HDFS与MapReduce编程
Apache Hadoop是大数据处理的关键,其核心包括HDFS(分布式文件系统)和MapReduce(并行计算框架)。HDFS为大数据存储提供高容错性和高吞吐量,采用主从结构,通过数据复制保证可靠性。MapReduce将任务分解为Map和Reduce阶段,适合大规模数据集的处理。通过代码示例展示了如何使用MapReduce实现Word Count功能。HDFS和MapReduce的结合,加上YARN的资源管理,构成处理和分析大数据的强大力量。了解和掌握这些基础对于有效管理大数据至关重要。【6月更文挑战第12天】
232 0