《Hadoop与大数据挖掘》一2.4.4 MapReduce组件分析与编程实践

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介:

本节书摘来华章计算机《Hadoop与大数据挖掘》一书中的第2章 ,第2.4.4节,张良均 樊 哲 位文超 刘名军 许国杰 周 龙 焦正升 著 更多章节内容可以访问云栖社区“华章计算机”公众号查看。

2.4.4 MapReduce组件分析与编程实践

MapReduce整个流程包括以下步骤:输入格式(InputFormat)、Mapper、Combiner、Partitioner、Reducer、输出格式(OutputFormat)。这里会针对流程中的Combiner、Part-itioner、输入/输出格式进行分析,同时,也会介绍相关的编程技巧,如自定义键值对。
1. Combiner分析
Combiner是什么呢?从字面意思理解,Combine即合并。其实,Combiner就是对Mapper的输出进行一定的合并,减少网络输出的组件。所以,其去掉与否不影响最终结果,影响的只是性能。
Combiner是Mapper端的汇总,然后才通过网络发向Reducer。如图2-40所示,经过Combiner后,键值对,被合并为,这样发往Reducer的记录就可以减少一条(当然,实际中肯定不是只减少一条记录),从而减少了网络IO。
对于多个输入数据块,每个数据块产生一个InputSplit,每个InputSplit对应一个map任务,每个map任务会对应0个到多个Combiner,最后再汇总到Reducer。在单词计数的例子中,使用Combiner的情形如图2-41所示。


image


需要注意的是,自定义Combiner也是需要集成Reducer的,同样也需要在reduce函数中写入处理逻辑。但是要注意,Combiner的输入键值对格式与输出键值对格式必须保持一致,也正是因为这个要求,很多情况下,采用自定义Combiner的方式在业务或算法处理上行不通。还有,在单词计数程序中,Combiner和Reducer使用的是同一个类代码,这是可能的,但是大多数情况下不能这样做,因为Reducer和Combiner的逻辑在很多情况下是不一样的。
2. Partitioner分析
Partitioner是来做什么的呢?是用来提高性能的吗?非也!Partitioner主要的目的是把键值对分给不同的Reducer。分给不同的Reducer?难道Reducer可以有多个吗?这是当然的,只需要在初始化Job实例的时候进行设置即可,例如设置代码为job.setNum-ReduceTasks(3),这样就可以设置3个Reducer了。
经过前面的分析可以知道,在Reducer的输入端,其键值对组是按照一个键对应一个值列表的。如果同一个键的不同值被发送到了不同的Reducer中,那么(注意,每个Reducer在一个子节点运行,不同Reducer之间不会干扰),经过不同的Reducer处理后,其实我们已经做不到针对一个键,输出一个值了,而是输出了两条记录。我们可以看下Hadoop系统默认的Partitioner实现,默认的Partitioner是HashPartitioner,其源码如代码清单2-30所示。

代码清单2-30 HashPartitioner源码
public class HashPartitioner<K, V> extends Partitioner<K, V> {
  /** Use {@link Object#hashCode()} to partition. */
  public int getPartition(K key, V value,
                          int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }
}

在源码中,可以看到HashPartitiner中只有一个方法,就是getPartition(K key,V value, int numReducTasks)。3个参数分别为键、值、Reducer的个数,输出其实就是Reducer的ID。从代码的实现中可以看出,最终输出的Reducer ID只与键(key)的值有关,这样也就保证了同样的键会被发送到同一个Reducer中处理。
同一个键的记录会被发送到同一个Reducer中处理,一个Reducer可以处理不同的键的记录。

3.输入输出格式/键值类型
一般来说,HDFS一个文件对应多个文件块,每个文件块对应一个InputSplit,而一个InputSplit就对应一个Mapper任务,每个Mapper任务在一个节点上运行,其仅处理当前文件块的数据,但是我们编写Mapper的时候只是关心输入键值对,而不是关心输入文件块。那么,文件块怎么被处理成了键值对呢?这就是Hadoop的输入格式要做的工作了。
在InputFormat中定义了如何分割以及如何进行数据读取从而得到键值对的实现方式,它有一个子类FileInputFormat,如果要自定义输入格式,一般都会集成它的子类File-InputFormat,它里面帮我们实现了很多基本的操作,比如记录跨文件块的处理等。
图2-42所示是InputFormat的类继承结构。
然而,比较常用的则是如表2-7所示的几个实现方式。
同理,可以想象,输出格式(OutputFormat)也与输入格式相同,不过是输入格式的逆过程:把键值对写入HDFS中的文件块中。如图2-43所示是OutputFormat的类继承结构。


image



image


同样,比较常用的方式如表2-8所示。

image


image


在Hadoop中,无论是Mapper或Reducer处理的都是键值对记录,那么Hadoop中有哪些键值对类型呢?Hadoop中常用的键值对类型如图2-44所示。


image


从各个类的命名上其实也可以看出其代表什么类型,比如LongWritable,代表的就是Long的实现,而Text就是String的实现。在前面的单词计数中我们使用过IntWritable以及Text。
这里有两点需要注意:
1)值类型都需实现Writale接口;
2)键需要实现WritableComparable接口。
其实从图2-44中也可以看出,Hadoop已有的键值类型都是实现WritableComparable接口的,然而WritableComparable接口又是实现Writable接口的。所以,Hadoop已有的键值类型既可以作为键类型也可以作为值类型。作为键类型的肯定可以作为值类型,但作为值类型的却不能作为键类型。为什么键类型是实现WritableComparable接口呢?其实,如果你联想到了Shuffle/Sort过程的话,应该不难理解,因为MapReduce框架需要在这里对键进行排序。
4.动手实践:指定输入输出格式
这个实验主要是加深理解Hadoop的输入/输出格式,熟悉常用的SequenceFileInput-Format和SequenceFileOutputFormat。
实验步骤:
1)打开Eclipse,打开已经完成的WordCount程序;
2)设置输出格式为SequenceFileOutputFormat,重新打包,并提交到Linux上运行;
3)查看输出的文件;
4)再次修改WordCount程序,设置输入格式为SequenceFileInputFormat、输入路径为3的输出;设置输出格式为TextFileInputFormat;
5)查看输出结果;
6)针对上面的各个步骤以及输出进行分析,解释对应的输出结构。
思考:
1)第4步中查看的文件是否是乱码?如果是乱码,为什么是乱码?针对这样的数据,如何使用HDFS Java API进行读取?如果不是乱码,看到的是什么?
2)使用SequenceFileInputFormat或SequenceFileOutputFormat有什么优势与劣势?
5.自定义键值类型
Hadoop已经定义了很多键值类型,比如Text、IntWritable、LongWritable等,那为什么需要用到自定键值类型呢?答案其实很简单,不够用。在有些情况下,我们需要一些特殊的键值类型来满足我们的业务需求,这种时候就需要自定义键值类型了。前面已经提到,自定义键需要实现WritableComparable接口,自定义值需要实现Writable接口,那么实现了接口后,还需要做哪些操作呢?
自定义值类型可参考代码清单2-31进行分析。
**代码清单2-31 自定义Hadoop 值类型
public class MyWritable implements Writable {
private int counter;
private long timestamp;
@Override
public void write(DataOutput out) throws IOException {

 out.writeInt(counter);
 out.writeLong(timestamp);

}
@Override
public void readFields(DataInput in) throws IOException {

 counter = in.readInt();
 timestamp=in.readLong();

}**
}
在代码清单2-31中,首先实现了Writable接口,接着定义了两个变量。这两个变量其实是与业务相关的(比如,这里定义了一个counter,一个timestamp)。实现了Writable接口后,需要覆写两个方法(write和readFields),这里需要注意写入和读取的顺序是很重要的,比如这里先把counter写入out输出流,再把timestamp写入out输出流。那么,在读取的时候就需要先读取counter,再读取timestamp(如果两个变量都是int型,那么就更加需要注意区分)。
自定义键类型可参考代码清单2-32进行分析。
**代码清单2-32 自定义Hadoop 键类型
public class MyWritableComparable implements WritableComparable {
private int counter;
private long timestamp;
@Override
public void write(DataOutput out) throws IOException {

 out.writeInt(counter);
 out.writeLong(timestamp);

}
@Override
public void readFields(DataInput in) throws IOException {

 counter = in.readInt();
 timestamp= in.readLong();

}
@Override
public int compareTo(MyWritableComparable other) {

 if(this.counter == other.counter){
     return (int)(this.timestamp - other.timestamp);
 }
 return this.counter-other.counter;

}
}**
从代码清单2-32中可以看出,自定义键类型其实就是比自定义值类型多了一个比较方法而已,其他都是一样的。
6.动手实践:自定义键值类型
针对source/hadoop/keyvalue.data数据求解每行数据的个数以及平均值,该数据格式如表2-9所示。

image

1)编写Driver程序,main函数接收两个参数和,设置输入格式为KeyValueInputFormat;
2)编写Mapper程序,map函数针对每个value值,使用‘t’进行分隔;接着,对分隔后的数据进行求和以及个数统计(注意将字符串转换为数值),输出平均值和个数,Mapper输出键值对类型为;
3)编写自定义value类型MyValue,定义两个字段,一个是average,一个是num,用于存储平均值和个数;重写toString方法;
4)编写Reducer程序,直接输出即可;
5)对编写的程序进行打包averagejob.jar;
6)上传source/hadoop/keyvalue.data到HDFS,上传averagejob.jar到Linux;
7)使用命令hadoop jar averagejob.jar进行调用;
8)查看输出结果。
思考:
1)Reducer类是否必需?如果不需要,则如何修改?如果去掉reducer,输出结果会有什么不一样?
2)如果想让程序可以直接在Eclipse中运行,应该如何修改程序?

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
相关文章
|
3天前
|
SQL 存储 消息中间件
vivo基于Paimon的湖仓一体落地实践
本文整理自vivo互联网大数据专家徐昱在Flink Forward Asia 2024的分享,基于实际案例探讨了构建现代化数据湖仓的关键决策和技术实践。内容涵盖组件选型、架构设计、离线加速、流批链路统一、消息组件替代、样本拼接、查询提速、元数据监控、数据迁移及未来展望等方面。通过这些探索,展示了如何优化性能、降低成本并提升数据处理效率,为相关领域提供了宝贵的经验和参考。
275 3
vivo基于Paimon的湖仓一体落地实践
|
5天前
|
数据采集 机器学习/深度学习 数据可视化
探索大数据分析的无限可能:R语言的应用与实践
探索大数据分析的无限可能:R语言的应用与实践
36 9
|
7天前
|
SQL 分布式计算 运维
StarRocks 在爱奇艺大数据场景的实践
本文介绍了爱奇艺大数据OLAP服务负责人林豪在StarRocks年度峰会上的分享,重点讲述了爱奇艺OLAP引擎的演进及引入StarRocks后的显著效果。在广告业务中,StarRocks替换Impala+Kudu后,接口性能提升400%,P90查询延迟缩短4.6倍;在“魔镜”数据分析平台中,StarRocks替代Spark达67%,P50查询速度提升33倍,P90提升15倍,节省4.6个人天。未来,爱奇艺计划进一步优化存算一体和存算分离架构,提升整体数据处理效率。
StarRocks 在爱奇艺大数据场景的实践
|
7天前
|
SQL 分布式计算 数据挖掘
从湖仓分离到湖仓一体,四川航空基于 SelectDB 的多源数据联邦分析实践
川航选择引入 SelectDB 建设湖仓一体大数据分析引擎,取得了数据导入效率提升 3-6 倍,查询分析性能提升 10-18 倍、实时性提升至 5 秒内等收益。
从湖仓分离到湖仓一体,四川航空基于 SelectDB 的多源数据联邦分析实践
|
9天前
|
运维 自然语言处理 算法
云栖实录 | 大模型在大数据智能运维的应用实践
云栖实录 | 大模型在大数据智能运维的应用实践
|
9天前
|
SQL 存储 HIVE
鹰角基于 Flink + Paimon + Trino 构建湖仓一体化平台实践项目
鹰角基于 Flink + Paimon + Trino 构建湖仓一体化平台实践项目
|
11天前
|
机器学习/深度学习 数据采集 分布式计算
大数据分析中的机器学习基础:从原理到实践
大数据分析中的机器学习基础:从原理到实践
51 3
|
19天前
|
SQL 存储 HIVE
鹰角基于 Flink + Paimon + Trino 构建湖仓一体化平台实践项目
本文整理自鹰角网络大数据开发工程师朱正军在Flink Forward Asia 2024上的分享,主要涵盖四个方面:鹰角数据平台架构、数据湖选型、湖仓一体建设及未来展望。文章详细介绍了鹰角如何构建基于Paimon的数据湖,解决了Hudi入湖的痛点,并通过Trino引擎和Ranger权限管理实现高效的数据查询与管控。此外,还探讨了湖仓一体平台的落地效果及未来技术发展方向,包括Trino与Paimon的集成增强、StarRocks的应用以及Paimon全面替换Hive的计划。
128 1
鹰角基于 Flink + Paimon + Trino 构建湖仓一体化平台实践项目
|
2月前
|
数据采集 存储 机器学习/深度学习
数据的秘密:如何用大数据分析挖掘商业价值
数据的秘密:如何用大数据分析挖掘商业价值
64 9
|
2月前
|
数据采集 人工智能 分布式计算
MaxFrame:链接大数据与AI的高效分布式计算框架深度评测与实践!
阿里云推出的MaxFrame是链接大数据与AI的分布式Python计算框架,提供类似Pandas的操作接口和分布式处理能力。本文从部署、功能验证到实际场景全面评测MaxFrame,涵盖分布式Pandas操作、大语言模型数据预处理及企业级应用。结果显示,MaxFrame在处理大规模数据时性能显著提升,代码兼容性强,适合从数据清洗到训练数据生成的全链路场景...
111 5
MaxFrame:链接大数据与AI的高效分布式计算框架深度评测与实践!