3.4 六个mapreduce
3.4.1 计算每门成绩的最高分、最低分、平均分(Mma)
package max_min_avg; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /* stu[0]:课程名称 stu[1]:学生姓名 stu[2]:成绩 stu[3]:性别 stu[4]:年龄 该功能实现的计算出每门课程中的最高分、最低分、平均分 */ public class MmaMapper extends Mapper<LongWritable,Text,Text,Text> { @Override protected void map(LongWritable key1,Text value1,Context context)throws IOException,InterruptedException{ //将文件的每一行传递过来,使用split分割后利用字符数组进行接收 String[] splits = value1.toString().split(","); //向Reducer传递参数-> Key:课程 Value:成绩 context.write(new Text(splits[0]),new Text(splits[2])); } } package max_min_avg; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; public class MmaReducer extends Reducer<Text, Text,Text, Text> { @Override protected void reduce(Text key,Iterable<Text> value,Context context)throws IOException,InterruptedException{ //Arraylist集合储存所有的成绩数据,借用collections的方法求最大值最小值 List<Integer> list = new ArrayList<>(); for(Text v: value){ list.add(Integer.valueOf(v.toString())); } //求max及min int maxScore = Collections.max(list); int minScore = Collections.min(list); // 求平均成绩 int sum = 0; for(int score: list){ sum += score; } double avg = sum / list.size(); System.out.println("*****************************************"); String result = "的最高分:"+maxScore+" 最低分:"+minScore+" 平均分:"+avg; System.out.println(key.toString()+result); context.write(key,new Text(result)); } } package max_min_avg; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; public class MmaMain { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, URISyntaxException { //创建job和“统计相同课程相同分数的人数”任务入口 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(MmaMain.class); //设置Mapper和Reducer的入口 job.setMapperClass(MmaMapper.class); job.setReducerClass(MmaReducer.class); //设置Mapper的输入输出类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //设置Reducer的输入输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //指定输入输出路径 String inputPath = "hdfs://localhost:9000/mapreduce/input/学生成绩.csv"; String outputPath = "hdfs://localhost:9000/mapreduce/output/最大值最小值平均值.txt"; FileInputFormat.setInputPaths(job,new Path(inputPath)); FileOutputFormat.setOutputPath(job,new Path(outputPath)); //输出路径存在的话就删除,不然就只能手动删除,否则会报该文件已存在的异常 FileSystem fileSystem = FileSystem.get(new URI(outputPath), conf); if (fileSystem.exists(new Path(outputPath))) { fileSystem.delete(new Path(outputPath), true); } //执行job job.waitForCompletion(true); } }
3.4.2 计算每个学生的总分及平均成绩并进行排序(Sas)
package sum_avg_sort; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /* stu[0]:课程名称 stu[1]:学生姓名 stu[2]:成绩 stu[3]:性别 stu[4]:年龄 该功能实现:统计每个学生总分平均分并对成绩进行排序 */ public class SasMapper extends Mapper<LongWritable, Text,Text,Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //将文件的每一行传递过来,使用split分割后利用字符数组进行接收 String[] stu = value.toString().split(","); //向Reducer传递参数-> Key:学生姓名 Value:成绩 context.write(new Text(stu[1]),new Text(stu[2])); } } package sum_avg_sort; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; public class SasReducer extends Reducer<Text,Text,Text,Text> { @Override protected void reduce(Text key, Iterable<Text> values,Context context) throws IOException, InterruptedException { System.out.println("*********************************************************************"); //定义一个ArrayList集合接收该学生的各项成绩 List<Integer> scores = new ArrayList<>(); for(Text value:values){ scores.add(Integer.valueOf(value.toString())); } //对该学生的成绩进行求总分、平均分 int num = 0, sum = 0; for(Integer score:scores){ sum = sum + score.intValue(); num = num + 1; } float avg = sum / num; //成绩排序 Collections.sort(scores); //使用一个字符串拼接排好序的所有成绩 String sort = "的总分:"+sum+" 平均分:"+avg+" 该生的成绩从低到高排序是:"; for(Integer score:scores){ sort = sort + score + " "; } System.out.println(key.toString()+sort); //输出 context.write(key,new Text(sort)); } } package sum_avg_sort; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; public class SasMain { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, URISyntaxException { //创建一个job和任务的入口 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(SasMain.class); //设置mapper和reducer的入口 job.setMapperClass(SasMapper.class); job.setReducerClass(SasReducer.class); //设置mapper输出类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //设置reducer的输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //指定输入输出路径 String inputPath = "hdfs://localhost:9000/mapreduce/input/学生成绩.csv"; String outputPath = "hdfs://localhost:9000/mapreduce/output/每个学生总分平均分排序.txt"; FileInputFormat.setInputPaths(job,new Path(inputPath)); FileOutputFormat.setOutputPath(job,new Path(outputPath)); //输出路径存在的话就删除,不然就只能手动删除,否则会报该文件已存在的异常 FileSystem fileSystem = FileSystem.get(new URI(outputPath), conf); if (fileSystem.exists(new Path(outputPath))) { fileSystem.delete(new Path(outputPath), true); } //执行job job.waitForCompletion(true); } }
3.4.3 统计所有学生的信息(Si)
package student_info; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /* stu[0]:课程名称 stu[1]:学生姓名 stu[2]:成绩 stu[3]:性别 stu[4]:年龄 该功能实现:统计所有学生课程考试信息 */ public class SiMapper extends Mapper<LongWritable,Text,Text,Text> { @Override protected void map(LongWritable Key1, Text value1,Context context) throws IOException, InterruptedException { //将文件的每一行传递过来,使用split分割后利用字符数组进行接收 String[] splits= value1.toString().split(","); //拼接姓名+性别+年龄 String name = splits[1]; String sex = splits[3]; String age = splits[4]; String stu_info = name+"-"+sex+"-"+age; //拼接课程+成绩 String course = splits[0]; String score = splits[2]; String course_info = course+"-"+score; //向Reducer传递参数-> Key:姓名+性别+年龄 Value:课程+成绩 context.write(new Text(stu_info),new Text(course_info)); } } package student_info; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.ArrayList; import java.util.List; public class SiReducer extends Reducer<Text, Text,Text, Text> { @Override protected void reduce(Text key,Iterable<Text> values,Context context)throws IOException,InterruptedException{ //拼接学生各科考试成绩信息 String scoreInfo = ""; for(Text value:values){ scoreInfo = scoreInfo + value+" "; } System.out.println("********************************************************"); System.out.println(key.toString()+"\n"+scoreInfo); context.write(key,new Text(scoreInfo)); } } package student_info; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; public class SiMain { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, URISyntaxException { //创建job和“统计相同课程相同分数的人数”任务入口 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(SiMain.class); //设置Mapper和Reducer的入口 job.setMapperClass(SiMapper.class); job.setReducerClass(SiReducer.class); //设置Mapper的输入输出类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //设置Reducer的输入输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //指定输入输出路径 String inputPath = "hdfs://localhost:9000/mapreduce/input/学生成绩.csv"; String outputPath = "hdfs://localhost:9000/mapreduce/output/学生信息.txt"; FileInputFormat.setInputPaths(job,new Path(inputPath)); FileOutputFormat.setOutputPath(job,new Path(outputPath)); //输出路径存在的话就删除,不然就只能手动删除,否则会报该文件已存在的异常 FileSystem fileSystem = FileSystem.get(new URI(outputPath), conf); if (fileSystem.exists(new Path(outputPath))) { fileSystem.delete(new Path(outputPath), true); } //执行job job.waitForCompletion(true); } }