流程:
1.开发Map阶段代码
2.开发Reduce阶段代码
3.组装job
Map阶段代码:
publicstaticclassMyMapperextendsMapper<LongWritable, Text,Text,LongWritable>{
Loggerlogger=LoggerFactory.getLogger(MyMapper.class);
@Override
protectedvoidmap(LongWritablek1, Textv1, Contextcontext)
throwsIOException, InterruptedException {
//输出k1,v1的值
//System.out.println("<k1,v1>=<"+k1.get()+","+v1.toString()+">");
//logger.info("<k1,v1>=<"+k1.get()+","+v1.toString()+">");
//k1 代表的是每一行数据的行首偏移量,v1代表的是每一行内容
//对获取到的每一行数据进行切割,把单词切割出来
String[] words=v1.toString().split(" ");
//迭代切割出来的单词数据
for (Stringword : words) {
//把迭代出来的单词封装成<k2,v2>的形式
Textk2=newText(word);
LongWritablev2=newLongWritable(1L);
//把<k2,v2>写出去
context.write(k2,v2);
}
}
}
Reduce阶段代码:
publicstaticclassMyReducerextendsReducer<Text,LongWritable,Text,LongWritable>{
Loggerlogger=LoggerFactory.getLogger(MyReducer.class);
@Override
protectedvoidreduce(Textk2, Iterable<LongWritable>v2s, Contextcontext)
throwsIOException, InterruptedException {
//创建一个sum变量,保存v2s的和
longsum=0L;
//对v2s中的数据进行累加求和
for(LongWritablev2: v2s){
//输出k2,v2的值
//System.out.println("<k2,v2>=<"+k2.toString()+","+v2.get()+">");
//logger.info("<k2,v2>=<"+k2.toString()+","+v2.get()+">");
sum+=v2.get();
}
//组装k3,v3
Textk3=k2;
LongWritablev3=newLongWritable(sum);
//输出k3,v3的值
//System.out.println("<k3,v3>=<"+k3.toString()+","+v3.get()+">");
//logger.info("<k3,v3>=<"+k3.toString()+","+v3.get()+">");
// 把结果写出去
context.write(k3,v3);
}
}
组装Job:
publicstaticvoidmain(String[] args) {
try{
if(args.length!=2){
//如果传递的参数不够,程序直接退出
System.exit(100);
}
//指定Job需要的配置参数
Configurationconf=newConfiguration();
//创建一个Job
Jobjob=Job.getInstance(conf);
//注意了:这一行必须设置,否则在集群中执行的时候是找不到WordCountJob这个类的
job.setJarByClass(WordCountJob.class);
//指定输入路径(可以是文件,也可以是目录)
FileInputFormat.setInputPaths(job,newPath(args[0]));
//指定输出路径(只能指定一个不存在的目录)
FileOutputFormat.setOutputPath(job,newPath(args[1]));
//指定map相关的代码
job.setMapperClass(MyMapper.class);
//指定k2的类型
job.setMapOutputKeyClass(Text.class);
//指定v2的类型
job.setMapOutputValueClass(LongWritable.class);
//指定reduce相关的代码
job.setReducerClass(MyReducer.class);
//指定k3的类型
job.setOutputKeyClass(Text.class);
//指定v3的类型
job.setOutputValueClass(LongWritable.class);
//提交job
job.waitForCompletion(true);
}catch(Exceptione){
e.printStackTrace();
}
}
接下来就可以打包发布到集群
指定mapreduce接收到的第一个参数:文件路径
指定mapreduce接收到的第二个参数:输出目录
访问 http://bigdata01:8088 也可以查看任务输出结果
在out输出目录中,_SUCCESS是一个标记文件,有这个文件表示这个任务执行成功了。 part-r-00000是具体的数据文件,如果有多个reduce任务会产生多个这种文件,多个文件的话会按照从0往下排
还要一点需要注意的 ,part 后面的 r 表示这个结果文件是 reduce 步骤产生的, 如果一个 mapreduce 只有 map阶段没有reduce阶段,那么产生的结果文件是part-m-00000这样的。