利用采样器实现mapreduce任务输出全排序

简介: 采样器是hadoop内自带的一个可以对目标文件部分数据进行提取的工具类,以方便我们对这些采样的数据做一些参考或者处理。hadoop提供了多种采样器供我们使用,以满足不同的需求。另外,采样器不同于普通mapreduce操作。
采样器是hadoop内自带的一个可以对目标文件部分数据进行提取的工具类,以方便我们对这些采样的数据做一些参考或者处理。hadoop提供了多种采样器供我们使用,以满足不同的需求。另外,采样器不同于普通mapreduce操作。它是直接在客户端机器上运行的。
常见采样器
IntervalSampler 以一定的间隔定期从划分中选择key,对有排序的数据来说更好
RandomSameler 以指定的采样率均匀的从数据集中选择样本
SplitSampler 只采样一个分片的前n条记录,所以不适合有排序的数据

最简单的使用采样器举例
例1:
JobConf conf = new JobConf(SamplerTest.class);
conf.setJobName("Sampler");

FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));

//设置采样率为0.1,最大样本数1000,最大分区数10(在最多10个分区中,按照约总条数*0.1的间隔取出一个样本,最多1000个样本,注意每条提取的顺序也是被打乱了的),采样器在客户端运行
//理论上RandomSampler<K, V>中的k,v和inputformat中的map传入值类型一致,但是这里好像有个bug,在后面的sampler.getSample(inf, conf)中返回的list使用的toArrays()方法,这样返回的就只能是Object数组了,所以k只能为Object,否则会抛类转换异常。另外v在采样时没什么用  
RandomSampler<Object, Text> sampler = new RandomSampler<Object, Text>(0.1, 1000, 10);
final InputFormat<Object,Text> inf = (InputFormat<Object,Text>) conf.getInputFormat();

//因为sampler.getSample(inf, conf)的返回类型由inf中的InputFormat中的K,V决定,所以这里也要转换成对应的K,V
//conf.getInputFormat()从配置中获得inputformat,若不配置,默认使用TextInputFormat(这里就没配置)
Object[] keys = sampler.getSample(inf, conf);
for(Object l:keys){
 LongWritable l2 = (LongWritable)l;
 System.out.println(l2);
}
return 0;
     上面的例子只是对采样器进行了简单的使用,从总条数*0.1的间隔取出一个样本,最多1000个样本,最大10个分区中取出样本。并将这些样本打印出来。注意,若这里面任意一个条件满足。则取样本操作结束。比如已经取到了1000个样本,或者已经取了10个分区时,这时就认为样本已提取完成。
这个例子可以看出两个问题:
1.该例子没有进行runJob,所以没有执行mapreduce任务。由此看来采样器的获取是在客户端中进行的
2.该例子得到的sample只能是key的sample,不能是value的sample。若要得到value的sample则需要自己重写该方法了。

利用样本来对输出内容进行全排序
使用hadoop对输出文件进行全排序有很多方法,他们各有优缺点,这里总结一下各个方法的优劣:
1.直接使用一个reduce,且输出一个文件。
    该操作的优点在于简单,无须进行任何排序操作,hadoop在进行reduce的时候就对key进行了内部排序。所以直接输出结果就醒了。但是它的缺点也是显而易见的,就是一旦数据量大了以后,采用一个reduce对数据进行处理显然没有达到分布式处理的效果,有违hadoop的操作宗旨。
2.自定义partitioner,按范围来区分多个reduce任务,输出多个文件,然后再将这些文件进行组合。
    该操作的优点在于能够有效的将大数据进行分布式处理,达到了一定程度的负载均衡。但是缺点是用户依赖较大。除了要自己写partitioner,用户还要要事先知道数据的取之范围,而且还要了解数据分布情况,若数据分布不均匀的时候也可能出现负载不均衡的情况。如一个包含了各地温度的数据。在-20~0度约有5%,0~20度约有80%,20~40度约有14%。若按照每20度来进行划分的话,显然不能达到均衡的效果。
3.采用hadoop提供的TotalOrderPartitioner分区工具进行全排序
    该操作实际上就是hadoop内部使用了采样器来对数据进行采样,然后内部按分布比例进行分区。整个动作已经帮开发者做好了,所以它的优点是能够最大限度的达到负载均衡。

方法1比较简单,下面分别对方法2和方法3做一个简要描述和举例:

利用方法2实现全排序:自定义partitioner的mapreduce任务来实现全排序

     采用hadoop提供的TotalOrderPartitioner分区工具实现全排序简单方便。但是有的时候我们可能需要增加一些自定义的东西来实现更通用的扩展。这个时候就有必要自己写一个类似的功能来实现全排序了。
全排序的思想无外乎就是先对数据进行采样,然后根据采样的数据进行分区。最后得到按指定分区好的并且已排序的数据。按照这个思想,我们就可以写mapreduce任务了。

例2:
1.编写mapper:mapper任务很简单,就是将获取到的数据直接派发给reduce,当然hadoop会合并这些相同的key。
public class MyMap extends MapReduceBase implements
  Mapper<LongWritable, Text, LongWritable, IntWritable> {
 @Override
 public void map(LongWritable key, Text val,
   OutputCollector<LongWritable, IntWritable> output, Reporter reporter)
   throws IOException {
  output.collect(key, new IntWritable(1));
 }

}
2.编写reducer:reducer的任务就是对已排好序的key进行处理并输出。
public class MyRed extends MapReduceBase implements
  Reducer<LongWritable, IntWritable, Text, NullWritable> {
 @Override
 public void reduce(LongWritable key, Iterator<IntWritable> values,
   OutputCollector<Text, NullWritable> output, Reporter reporter)
   throws IOException {
  while(values.hasNext()){
   values.next();
   output.collect(new Text(key.toString()), NullWritable.get());
  }
 }
}
3.编写partitioner:partitioner的作用就是将map任务的结果交给哪个reduce处理,这里需要用到已建立好缓存数据(该数据会在运行客户端主程序时)。该缓存数据告诉patitioner将哪些范围中的数据放到哪个reducer中去
public class MyPartitions implements Partitioner<LongWritable,IntWritable> {

 @Override
 public void configure(JobConf job) {
 }
 @Override
 public int getPartition(LongWritable key, IntWritable value,
   int numPartitions) {
  //PartitionsUtils.findPartitions的作用就是依据分布式缓存数据,按key进行分区,具体代码就不贴了,具体操作如下
  //获取分布式缓存文件中的数据,取得对应的数字,这些数字按从小到达顺序排列,返回其插值序数。比如:3,3,9。那么当key<3时返回0,key=3时返回1,3<key<=9时返回2,key>9时返回3。
  return PartitionsUtils.findPartitions(key,value,numPartitions);
 }
}
4.最后编写客户端主程序:运行mapreduce任务驱动程序,并产生分布式缓存数据
public class SamplerTest3 extends Configured implements Tool {

 public int run(String[] args) throws Exception {
  int reduceTasks = 4;
  JobConf conf = new JobConf(SamplerTest3.class);
  conf.setJobName("Sampler");

  FileInputFormat.setInputPaths(conf, new Path(args[0]));
  FileOutputFormat.setOutputPath(conf, new Path(args[1]));
  //MyTextInputFormat是自定义的一个InputFormat,该InputFormat与TextInputFormat的区别是key就是Long.parseLong(value.toString());
  conf.setInputFormat(MyTextInputFormat.class);
  conf.setOutputFormat(TextOutputFormat.class);

  conf.setOutputKeyClass(Text.class);
  //因为是返回原数据中已排好序的数字,而这些数据是来自key的,无须value,所以value类型为NullWritable即可
  conf.setOutputValueClass(NullWritable.class);
  
  conf.setMapOutputKeyClass(LongWritable.class);
  conf.setMapOutputValueClass(IntWritable.class);

  conf.setReducerClass(MyRed.class);
  conf.setMapperClass(MyMap.class);
  
  conf.setNumReduceTasks(reduceTasks);
  conf.setPartitionerClass(MyPartitions.class);
  
  //进行采样
  RandomSampler<Object, Text> sampler = new RandomSampler<Object, Text>(0.1, 1000, 10);
  final InputFormat<Object,Text> inf = (InputFormat<Object,Text>) conf.getInputFormat();
  Object[] keys = sampler.getSample(inf, conf);
  Path catchPath = new Path("/cache");
  URI partitionUri = new URI(catchPath.toString()+ "#myCache");

  FileSystem fs = FileSystem.get(conf);
  FSDataOutputStream out = fs.create(catchPath);
  //MakeCacheFile方法为写入分布式缓存文件内容。
  //内容如下:keys为取到的一系列的数字样本。这里将这些样本数据按从小到大进行排序,然后平分成n段,共reduceTasks段
  //取右端的数,存入分布式缓存中。最后一段不用存
  //比如采样的数字样本是1112|2222|2367|7788 分成了4段,取右端的数就是2,2,7,8。再去除最后一个数最终得2,2,7,存入分布式缓存中
  MakeCacheFile(out,keys,reduceTasks);
  IOUtils.closeStream(out);
  
  //添加到分布式缓存
  DistributedCache.addCacheFile(partitionUri, conf);
  DistributedCache.createSymlink(conf);
  
  JobClient.runJob(conf);
  return 0;
 }

 public static void main(String[] args) throws Exception {
  System.exit(ToolRunner.run(new SamplerTest3(), args));
 }

}
利用方法3实现全排序:采用hadoop提供的TotalOrderPartitioner分区工具来实现全排序

     采用hadoop的TotalOrderPartitioner简化了开发,原理实际上和上面的例子类似,都是先采样,然后进行分区。但是我们无需关心如何划分partition,并且也不用关心分布式缓存如何生成和使用。只要提供对应的key的样本即可。下面这个例子就是使用了hadoop提供的分区工具。

例3:
public class SamplerTest4 extends Configured implements Tool {

 public int run(String[] args) throws Exception {
  JobConf conf = new JobConf(SamplerTest4.class);
  conf.setJobName("Sampler");

  FileInputFormat.setInputPaths(conf, new Path(args[0]));
  FileOutputFormat.setOutputPath(conf, new Path(args[1]));
  //MyTextInputFormat和例2一样
  conf.setInputFormat(MyTextInputFormat.class);
  conf.setOutputFormat(TextOutputFormat.class);

  conf.setOutputKeyClass(Text.class);
  conf.setOutputValueClass(NullWritable.class);
  
  conf.setMapOutputKeyClass(LongWritable.class);
  conf.setMapOutputValueClass(IntWritable.class);

  //MyRed,MyMap和例2一样
  conf.setReducerClass(MyRed.class);
  conf.setMapperClass(MyMap.class);

  conf.setNumReduceTasks(4);
  conf.setPartitionerClass(TotalOrderPartitioner.class);

  
  RandomSampler<Object, Text> sampler = new RandomSampler<Object, Text>(0.1, 1000, 10);
  //告诉hadoop分布式缓存文件放在哪里好
  Path catchPath = new Path("/partitionFile");
  TotalOrderPartitioner.setPartitionFile(conf, catchPath);
  //自动生成缓存文件
  InputSampler.writePartitionFile(conf, sampler);

  URI partitionUri = new URI(catchPath.toString()+ "#_partitions");
  //添加到分布式缓存
  DistributedCache.addCacheFile(partitionUri, conf);
  DistributedCache.createSymlink(conf);
  JobClient.runJob(conf);
  return 0;
 }
 public static void main(String[] args) throws Exception {
  System.exit(ToolRunner.run(new SamplerTest4(), args));
 }
}
     上面只是对有关全排序进行一个针对性的描述。实际使用中还可以进一步优化,比如采用压缩机制。或者自定义优化对key的排序,这样会时mapredoce任务运行效率更高。

相关文章
|
SQL 分布式计算 Hadoop
通过Job Committer保证Mapreduce/Spark任务数据一致性
通过对象存储系统普遍提供的Multipart Upload功能,实现的No-Rename Committer在数据一致性和性能方面相对于FileOutputCommitter V1/V2版本均有较大提升,在使用MapRedcue和Spark写入数据到S3/Oss的场景中更加推荐使用。
通过Job Committer保证Mapreduce/Spark任务数据一致性
|
1月前
|
分布式计算 资源调度 数据可视化
Hadoop-06-Hadoop集群 历史服务器配置 超详细 执行任务记录 JobHistoryServer MapReduce执行记录 日志聚合结果可视化查看
Hadoop-06-Hadoop集群 历史服务器配置 超详细 执行任务记录 JobHistoryServer MapReduce执行记录 日志聚合结果可视化查看
34 1
|
1月前
|
分布式计算 资源调度 Hadoop
Hadoop-05-Hadoop集群 集群WordCount 超详细 真正的分布式计算 上传HDFS MapReduce计算 YRAN查看任务 上传计算下载查看
Hadoop-05-Hadoop集群 集群WordCount 超详细 真正的分布式计算 上传HDFS MapReduce计算 YRAN查看任务 上传计算下载查看
46 1
|
5月前
|
数据采集 SQL 分布式计算
|
6月前
|
SQL 分布式计算 数据处理
【Hive】所有的Hive任务都会有MapReduce的执行吗?
【4月更文挑战第17天】【Hive】所有的Hive任务都会有MapReduce的执行吗?
|
分布式计算 Java Hadoop
70 Azkaban MAPREDUCE任务
70 Azkaban MAPREDUCE任务
38 0
|
分布式计算 负载均衡 调度
MapReduce实现日记-任务粒度如何把握
MapReduce实现日记-任务粒度如何把握
|
分布式计算 Java
Mapreduce执行机制之提交任务和切片原理
Mapreduce执行机制之提交任务和切片原理
103 0
|
分布式计算 Hadoop 分布式数据库
通过Job Committer保证Mapreduce/Spark任务数据一致性
并发地向目标存储系统写数据是分布式任务的一个天然特性,通过在节点/进程/线程等级别的并发写数据,充分利用集群的磁盘和网络带宽,实现高容量吞吐。并发写数据的一个主要需要解决的问题就是如何保证数据一致性的问题,本文主要介绍MapReduce/Spark如何通过Job Committer机制解决写数据一致性的问题,以及在OSS等对象存储上的解决方案。
471 0
|
SQL 数据采集 缓存
实践Hadoop MapReduce 任务的性能翻倍之路
eBay每天产生PB量级的CAL日志,其数据量每天都在增加。对于日益增长的数据量,Hadoop MapReduce job的优化将会大大节省计算资源。本文将分享eBay团队如何对这些Hadoop job进行优化,希望为开发者带来启发,解决Hadoop MapReduce(MR)job实践中存在的问题。
实践Hadoop MapReduce 任务的性能翻倍之路