MapReduce实现数据去重

简介: 一、原理分析   Mapreduce的处理过程,由于Mapreduce会在Map~reduce中,将重复的Key合并在一起,所以Mapreduce很容易就去除重复的行。Map无须做任何处理,设置Map中写入context的东西为不作任何处理的行,也就是Map中最初处理的value即可,而Reduce同样无须做任何处理,写入输出文件的东西就是,最初得到的Key。

一、原理分析

  Mapreduce的处理过程,由于Mapreduce会在Map~reduce中,将重复的Key合并在一起,所以Mapreduce很容易就去除重复的行。Map无须做任何处理,设置Map中写入context的东西为不作任何处理的行,也就是Map中最初处理的value即可,而Reduce同样无须做任何处理,写入输出文件的东西就是,最初得到的Key。

  我原来以为是map阶段用了hashmap,根据hash值的唯一性。估计应该不是...

  Map是输入文件有几行,就运行几次。

二、代码

2.1 Mapper

package algorithm;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class DuplicateRemoveMapper extends
		Mapper<LongWritable, Text, Text, Text> {
	//输入文件是数字 不过可能也有字符等 所以用Text,不用LongWritable
	public void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		context.write(value, new Text());//后面不能是null,否则,空指针

	}

}

  

2.2 Reducer

package algorithm;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class DuplicateRemoveReducer extends Reducer<Text, Text, Text, Text> {

	public void reduce(Text key, Iterable<Text> value, Context context)
			throws IOException, InterruptedException {
		// process values
		context.write(key, null); //可以出处null
	}

}

  

2.3 Main

package algorithm;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class DuplicateMainMR  {

	public static void main(String[] args) throws Exception{
		// TODO Auto-generated method stub
		Configuration conf = new Configuration(); 
		Job job = new Job(conf,"DuplicateRemove");
		job.setJarByClass(DuplicateMainMR.class);
		job.setMapperClass(DuplicateRemoveMapper.class);
		job.setReducerClass(DuplicateRemoveReducer.class);
		job.setOutputKeyClass(Text.class);
		//输出是null,不过不能随意写  否则包类型不匹配
		job.setOutputValueClass(Text.class);
		
		job.setNumReduceTasks(1);
		//hdfs上写错了文件名 DupblicateRemove  多了个b
		//hdfs不支持修改操作
		FileInputFormat.addInputPath(job, new Path("hdfs://192.168.58.180:8020/ClassicalTest/DupblicateRemove/DuplicateRemove.txt"));
		FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.58.180:8020/ClassicalTest/DuplicateRemove/DuplicateRemoveOut"));
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}

}

  

三、输出分析

3.1 输入与输出

没啥要对比的....不贴了

3.2 控制台

 

doop.mapreduce.Job.updateStatus(Job.java:323)
  INFO - Job job_local4032991_0001 completed successfully
 DEBUG - PrivilegedAction as:hxsyl (auth:SIMPLE) from:org.apache.hadoop.mapreduce.Job.getCounters(Job.java:765)
  INFO - Counters: 38
	File System Counters
		FILE: Number of bytes read=560
		FILE: Number of bytes written=501592
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=48
		HDFS: Number of bytes written=14
		HDFS: Number of read operations=13
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=4
	Map-Reduce Framework
		Map input records=8
		Map output records=8
		Map output bytes=26
		Map output materialized bytes=48
		Input split bytes=142
		Combine input records=0
		Combine output records=0
		Reduce input groups=6
		Reduce shuffle bytes=48
		Reduce input records=8
		Reduce output records=6
		Spilled Records=16
		Shuffled Maps =1
		Failed Shuffles=0
		Merged Map outputs=1
		GC time elapsed (ms)=4
		CPU time spent (ms)=0
		Physical memory (bytes) snapshot=0
		Virtual memory (bytes) snapshot=0
		Total committed heap usage (bytes)=457179136
	Shuffle Errors
		BAD_ID=0
		CONNECTION=0
		IO_ERROR=0
		WRONG_LENGTH=0
		WRONG_MAP=0
		WRONG_REDUCE=0
	File Input Format Counters 
		Bytes Read=24
	File Output Format Counters 
		Bytes Written=14
 DEBUG - PrivilegedAction as:hxsyl (auth:SIMPLE) from:org.apache.hadoop.mapreduce.Job.updateStatus(Job.java:323)
 DEBUG - stopping client from cache: org.apache.hadoop.ipc.Client@37afeb11
 DEBUG - removing client from cache: org.apache.hadoop.ipc.Client@37afeb11
 DEBUG - stopping actual client because no more references remain: org.apache.hadoop.ipc.Client@37afeb11
 DEBUG - Stopping client
 DEBUG - IPC Client (521081105) connection to /192.168.58.180:8020 from hxsyl: closed
 DEBUG - IPC Client (521081105) connection to /192.168.58.180:8020 from hxsyl: stopped, remaining connections 0

 

目录
相关文章
|
27天前
|
SQL 分布式计算 关系型数据库
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
68 0
|
27天前
|
SQL 分布式计算 关系型数据库
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
33 0
|
27天前
|
SQL 分布式计算 关系型数据库
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
41 0
|
3月前
|
存储 分布式计算 分布式数据库
《HBase MapReduce之旅:我的学习笔记与心得》——跟随我的步伐,一同探索HBase世界,揭开MapReduce的神秘面纱,分享那些挑战与收获,让你在数据的海洋里畅游无阻!
【8月更文挑战第17天】HBase是Apache顶级项目,作为Bigtable的开源版,它是一个非关系型、分布式数据库,具备高可扩展性和性能。结合HDFS存储和MapReduce计算框架,以及Zookeeper协同服务,HBase支持海量数据高效管理。MapReduce通过将任务拆解并在集群上并行执行,极大提升处理速度。学习HBase MapReduce涉及理解其数据模型、编程模型及应用实践,虽然充满挑战,但收获颇丰,对职业发展大有裨益。
43 0
|
6月前
|
SQL 分布式计算 数据可视化
数据分享|Python、Spark SQL、MapReduce决策树、回归对车祸发生率影响因素可视化分析
数据分享|Python、Spark SQL、MapReduce决策树、回归对车祸发生率影响因素可视化分析
|
6月前
|
分布式计算
如何在MapReduce中处理非结构化数据?
如何在MapReduce中处理非结构化数据?
73 0
|
6月前
|
分布式计算 Java Hadoop
MapReduce编程:数据过滤保存、UID 去重
MapReduce编程:数据过滤保存、UID 去重
101 0
|
6月前
|
存储 分布式计算 分布式数据库
对给定的数据利用MapReduce编程实现数据的清洗和预处理,编程实现数据存储到HBase数据库,实现数据的增删改查操作接口
对给定的数据利用MapReduce编程实现数据的清洗和预处理,编程实现数据存储到HBase数据库,实现数据的增删改查操作接口
51 0
|
6月前
|
存储 分布式计算 算法
MapReduce【数据压缩】
MapReduce【数据压缩】
|
数据采集 缓存 分布式计算
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(二)
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(二)