## 3.4 六个mapreduce

### 3.4.1 计算每门成绩的最高分、最低分、平均分(Mma)

package max_min_avg;
import java.io.IOException;
/*
stu[0]:课程名称
stu[1]:学生姓名
stu[2]:成绩
stu[3]:性别
stu[4]:年龄

*/
public class MmaMapper extends Mapper<LongWritable,Text,Text,Text> {
@Override
protected void map(LongWritable key1,Text value1,Context context)throws IOException,InterruptedException{
//将文件的每一行传递过来，使用split分割后利用字符数组进行接收
String[] splits = value1.toString().split(",");
//向Reducer传递参数-> Key：课程 Value：成绩
context.write(new Text(splits[0]),new Text(splits[2]));
}
}
package max_min_avg;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class MmaReducer extends Reducer<Text, Text,Text, Text> {
@Override
protected void reduce(Text key,Iterable<Text> value,Context context)throws IOException,InterruptedException{
//Arraylist集合储存所有的成绩数据，借用collections的方法求最大值最小值
List<Integer> list = new ArrayList<>();
for(Text v: value){
}
//求max及min
int maxScore = Collections.max(list);
int minScore = Collections.min(list);
// 求平均成绩
int sum = 0;
for(int score: list){
sum += score;
}
double avg = sum  / list.size();
System.out.println("*****************************************");
String result = "的最高分:"+maxScore+"    最低分:"+minScore+"    平均分:"+avg;
System.out.println(key.toString()+result);
context.write(key,new Text(result));
}
}
package max_min_avg;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
public class MmaMain {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, URISyntaxException {
//创建job和“统计相同课程相同分数的人数”任务入口
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(MmaMain.class);
//设置Mapper和Reducer的入口
job.setMapperClass(MmaMapper.class);
job.setReducerClass(MmaReducer.class);
//设置Mapper的输入输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//设置Reducer的输入输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//指定输入输出路径
String inputPath = "hdfs://localhost:9000/mapreduce/input/学生成绩.csv";
String outputPath = "hdfs://localhost:9000/mapreduce/output/最大值最小值平均值.txt";
FileInputFormat.setInputPaths(job,new Path(inputPath));
FileOutputFormat.setOutputPath(job,new Path(outputPath));
//输出路径存在的话就删除，不然就只能手动删除，否则会报该文件已存在的异常
FileSystem fileSystem = FileSystem.get(new URI(outputPath), conf);
if (fileSystem.exists(new Path(outputPath))) {
fileSystem.delete(new Path(outputPath), true);
}
//执行job
job.waitForCompletion(true);
}
}

### 3.4.2 计算每个学生的总分及平均成绩并进行排序(Sas)

package sum_avg_sort;
import java.io.IOException;
/*
stu[0]:课程名称
stu[1]:学生姓名
stu[2]:成绩
stu[3]:性别
stu[4]:年龄

*/
public class SasMapper extends Mapper<LongWritable, Text,Text,Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//将文件的每一行传递过来，使用split分割后利用字符数组进行接收
String[] stu = value.toString().split(",");
//向Reducer传递参数-> Key：学生姓名 Value：成绩
context.write(new Text(stu[1]),new Text(stu[2]));
}
}
package sum_avg_sort;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class SasReducer extends Reducer<Text,Text,Text,Text> {
@Override
protected void reduce(Text key, Iterable<Text> values,Context context) throws IOException, InterruptedException {
System.out.println("*********************************************************************");
//定义一个ArrayList集合接收该学生的各项成绩
List<Integer> scores = new ArrayList<>();
for(Text value:values){
}
//对该学生的成绩进行求总分、平均分
int num = 0, sum = 0;
for(Integer score:scores){
sum = sum + score.intValue();
num = num + 1;
}
float avg = sum / num;
//成绩排序
Collections.sort(scores);
//使用一个字符串拼接排好序的所有成绩
String sort = "的总分:"+sum+" 平均分:"+avg+" 该生的成绩从低到高排序是:";
for(Integer score:scores){
sort = sort + score + "  ";
}
System.out.println(key.toString()+sort);
//输出
context.write(key,new Text(sort));
}
}
package sum_avg_sort;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
public class SasMain {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, URISyntaxException {
//创建一个job和任务的入口
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(SasMain.class);
//设置mapper和reducer的入口
job.setMapperClass(SasMapper.class);
job.setReducerClass(SasReducer.class);
//设置mapper输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//设置reducer的输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//指定输入输出路径
String inputPath = "hdfs://localhost:9000/mapreduce/input/学生成绩.csv";
String outputPath = "hdfs://localhost:9000/mapreduce/output/每个学生总分平均分排序.txt";
FileInputFormat.setInputPaths(job,new Path(inputPath));
FileOutputFormat.setOutputPath(job,new Path(outputPath));
//输出路径存在的话就删除，不然就只能手动删除，否则会报该文件已存在的异常
FileSystem fileSystem = FileSystem.get(new URI(outputPath), conf);
if (fileSystem.exists(new Path(outputPath))) {
fileSystem.delete(new Path(outputPath), true);
}
//执行job
job.waitForCompletion(true);
}
}

### 3.4.3 统计所有学生的信息(Si)

package student_info;
import java.io.IOException;
/*
stu[0]:课程名称
stu[1]:学生姓名
stu[2]:成绩
stu[3]:性别
stu[4]:年龄

*/
public class SiMapper extends Mapper<LongWritable,Text,Text,Text> {
@Override
protected void map(LongWritable Key1, Text value1,Context context) throws IOException, InterruptedException {
//将文件的每一行传递过来，使用split分割后利用字符数组进行接收
String[] splits= value1.toString().split(",");
//拼接姓名+性别+年龄
String name = splits[1];
String sex = splits[3];
String age = splits[4];
String stu_info = name+"-"+sex+"-"+age;
//拼接课程+成绩
String course = splits[0];
String score = splits[2];
String course_info = course+"-"+score;
//向Reducer传递参数-> Key：姓名+性别+年龄  Value：课程+成绩
context.write(new Text(stu_info),new Text(course_info));
}
}
package student_info;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class SiReducer extends Reducer<Text, Text,Text, Text> {
@Override
protected void reduce(Text key,Iterable<Text> values,Context context)throws IOException,InterruptedException{
//拼接学生各科考试成绩信息
String scoreInfo = "";
for(Text value:values){
scoreInfo = scoreInfo + value+"   ";
}
System.out.println("********************************************************");
System.out.println(key.toString()+"\n"+scoreInfo);
context.write(key,new Text(scoreInfo));
}
}
package student_info;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
public class SiMain {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, URISyntaxException {
//创建job和“统计相同课程相同分数的人数”任务入口
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(SiMain.class);
//设置Mapper和Reducer的入口
job.setMapperClass(SiMapper.class);
job.setReducerClass(SiReducer.class);
//设置Mapper的输入输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//设置Reducer的输入输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//指定输入输出路径
String inputPath = "hdfs://localhost:9000/mapreduce/input/学生成绩.csv";
String outputPath = "hdfs://localhost:9000/mapreduce/output/学生信息.txt";
FileInputFormat.setInputPaths(job,new Path(inputPath));
FileOutputFormat.setOutputPath(job,new Path(outputPath));
//输出路径存在的话就删除，不然就只能手动删除，否则会报该文件已存在的异常
FileSystem fileSystem = FileSystem.get(new URI(outputPath), conf);
if (fileSystem.exists(new Path(outputPath))) {
fileSystem.delete(new Path(outputPath), true);
}
//执行job
job.waitForCompletion(true);
}
}

