MapReduce多个job同时使用的方式(从网上找到的案例,原始博文:http://www.cnblogs.com/yjmyzz/p/4540469.html)

简介: 复杂的MapReduce处理中,往往需要将复杂的处理过程,分解成多个简单的Job来执行,第1个Job的输出做为第2个Job的输入,相互之间有一定依赖关系。以上一篇中的求平均数为例,可以分解成三个步骤:1. 求Sum2. 求Count3. 计算平均数每1个步骤看成一个Job,其中Job3必须等待Job1、Job2完成,并将Job1、Job2的输出结果做为输入,下面的代码演示了如何将这3个

复杂的MapReduce处理中,往往需要将复杂的处理过程,分解成多个简单的Job来执行,第1个Job的输出做为第2个Job的输入,相互之间有一定依赖关系。以上一篇中的求平均数为例,可以分解成三个步骤:

1. 求Sum

2. 求Count

3. 计算平均数

每1个步骤看成一个Job,其中Job3必须等待Job1、Job2完成,并将Job1、Job2的输出结果做为输入,下面的代码演示了如何将这3个Job串起来

代码:

package cn.toto.bigdata.mr.wc;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Avg2 {

	private static final Text TEXT_SUM = new Text("SUM");
	private static final Text TEXT_COUNT = new Text("COUNT");
	private static final Text TEXT_AVG = new Text("AVG");
	
	public static class SumMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
		public long sum = 0;
		
		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			sum += value.toString().length();
		}
		
		@Override
		protected void cleanup(Mapper<LongWritable, Text, Text, LongWritable>.Context context)
				throws IOException, InterruptedException {
			context.write(TEXT_SUM, new LongWritable(sum));
		}
	}
	
	public static class SumReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
		public long sum = 0;
		
		@Override
		protected void reduce(Text key, Iterable<LongWritable> values,Context context) 
				throws IOException, InterruptedException {
			for (LongWritable v : values) {
				sum += v.get();
			}
			context.write(TEXT_SUM, new LongWritable(sum));
		}
	}
	
	//计算Count
	public static class CountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
		public long count = 0;
		
		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)
				throws IOException, InterruptedException {
			count += 1;
		}
		
		@Override
		protected void cleanup(Context context)
				throws IOException, InterruptedException {
			context.write(TEXT_COUNT, new LongWritable(count));
		}
	}
	
	public static class CountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
		public long count = 0;
		
		@Override
		public void reduce(Text key, Iterable<LongWritable> values, Context context)
				throws IOException, InterruptedException {
			for (LongWritable v : values) {
				count += v.get();
			}
			context.write(TEXT_COUNT, new LongWritable(count));
		}
	}
	
	//计算Avg 
	public static class AvgMapper extends Mapper<LongWritable, Text, LongWritable, LongWritable> {
		public long count = 0;
		public long sum = 0;
		
		@Override
		protected void map(LongWritable key, Text value,Context context)
				throws IOException, InterruptedException {
			String[] v = value.toString().split("\t");
			if (v[0].equals("COUNT")) {
				count = Long.parseLong(v[1]);
			} else if (v[0].equals("SUM")) {
				sum = Long.parseLong(v[1]);
			}
		}
		
		@Override
		protected void cleanup(Context context) throws IOException, InterruptedException {
			context.write(new LongWritable(sum), new LongWritable(count));
		}
	}
	
	public static class AvgReducer extends Reducer<LongWritable, LongWritable, Text, DoubleWritable> {
		public long sum = 0;
		public long count = 0;
		
		@Override
		protected void reduce(LongWritable key, Iterable<LongWritable> values,Context context)
				throws IOException, InterruptedException {
			sum += key.get();
			for(LongWritable v : values) {
				count += v.get();
			}
		}
		
		@Override
		protected void cleanup(Reducer<LongWritable, LongWritable, Text, DoubleWritable>.Context context)
				throws IOException, InterruptedException {
			context.write(TEXT_AVG, new DoubleWritable(new Double(sum) / count));
		}
	}
	
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		
		String inputPath = "E:/wordcount/input/a.txt";
		String maxOutputPath = "E:/wordcount/output/max/";
		String countOutputPath = "E:/wordcount/output/count/";
		String avgOutputPath = "E:/wordcount/output/avg/";
		
		Job job1 = Job.getInstance(conf, "Sum");
		job1.setJarByClass(Avg2.class);
		job1.setMapperClass(SumMapper.class);
		job1.setCombinerClass(SumReducer.class);
		job1.setReducerClass(SumReducer.class);
		job1.setOutputKeyClass(Text.class);
		job1.setOutputValueClass(LongWritable.class);
		FileInputFormat.addInputPath(job1, new Path(inputPath));
		FileOutputFormat.setOutputPath(job1, new Path(maxOutputPath));
		
		Job job2 = Job.getInstance(conf, "Count");
		job2.setJarByClass(Avg2.class);
		job2.setMapperClass(CountMapper.class);
		job2.setCombinerClass(CountReducer.class);
		job2.setReducerClass(CountReducer.class);
		job2.setOutputKeyClass(Text.class);
		job2.setOutputValueClass(LongWritable.class);
		FileInputFormat.addInputPath(job2, new Path(inputPath));
		FileOutputFormat.setOutputPath(job2, new Path(countOutputPath));
		
		Job job3 = Job.getInstance(conf, "Average");
		job3.setJarByClass(Avg2.class);
		job3.setMapperClass(AvgMapper.class);
		job3.setReducerClass(AvgReducer.class);
		job3.setMapOutputKeyClass(LongWritable.class);
		job3.setMapOutputValueClass(LongWritable.class);
		job3.setOutputKeyClass(Text.class);
		job3.setOutputValueClass(DoubleWritable.class);
		
		//将job1及job2的输出为做job3的输入
		FileInputFormat.addInputPath(job3, new Path(maxOutputPath));
		FileInputFormat.addInputPath(job3, new Path(countOutputPath));
		FileOutputFormat.setOutputPath(job3, new Path(avgOutputPath));
		 
		//提交job1及job2,并等待完成
		if (job1.waitForCompletion(true) && job2.waitForCompletion(true)) {
			System.exit(job3.waitForCompletion(true) ? 0 : 1);
		}
	}

}
运行准备:

准备数据文件:

E:/wordcount/input/a.txt

数据文件的内容如下:


运行后:E:\wordcount\output\count\part-r-00000的值如下:


运行后:

E:\wordcount\output\max\part-r-00000的内容如下:


最终的平均值是:E:\wordcount\output\avg\part-r-00000



目录
相关文章
|
5月前
|
前端开发 Python
HTML笔记+案例(上)
HTML笔记+案例
60 0
|
5月前
|
存储 分布式计算 监控
Hadoop【基础知识 01+02】【分布式文件系统HDFS设计原理+特点+存储原理】(部分图片来源于网络)【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
【4月更文挑战第3天】【分布式文件系统HDFS设计原理+特点+存储原理】(部分图片来源于网络)【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
285 2
|
2月前
|
移动开发 网络协议 编译器
实战案例3:C语言实现的HTTP服务器
实战案例3:C语言实现的HTTP服务器
88 0
|
4月前
杨老师课堂之网页制作HTML的学习入门-含有案例1
杨老师课堂之网页制作HTML的学习入门-含有案例
38 3
|
4月前
|
移动开发 HTML5
杨校老师课堂之HTML5动画特效的风车案例
杨校老师课堂之HTML5动画特效的风车案例
38 0
|
4月前
|
数据安全/隐私保护
杨老师课堂之网页制作HTML的学习入门-含有案例2
杨老师课堂之网页制作HTML的学习入门-含有案例
22 0
|
5月前
|
数据采集 网络协议
WWW(URL,HTTP,HTML)
WWW(URL,HTTP,HTML)
114 1
|
5月前
|
分布式计算 数据可视化 Hadoop
大数据实战——基于Hadoop的Mapreduce编程实践案例的设计与实现
大数据实战——基于Hadoop的Mapreduce编程实践案例的设计与实现
|
5月前
|
JavaScript
jQuery选择器案例之——index.html
jQuery选择器案例之——index.html
|
5月前
|
分布式计算 监控 Hadoop
Hadoop【基础知识 02】【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
【4月更文挑战第3天】Hadoop【基础知识 02】【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
235 0