开发者社区> xiaohei.info> 正文
阿里云
为了无法计算的价值
打开APP
阿里云APP内打开

MapReduce高级特性

简介: 版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq1010885678/article/details/51366342 计数器 ...
+关注继续查看
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq1010885678/article/details/51366342

计数器

因为计数器的查看往往比查看集群日志来的方便快捷
所以有些情况下计数器信息比集群日志更加有效

用户自定义的计数器

关于Hadoop的内置计数器的介绍可以参考Hadoop权威指南第九章MapReduce Features中的Build-in Counts小节
这里限于篇幅不再说明

MapReduce允许用户在程序中使用枚举或者字符串的格式类自定义计数器
一个作业可以定义的计数器不限,使用枚举类型时
枚举类型的名称即为组名,枚举类型的字段即为计数器名
计数器是全局的,会跨越所有Mapper和Reducer进行使用,并在作业结束的时候产生一个结果

例如,现有枚举类型如下:

enum Temperature{
    MISSING,
    MALFORMAT
}

在MapReduce程序中可以这样来使用计数器:

context.getCounter(Temperature.MISSING).increment(1);
context.getCounter(Temperature.MALFORMAT).increment(1);

动态计数器

由于枚举类型在编译的时候就确定了所有字段,但是某些情况下我们可能要根据未知的名称来命名计数器
这个时候就可以使用动态计数器来实现:

context.getCounter("计数器组名","计数器名").increment(1);

这里的计数器名的获得方式可以是任意的,例如动态获取的字段值等
但是大部分情况下,枚举类型可以足够使用了,而且枚举类型阅读性较强,易于使用,而且是类型安全的
所以推荐尽可能的使用枚举类型

在代码中获取计数器的值

除了通过Web UI、CLI和-counter参数获得作业的计数器,用户也可以通过代码在程序中获取计数器的值:

String jobId = args[0];
Cluster cluster = new Cluster(getConf());
Job job = cluster.getJob(JobId.forName(jobId));
if(job == null){
    System.err.println("No job whih ID %s found",jobId);
    return -1;
}
if(!job.isComplete()){
    System.err.println("Job %s is not complete",jobId);
    return -1;
}
Counters counters = job.getCounters();
//关键代码
long missing = conters.findCounter(Temperature.MISSING).getValue();
long total = counters.findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue();

排序

部分排序

部分排序是指在map阶段,对每个分区中的数据进行排序的过程

Hadoop提交作业自定义排序和分组中可以看到
MapReduce中控制部分排序的方法不只有一种,控制排序的顺序如下:

1.如果设置了mapreduce.job.output.key.comparator.class属性或者setComparatorClass()方法,则使用设置的类进行部分排序
2.否则,键必须是WritableComparable的子类,并使用针对该键类型的已经注册的comparator
3.否则,使用RawComparator将字节流反序列化为对象,并调用WritableComparable的comparaTo()方法

我们在自定义数据类型的时候继承自WritableComparable,并重写了comparaTo方法,这里的设置是最后才会使用的
如果定义了RawComparator/WritableComparator的具体实现类,那么将会优先使用这个设置,因为其可以直接对比字节流数组

全排序

MapReduce Shuffle阶段的排序只针对各个单独的分区,也就是之前讨论到的部分排序
对于每个分区,其数据是有序的,但是从数据的总体来看,是无序的
如何让MapReduce产生全局有序的数据呢?
最简单的办法是只使用一个分区,但是这就丧失了MapReduce并行计算的特性,必须在单台机器上处理所有数据

事实上,除了使用一个分区,还有另外一种方式既可以实现全局有序,也可以充分利用到MapReduce的并行计算能力
但是这个方法需要做一些额外的工作

思考一下,在部分排序中,每个分区内的数据都是有序的,但是从分区的角度看就是无序的了
如果我们能够确保分区也是有序的呢?,例如分区1保存1-100的数据,分区2保存101-200的数据,一次类推
那么从分区的角度看,各个分区之间是有序的,而分区内部的数据也是自然有序的
从而就做到了数据的全局有序

但是在这个过程中需要注意一个情况:如何确保每个分区的数据量分配是均匀的?
因为在实际场景中,1-100中包含的数据可能有1000个,而101-200的数据只有50个,这就造成了数据倾斜的问题

为了解决这个问题,我们通常需要深入的了解数据的组成特性
但是在海量数据的情况下,不可能对全部数据进行检查
这时我们可以使用采样的方式来进行

采样的核心思想是只查看一小部分的键,获得键的近似分布由此构建分区

Hadoop中已经内置了若干的采样器,接口如下:

public interface Sampler<K,V>{
    K[] getSample(InputFormat<K,V> inf,Job job) throw IOException,InterruptedException;
}

但是通常不会直接使用这个getSample接口,而是由InputSampler的writePartitionFile方法调用
目的是创建一个SequenceFile来存储定义分区的键

public static <K,V> writePartitionFile(Job job,Sampler<K,V> sampler) throw IOException,ClassNotFoundException,InterruptedException

该SequenceFile会被TotalOrderPartitioner使用来为作业创建分区:

//设置分区类为TotalOrderPartitioner
job.setPartitionerClass(TotalOrderPartitioner.class);
//使用随机采样器,采样率为0.1,最大样本数和最大分区数为10000何10,任意一个条件满足之后即刻停止采样
InputSampler.Sampler<IntWritable,Text> sampler = new InputSampler.RandomSampler<IntWritable,Text>(0.1,10000,10);
//使用该采样器创建定义分区键的SequenceFile
InputSampler.writePartitionFile(job,sampler);
///获得该SequenceFile并加入分布式缓存中共享
String partitionFile = TotalOrderPartitioner.getPartitionFile(conf);
URI uri = new URI(partitionFile);
jov.addCacheFile(uri);

这个采样器将会运行在客户端,所以会从集群上下载数据,需要注意下载的数据量不要太大不然运行时间很久
使用该方法还可以自由的设置reducer的任务数,即分区数,通过mapreduce.job.reducers来设置最后需要产生多少个均匀的分区

RandomSampler是一种比较通用的采样器,除了它,还有另外一些例如:

  • SplitSampler:只采样一个分片中的前n条记录,没有从全部分片中广泛采样,所以不适合已经排好序的数据
  • IntervalSampler:以一定的间隔从分片中选择键,因此很适合排过序的数据

二次排序

二次排序即为对数据的值进行排序,其实在Hadoop I/O的序列化小节中
就已经讨论过这个问题了,具体案例可以参考:Hadoop提交作业自定义排序和分组

Join连接

使用MapReduce进行连接操作的方式和技巧取决于数据集的规模和结构
如果一个数据集很大,另外一个很小,完全可以使用MapReduce中的DistributedCache
将小数据集分发到各个节点上

如果两个数据集都很大,那么又可以分为Map端的Join和Reduce端的Join

Map端的Join

Map端的Join操作会在数据到达map函数之前执行
为了达到这个目的,Map端的输入数据必须:

1.两个数据集被划分为数量相同的分区
2.两个数据集按照相同的键进行排序

由于Map可以设置之前执行的多个作业的输出为其输入,按照以上条件
此时输入数据应该满足:

1.两个作业有相同的reduce数量
2.键是相同的且不可分割

满足Map端Join操作的要求之后,可以利用org.apache.hadoop.mapreduce.join包中的ComsiteInputFormat类在map函数之前执行join操作

Reduce端的Join

比起Map端,Reduce端的Join对数据的要求没有那么高,利用Shuffle相同键的记录会被输入到同一个reducer(分区)中的特性
Reducer端可以天然进行Join操作,但是由于数据要经过Shuffle过程,所以效率往往比Map端的Join要低

而且在Reduce端的Join中,还可以利用到之前讨论的二次排序
有时候join连接需要一个数据集先于另一个数据集到达reduce函数,这时候我们可以听过二次排序对数据的值做一个标号
先要达到的数据标号设置为0,另外一个数据集设置为1,然后根据这个标号进行排序就可以实现让想要的数据集先一步到达reduce

边数据分布

所谓的边数据(Side Data)可以理解为MapReduce作业执行过程中
所有任务都有可能要使用到的只读的的数据,用以辅助处理主数据

使用JobConfiguration

Configuration类的各种setter方法可以方便的设置一些键值对类型的数据
用户可以通过getConfiguration方法获得配置的信息

这种方式足以应对很多只需要设置一些属性的场合
但是其缺点是:

  • 只适合类似属性设置的小数据
  • 对于很复杂的对象,用户需要自己设置序列化和反序列化
  • 每次读取配置的时候所有设置都将读取内存,不管有没有用到

DistributedCache

分布式缓存机制在作业运行之前将用户设置的数据拷贝到各个节点中以供使用
缓存的容量大小默认为10G,可以通过yarn.nodemanager.localizer.cache.target-size-mb来配置(以字节为单位)

具体的使用方式参考:MapReduce中的DistributedCache

作者:@小黑

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
【Druid】(七)E-MapReduce 增强型 Druid 入门1
【Druid】(七)E-MapReduce 增强型 Druid 入门1
45 0
E-MapReduce 4.0产品新特性
E-MapReduce是运行在阿里云平台上的一大数据处理的系统解决方案。在2019年10月,阿里巴巴将发布EMR4.0版本。本篇介绍EMR4.0的新特性,包括在EMR基础能力,技术栈,生态集成和数据迁移等方面的升级,EMR4.0为用户提供更高的计算性能和更低的产品价格,将技术的红利让给用户。
5846 0
使用资源编排服务(ROS)轻松玩转E-MapReduce(EMR)
前言 如果您还没有听说过资源编排服务(ROS),那么恭喜您,本文将带您走进一个新的世界,学习资源管理的新姿势。 当您在使用E-MapReduce(EMR)时,是否想过使用OpenAPI或者是SDK来创建实例?是否为太多的参数感到困扰,为不一样的参数名称形态看到困惑?尤其是要通过代码管理整个实例的生命周期感到烦恼? 但您本不该为此烦恼,不是吗? 其实您只是想创建若干个资源,您不想关心应该调用
1014 0
E-MapReduce上如何升级EMR-Core
本文档将介绍如何将老集群的EMR-Core升级到最新版本 什么是EMR-Core EMR-Core是E-MapReduce集群上支持Hadoop生态组件读写OSS的依赖包。它提供一种高效地读写OSS数据的实现,并不断地在优化中。
2366 0
MapReduce(一)
分布式并行编程框架 特点:     MapReduce将复杂的、运行于大规模集群上的并行计算过程高度地抽象到了两个函数:Map和Reduce•编程容易,不需要掌握分布式并行编程细节,也可以很容易把自己的程序运行在分布式系统上,完成海量数据的计算;     •MapReduce采用“分...
1071 0
MapReduce程序开发
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq1010885678/article/details/51325813 通过AP...
847 0
如何在E-MapReduce中玩转OSS
在E-MapReduce中,用户可以将OSS作为Hadoop/Spark的可选数据源之一。但是在实际使用时,我们发现Hadoop读写OSS的性能不令人满意。为了解决这个问题,E-MapReduce团队对Hadoop的底层实现进行了优化,使得OSS数据源能够更好地适配Hadoop/Spark。
7219 0
MapReduce
 PS:内容来自开源力量公开课第二十四期-为何Hadoop是分布式大数据处理的未来&如何掌握Hadoop?的文档,算做简单了解,想花时间好好了解hadoop!   MapReduce原理1 问题: 求出以下数组当中最大的数 1,3,23,3,4,18,2,8,10,16,7,5 int Max(int a[], n) { int m=0; for(
1112 0
+关注
163
文章
1
问答
文章排行榜
最热
最新
相关电子书
更多
低代码开发师(初级)实战教程
立即下载
阿里巴巴DevOps 最佳实践手册
立即下载
冬季实战营第三期:MySQL数据库进阶实战
立即下载