大数据MapReduce统计单词实例

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 大数据MapReduce统计单词实例

1 梳理单词计数的执行流程

上面的是单个文件的执行流程,有一些现象看起来还是不明显 下面我们来看一个两个文件的执行流程




2 实战WordCount

前面我们通过理论层面详细分析了单词计数的执行流程,下面我们就来实际上手操作一下。

大致流程如下:

第一步:开发Map阶段代码

第二步:开发Reduce阶段代码

第三步:组装Job

在idea中创建WordCountJob类

添加注释,梳理一下需求:

需求:读取hdfs上的hello.txt文件,计算文件中每个单词出现的总次数

hello.txt文件内容如下:

hello you

hello me

最终需要的结果形式如下:

hello 2

me 1

you 1

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordCountJob {
    /** * 创建自定义mapper类 */
    public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
        /** * 需要实现map函数 
        * 这个map函数就是可以接收k1,v1, 产生k2,v2 
        * * * @param k1 * @param v1 * @param context 
        * * @throws IOException * @throws InterruptedException 
        * */
        @Override
        protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
            // k1代表的是每一行的行首偏移量,v1代表的是每一行内容
            // 对获取到的每一行数据进行切割,把单词切割出来
            String[] words = v1.toString().split(" ");
            for (String word : words) {
                // 迭代切割出来的单词数据
                Text k2 = new Text(word);
                LongWritable v2 = new LongWritable(1L);
                // 把<k2,v2>写出去 context.write(k2,v2);
                context.write(k2, v2);
            }
        }
    }
    /** * 创建自定义reducer类 */
    public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
        /** * 针对<k2,{v2……}>的数据进行累加求和,并且最终把数据转化为k3,v3写出去 * 
        * @param k2 
        * * @param v2s 
        * * @param context 
        * * @throws IOException 
        * * @throws InterruptedException 
        * */
        @Override
        protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context) throws IOException, InterruptedException {
            long sum = 0L;
            for (LongWritable v2 : v2s) {
                sum += v2.get();
            }
            Text k3 = k2;
            LongWritable v3 = new LongWritable(sum);
            context.write(k3, v3);
        }
    }
    public static void main(String[] args) {
        try {
            if (args.length != 2) {
                // 如果传递的参数不够,程序直接退出
                System.exit(100);
            }
            // job需要的配置参数
            Configuration conf = new Configuration();
            // 创建一个job
            Job job = Job.getInstance(conf);
            // 注意:这一行必须设置,否则在集群中执行的是找不到WordCountJob这个类
            job.setJarByClass(WordCountJob.class);
            // 指定输入路径(可以是文件,也可以是目录)
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            // 指定输出路径(只能指定一个不存在的目录)
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            // 指定map相关的代码
            job.setMapperClass(MyMapper.class);
            // 指定k2的类型
            job.setMapOutputKeyClass(Text.class);
            // 指定v2的类型
            job.setMapOutputValueClass(LongWritable.class);
            // 指定reduce相关的代码
            job.setReducerClass(MyReducer.class);
            // 指定k3的类型
            job.setOutputKeyClass(Text.class);
            // 指定v3的类型
            job.setOutputValueClass(LongWritable.class);
            job.waitForCompletion(true);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

现在代码开发完毕了,现在我们是把自定义的mapper类和reducer类都放到了这个WordCountJob类中,主要是为了在学习阶段看起来清晰一些,所有的代码都在一个类中,好找,其实我们完全可以把自定义的mapper类和reducer类单独提出去,定义为单独的类,是没有什么区别的。

ok,那代码开发好了以后想要执行,我们需要打jar包上传到集群上去执行,这个时候需要在pom文件中添加maven的编译打包插件。

<build>
        <!-- compiler插件, 设定JDK版本 -->
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <encoding>UTF-8</encoding>
                    <source>1.8</source>
                    <target>1.8</target>
                    <showWarnings>true</showWarnings>
                </configuration>
            </plugin>
            <!--打包插件-->
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass></mainClass>  //在运行的时候可以动态指定
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
 </build>

注意了,这些添加完以后还有一个地方需要修改,需要在pom中的hadoop-client和log4j依赖中增加scope属性,值为provided,表示只在编译的时候使用这个依赖,在执行以及打包的时候都不使用,因为hadoop-client和log4j依赖在集群中都是有的,所以在打jar包的时候就不需要打进去了,如果我们使用到了集群中没有的第三方依赖包就不需要增加这个provided属性了,不增加provided就可以把对应的第三方依赖打进jar包里面了。

<dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.2.1</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.10</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.10</version>
            <scope>provided</scope>
        </dependency>
</dependencies>

添加好了以后就可以打包了,建议在windows的cmd命令行下cd到项目根目录,然后执行mvn编译打包命令,看到最后输出的BUILD SUCCESS就说明执行成功了


命令执行成功之后,就可以到target目录下获取对应的jar包了,需要使用jar-with-dependencies结尾的那个jar包。

D:\IdeaProjects\db_hadoop\target\db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar

把这个jar包上传到集群的任意一台机器上面或者是hadoop客户端机器上都可以,只要这台机器可以和集群进行交互即可。

注意,这个jar包不能使用java -jar的方式执行,需要使用集群特有的执行方式

我把这个jar包上传到了bigdata01机器的/usr/local/hadoop-3.2.0目录下了,

在向集群中正式提交任务jar包之前需要先把测试数据准备好

在本地创建一个hello.txt文件,内容是

[root@bigdata01 hadoop-3.2.0]# vi hello.txt 
hello you
hello me

单词中间用空格隔开,因为我们在MapReduce代码中是使用空格进行切割单词的。

然后把hello.txt上传到hdfs的test目录下

[root@bigdata01 hadoop-3.2.0]# hdfs dfs -mkdir /test 
[root@bigdata01 hadoop-3.2.0]# hdfs dfs -put hello.txt /test 
[root@bigdata01 hadoop-3.2.0]# hdfs dfs -ls /test 
Found 1 items 
-rw-r--r-- 2 root supergroup 19 2020-04-22 11:16 /test/hello.txt复制代码

接下来就可以向集群提交MapReduce任务了

具体的命令是这样的

hadoop jar db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar com.cjt.mr.WordCountJob /test/hello.txt /out
yarn jar db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar com.cjt.mr.WordCountJob /test/hello.txt /out

hadoop:表示使用hadoop脚本提交任务,其实在这里使用yarn脚本也是可以的,从hadoop2开始支持使用yarn,不过也兼容hadoop1,也继续支持使用hadoop脚本,所以在这里使用哪个都可以,具体就看你个人的喜好了,我是习惯于使用hadoop脚本

jar:表示执行jar包

db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar:指定具体的jar包路径信息

com.cjt.mr.WordCountJob:指定要执行的mapreduce代码的全路径

/test/hello.txt:指定mapreduce接收到的第一个参数,代表的是输入路径,这里的输入路径可以直接指定hello.txt的路径,也可以直接指定它的父目录,因为它的父目录里面也没有其它无关的文件,如果指定目录的话就意味着hdfs会读取这个目录下所有的文件,所以后期如果我们需要处理一批文件,那就可以把他们放到同一个目录里面,直接指定目录即可。

/out:指定mapreduce接收到的第二个参数,代表的是输出目录,这里的输出目录必须是不存在的,MapReduce程序在执行之前会检测这个输出目录,如果存在会报错,因为它每次执行任务都需要一个新的输出目录来存储结果数据

任务提交到集群上面之后,可以在shell窗口中看到如下日志信息,最终map执行到100%,reduce执行到100%,说明任务执行成功了

2020-04-22 15:12:59,887 INFO mapreduce.Job: map 0% reduce 0% 
2020-04-22 15:13:08,050 INFO mapreduce.Job: map 100% reduce 0% 
2020-04-22 15:13:16,261 INFO mapreduce.Job: map 100% reduce 100%

3 web界面中查看任务执行情况

访问 http://bigdata01:8088

那我们来查看一下任务输出的结果,

[root@bigdata01 hadoop-3.2.0]# hdfs dfs -ls /out 
Found 2 items 
-rw-r--r-- 2 root supergroup 0 2020-04-22 15:13 /out/_SUCCESS 
-rw-r--r-- 2 root supergroup 19 2020-04-22 15:13 /out/part-r-00000


还要一点需要注意的,part后面的r表示这个结果文件是reduce步骤产生的,如果一个mapreduce只有map阶段没有reduce阶段,那么产生的结果文件是part-m-00000这样的。

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
2月前
|
分布式计算 资源调度 Hadoop
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
74 2
|
2月前
|
消息中间件 存储 druid
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
46 3
|
4月前
|
分布式计算 大数据 分布式数据库
"揭秘HBase MapReduce高效数据处理秘诀:四步实战攻略,让你轻松玩转大数据分析!"
【8月更文挑战第17天】大数据时代,HBase以高性能、可扩展性成为关键的数据存储解决方案。结合MapReduce分布式计算框架,能高效处理HBase中的大规模数据。本文通过实例展示如何配置HBase集群、编写Map和Reduce函数,以及运行MapReduce作业来计算HBase某列的平均值。此过程不仅限于简单的统计分析,还可扩展至更复杂的数据处理任务,为企业提供强有力的大数据技术支持。
75 1
|
4月前
|
分布式计算 大数据 Hadoop
MapReduce:大数据处理的基石
【8月更文挑战第31天】
150 0
|
4月前
|
机器学习/深度学习 分布式计算 算法
MaxCompute 的 MapReduce 与机器学习
【8月更文第31天】随着大数据时代的到来,如何有效地处理和分析海量数据成为了一个重要的课题。MapReduce 是一种编程模型,用于处理和生成大型数据集,其核心思想是将计算任务分解为可以并行处理的小任务。阿里云的 MaxCompute 是一个面向离线数据仓库的计算服务,提供了 MapReduce 接口来处理大规模数据集。本文将探讨如何利用 MaxCompute 的 MapReduce 功能来执行复杂的计算任务,特别是应用于机器学习场景。
95 0
|
5月前
|
分布式计算 大数据 关系型数据库
MaxCompute产品使用合集之如何实现类似mysql实例中的数据库功能
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
5月前
|
分布式计算 资源调度 DataWorks
MaxCompute产品使用合集之如何增加MC中Fuxi任务的实例数
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
5月前
|
分布式计算 大数据 MaxCompute
MaxCompute产品使用合集之如何实现根据商品维度统计每件商品的断货时长的功能
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
4月前
|
存储 分布式计算 算法
"揭秘!MapReduce如何玩转压缩文件,让大数据处理秒变‘瘦身达人’,效率飙升,存储不再是烦恼!"
【8月更文挑战第17天】MapReduce作为Hadoop的核心组件,在处理大规模数据集时展现出卓越效能。通过压缩技术减少I/O操作和网络传输的数据量,不仅提升数据处理速度,还节省存储空间。支持Gzip等多种压缩算法,可根据需求选择。示例代码展示了如何配置Map输出压缩,并使用GzipCodec进行压缩。尽管压缩带来CPU负担,但在多数情况下收益大于成本,特别是Hadoop能够自动处理压缩文件,简化开发流程。
73 0
|
5月前
|
SQL 分布式计算 大数据
MaxCompute操作报错合集之执行多条SQL语句时,使用同一个实例来运行,遇到报错,该如何解决
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。