Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容

简介: Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容

章节内容

上一节我们完成了:


新工程的建立 和 POM 的导入

Java连接到HDFS集群

Java操作HDFS集群,如上传下载,遍历目录,PUT GET 等等操作

背景介绍

这里是三台公网云服务器,每台 2C4G,搭建一个Hadoop的学习环境,供我学习。

之前已经在 VM 虚拟机上搭建过一次,但是没留下笔记,这次趁着前几天薅羊毛的3台机器,赶紧尝试在公网上搭建体验一下。


注意,如果你和我一样,打算用公网部署,那一定要做好防火墙策略,避免不必要的麻烦!!!

请大家都以学习为目的,也请不要对我的服务进行嗅探或者攻击!!!


但是有一台公网服务器我还运行着别的服务,比如前几天发的:autodl-keeper 自己写的小工具,防止AutoDL机器过期的。还跑着别的Web服务,所以只能挤出一台 2C2G 的机器。那我的配置如下了:


2C4G 编号 h121

2C4G 编号 h122

2C2G 编号 h123

MapReduce 图片介绍

Hadoop 序列化

为什么是Hadoop实现的序列化,而不是使用Java自带的?


序列化在分布式中非常重要,在Hadoop中,集群中多个节点之间的通信是通过RPC实现的,RPC将数据序列化为二进制的流发送到远程节点,远程节点接收到二进制的流数据之后再转换为原始的消息。

RPC可以更小的体积更快的速度。

Hadoop使用自己的Writable,它比Java的序列化更紧凑更快,一个对象使用序列化后,会携带额外的校验信息等等···

Mapper规范

用户自定义一个Mapper类继承Hadoop的Mapper类

Mapper的输入数据是KV的形式

Map阶段的业务逻辑定义子啊map()方法中

Mapper的输出数据是KV对的形式

Reducer规范

用户自定义Reducer类要继承Hadoop的Reducer类

Reducer的输入数据类型对应Mapper的数据类型

Reducer的业务逻辑在reduce()方法中

Reduce()方法是对相同的K的一组KV对 调用执行一次

Driver规范

创建提交YARN集群运行的JOB对象,其中封装了MapReduce程序运行所需要的相关参数:


输入数据路径

输出数据路径

Mapper

Reducer

也相当于一个YRAN集群的客户端,主要作用就是提交我们的MapReduce程序运行。


WordCount

需求介绍

下面是我做的一些操作,这里有一个 1.txt 文档,当中是一些文本内容。我们将对其做计算,统计出每个单词出现的频率。

root@hecs-393573:/opt/wzk# ls
wordcount.txt
root@hecs-393573:/opt/wzk# hadoop fs -get /wzk/test/1.txt ./
root@hecs-393573:/opt/wzk# ls
1.txt  wordcount.txt
root@hecs-393573:/opt/wzk# hadoop fs -ls /
Found 5 items

操作输出的结果 如下

实现步骤

  • 创建工程
  • 导入POM
  • 编写 Mapper
  • 编写 Reducer
  • 编写 Driver

POM

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>hadoop-demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <!-- Hadoop Dependencies -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.9.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.9.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.9.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-common</artifactId>
            <version>2.9.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.8.2</version>
        </dependency>
    </dependencies>

    <!--maven打包插件 -->
    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

Mapper编写

按照上述的规范,我们编写一个Mapper出来

package icu.wzk.demo02;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    Text k = new Text();
    IntWritable v = new IntWritable(1);
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] words = line.split(" ");
        for (String word : words) {
            k.set(word);
            context.write(k, v);
        }
    }
}

Reducer编写

同样,按照之前文章中的说的规范,编写一个 Reducer 出来。

Driver编写

package icu.wzk.demo02;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;

import java.io.IOException;

public class WordCountDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        // String inputPath = args[0];
        // String outputPath = args[1];

        // === 测试 ===
        String inputPath = "wc.txt";
        String outputPath = "wc-out";
        // =======

        // 配置
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        // 运行的类
        job.setJarByClass(WordCountDriver.class);
        // Mapper
        job.setMapperClass(WordCountMapper.class);
        // Reducer
        job.setReducerClass(WordCountReducer.class);
        // Map Output Key Value
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        // 最终 Key Value
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // 路劲参数
        FileInputFormat.setInputPaths(job, new Path(inputPath));
        FileOutputFormat.setOutputPath(job, new Path(outputPath));
        // 等待结果
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }

}

本地测试

在代码中,修改为本地的路径。

// === 测试 ===
String inputPath = "wc.txt";
String outputPath = "wc-out";
// =======

运行 Driver 代码之后,我们发现文件目录中生成了如下的结果

打开文本内容,我们可以看到如下的结果

目录
相关文章
|
13天前
|
分布式计算 Java Hadoop
java使用hbase、hadoop报错举例
java使用hbase、hadoop报错举例
58 6
|
7月前
|
XML 存储 分布式计算
【赵渝强老师】史上最详细:Hadoop HDFS的体系架构
HDFS(Hadoop分布式文件系统)由三个核心组件构成:NameNode、DataNode和SecondaryNameNode。NameNode负责管理文件系统的命名空间和客户端请求,维护元数据文件fsimage和edits;DataNode存储实际的数据块,默认大小为128MB;SecondaryNameNode定期合并edits日志到fsimage中,但不作为NameNode的热备份。通过这些组件的协同工作,HDFS实现了高效、可靠的大规模数据存储与管理。
671 70
|
9月前
|
存储 分布式计算 Hadoop
基于Java的Hadoop文件处理系统:高效分布式数据解析与存储
本文介绍了如何借鉴Hadoop的设计思想,使用Java实现其核心功能MapReduce,解决海量数据处理问题。通过类比图书馆管理系统,详细解释了Hadoop的两大组件:HDFS(分布式文件系统)和MapReduce(分布式计算模型)。具体实现了单词统计任务,并扩展支持CSV和JSON格式的数据解析。为了提升性能,引入了Combiner减少中间数据传输,以及自定义Partitioner解决数据倾斜问题。最后总结了Hadoop在大数据处理中的重要性,鼓励Java开发者学习Hadoop以拓展技术边界。
282 7
|
12月前
|
分布式计算 Kubernetes Hadoop
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
448 6
|
12月前
|
分布式计算 NoSQL Java
Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码
Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码
170 2
|
12月前
|
分布式计算 Java Hadoop
Hadoop-30 ZooKeeper集群 JavaAPI 客户端 POM Java操作ZK 监听节点 监听数据变化 创建节点 删除节点
Hadoop-30 ZooKeeper集群 JavaAPI 客户端 POM Java操作ZK 监听节点 监听数据变化 创建节点 删除节点
201 1
|
12月前
|
SQL 分布式计算 关系型数据库
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
287 0
|
12月前
|
SQL 分布式计算 关系型数据库
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
140 0
|
12月前
|
SQL 分布式计算 关系型数据库
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
191 0
|
12月前
|
SQL 分布式计算 监控
Hadoop-20 Flume 采集数据双写至本地+HDFS中 监控目录变化 3个Agent MemoryChannel Source对比
Hadoop-20 Flume 采集数据双写至本地+HDFS中 监控目录变化 3个Agent MemoryChannel Source对比
172 3

热门文章

最新文章