MapReduce编程:join操作和聚合操作

简介: MapReduce编程:join操作和聚合操作


MapReduce 编程:join操作和聚合操作

一、实验目标

  1. 理解MapReduce计算框架的分布式处理工作流程
  2. 掌握用mapreduce计算框架实现Map端的本地聚合操作
  3. 掌握MapReduce编程的map端join操作

二、实验要求及注意事项

  1. 给出每个实验的主要实验步骤、实现代码和测试效果截图。
  2. 对本次实验工作进行全面的总结分析。
  3. 所有程序需要本地测试和集群测试,给出相应截图。
  4. 建议工程名,类名或包名等做适当修改,显示个人学号或者姓名

三、实验内容及步骤

实验任务1:使用MapReduce编程,如果涉及到join操作,一般使用的是reduce端的join;但如果其中一个文件较小,可以将其添加到分布式缓存当中去,在map阶段时,每一个map task开始运行前,先从分布式缓存中取出此小文件,在map方法中对数据进行join操作,即map join操作。本实验使用的输入文件分别是uid-list和sogou,uid-lis保存着搜索过“电影”的用户的UID(比较小),sogou是日志文件;对于处于uid-list中的用户,把他们在sogou日志文件中的uid及搜索关键词输出到HDFS。实现效果如图1和图2所示。

主要实现步骤和运行效果图:

(1)进入虚拟机并启动Hadoop集群,完成文件上传。

(2)启动Eclipse客户端,新建一个java工程;在该工程中创建package,导入jar包,完成环境配置,依次创建包、Mapper类,Reducer类和主类等;

(3)完成代码编写。

JoinMap

package hadoop;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.io.*;
public class WjwJoinMap extends Mapper<Object, Text, Text, Text>{
  private Map<String, Integer> map = new HashMap<String, Integer>();
  private Text uid = new Text();
  protected void setup(Context context) throws IOException, InterruptedException{
    @SuppressWarnings("resource")
    BufferedReader br = new BufferedReader(new FileReader("uuid"));
    String line = null;
    while((line=br.readLine())!=null){
      System.out.println(line);
      map.put(line.trim(), 1);
    }
  }
  protected void map(Object key, Text value, Context context) throws IOException, InterruptedException{
    String[] arr = value.toString().split("\t");
    String keyword = arr[2];
    if(arr[1]!=null && map.get(arr[1])!=null){
      uid.set(arr[1]);
      context.write(uid, new Text(keyword));
    }
  }
}

JoinMain

package hadoop;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
public class WjwJoinMain {
  public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException{
    if(args==null || args.length!=3){
      System.out.println("error");
      System.exit(0);
    }
    Job job = Job.getInstance(new Configuration(), "WjwJoinMain");
    job.setJarByClass(WjwJoinMain.class);
    job.setMapperClass(WjwJoinMap.class);
    job.setNumReduceTasks(0);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    Path path = new Path(args[0]);
    String pathLink = path.toUri().toString() + "#uuid";
    job.addCacheFile(new URI(pathLink));
    FileInputFormat.addInputPath(job, new Path(args[1]));
    FileOutputFormat.setOutputPath(job, new Path(args[2]));
    job.waitForCompletion(true);
  }
}

(4)测试程序,并查看输出结果。

实验任务2: Map端本地聚合,读取文本文件/home/zkpk/word.txt,进行单词计数,为了减少网络传输数据量,且使用本地聚合不会影响最终结果,在map端进行本地聚合。

主要实现步骤和运行效果图:

(1)进入虚拟机并启动Hadoop集群,完成文件上传。

(2)启动Eclipse客户端,新建一个java工程;在该工程中创建package,导入jar包,完成环境配置,依次创建包、Mapper类,Reducer类和主类等;

(3)完成代码编写。

WordMap

package hadoop;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import org.apache.hadoop.io.*;
public class WjwWordMap extends Mapper<Object, Text, Text, IntWritable>{
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException{
      String arr[] = value.toString().split(" ");
      for(String word:arr){
        context.write(new Text(word), new IntWritable(1));
      }
    }
}

WordReduce

package hadoop;
import java.io.IOException;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Reducer;
public class WjwWordReduce extends Reducer<Text, IntWritable, Text, IntWritable>{
  protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{
    int sum = 0;
    for(IntWritable val:values){
      sum += val.get();
    }
    context.write(key, new IntWritable(sum));
  }
}

WordMain

package hadoop;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
public class WjwWordMain {
  public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{
    if(args==null || args.length!=2){
      System.out.println("error");
    }
    Job job = Job.getInstance(new Configuration(), "WjwWordMain");
    job.setJarByClass(WjwWordMain.class);
    job.setMapperClass(WjwWordMap.class);
    job.setCombinerClass(WjwWordReduce.class);
    job.setReducerClass(WjwWordReduce.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    job.waitForCompletion(true);
  }
}

(4)测试程序,并查看输出结果。

目录
相关文章
|
7月前
|
分布式计算 Hadoop Java
MapReduce编程:自定义分区和自定义计数器
MapReduce编程:自定义分区和自定义计数器
107 0
|
2月前
|
SQL 分布式计算 Java
Hadoop-11-MapReduce JOIN 操作的Java实现 Driver Mapper Reducer具体实现逻辑 模拟SQL进行联表操作
Hadoop-11-MapReduce JOIN 操作的Java实现 Driver Mapper Reducer具体实现逻辑 模拟SQL进行联表操作
46 3
|
2月前
|
分布式计算 资源调度 数据可视化
Hadoop-06-Hadoop集群 历史服务器配置 超详细 执行任务记录 JobHistoryServer MapReduce执行记录 日志聚合结果可视化查看
Hadoop-06-Hadoop集群 历史服务器配置 超详细 执行任务记录 JobHistoryServer MapReduce执行记录 日志聚合结果可视化查看
51 1
|
7月前
|
存储 分布式计算 算法
【底层服务/编程功底系列】「大数据算法体系」带你深入分析MapReduce算法 — Shuffle的执行过程
【底层服务/编程功底系列】「大数据算法体系」带你深入分析MapReduce算法 — Shuffle的执行过程
107 0
|
4月前
|
分布式计算 大数据 Hadoop
揭秘MapReduce背后的魔法:从基础类型到高级格式,带你深入理解这一大数据处理利器的奥秘与实战技巧,让你从此不再是编程门外汉!
【8月更文挑战第17天】MapReduce作为分布式计算模型,是大数据处理的基石。它通过Map和Reduce函数处理大规模数据集,简化编程模型,使开发者聚焦业务逻辑。MapReduce分单阶段和多阶段,支持多种输入输出格式如`TextInputFormat`和`SequenceFileInputFormat`。例如,简单的单词计数程序利用`TextInputFormat`读取文本行并计数;而`SequenceFileInputFormat`适用于高效处理二进制序列文件。合理选择类型和格式可有效解决大数据问题。
78 1
|
6月前
|
分布式计算 Hadoop Java
MapReduce编程模型——在idea里面邂逅CDH MapReduce
MapReduce编程模型——在idea里面邂逅CDH MapReduce
97 15
|
6月前
|
分布式计算 Hadoop Java
Hadoop MapReduce编程
该教程指导编写Hadoop MapReduce程序处理天气数据。任务包括计算每个城市ID的最高、最低气温、气温出现次数和平均气温。在读取数据时需忽略表头,且数据应为整数。教程中提供了环境变量设置、Java编译、jar包创建及MapReduce执行的步骤说明,但假设读者已具备基础操作技能。此外,还提到一个扩展练习,通过分区功能将具有相同尾数的数字分组到不同文件。
66 1
|
7月前
|
分布式计算 资源调度 Hadoop
MapReduce分布式编程
MapReduce分布式编程
87 1
|
6月前
|
存储 分布式计算 Hadoop
MapReduce编程模型——自定义序列化类实现多指标统计
MapReduce编程模型——自定义序列化类实现多指标统计
54 0
|
6月前
|
机器学习/深度学习 分布式计算 并行计算
MapReduce是一种用于并行计算的编程模型和处理大规模数据集的实现
MapReduce是一种用于并行计算的编程模型和处理大规模数据集的实现
99 0