Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容

简介: Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容

章节内容

上一节我们完成了:


新工程的建立 和 POM 的导入

Java连接到HDFS集群

Java操作HDFS集群,如上传下载,遍历目录,PUT GET 等等操作

背景介绍

这里是三台公网云服务器,每台 2C4G,搭建一个Hadoop的学习环境,供我学习。

之前已经在 VM 虚拟机上搭建过一次,但是没留下笔记,这次趁着前几天薅羊毛的3台机器,赶紧尝试在公网上搭建体验一下。


注意,如果你和我一样,打算用公网部署,那一定要做好防火墙策略,避免不必要的麻烦!!!

请大家都以学习为目的,也请不要对我的服务进行嗅探或者攻击!!!


但是有一台公网服务器我还运行着别的服务,比如前几天发的:autodl-keeper 自己写的小工具,防止AutoDL机器过期的。还跑着别的Web服务,所以只能挤出一台 2C2G 的机器。那我的配置如下了:


2C4G 编号 h121

2C4G 编号 h122

2C2G 编号 h123

MapReduce 图片介绍

Hadoop 序列化

为什么是Hadoop实现的序列化,而不是使用Java自带的?


序列化在分布式中非常重要,在Hadoop中,集群中多个节点之间的通信是通过RPC实现的,RPC将数据序列化为二进制的流发送到远程节点,远程节点接收到二进制的流数据之后再转换为原始的消息。

RPC可以更小的体积更快的速度。

Hadoop使用自己的Writable,它比Java的序列化更紧凑更快,一个对象使用序列化后,会携带额外的校验信息等等···

Mapper规范

用户自定义一个Mapper类继承Hadoop的Mapper类

Mapper的输入数据是KV的形式

Map阶段的业务逻辑定义子啊map()方法中

Mapper的输出数据是KV对的形式

Reducer规范

用户自定义Reducer类要继承Hadoop的Reducer类

Reducer的输入数据类型对应Mapper的数据类型

Reducer的业务逻辑在reduce()方法中

Reduce()方法是对相同的K的一组KV对 调用执行一次

Driver规范

创建提交YARN集群运行的JOB对象,其中封装了MapReduce程序运行所需要的相关参数:


输入数据路径

输出数据路径

Mapper

Reducer

也相当于一个YRAN集群的客户端,主要作用就是提交我们的MapReduce程序运行。


WordCount

需求介绍

下面是我做的一些操作,这里有一个 1.txt 文档,当中是一些文本内容。我们将对其做计算,统计出每个单词出现的频率。

root@hecs-393573:/opt/wzk# ls
wordcount.txt
root@hecs-393573:/opt/wzk# hadoop fs -get /wzk/test/1.txt ./
root@hecs-393573:/opt/wzk# ls
1.txt  wordcount.txt
root@hecs-393573:/opt/wzk# hadoop fs -ls /
Found 5 items

操作输出的结果 如下

实现步骤

  • 创建工程
  • 导入POM
  • 编写 Mapper
  • 编写 Reducer
  • 编写 Driver

POM

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>hadoop-demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <!-- Hadoop Dependencies -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.9.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.9.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.9.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-common</artifactId>
            <version>2.9.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.8.2</version>
        </dependency>
    </dependencies>

    <!--maven打包插件 -->
    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

Mapper编写

按照上述的规范,我们编写一个Mapper出来

package icu.wzk.demo02;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    Text k = new Text();
    IntWritable v = new IntWritable(1);
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] words = line.split(" ");
        for (String word : words) {
            k.set(word);
            context.write(k, v);
        }
    }
}

Reducer编写

同样,按照之前文章中的说的规范,编写一个 Reducer 出来。

Driver编写

package icu.wzk.demo02;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;

import java.io.IOException;

public class WordCountDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        // String inputPath = args[0];
        // String outputPath = args[1];

        // === 测试 ===
        String inputPath = "wc.txt";
        String outputPath = "wc-out";
        // =======

        // 配置
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        // 运行的类
        job.setJarByClass(WordCountDriver.class);
        // Mapper
        job.setMapperClass(WordCountMapper.class);
        // Reducer
        job.setReducerClass(WordCountReducer.class);
        // Map Output Key Value
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        // 最终 Key Value
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // 路劲参数
        FileInputFormat.setInputPaths(job, new Path(inputPath));
        FileOutputFormat.setOutputPath(job, new Path(outputPath));
        // 等待结果
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }

}

本地测试

在代码中,修改为本地的路径。

// === 测试 ===
String inputPath = "wc.txt";
String outputPath = "wc-out";
// =======

运行 Driver 代码之后,我们发现文件目录中生成了如下的结果

打开文本内容,我们可以看到如下的结果

目录
相关文章
|
16天前
|
XML 安全 Java
Java反射机制:解锁代码的无限可能
Java 反射(Reflection)是Java 的特征之一,它允许程序在运行时动态地访问和操作类的信息,包括类的属性、方法和构造函数。 反射机制能够使程序具备更大的灵活性和扩展性
25 5
Java反射机制:解锁代码的无限可能
|
2天前
|
Java
在 Java 中捕获和处理自定义异常的代码示例
本文提供了一个 Java 代码示例,展示了如何捕获和处理自定义异常。通过创建自定义异常类并使用 try-catch 语句,可以更灵活地处理程序中的错误情况。
|
12天前
|
jenkins Java 测试技术
如何使用 Jenkins 自动发布 Java 代码,通过一个电商公司后端服务的实际案例详细说明
本文介绍了如何使用 Jenkins 自动发布 Java 代码,通过一个电商公司后端服务的实际案例,详细说明了从 Jenkins 安装配置到自动构建、测试和部署的全流程。文中还提供了一个 Jenkinsfile 示例,并分享了实践经验,强调了版本控制、自动化测试等关键点的重要性。
42 3
|
18天前
|
存储 安全 Java
系统安全架构的深度解析与实践:Java代码实现
【11月更文挑战第1天】系统安全架构是保护信息系统免受各种威胁和攻击的关键。作为系统架构师,设计一套完善的系统安全架构不仅需要对各种安全威胁有深入理解,还需要熟练掌握各种安全技术和工具。
50 10
|
13天前
|
分布式计算 Java MaxCompute
ODPS MR节点跑graph连通分量计算代码报错java heap space如何解决
任务启动命令:jar -resources odps-graph-connect-family-2.0-SNAPSHOT.jar -classpath ./odps-graph-connect-family-2.0-SNAPSHOT.jar ConnectFamily 若是设置参数该如何设置
|
12天前
|
Java
Java代码解释++i和i++的五个主要区别
本文介绍了前缀递增(++i)和后缀递增(i++)的区别。两者在独立语句中无差异,但在赋值表达式中,i++ 返回原值,++i 返回新值;在复杂表达式中计算顺序不同;在循环中虽结果相同但使用方式有别。最后通过 `Counter` 类模拟了两者的内部实现原理。
Java代码解释++i和i++的五个主要区别
|
20天前
|
搜索推荐 Java 数据库连接
Java|在 IDEA 里自动生成 MyBatis 模板代码
基于 MyBatis 开发的项目,新增数据库表以后,总是需要编写对应的 Entity、Mapper 和 Service 等等 Class 的代码,这些都是重复的工作,我们可以想一些办法来自动生成这些代码。
28 6
|
6月前
|
分布式计算 Hadoop
Hadoop系列 mapreduce 原理分析
Hadoop系列 mapreduce 原理分析
75 1
|
5月前
|
分布式计算 Hadoop Java
Hadoop MapReduce编程
该教程指导编写Hadoop MapReduce程序处理天气数据。任务包括计算每个城市ID的最高、最低气温、气温出现次数和平均气温。在读取数据时需忽略表头,且数据应为整数。教程中提供了环境变量设置、Java编译、jar包创建及MapReduce执行的步骤说明,但假设读者已具备基础操作技能。此外,还提到一个扩展练习,通过分区功能将具有相同尾数的数字分组到不同文件。
62 1
|
5月前
|
数据采集 SQL 分布式计算