手写WordCount示例编写
需求:在给定的文本文件中统计输出每一个单词出现的总次数
数据格式准备如下:
cd /export/servers vim wordcount.txt hello,world,hadoop hive,sqoop,flume,hello kitty,tom,jerry,world hadoop hdfs dfs -mkdir /wordcount/ hdfs dfs -put wordcount.txt /wordcount/
定义一个mapper类
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class WordCountMapper extends Mapper<LongWritable,Text,Text,LongWritable> { @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] split = line.split(","); for (String word : split) { context.write(new Text(word),new LongWritable(1)); } } }
定义一个reducer类
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class WordCountReducer extends Reducer<Text,LongWritable,Text,LongWritable> { /** * 自定义reduce逻辑 * 所有的key都是单词,所有的values都是单词出现的次数 * @param key * @param values * @param context * @throws IOException * @throws InterruptedException */ @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long count = 0; for (LongWritable value : values) { count += value.get(); } context.write(key,new LongWritable(count)); } }
定义一个主类,用来描述job并提交job
public class JobMain extends Configured implements Tool { @Override public int run(String[] args) throws Exception { Job job = Job.getInstance(super.getConf(), JobMain.class.getSimpleName()); //打包到集群上面运行时候,必须要添加以下配置,指定程序的main函数 job.setJarByClass(JobMain.class); //第一步:读取输入文件解析成key,value对 job.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job,new Path("hdfs://192.168.100.129:8020/wordcount")); //第二步:设置mapper类 job.setMapperClass(WordCountMapper.class); //设置map阶段完成之后的输出类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); //第三步,第四步,第五步,第六步,省略 //第七步:设置reduce类 job.setReducerClass(WordCountReducer.class); //设置reduce阶段完成之后的输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //第八步:设置输出类以及输出路径 job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job,new Path("hdfs://192.168.100.129:8020/wordcount_out")); boolean b = job.waitForCompletion(true); return b?0:1; } /** * 程序main函数的入口类 * @param args * @throws Exception */ public static void main(String[] args) throws Exception { Configuration configuration = new Configuration(); Tool tool = new JobMain(); int run = ToolRunner.run(configuration, tool, args); System.exit(run); }
}
代码编写完毕后将代码打成jar包放到服务器上面去运行,实际工作当中,都是将代码打成jar包,开发main方法作为程序的入口,然后放到集群上面去运行
运行命令
hadoop jar hadoop_hdfs_operate-1.0-SNAPSHOT.jar cn.itcast.hdfs.demo1.JobMain
纯手写代码时出现Bug:
1.
运行集群并未报错,
开启JobHistory,打开浏览器19888页面找报错