Hadoop MapReduce(WordCount) Java编程

简介:

编写WordCount程序数据如下:

hello beijing

hello shanghai

hello chongqing

hello tianjin

hello guangzhou

hello shenzhen

...


1、WCMapper:

package com.hadoop.testHadoop;


import java.io.IOException;


import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;


//4个泛型中,前两个是指定mapper输入数据的类型,KEYIN是输入的key的类型,VALUEIN是输入的value的类型

//map 和 reduce 的数据输入输出都是以 key-value对的形式封装的

//默认情况下,框架传递给我们的mapper的输入数据中,key是要处理的文本中一行的起始偏移量,value为这一行的内容

//LongWritable  Text 是hadoop为了序列化 定义的数据类型

public class WCMapper extends  Mapper<LongWritable,Text,Text,LongWritable>{


   //mapreduce框架每读一行数据就调用一次该方法

@Override

protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {

String line=value.toString();

String [] words = line.split(" ");

for(String word:words){

context.write(new Text(word), new LongWritable(1));

}

}

}

2、WCReducer:

package com.hadoop.testHadoop;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable>{

//框架在map处理完成之后,将所有kv对缓存起来,进行分组,然后传递一个组<key,valus{}>,调用一次reduce方法

//<hello,{1,1,1,1,1,1.....}>

@Override

protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws java.io.IOException ,InterruptedException {

long count=0;

for(LongWritable value:values){

count+=value.get();

}

context.write(key, new LongWritable(count));

}


}


3、WCRunner:

package com.hadoop.testHadoop;


import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class WCRunner {

public static void main(String[] args) throws Exception {

Configuration conf=new Configuration();

Job job = Job.getInstance(conf);

//设置整个job所用的那些类在哪个jar包

   job.setJarByClass(WCRunner.class);

job.setMapperClass(WCMapper.class);

job.setReducerClass(WCReducer.class);

//map输出数据kv类型

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(LongWritable.class);

//reduce输出数据kv类型

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(LongWritable.class);

//执行输入数据的路径

FileInputFormat.setInputPaths(job, new Path("/wordcount/inpput"));

//执行输出数据的路径

FileOutputFormat.setOutputPath(job, new Path("/wordcount/outputmy"));

//将job提交给集群运行 

job.waitForCompletion(true);

}

}


















本文转自lzf0530377451CTO博客,原文链接:http://blog.51cto.com/8757576/1839294,如需转载请自行联系原作者




相关文章
|
2月前
|
缓存 分布式计算 算法
优化Hadoop MapReduce性能的最佳实践
【8月更文第28天】Hadoop MapReduce是一个用于处理大规模数据集的软件框架,适用于分布式计算环境。虽然MapReduce框架本身具有很好的可扩展性和容错性,但在某些情况下,任务执行可能会因为各种原因导致性能瓶颈。本文将探讨如何通过调整配置参数和优化算法逻辑来提高MapReduce任务的效率。
183 0
|
3月前
|
分布式计算 Java Hadoop
Hadoop找到JAVA_HOME变量,并设置其值
【7月更文挑战第19天】
58 3
|
3月前
|
分布式计算 Hadoop Java
如何在Java中使用Hadoop
如何在Java中使用Hadoop
|
3月前
|
分布式计算 Java Hadoop
如何在Java中使用MapReduce
如何在Java中使用MapReduce
|
4月前
|
分布式计算 Hadoop Java
优化大数据处理:Java与Hadoop生态系统集成
优化大数据处理:Java与Hadoop生态系统集成
|
4月前
|
分布式计算 资源调度 Hadoop
Java大数据处理:Spark与Hadoop整合
Java大数据处理:Spark与Hadoop整合
|
4月前
|
分布式计算 Hadoop Java
Hadoop MapReduce编程
该教程指导编写Hadoop MapReduce程序处理天气数据。任务包括计算每个城市ID的最高、最低气温、气温出现次数和平均气温。在读取数据时需忽略表头,且数据应为整数。教程中提供了环境变量设置、Java编译、jar包创建及MapReduce执行的步骤说明,但假设读者已具备基础操作技能。此外,还提到一个扩展练习,通过分区功能将具有相同尾数的数字分组到不同文件。
47 1
|
3月前
|
存储 分布式计算 并行计算
使用Hadoop构建Java大数据分析平台
使用Hadoop构建Java大数据分析平台
|
3月前
|
分布式计算 Hadoop 大数据
优化大数据处理:Java与Hadoop生态系统集成
优化大数据处理:Java与Hadoop生态系统集成
|
4月前
|
存储 分布式计算 Hadoop
Hadoop生态系统详解:HDFS与MapReduce编程
Apache Hadoop是大数据处理的关键,其核心包括HDFS(分布式文件系统)和MapReduce(并行计算框架)。HDFS为大数据存储提供高容错性和高吞吐量,采用主从结构,通过数据复制保证可靠性。MapReduce将任务分解为Map和Reduce阶段,适合大规模数据集的处理。通过代码示例展示了如何使用MapReduce实现Word Count功能。HDFS和MapReduce的结合,加上YARN的资源管理,构成处理和分析大数据的强大力量。了解和掌握这些基础对于有效管理大数据至关重要。【6月更文挑战第12天】
125 0
下一篇
无影云桌面