[Hadoop]MapReduce多输出

简介: 版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/SunnyYoona/article/details/53486203 FileOutputFormat及其子类产生的文件放在输出目录下。
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/SunnyYoona/article/details/53486203

FileOutputFormat及其子类产生的文件放在输出目录下。每个reducer一个文件并且文件由分区号命名:part-r-00000,part-r-00001,等等。有时可能要对输出的文件名进行控制或让每个reducer输出多个文件。MapReduce为此提供了MultipleOutputFormat类。

MultipleOutputFormat类可以将数据写到多个文件,这些文件的名称源于输出的键和值或者任意字符串。这允许每个reducer(或者只有map作业的mapper)创建多个文件。采用name-r-nnnnn形式的文件名用于map输出,name-r-nnnnn形式的文件名用于reduce输出,其中name是由程序设定的任意名字,nnnnn是一个指名块号的整数(从0开始)。块号保证从不同块(mapper或者reducer)写的输出在相同名字情况下不会冲突。

1. 重定义输出文件名

我们可以对输出的文件名进行控制。考虑这样一个需求:按男女性别来区分度假订单数据。这需要运行一个作业,作业的输出是男女各一个文件,此文件包含男女性别的所有数据记录。

这个需求可以使用MultipleOutputs来实现:

 
  
  1. package com.sjf.open.test;
  2. import java.io.IOException;
  3. import org.apache.commons.lang3.StringUtils;
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.conf.Configured;
  6. import org.apache.hadoop.fs.Path;
  7. import org.apache.hadoop.io.LongWritable;
  8. import org.apache.hadoop.io.NullWritable;
  9. import org.apache.hadoop.io.Text;
  10. import org.apache.hadoop.io.compress.CompressionCodec;
  11. import org.apache.hadoop.io.compress.GzipCodec;
  12. import org.apache.hadoop.mapred.JobPriority;
  13. import org.apache.hadoop.mapreduce.Job;
  14. import org.apache.hadoop.mapreduce.Mapper;
  15. import org.apache.hadoop.mapreduce.Reducer;
  16. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  17. import org.apache.hadoop.mapreduce.lib.input.FileSplit;
  18. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  19. import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
  20. import org.apache.hadoop.util.Tool;
  21. import org.apache.hadoop.util.ToolRunner;
  22. import com.sjf.open.utils.ConfigUtil;
  23. /**
  24. * Created by xiaosi on 16-11-7.
  25. */
  26. public class VacationOrderBySex extends Configured implements Tool {
  27. public static void main(String[] args) throws Exception {
  28. int status = ToolRunner.run(new VacationOrderBySex(), args);
  29. System.exit(status);
  30. }
  31. public static class VacationOrderBySexMapper extends Mapper<LongWritable, Text, Text, Text> {
  32. public String fInputPath = "";
  33. @Override
  34. protected void setup(Context context) throws IOException, InterruptedException {
  35. super.setup(context);
  36. fInputPath = ((FileSplit) context.getInputSplit()).getPath().toString();
  37. }
  38. @Override
  39. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  40. String line = value.toString();
  41. if(fInputPath.contains("vacation_hot_country_order")){
  42. String[] params = line.split("\t");
  43. String sex = params[2];
  44. if(StringUtils.isBlank(sex)){
  45. return;
  46. }
  47. context.write(new Text(sex.toLowerCase()), value);
  48. }
  49. }
  50. }
  51. public static class VacationOrderBySexReducer extends Reducer<Text, Text, NullWritable, Text> {
  52. private MultipleOutputs<NullWritable, Text> multipleOutputs;
  53. @Override
  54. protected void setup(Context context) throws IOException, InterruptedException {
  55. multipleOutputs = new MultipleOutputs<NullWritable, Text>(context);
  56. }
  57. @Override
  58. protected void reduce(Text key, Iterable<Text> values, Context context)
  59. throws IOException, InterruptedException {
  60. for (Text value : values) {
  61. multipleOutputs.write(NullWritable.get(), value, key.toString());
  62. }
  63. }
  64. @Override
  65. protected void cleanup(Context context) throws IOException, InterruptedException {
  66. multipleOutputs.close();
  67. }
  68. }
  69. @Override
  70. public int run(String[] args) throws Exception {
  71. if (args.length != 2) {
  72. System.err.println("./run <input> <output>");
  73. System.exit(1);
  74. }
  75. String inputPath = args[0];
  76. String outputPath = args[1];
  77. int numReduceTasks = 16;
  78. Configuration conf = this.getConf();
  79. conf.setBoolean("mapred.output.compress", true);
  80. conf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class);
  81. Job job = Job.getInstance(conf);
  82. job.setJobName("vacation_order_by_jifeng.si");
  83. job.setJarByClass(VacationOrderBySex.class);
  84. job.setMapperClass(VacationOrderBySexMapper.class);
  85. job.setReducerClass(VacationOrderBySexReducer.class);
  86. job.setMapOutputKeyClass(Text.class);
  87. job.setMapOutputValueClass(Text.class);
  88. job.setOutputKeyClass(NullWritable.class);
  89. job.setOutputValueClass(Text.class);
  90. FileInputFormat.setInputPaths(job, inputPath);
  91. FileOutputFormat.setOutputPath(job, new Path(outputPath));
  92. job.setNumReduceTasks(numReduceTasks);
  93. boolean success = job.waitForCompletion(true);
  94. return success ? 0 : 1;
  95. }
  96. }
在生成输出的reduce中,在setup()方法中构造一个MultipleOutputs的实例并将它赋予一个实例变量。在reduce()方法中使用MultipleOutputs实例来写输出,而不是context。write()方法作用于键,值和名字。这里使用的是性别作为名字,因此最后产生的输出名称的形式为sex-r-nnnnn:
 
  
  1. -rw-r--r-- 3 wirelessdev wirelessdev 0 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/_SUCCESS
  2. -rw-r--r-- 3 wirelessdev wirelessdev 88574 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/f-r-00005.gz
  3. -rw-r--r-- 3 wirelessdev wirelessdev 60965 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/m-r-00012.gz
  4. -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00000.gz
  5. -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00001.gz
  6. -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00002.gz
  7. -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00003.gz
  8. -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00004.gz
  9. -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00005.gz
  10. -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00006.gz
  11. -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00007.gz
  12. -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00008.gz
我们可以看到在输出文件中不仅有我们想要的输出文件类型,还有part-r-nnnnn形式的文件,但是文件内没有信息,这是程序默认的输出文件。所以我们在指定输出文件名称时(name-r-nnnnn),不要指定name为part,因为它已经被使用为默认值了。
2. 多目录输出

在MultipleOutputs的write()方法中指定的基本路径相对于输出路径进行解释,因为它可以包含文件路径分隔符(/),创建任意深度的子目录。例如,我们改动上面的需求:按男女性别来区分度假订单数据,不同性别数据位于不同子目录(例如:sex=f/part-r-00000)。

 
  
  1. public static class VacationOrderBySexReducer extends Reducer<Text, Text, NullWritable, Text> {
  2. private MultipleOutputs<NullWritable, Text> multipleOutputs;
  3. @Override
  4. protected void setup(Context context) throws IOException, InterruptedException {
  5. multipleOutputs = new MultipleOutputs<NullWritable, Text>(context);
  6. }
  7. @Override
  8. protected void reduce(Text key, Iterable<Text> values, Context context)
  9. throws IOException, InterruptedException {
  10. for (Text value : values) {
  11. String basePath = String.format("sex=%s/part", key.toString());
  12. multipleOutputs.write(NullWritable.get(), value, basePath);
  13. }
  14. }
  15. @Override
  16. protected void cleanup(Context context) throws IOException, InterruptedException {
  17. multipleOutputs.close();
  18. }
  19. }
后产生的输出名称的形式为sex=f/part-r-nnnnn或者sex=m/part-r-nnnnn:
 
  
  1. -rw-r--r-- 3 wirelessdev wirelessdev 0 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/_SUCCESS
  2. -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00000.gz
  3. -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00001.gz
  4. -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00002.gz
  5. -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00003.gz
  6. -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00004.gz
  7. -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00005.gz
  8. -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00006.gz
  9. -rw-r--r-- 3 wirelessdev wirelessdev 20 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00007.gz
  10. drwxr-xr-x - wirelessdev wirelessdev 0 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/sex=f
  11. drwxr-xr-x - wirelessdev wirelessdev 0 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/sex=m
3. 延迟输出

FileOutputFormat的子类会产生输出文件(part-r-nnnnn),即使文件是空的,也会产生。我们有时候不想要这些空的文件,我们可以使用LazyOutputFormat进行处理。它是一个封装输出格式,可以指定分区第一条记录输出时才真正创建文件。要使用它,用JobConf和相关输出格式作为参数来调用setOutputFormatClass()方法即可:

 
  
  1. Configuration conf = this.getConf();
  2. Job job = Job.getInstance(conf);
  3. LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);

再次检查一下我们的输出文件(第一个例子):

 
  
  1. sudo -uwirelessdev hadoop fs -ls tmp/data_group/order/vacation_hot_country_order_by_sex/
  2. Found 3 items
  3. -rw-r--r-- 3 wirelessdev wirelessdev 0 2016-12-06 13:36 tmp/data_group/order/vacation_hot_country_order_by_sex/_SUCCESS
  4. -rw-r--r-- 3 wirelessdev wirelessdev 88574 2016-12-06 13:36 tmp/data_group/order/vacation_hot_country_order_by_sex/f-r-00005.gz
  5. -rw-r--r-- 3 wirelessdev wirelessdev 60965 2016-12-06 13:36 tmp/data_group/order/vacation_hot_country_order_by_sex/m-r-00012.gz







目录
相关文章
|
4月前
|
分布式计算 资源调度 Hadoop
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
113 2
|
2月前
|
数据采集 分布式计算 Hadoop
使用Hadoop MapReduce进行大规模数据爬取
使用Hadoop MapReduce进行大规模数据爬取
|
4月前
|
分布式计算 资源调度 Hadoop
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
157 3
|
4月前
|
分布式计算 资源调度 数据可视化
Hadoop-06-Hadoop集群 历史服务器配置 超详细 执行任务记录 JobHistoryServer MapReduce执行记录 日志聚合结果可视化查看
Hadoop-06-Hadoop集群 历史服务器配置 超详细 执行任务记录 JobHistoryServer MapReduce执行记录 日志聚合结果可视化查看
78 1
|
4月前
|
分布式计算 资源调度 Hadoop
Hadoop-05-Hadoop集群 集群WordCount 超详细 真正的分布式计算 上传HDFS MapReduce计算 YRAN查看任务 上传计算下载查看
Hadoop-05-Hadoop集群 集群WordCount 超详细 真正的分布式计算 上传HDFS MapReduce计算 YRAN查看任务 上传计算下载查看
84 1
|
4月前
|
SQL 分布式计算 关系型数据库
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
165 0
|
4月前
|
SQL 分布式计算 关系型数据库
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
71 0
|
4月前
|
SQL 分布式计算 关系型数据库
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
93 0
|
6月前
|
缓存 分布式计算 算法
优化Hadoop MapReduce性能的最佳实践
【8月更文第28天】Hadoop MapReduce是一个用于处理大规模数据集的软件框架,适用于分布式计算环境。虽然MapReduce框架本身具有很好的可扩展性和容错性,但在某些情况下,任务执行可能会因为各种原因导致性能瓶颈。本文将探讨如何通过调整配置参数和优化算法逻辑来提高MapReduce任务的效率。
816 0
|
8月前
|
分布式计算 Hadoop Java
Hadoop MapReduce编程
该教程指导编写Hadoop MapReduce程序处理天气数据。任务包括计算每个城市ID的最高、最低气温、气温出现次数和平均气温。在读取数据时需忽略表头,且数据应为整数。教程中提供了环境变量设置、Java编译、jar包创建及MapReduce执行的步骤说明,但假设读者已具备基础操作技能。此外,还提到一个扩展练习,通过分区功能将具有相同尾数的数字分组到不同文件。
78 1

热门文章

最新文章

相关实验场景

更多