2.1 导入依赖
MapReduce不需导入的四个依赖(hadoop-client、hadoop-hdfs、hadoop-common、hadoop-mapreduce-client-core)
<dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.3</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.7.3</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.3</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.7.3</version> </dependency> </dependencies>
2.2 mapper
package course_score_same; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /* stu[0]:课程名称 stu[1]:学生姓名 stu[2]:成绩 stu[3]:性别 stu[4]:年龄 该功能实现:统计该课程中成绩相同的学生姓名 */ //Mapper的泛型依次为输入文本的第几行,该行的文本,Mapper的输出key,Mapper的输出value public class CssMapper extends Mapper<LongWritable, Text,Text,Text> { //重写方法:在idea的代码区使用快捷键 alt+insert选择鼠标单击override methods选择map方法 @Override //map方法的三个参数,前两个就是输入文本行号,该行的文本,最后一个Context context固定写法 protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { //将文件的每一行传递过来,使用split分割后利用字符数组进行接收 String[] stu = value.toString().split(","); //拼接字符串:课程和成绩 String sc = stu[0]+"\t"+stu[2]; //向Reducer传递参数-> Key:课程+成绩 Value:学生名 context.write(new Text(sc),new Text(stu[1])); } }
2.3 reducer
package course_score_same; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; //Reducer的泛型依次为Mapper输出的key作为Reducer的输入,Mapper输出的value作为Reducer的输入,Reducer的输出key,Reducer的输出value public class CssReducer extends Reducer <Text,Text,Text,Text>{ //重写方法与Mapper一样 @Override //reduce方法的三个参数:Mapper输出的key作为Reducer的输入,Mapper输出的value作为Reducer的输入,最后一个Context context固定写法 protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { //创建StringBuffer用来接收该课程中成绩相同的学生的姓名 StringBuffer sb = new StringBuffer(); //num变量用来计数 int num = 0; //遍历values参数,将所有的value拼接进sb,并统计学生数量 for(Text value:values){ sb.append(value.toString()).append(","); num++; } //如果num=1,则表明该课程的这个成绩只有一个学生,否则就输出 if(num>1){ String names = "一共有" + num + "名学生,他们的名字是:" +sb.toString(); System.out.println("*************************************************"); System.out.println(key.toString() + names); context.write(key,new Text(names)); } } }
2.4 main
package course_score_same; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; public class CssMain { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, URISyntaxException { //创建job和“统计相同课程相同分数的人数”任务入口 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(CssMain.class); //设置Mapper和Reducer的入口 job.setMapperClass(CssMapper.class); job.setReducerClass(CssReducer.class); //设置Mapper的输入输出类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //设置Reducer的输入输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //指定输入输出路径 String inputPath = "hdfs://localhost:9000/mapreduce/input/学生成绩.csv"; String outputPath = "hdfs://localhost:9000/mapreduce/output/该课程中成绩相同的学生姓名.txt"; FileInputFormat.setInputPaths(job,new Path(inputPath)); FileOutputFormat.setOutputPath(job,new Path(outputPath)); //输出路径存在的话就删除,不然就只能手动删除,否则会报该文件已存在的异常 FileSystem fileSystem = FileSystem.get(new URI(outputPath), conf); if (fileSystem.exists(new Path(outputPath))) { fileSystem.delete(new Path(outputPath), true); } //执行job job.waitForCompletion(true); } }
至此,一个完整的MapReduce的编写就已经完全结束了,如果想要别的功能,只需要修改mapper和reducer类中重写方法的方法体本身,还有main方法里面的各项参数值即可。
为了进一步锻炼大家MapReduce确定mapper输出的key和value,下面再找两个例子练习一下(每个人的想法都不一样,所以说并没有标准的答案,合理即可):
统计所有学生的信息—>(key:姓名+性别+年龄;value:课程+成绩)
计算每门成绩的最高分、最低分、平均分—>(key:课程名称;value:成绩)
统计各性别的人数及他们的姓名—>(key:性别;value:姓名)