MapReduce-案列深度分析

简介: Bigdata的计算框架,从最开始的MapReduce,到离线处理Spark,实时处理FLink。MapReduce在大数据初级阶段是很重要的一项技能。MapReduce分两个阶段 1:Map阶段 2:Reduce阶段

# 示例

  • A:C,F,H,M,E
    B:F,H,E,X,C
    C:B,F,D,E
    D:M,H,B,D

Map阶段


import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com._51doit.AnLie.FlowMapper;
import com._51doit.AnLie.FlowReducer;
import com.sun.jersey.api.ConflictException;

public class Friends {
   static class FriendsMapper extends Mapper<LongWritable, Text, Text, Text>
   {
     Text k= new Text();
     Text v= new Text();
      @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
            throws IOException, InterruptedException {
        String line = value.toString();
        String[] sp = line.split(":");
        String user = sp[0];//value
        v.set(user);
        String fs = sp[1];
        String[] f = fs.split(",");//
        for (String st : f) {//key
            k.set(st);
            context.write(k, v);//输出为b,a c,a等
        }
    } 
   }
   static class FriendsReducer extends Reducer<Text, Text, Text, NullWritable>
   {
       @Override
    protected void reduce(Text key, Iterable<Text> itera, Reducer<Text, Text, Text,NullWritable>.Context context)
            throws IOException, InterruptedException {
           //Text都是一个一个的,我们要把他们的共同用户放置一个list集合里
           List<String> list = new ArrayList<>();
        for (Text text : itera) {
            //Text是同一个对象,但是toString不是同一个对象
            list.add(text.toString());
        }
        Collections.sort(list);
        for (int i = 0; i < list.size()-1; i++) {
            for (int j = i+1; j < list.size(); j++) {
                String k=list.get(i)+"和"+list.get(j)+"的共同好友是:"+key.toString();
                context.write(new Text(k), NullWritable.get());
            }
        }
    }
   }
   public static void main(String[] args) throws Exception {
     //配置
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        // 设置map和reduce类  调用类中自定义的map reduce方法的业务逻辑
        job.setMapperClass(FriendsMapper.class);
        job.setReducerClass(FriendsReducer.class);
        //Mapper的输出格式
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        //Reducer的输出格式
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        //输出文件
        FileInputFormat.setInputPaths(job, new Path("D:\\www"));
        //输入文件
        FileOutputFormat.setOutputPath(job, new Path("D:\\rr"));
        //提交任务
        job.waitForCompletion(true);
}
}

第一阶段的结果:
C和D的共同好友是:B A和B的共同好友是:C C和D的共同好友是:D A和B的共同好友是:E A和C的共同好友是:E
B和C的共同好友是:E A和B的共同好友是:F A和C的共同好友是:F B和C的共同好友是:F A和B的共同好友是:H
A和D的共同好友是:H B和D的共同好友是:H A和D的共同好友是:M

Reduce阶段


import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Friends2 {
   static class FriendsMapper extends Mapper<LongWritable, Text, Text, Text>
   {
     Text k= new Text();
     Text v= new Text(); 
      @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
            throws IOException, InterruptedException {
        String line = value.toString();
        String[] sp = line.split(":");
        k.set(sp[0]);
        v.set(sp[1]);
        context.write(k, v);
   }
   }
   static class FriendsReducer extends Reducer<Text, Text, Text, Text>
   {    Text v=new Text();
       @Override
    protected void reduce(Text key, Iterable<Text> itera, Reducer<Text, Text, Text,Text>.Context context)
            throws IOException, InterruptedException {
           
          StringBuilder b = new StringBuilder();
        for (Text text : itera) {
            b.append(text.toString()+"");
        }
       v.set(b.toString().trim());
       context.write(key, v);
   }
   }
   public static void main(String[] args) throws Exception {
     //配置
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        // 设置map和reduce类  调用类中自定义的map reduce方法的业务逻辑
        job.setMapperClass(FriendsMapper.class);
        job.setReducerClass(FriendsReducer.class);
        //Mapper的输出格式
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        //Reducer的输出格式
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        //输出文件
        FileInputFormat.setInputPaths(job, new Path("D:\\rr"));
        //输入文件
        FileOutputFormat.setOutputPath(job, new Path("D:\\vvv"));
        //提交任务
        job.waitForCompletion(true);
}
}

第二阶段结果:
A和B的共同好友是 FCHE A和C的共同好友是 EF A和D的共同好友是 MH B和C的共同好友是 EF
B和D的共同好友是 H C和D的共同好友是 DB

MapReduce的两个阶段分为治之的思想,奠定了大数据离线批处理的si'xin思想。有兴趣的,可以找一下Goole的那3篇关于离线处理大数据思想的论文。

相关文章
|
10月前
|
分布式计算 Hadoop
Hadoop系列 mapreduce 原理分析
Hadoop系列 mapreduce 原理分析
103 1
|
分布式计算 前端开发 Hadoop
【细节拉满】Hadoop课程设计项目,使用idea编写基于MapReduce的学生成绩分析系统(附带源码、项目文件下载地址)(三)
【细节拉满】Hadoop课程设计项目,使用idea编写基于MapReduce的学生成绩分析系统(附带源码、项目文件下载地址)(三)
910 0
|
7月前
|
分布式计算 大数据 分布式数据库
"揭秘HBase MapReduce高效数据处理秘诀:四步实战攻略,让你轻松玩转大数据分析!"
【8月更文挑战第17天】大数据时代,HBase以高性能、可扩展性成为关键的数据存储解决方案。结合MapReduce分布式计算框架,能高效处理HBase中的大规模数据。本文通过实例展示如何配置HBase集群、编写Map和Reduce函数,以及运行MapReduce作业来计算HBase某列的平均值。此过程不仅限于简单的统计分析,还可扩展至更复杂的数据处理任务,为企业提供强有力的大数据技术支持。
113 1
|
10月前
|
SQL 分布式计算 数据可视化
数据分享|Python、Spark SQL、MapReduce决策树、回归对车祸发生率影响因素可视化分析
数据分享|Python、Spark SQL、MapReduce决策树、回归对车祸发生率影响因素可视化分析
|
9月前
|
分布式计算 Hadoop Java
使用Hadoop MapReduce分析邮件日志提取 id、状态 和 目标邮箱
使用Hadoop MapReduce分析邮件日志提取 id、状态 和 目标邮箱
120 0
|
分布式计算 Hadoop
【细节拉满】Hadoop课程设计项目,使用idea编写基于MapReduce的学生成绩分析系统(附带源码、项目文件下载地址)(二)
【细节拉满】Hadoop课程设计项目,使用idea编写基于MapReduce的学生成绩分析系统(附带源码、项目文件下载地址)(二)
680 0
|
缓存 分布式计算 资源调度
MapReduce执行过程分析【问题】
MapReduce执行过程分析【问题】
200 0
MapReduce执行过程分析【问题】
|
分布式计算 Hadoop
【细节拉满】Hadoop课程设计项目,使用idea编写基于MapReduce的学生成绩分析系统(附带源码、项目文件下载地址)(四)
【细节拉满】Hadoop课程设计项目,使用idea编写基于MapReduce的学生成绩分析系统(附带源码、项目文件下载地址)(四)
905 1
【细节拉满】Hadoop课程设计项目,使用idea编写基于MapReduce的学生成绩分析系统(附带源码、项目文件下载地址)(四)
|
存储 分布式计算 Hadoop
【细节拉满】Hadoop课程设计项目,使用idea编写基于MapReduce的学生成绩分析系统(附带源码、项目文件下载地址)(一)
【细节拉满】Hadoop课程设计项目,使用idea编写基于MapReduce的学生成绩分析系统(附带源码、项目文件下载地址)(一)
544 1
【细节拉满】Hadoop课程设计项目,使用idea编写基于MapReduce的学生成绩分析系统(附带源码、项目文件下载地址)(一)
|
SQL 分布式计算 并行计算
【手把手 脑把脑】教会你使用idea基于MapReduce的统计数据分析(从问题分析到代码编写)(一)
【手把手 脑把脑】教会你使用idea基于MapReduce的统计数据分析(从问题分析到代码编写)(一)
319 0
【手把手 脑把脑】教会你使用idea基于MapReduce的统计数据分析(从问题分析到代码编写)(一)