MapReduce是一种用于并行计算的编程模型和处理大规模数据集的实现。它通常用于在分布式计算环境中处理大规模数据集,如在Hadoop中。下面是一个简单的MapReduce示例,用Java编写,用于计算一组数字的总和:
首先是Mapper类,用于将输入数据转换为键值对(key-value pairs):
```java import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class SumMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text("total"); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); int number = Integer.parseInt(line); context.write(word, new IntWritable(number)); } } ```
然后是Reducer类,用于对Mapper输出的键值对进行合并计算:
```java import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class SumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } context.write(key, new IntWritable(sum)); } } ```
最后是主程序,用于设置作业并运行MapReduce作业:
```java import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class SumDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "sum"); job.setJarByClass(SumDriver.class); job.setMapperClass(SumMapper.class); job.setCombinerClass(SumReducer.class); job.setReducerClass(SumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } } ```
在这个示例中,SumMapper类负责将输入数据转换为键值对,其中键是固定的"total",值是输入数据中的数字。SumReducer类负责对相同键的值进行合并计算,最终输出总和。主程序SumDriver类设置了作业的各种参数,并运行MapReduce作业。
MapReduce是一种经典的并行计算模型,用于处理大规模数据集。它由两个主要阶段组成:Map阶段和Reduce阶段。在Map阶段,数据被拆分成若干组,每组由一个键值对表示。Map任务将每个键值对处理成零个或多个新的键值对,并按照键将它们分组。在Reduce阶段,相同键的值被合并,并通过用户自定义的reduce函数进行处理。
MapReduce的工作流程如下:
1. **输入分片**:将输入数据集分成若干个小的数据块,每个数据块称为一个输入分片。
2. **映射阶段(Map Phase)**:对每个输入分片应用Map函数,将输入键值对映射成零个或多个中间键值对。这些中间键值对不需要与原始键值对的键相同。
3. **分组(Shuffle)**:将中间键值对按照键分组,以便后续的Reduce阶段能够处理相同键的所有值。
4. **Reduce阶段**:对每个中间键的所有值应用Reduce函数,将它们合并成一个或多个输出键值对。
5. **输出**:将Reduce阶段产生的输出键值对写入文件系统。
MapReduce模型的优点是能够有效处理大规模数据,并且具有良好的扩展性和容错性。它被广泛应用于分布式计算领域,如日志分析、数据挖掘和机器学习等。