MapReduce编程:数据过滤保存、UID 去重

简介: MapReduce编程:数据过滤保存、UID 去重


MapReduce编程:数据过滤保存、UID 去重

一、实验目标

  1. 熟练掌握Mapper类,Reducer类和main函数的编写
  2. 熟练掌握在本地测试方法
  3. 熟练掌握集群上进行分布式程序测试
  4. 掌握用户UID去重实现方法
  5. 掌握MapReduce数据过滤方法

二、实验要求及注意事项

  1. 给出每个实验的主要实验步骤、实现代码和测试效果截图。
  2. 对本次实验工作进行全面的总结分析。
  3. 所有程序需要本地测试和集群测试,给出相应截图。
  4. 建议工程名,类名或包名等做适当修改,显示个人学号或者姓名
  5. 注意:sogou.txt文件上传到HDFS

三、实验内容及步骤

实验任务1:数据过滤及保存,输入文件为搜狗日志文件(公共目录下数据文件/experiment/sogou.txt上传到HDFS的根目录),对输入的每行数据做判断,只把搜索的关键词中包含数字的用户uid和关键词输出到HDFS上。展示结果为成功过滤出搜索关键词包含数字的用户ID和其搜索的关键词,如图所示:

主要实现步骤和运行效果图:

(1)进入虚拟机并启动Hadoop集群,完成文件上传。

(2)启动Eclipse客户端,新建一个java工程;在该工程中创建package,导入jar包,完成环境配置,依次创建包、Mapper类,Reducer类和主类等;

(3)完成代码编写。

Map

package com.wjw.hadoop;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WjwMap extends Mapper<Object, Text, Text, Text>{
    protected void map(Object key, Text value, Context context) throws IOException, InterruptedException{
        String [] str = value.toString().split("\t");
        if(str != null && str.length == 6){
            String keyword = str[2];
            String uid = str[1];
            if(keyword.matches(".*\\d+.*")){
                context.write(new Text(uid), new Text(keyword));
            }
        }
    }
}

Reduce

package com.wjw.hadoop;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WjwReduce extends Reducer< Text, Text,  Text, Text>{
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException{
        for (Text val:values){
            context.write(key, val);
        }
    }
}

Main

package com.wjw.hadoop;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WjwMain {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{
        if(null == args || args.length != 2){
            System.err.println("<Usage>:WjwMain need <input> <output>");
            System.exit(1);
        }
        String in = args[0];
        String out = args[1];
        Configuration conf = new Configuration();
        Job job = new Job(conf, "WjwMain");
        job.setJarByClass(WjwMain.class);
        job.setMapperClass(WjwMap.class);
        job.setReducerClass(WjwReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, new Path(in));
        FileOutputFormat.setOutputPath(job, new Path(out));
        System.exit(job.waitForCompletion(true)?0:1);
    }
}

(4)测试程序,并查看输出结果。

实验任务2:使用mapreduce编程,统计sogou.txt文件中所有去重后的用户UID,实现效果如图1所示。2.MapReduce编程:UID去重,完成打印sogou日志中独立uid,展示的结果为sogou.txt文件中所有去重后的用户UID。实现效果如下图所示:

主要实现步骤和运行效果图:

(1)进入虚拟机并启动Hadoop集群,完成文件上传。

(2)启动Eclipse客户端,新建一个java工程;在该工程中创建package,导入jar包,完成环境配置,依次创建包、Mapper类,Reducer类和主类等;

(3)完成代码编写。

UidMap

package com.wjw.hadoop;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WjwUidMap extends Mapper<Object, Text, Text, NullWritable>{
    protected void map(Object key, Text value, Context context) throws IOException, InterruptedException{
        String [] arr = value.toString().split("\t");
        String uid = arr[1];
        context.write(new Text(uid), NullWritable.get());
    }
}

UidReduce

package com.wjw.hadoop;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WjwUidReduce extends Reducer< Text, NullWritable, Text, NullWritable>{
    protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException{
        context.write(key, NullWritable.get());
    }
}

UidMain

package com.wjw.hadoop;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WjwUidMain {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{
        if(null == args || args.length != 2){
            System.err.println("<Usage>:WjwUidMain need <input> <output>");
            System.exit(1);
        }
        String in = args[0];
        String out = args[1];
        Configuration conf = new Configuration();
        Job job = new Job(conf, "WjwUidMain");
        job.setJarByClass(WjwUidMain.class);
        job.setMapperClass(WjwUidMap.class);
        job.setReducerClass(WjwUidReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        FileInputFormat.addInputPath(job, new Path(in));
        FileOutputFormat.setOutputPath(job, new Path(out));
        System.exit(job.waitForCompletion(true)?0:1);
    }
}

(4)测试程序,并查看输出结果。

实验任务3:使用Java API编程实现创建一个目录,删除一个目录,读文件和两个写文件的函数,其中一个写文件函数,用于将磁盘文件内容写入到HDFS一个自定义文件中,另一个写文件函数,将HDFS中一个目录下的文件内容写入到HDFS中另一个目录文件中。文件名字自拟。要求将所有函数封装到同一个类中,在主函数中调用,进行测试。

主要实现步骤和运行效果图:

(1)进入虚拟机并启动Hadoop集群,完成文件上传。

(2)启动Eclipse客户端,新建一个java工程;在该工程中创建package,导入jar包,完成环境配置,依次创建包、Mapper类,Reducer类和主类等;

(3)完成代码编写。

Main

package hadoop;
import java.io.BufferedInputStream; 
import java.io.FileInputStream;
import java.io.IOException; 
import java.net.URI;
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IOUtils;
public class WjwFile {
public static void Read(String[] args) throws IOException {
    Configuration conf = new Configuration();
    conf.set("fs.defaultFS", "hdfs://master:9000");
    FileSystem fs = null;
    FSDataInputStream in =null;
    fs = FileSystem.get(conf);
    in = fs.open(new Path(args[2]));
    IOUtils.copyBytes(in, System.out, 4096, false);
}
public static void Write1(String[] args) throws IOException {
    Configuration conf = new Configuration();
    BufferedInputStream in = null;
    FileSystem fs = null;
    FSDataOutputStream out = null;
    in = new BufferedInputStream(new FileInputStream(args[1]));
    fs = FileSystem.get(URI.create(args[2]), conf);
    out = fs.create(new Path(args[2]));
    IOUtils.copyBytes(in, out, 4096, false);
}
public static void Write2(String[] args) throws IOException {
    Configuration conf = new Configuration();
    BufferedInputStream in = null;
    FileSystem fs = null;
    FSDataOutputStream out = null;
    Path path = new Path(args[2]);
    fs = FileSystem.get(URI.create(args[3]), conf);
    in = new BufferedInputStream(fs.open(path));
    out = fs.create(new Path(args[3]));
    IOUtils.copyBytes(in, out, 4096, false);
}
public static void Mkdir(String[] args) throws IOException {
    Configuration conf = new Configuration();
    FileSystem fs = null;
    fs = FileSystem.get(URI.create(args[0]), conf);
    fs.mkdirs(new Path(args[0]));
}
public static void Delete(String[] args) throws IOException {
    Configuration conf = new Configuration();
    FileSystem fs = null;
    fs = FileSystem.get(URI.create(args[0]), conf);
    fs.delete(new Path(args[0]), true);
}
public static void main(String[] args) throws IOException{
    Mkdir(args);
    System.out.println("Directory created.");
    Write1(args);
    System.out.println("File1 written.");
    Read(args);
    System.out.println("File read.");
    Write2(args);
    System.out.println("File2 written.");
    Delete(args);
    System.out.println("Directory deleted.");
    }
}

(4)测试程序,并查看输出结果。

目录
相关文章
|
14天前
|
分布式计算 大数据 Hadoop
揭秘MapReduce背后的魔法:从基础类型到高级格式,带你深入理解这一大数据处理利器的奥秘与实战技巧,让你从此不再是编程门外汉!
【8月更文挑战第17天】MapReduce作为分布式计算模型,是大数据处理的基石。它通过Map和Reduce函数处理大规模数据集,简化编程模型,使开发者聚焦业务逻辑。MapReduce分单阶段和多阶段,支持多种输入输出格式如`TextInputFormat`和`SequenceFileInputFormat`。例如,简单的单词计数程序利用`TextInputFormat`读取文本行并计数;而`SequenceFileInputFormat`适用于高效处理二进制序列文件。合理选择类型和格式可有效解决大数据问题。
25 1
|
14天前
|
存储 分布式计算 分布式数据库
《HBase MapReduce之旅:我的学习笔记与心得》——跟随我的步伐,一同探索HBase世界,揭开MapReduce的神秘面纱,分享那些挑战与收获,让你在数据的海洋里畅游无阻!
【8月更文挑战第17天】HBase是Apache顶级项目,作为Bigtable的开源版,它是一个非关系型、分布式数据库,具备高可扩展性和性能。结合HDFS存储和MapReduce计算框架,以及Zookeeper协同服务,HBase支持海量数据高效管理。MapReduce通过将任务拆解并在集群上并行执行,极大提升处理速度。学习HBase MapReduce涉及理解其数据模型、编程模型及应用实践,虽然充满挑战,但收获颇丰,对职业发展大有裨益。
27 0
|
3月前
|
分布式计算 Hadoop Java
MapReduce编程模型——在idea里面邂逅CDH MapReduce
MapReduce编程模型——在idea里面邂逅CDH MapReduce
51 15
|
3月前
|
分布式计算 Hadoop Java
Hadoop MapReduce编程
该教程指导编写Hadoop MapReduce程序处理天气数据。任务包括计算每个城市ID的最高、最低气温、气温出现次数和平均气温。在读取数据时需忽略表头,且数据应为整数。教程中提供了环境变量设置、Java编译、jar包创建及MapReduce执行的步骤说明,但假设读者已具备基础操作技能。此外,还提到一个扩展练习,通过分区功能将具有相同尾数的数字分组到不同文件。
41 1
|
3月前
|
存储 分布式计算 Hadoop
MapReduce编程模型——自定义序列化类实现多指标统计
MapReduce编程模型——自定义序列化类实现多指标统计
29 0
|
3月前
|
机器学习/深度学习 分布式计算 并行计算
MapReduce是一种用于并行计算的编程模型和处理大规模数据集的实现
MapReduce是一种用于并行计算的编程模型和处理大规模数据集的实现
41 0
|
3月前
|
存储 分布式计算 Hadoop
Hadoop生态系统详解:HDFS与MapReduce编程
Apache Hadoop是大数据处理的关键,其核心包括HDFS(分布式文件系统)和MapReduce(并行计算框架)。HDFS为大数据存储提供高容错性和高吞吐量,采用主从结构,通过数据复制保证可靠性。MapReduce将任务分解为Map和Reduce阶段,适合大规模数据集的处理。通过代码示例展示了如何使用MapReduce实现Word Count功能。HDFS和MapReduce的结合,加上YARN的资源管理,构成处理和分析大数据的强大力量。了解和掌握这些基础对于有效管理大数据至关重要。【6月更文挑战第12天】
90 0
|
4月前
|
分布式计算 Hadoop
Hadoop系列 mapreduce 原理分析
Hadoop系列 mapreduce 原理分析
63 1
|
3月前
|
数据采集 SQL 分布式计算
|
4月前
|
分布式计算 Hadoop Java
Hadoop MapReduce 调优参数
对于 Hadoop v3.1.3,针对三台4核4G服务器的MapReduce调优参数包括:`mapreduce.reduce.shuffle.parallelcopies`设为10以加速Shuffle,`mapreduce.reduce.shuffle.input.buffer.percent`和`mapreduce.reduce.shuffle.merge.percent`分别设为0.8以减少磁盘IO。
下一篇
云函数