Hadoop MapReduce编程 API入门系列之统计学生成绩版本2(十八)

简介:

 统计出每个年龄段的 男、女 学生的最高分

 

  这里,为了空格符的差错,直接,我们有时候,像如下这样的来排数据。

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

代码

复制代码
package zhouls.bigdata.myMapReduce.Gender;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* 
* @function 统计不同年龄段内 男、女最高分数
*
*
*/

/*
Alice<tab>23<tab>female<tab>45
Bob<tab>34<tab>male<tab>89
Chris<tab>67<tab>male<tab>97
Kristine<tab>38<tab>female<tab>53
Connor<tab>25<tab>male<tab>27
Daniel<tab>78<tab>male<tab>95
James<tab>34<tab>male<tab>79
Alex<tab>52<tab>male<tab>69
Nancy<tab>7<tab>female<tab>98
Adam<tab>9<tab>male<tab>37
Jacob<tab>7<tab>male<tab>23
Mary<tab>6<tab>female<tab>93
Clara<tab>87<tab>female<tab>72
Monica<tab>56<tab>female<tab>92
*/
public class Gender extends Configured implements Tool {
/*
* 
* @function Mapper 解析输入数据,然后按需求输出
* @input key=行偏移量 value=学生数据
* @output key=gender value=name+age+score
* 
*/
public static class PCMapper extends Mapper<Object, Text, Text, Text>
{
public void map(Object key, Text value, Context context) throws IOException, InterruptedException 
{//拿Alice<tab>23<tab>female<tab>45
String[] tokens = value.toString().split("<tab>");//使用分隔符<tab>,将数据解析为数组 tokens
//得到Alice    23    female    45
//即tokens[0] tokens[1] tokens[2] tokens[3] 
String gender = tokens[2].toString();//性别
String nameAgeScore = tokens[0] + "\t" + tokens[1] + "\t"+ tokens[3];
//输出 key=gender value=name+age+score
//输出 key=female value=Alice    +23+45
context.write(new Text(gender), new Text(nameAgeScore));//将 (female , Alice+ 23+ 45) 写入到context中
}
}
public static class MyHashPartitioner extends Partitioner<Text, Text> 
{
/** Use {@link Object#hashCode()} to partition. */
@Override
public int getPartition(Text key, Text value,int numReduceTasks) 
{
return (key.hashCode()) % numReduceTasks;
}

}
/**
* 
* @function Partitioner 根据 age 选择 reduce 分区
*
*/
public static class PCPartitioner extends Partitioner<Text, Text> 
{

@Override
public int getPartition(Text key, Text value, int numReduceTasks) 
{
// TODO Auto-generated method stub
String[] nameAgeScore = value.toString().split("\t");
String age = nameAgeScore[1];//学生年龄
int ageInt = Integer.parseInt(age);//按年龄段分区

// 默认指定分区 0
if (numReduceTasks == 0)
return 0;

//年龄小于等于20,指定分区0
if (ageInt <= 20) {
return 0;
}
// 年龄大于20,小于等于50,指定分区1
if (ageInt > 20 && ageInt <= 50) {

return 1 % numReduceTasks;
}
// 剩余年龄,指定分区2
else
return 2 % numReduceTasks;
}
}

/**
* 
* @function 定义Combiner 合并 Mapper 输出结果
*
*/
public static class PCCombiner extends Reducer<Text, Text, Text, Text> 
{
private Text text = new Text();

public void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException 
{
int maxScore = Integer.MIN_VALUE;
String name = " ";
String age = " ";
int score = 0;
for (Text val : values) 
{
String[] valTokens = val.toString().split("\\t");
score = Integer.parseInt(valTokens[2]);
if (score > maxScore) 
{
name = valTokens[0];
age = valTokens[1];
maxScore = score;
}
}
text.set(name + "\t" + age + "\t" + maxScore);
context.write(key, text);
}
}

/*
* 
* @function Reducer 统计出 不同年龄段、不同性别 的最高分
* input key=gender value=name+age+score
* output key=name value=age+gender+score
* 
*/
static class PCReducer extends Reducer<Text, Text, Text, Text> 
{
@Override
public void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException 
{
int maxScore = Integer.MIN_VALUE;
String name = " ";
String age = " ";
String gender = " ";
int score = 0;
// 根据key,迭代 values 集合,求出最高分
for (Text val : values)
{
String[] valTokens = val.toString().split("\\t");
score = Integer.parseInt(valTokens[2]);
if (score > maxScore) 
{
name = valTokens[0];
age = valTokens[1];
gender = key.toString();
maxScore = score;
}
}
context.write(new Text(name), new Text("age- " + age + "\t" + gender + "\tscore-" + maxScore));
}
}

/**
* @function 任务驱动方法
* @param args
* @return
* @throws Exception
*/
@Override
public int run(String[] args) throws Exception 
{
// TODO Auto-generated method stub
Configuration conf = new Configuration();//读取配置文件

Path mypath = new Path(args[1]);
FileSystem hdfs = mypath.getFileSystem(conf);
if (hdfs.isDirectory(mypath)) 
{
hdfs.delete(mypath, true);
}

@SuppressWarnings("deprecation")
Job job = new Job(conf, "gender");//新建一个任务
job.setJarByClass(Gender.class);//主类
job.setMapperClass(PCMapper.class);//Mapper
job.setReducerClass(PCReducer.class);//Reducer

job.setPartitionerClass(MyHashPartitioner.class);
//job.setPartitionerClass(PCPartitioner.class);//设置Partitioner类
job.setNumReduceTasks(3);// reduce个数设置为3

job.setMapOutputKeyClass(Text.class);//map 输出key类型
job.setMapOutputValueClass(Text.class);//map 输出value类型

job.setCombinerClass(PCCombiner.class);//设置Combiner类

job.setOutputKeyClass(Text.class);//输出结果 key类型
job.setOutputValueClass(Text.class);//输出结果 value 类型

FileInputFormat.addInputPath(job, new Path(args[0]));// 输入路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));// 输出路径
job.waitForCompletion(true);//提交任务
return 0;
}
/**
* @function main 方法
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception
{
//    String[] args0 = {
//    "hdfs://HadoopMaster:9000/gender/gender.txt",
//    "hdfs://HadoopMaster:9000/out/partition/" };

String[] args0 = {
"./data/gender/gender.txt",
"./out/gender" };


int ec = ToolRunner.run(new Configuration(),new Gender(), args0);
System.exit(ec);
}
}
复制代码

 

 

 

 

    或者

    代码

复制代码
package com.dajiangtai.hadoop.junior;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
 * 
 * @function 统计不同年龄段内    男、女最高分数
 * @author zhouls
 *
 */
 
 /*
Alice<tab>23<tab>female<tab>45
Bob<tab>34<tab>male<tab>89
Chris<tab>67<tab>male<tab>97
Kristine<tab>38<tab>female<tab>53
Connor<tab>25<tab>male<tab>27
Daniel<tab>78<tab>male<tab>95
James<tab>34<tab>male<tab>79
Alex<tab>52<tab>male<tab>69
Nancy<tab>7<tab>female<tab>98
Adam<tab>9<tab>male<tab>37
Jacob<tab>7<tab>male<tab>23
Mary<tab>6<tab>female<tab>93
Clara<tab>87<tab>female<tab>72
Monica<tab>56<tab>female<tab>92
*/
public class Gender extends Configured implements Tool {
    /*
     * 
     * @function Mapper 解析输入数据,然后按需求输出
     * @input  key=行偏移量   value=学生数据
     * @output key=gender  value=name+age+score
     * 
     */
    public static class PCMapper extends Mapper<Object, Text, Text, Text>
    {
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException 
        {//拿Alice<tab>23<tab>female<tab>45
            String[] tokens = value.toString().split("<tab>");//使用分隔符<tab>,将数据解析为数组 tokens
                            //得到Alice        23         female            45
                            //即tokens[0]   tokens[1]  tokens[2]  tokens[3] 
            String gender = tokens[2].toString();//性别
            String nameAgeScore = tokens[0] + "\t" + tokens[1] + "\t"+ tokens[3];
            //输出  key=gender  value=name+age+score
            //输出     key=female  value=Alice    +23+45
            context.write(new Text(gender), new Text(nameAgeScore));//将 (female , Alice+  23+ 45) 写入到context中
        }
    }
    public static class MyHashPartitioner extends Partitioner<Text, Text> 
    {
          /** Use {@link Object#hashCode()} to partition. */
          @Override
          public int getPartition(Text key, Text value,int numReduceTasks) 
          {
            return (key.hashCode()) % numReduceTasks;
          }

        }
    /**
     * 
     * @function Partitioner 根据 age 选择 reduce 分区
     *
     */
    public static class PCPartitioner extends Partitioner<Text, Text> 
    {

        @Override
        public int getPartition(Text key, Text value, int numReduceTasks) 
        {
            // TODO Auto-generated method stub
            String[] nameAgeScore = value.toString().split("\t");
            String age = nameAgeScore[1];//学生年龄
            int ageInt = Integer.parseInt(age);//按年龄段分区

            // 默认指定分区 0
            if (numReduceTasks == 0)
                return 0;

            //年龄小于等于20,指定分区0
            if (ageInt <= 20) {
                return 0;
            }
            // 年龄大于20,小于等于50,指定分区1
            if (ageInt > 20 && ageInt <= 50) {

                return 1 % numReduceTasks;
            }
            // 剩余年龄,指定分区2
            else
                return 2 % numReduceTasks;
        }
    }

    /**
     * 
     * @function 定义Combiner 合并 Mapper 输出结果
     *
     */
    public static class PCCombiner extends Reducer<Text, Text, Text, Text> 
    {
        private Text text = new Text();

        public void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException 
        {
            int maxScore = Integer.MIN_VALUE;
            String name = " ";
            String age = " ";
            int score = 0;
            for (Text val : values) 
            {
                String[] valTokens = val.toString().split("\\t");
                score = Integer.parseInt(valTokens[2]);
                if (score > maxScore) 
                {
                    name = valTokens[0];
                    age = valTokens[1];
                    maxScore = score;
                }
            }
            text.set(name + "\t" + age + "\t" + maxScore);
            context.write(key, text);
        }
    }

    /*
     * 
     * @function Reducer 统计出 不同年龄段、不同性别 的最高分
     * input key=gender value=name+age+score
     * output key=name value=age+gender+score
     * 
     */
    static class PCReducer extends Reducer<Text, Text, Text, Text>  
    {
        @Override
        public void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException 
        {
            int maxScore = Integer.MIN_VALUE;
            String name = " ";
            String age = " ";
            String gender = " ";
            int score = 0;
            // 根据key,迭代 values 集合,求出最高分
            for (Text val : values)
                {
                String[] valTokens = val.toString().split("\\t");
                score = Integer.parseInt(valTokens[2]);
                if (score > maxScore) 
                {
                    name = valTokens[0];
                    age = valTokens[1];
                    gender = key.toString();
                    maxScore = score;
                }
            }
            context.write(new Text(name), new Text("age- " + age + "\t" + gender + "\tscore-" + maxScore));
        }
    }

    /**
     * @function 任务驱动方法
     * @param args
     * @return
     * @throws Exception
     */
    @Override
    public int run(String[] args) throws Exception 
    {
        // TODO Auto-generated method stub
        Configuration conf = new Configuration();//读取配置文件

        Path mypath = new Path(args[1]);
        FileSystem hdfs = mypath.getFileSystem(conf);
        if (hdfs.isDirectory(mypath)) 
        {
            hdfs.delete(mypath, true);
        }

        @SuppressWarnings("deprecation")
        Job job = new Job(conf, "gender");//新建一个任务
        job.setJarByClass(Gender.class);//主类
        job.setMapperClass(PCMapper.class);//Mapper
        job.setReducerClass(PCReducer.class);//Reducer

        job.setPartitionerClass(MyHashPartitioner.class);
        //job.setPartitionerClass(PCPartitioner.class);//设置Partitioner类
        job.setNumReduceTasks(3);// reduce个数设置为3

        job.setMapOutputKeyClass(Text.class);//map 输出key类型
        job.setMapOutputValueClass(Text.class);//map 输出value类型

        job.setCombinerClass(PCCombiner.class);//设置Combiner类

        job.setOutputKeyClass(Text.class);//输出结果 key类型
        job.setOutputValueClass(Text.class);//输出结果 value 类型

        FileInputFormat.addInputPath(job, new Path(args[0]));// 输入路径
        FileOutputFormat.setOutputPath(job, new Path(args[1]));// 输出路径
        job.waitForCompletion(true);//提交任务
        return 0;
    }
    /**
     * @function main 方法
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception
    {
        String[] args0 = {
                "hdfs://master:9000/middle/partition/gender.txt",
                "hdfs://master:9000/middle/partition/out/" };
        int ec = ToolRunner.run(new Configuration(),new Gender(), args0);
        System.exit(ec);
    }
}
复制代码

 


本文转自大数据躺过的坑博客园博客,原文链接:http://www.cnblogs.com/zlslch/p/6165704.html,如需转载请自行联系原作者

相关文章
|
4月前
|
JSON 安全 API
电商API入门问答:开发者必知的10个基础问题
本文详解电商API的10个基础知识,涵盖定义、用途、认证、安全等内容,帮助开发者快速入门并提升开发效率。
142 0
|
4月前
|
缓存 监控 安全
电商API集成入门:从零开始搭建高效接口
在数字化电商时代,API集成成为企业提升效率、实现系统互联的关键。本文从零开始,逐步讲解如何搭建高效、可靠的电商API接口,适合初学者学习。内容涵盖API基础、认证安全、请求处理、性能优化等核心步骤,并提供Python代码示例与数学公式辅助理解。通过实践,读者可掌握构建优质电商API的技巧,提升用户体验与系统性能。
231 0
|
1月前
|
Cloud Native 算法 API
Python API接口实战指南:从入门到精通
🌟蒋星熠Jaxonic,技术宇宙的星际旅人。深耕API开发,以Python为舟,探索RESTful、GraphQL等接口奥秘。擅长requests、aiohttp实战,专注性能优化与架构设计,用代码连接万物,谱写极客诗篇。
Python API接口实战指南:从入门到精通
|
7月前
|
JSON 算法 API
一文掌握 1688 商品详情 API 接口:从入门到实战
1688是国内领先的综合电商批发平台,提供海量商品资源。其商品详情API助力开发者与企业获取商品的详细信息(如属性、价格、库存等),广泛应用于电商数据分析、比价系统及采购场景。API支持GET/POST请求,需传入通用参数(app_key、timestamp等)与业务参数(如product_id)。返回JSON格式数据,包含商品标题、价格、图片链接等详情,提升业务效率与决策精准度。
|
7月前
|
搜索推荐 API 开发者
京东商品列表 API 接口全解析:从入门到精通
京东商品列表API是京东开放平台为开发者提供的核心数据接口,支持批量获取商品基础信息、价格、库存状态等多维度数据。它具备数据丰富性、灵活筛选与分页查询、稳定高效等特点,可满足市场分析、选品优化、比价工具及推荐系统开发等需求,为电商业务创新提供坚实支撑。通过标准化通道,助力第三方高效、合法地利用京东海量商品数据。
|
6月前
|
JSON API 开发工具
电商API接口入门指南
本文介绍了API的基础知识及其在电商领域的实际应用。首先,阐释了API的概念、运作机制及参数与返回值的作用,帮助读者理解如何通过API实现软件间的交互。接着,以获取电商商品列表为例,详细讲解了从选择平台、引入SDK到编写代码调用API的全流程。示例代码采用Python语言,利用requests库发送请求并解析JSON数据,为开发者提供了清晰的实践指导。
|
4月前
|
存储 安全 API
亚马逊SP-API入门:海外电商接口调用与国内平台的差异化
亚马逊 SP-API 与国内电商 API 在技术架构、安全机制及开发流程上差异显著。本文对比京东、淘宝等平台,分析接口设计、地域适配、权限管理等核心差异,并结合实战经验提供开发建议,助力开发者高效接入 SP-API,实现全球电商业务拓展。
|
6月前
|
数据挖掘 API 开发者
京东商品详情 API 接口全攻略:从入门到精通
京东商品详情API接口是京东开放平台为开发者提供的服务,用于获取商品详细信息。通过调用接口,开发者可获得商品属性、价格、库存、促销信息等数据,适用于电商应用、价格比较工具及数据分析平台等场景。支持GET/POST请求方式,参数包括API版本、密钥等。示例代码展示了如何使用Python的requests库调用该接口,并获取JSON格式的返回数据,包含商品基本信息、价格、库存和用户评价等内容。
292 16
|
开发框架 .NET API
RESTful API 设计与实现:C# 开发者的一分钟入门
【10月更文挑战第5天】本文从零开始,介绍了如何使用 C# 和 ASP.NET Core 设计并实现一个简单的 RESTful API。首先解释了 RESTful API 的概念及其核心原则,然后详细说明了设计 RESTful API 的关键步骤,包括资源识别、URI 设计、HTTP 方法选择、状态码使用和错误处理。最后,通过一个用户管理 API 的示例,演示了如何创建项目、定义模型、实现控制器及运行测试,帮助读者掌握 RESTful API 的开发技巧。
510 7
|
前端开发 JavaScript 安全
入门Vue+.NET 8 Web Api记录(一)
入门Vue+.NET 8 Web Api记录(一)
648 5