【细节拉满】Hadoop课程设计项目,使用idea编写基于MapReduce的学生成绩分析系统(附带源码、项目文件下载地址)(二)

简介: 【细节拉满】Hadoop课程设计项目,使用idea编写基于MapReduce的学生成绩分析系统(附带源码、项目文件下载地址)(二)

3.4 六个mapreduce

3.4.1 计算每门成绩的最高分、最低分、平均分(Mma)

package max_min_avg;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/*
stu[0]:课程名称
stu[1]:学生姓名
stu[2]:成绩
stu[3]:性别
stu[4]:年龄
该功能实现的计算出每门课程中的最高分、最低分、平均分
 */
public class MmaMapper extends Mapper<LongWritable,Text,Text,Text> {
    @Override
    protected void map(LongWritable key1,Text value1,Context context)throws IOException,InterruptedException{
        //将文件的每一行传递过来,使用split分割后利用字符数组进行接收
        String[] splits = value1.toString().split(",");
        //向Reducer传递参数-> Key:课程 Value:成绩
        context.write(new Text(splits[0]),new Text(splits[2]));
    }
}
package max_min_avg;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class MmaReducer extends Reducer<Text, Text,Text, Text> {
    @Override
    protected void reduce(Text key,Iterable<Text> value,Context context)throws IOException,InterruptedException{
        //Arraylist集合储存所有的成绩数据,借用collections的方法求最大值最小值
        List<Integer> list = new ArrayList<>();
        for(Text v: value){
            list.add(Integer.valueOf(v.toString()));
        }
        //求max及min
        int maxScore = Collections.max(list);
        int minScore = Collections.min(list);
        // 求平均成绩
        int sum = 0;
        for(int score: list){
            sum += score;
        }
        double avg = sum  / list.size();
        System.out.println("*****************************************");
        String result = "的最高分:"+maxScore+"    最低分:"+minScore+"    平均分:"+avg;
        System.out.println(key.toString()+result);
        context.write(key,new Text(result));
    }
}
package max_min_avg;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
public class MmaMain {
        public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, URISyntaxException {
            //创建job和“统计相同课程相同分数的人数”任务入口
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            job.setJarByClass(MmaMain.class);
            //设置Mapper和Reducer的入口
            job.setMapperClass(MmaMapper.class);
            job.setReducerClass(MmaReducer.class);
            //设置Mapper的输入输出类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            //设置Reducer的输入输出类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            //指定输入输出路径
            String inputPath = "hdfs://localhost:9000/mapreduce/input/学生成绩.csv";
            String outputPath = "hdfs://localhost:9000/mapreduce/output/最大值最小值平均值.txt";
            FileInputFormat.setInputPaths(job,new Path(inputPath));
            FileOutputFormat.setOutputPath(job,new Path(outputPath));
            //输出路径存在的话就删除,不然就只能手动删除,否则会报该文件已存在的异常
            FileSystem fileSystem = FileSystem.get(new URI(outputPath), conf);
            if (fileSystem.exists(new Path(outputPath))) {
                fileSystem.delete(new Path(outputPath), true);
            }
            //执行job
            job.waitForCompletion(true);
        }
}

3.4.2 计算每个学生的总分及平均成绩并进行排序(Sas)

package sum_avg_sort;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/*
stu[0]:课程名称
stu[1]:学生姓名
stu[2]:成绩
stu[3]:性别
stu[4]:年龄
该功能实现:统计每个学生总分平均分并对成绩进行排序
 */
public class SasMapper extends Mapper<LongWritable, Text,Text,Text> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //将文件的每一行传递过来,使用split分割后利用字符数组进行接收
        String[] stu = value.toString().split(",");
        //向Reducer传递参数-> Key:学生姓名 Value:成绩
        context.write(new Text(stu[1]),new Text(stu[2]));
    }
}
package sum_avg_sort;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class SasReducer extends Reducer<Text,Text,Text,Text> {
    @Override
    protected void reduce(Text key, Iterable<Text> values,Context context) throws IOException, InterruptedException {
        System.out.println("*********************************************************************");
        //定义一个ArrayList集合接收该学生的各项成绩
        List<Integer> scores = new ArrayList<>();
        for(Text value:values){
            scores.add(Integer.valueOf(value.toString()));
        }
        //对该学生的成绩进行求总分、平均分
        int num = 0, sum = 0;
        for(Integer score:scores){
            sum = sum + score.intValue();
            num = num + 1;
        }
        float avg = sum / num;
        //成绩排序
        Collections.sort(scores);
        //使用一个字符串拼接排好序的所有成绩
        String sort = "的总分:"+sum+" 平均分:"+avg+" 该生的成绩从低到高排序是:";
        for(Integer score:scores){
            sort = sort + score + "  ";
        }
        System.out.println(key.toString()+sort);
        //输出
        context.write(key,new Text(sort));
    }
}
package sum_avg_sort;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
public class SasMain {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, URISyntaxException {
        //创建一个job和任务的入口
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(SasMain.class);
        //设置mapper和reducer的入口
        job.setMapperClass(SasMapper.class);
        job.setReducerClass(SasReducer.class);
        //设置mapper输出类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        //设置reducer的输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        //指定输入输出路径
        String inputPath = "hdfs://localhost:9000/mapreduce/input/学生成绩.csv";
        String outputPath = "hdfs://localhost:9000/mapreduce/output/每个学生总分平均分排序.txt";
        FileInputFormat.setInputPaths(job,new Path(inputPath));
        FileOutputFormat.setOutputPath(job,new Path(outputPath));
        //输出路径存在的话就删除,不然就只能手动删除,否则会报该文件已存在的异常
        FileSystem fileSystem = FileSystem.get(new URI(outputPath), conf);
        if (fileSystem.exists(new Path(outputPath))) {
            fileSystem.delete(new Path(outputPath), true);
        }
        //执行job
        job.waitForCompletion(true);
    }
}

3.4.3 统计所有学生的信息(Si)

package student_info;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/*
stu[0]:课程名称
stu[1]:学生姓名
stu[2]:成绩
stu[3]:性别
stu[4]:年龄
该功能实现:统计所有学生课程考试信息
 */
public class SiMapper extends Mapper<LongWritable,Text,Text,Text> {
    @Override
    protected void map(LongWritable Key1, Text value1,Context context) throws IOException, InterruptedException {
        //将文件的每一行传递过来,使用split分割后利用字符数组进行接收
        String[] splits= value1.toString().split(",");
        //拼接姓名+性别+年龄
        String name = splits[1];
        String sex = splits[3];
        String age = splits[4];
        String stu_info = name+"-"+sex+"-"+age;
        //拼接课程+成绩
        String course = splits[0];
        String score = splits[2];
        String course_info = course+"-"+score;
        //向Reducer传递参数-> Key:姓名+性别+年龄  Value:课程+成绩
        context.write(new Text(stu_info),new Text(course_info));
    }
}
package student_info;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class SiReducer extends Reducer<Text, Text,Text, Text> {
    @Override
    protected void reduce(Text key,Iterable<Text> values,Context context)throws IOException,InterruptedException{
        //拼接学生各科考试成绩信息
        String scoreInfo = "";
        for(Text value:values){
            scoreInfo = scoreInfo + value+"   ";
        }
        System.out.println("********************************************************");
        System.out.println(key.toString()+"\n"+scoreInfo);
        context.write(key,new Text(scoreInfo));
    }
}
package student_info;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
public class SiMain {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, URISyntaxException {
        //创建job和“统计相同课程相同分数的人数”任务入口
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(SiMain.class);
        //设置Mapper和Reducer的入口
        job.setMapperClass(SiMapper.class);
        job.setReducerClass(SiReducer.class);
        //设置Mapper的输入输出类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        //设置Reducer的输入输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        //指定输入输出路径
        String inputPath = "hdfs://localhost:9000/mapreduce/input/学生成绩.csv";
        String outputPath = "hdfs://localhost:9000/mapreduce/output/学生信息.txt";
        FileInputFormat.setInputPaths(job,new Path(inputPath));
        FileOutputFormat.setOutputPath(job,new Path(outputPath));
        //输出路径存在的话就删除,不然就只能手动删除,否则会报该文件已存在的异常
        FileSystem fileSystem = FileSystem.get(new URI(outputPath), conf);
        if (fileSystem.exists(new Path(outputPath))) {
            fileSystem.delete(new Path(outputPath), true);
        }
        //执行job
        job.waitForCompletion(true);
    }
}
相关文章
|
1月前
|
存储 分布式计算 Hadoop
基于Java的Hadoop文件处理系统:高效分布式数据解析与存储
本文介绍了如何借鉴Hadoop的设计思想,使用Java实现其核心功能MapReduce,解决海量数据处理问题。通过类比图书馆管理系统,详细解释了Hadoop的两大组件:HDFS(分布式文件系统)和MapReduce(分布式计算模型)。具体实现了单词统计任务,并扩展支持CSV和JSON格式的数据解析。为了提升性能,引入了Combiner减少中间数据传输,以及自定义Partitioner解决数据倾斜问题。最后总结了Hadoop在大数据处理中的重要性,鼓励Java开发者学习Hadoop以拓展技术边界。
50 7
|
8月前
|
分布式计算 Hadoop 数据挖掘
Hadoop生态系统介绍(一)
Hadoop生态系统介绍(一)
140 4
|
8月前
|
分布式计算 Hadoop 分布式数据库
Hadoop生态系统介绍(二)大数据技术Hadoop入门理论系列之一----hadoop生态圈介绍
Hadoop生态系统介绍(二)大数据技术Hadoop入门理论系列之一----hadoop生态圈介绍
225 2
|
6月前
|
存储 SQL 分布式计算
Hadoop生态系统概述:构建大数据处理与分析的基石
【8月更文挑战第25天】Hadoop生态系统为大数据处理和分析提供了强大的基础设施和工具集。通过不断扩展和优化其组件和功能,Hadoop将继续在大数据时代发挥重要作用。
|
7月前
|
Java Maven
idea安装并使用maven依赖分析插件:Maven Helper
idea安装并使用maven依赖分析插件:Maven Helper
2843 7
|
7月前
|
分布式计算 Hadoop Linux
Hadoop检查本地文件系统:
【7月更文挑战第24天】
67 6
|
7月前
|
分布式计算 Hadoop
|
6月前
|
存储 分布式计算 资源调度
Hadoop生态系统概览:从HDFS到Spark
【8月更文第28天】Hadoop是一个开源软件框架,用于分布式存储和处理大规模数据集。它由多个组件构成,旨在提供高可靠性、高可扩展性和成本效益的数据处理解决方案。本文将介绍Hadoop的核心组件,包括HDFS、MapReduce、YARN,并探讨它们如何与现代大数据处理工具如Spark集成。
450 0
|
7月前
|
存储 分布式计算 Hadoop
阿里巴巴飞天大数据架构体系与Hadoop生态系统的深度融合:构建高效、可扩展的数据处理平台
技术持续创新:随着新技术的不断涌现和应用场景的复杂化,阿里巴巴将继续投入研发力量推动技术创新和升级换代。 生态系统更加完善:Hadoop生态系统将继续扩展和完善,为用户提供更多元化、更灵活的数据处理工具和服务。
|
8月前
|
存储 SQL 分布式计算
Hadoop 生态系统
【6月更文挑战第20天】Hadoop 生态系统
86 3

热门文章

最新文章