Hadoop MapReduce编程 API入门系列之挖掘气象数据版本3(九)

简介:

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

代码

复制代码
package zhouls.bigdata.myMapReduce.weather;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class MyKey implements WritableComparable<MyKey>{
    //WritableComparable,实现这个方法,要多很多
    //readFields是读入,write是写出
    private int year;
    private int month;
    private double hot;
    public int getYear() {
    return year;
}

    public void setYear(int year) {
        this.year = year;
    }
    
    public int getMonth() {
        return month;
    }
    
    public void setMonth(int month) {
        this.month = month;
    }
    
    public double getHot() {
        return hot;
    }
    
    public void setHot(double hot) {
        this.hot = hot;
        }//这一大段的get和set,可以右键,source,产生get和set,自动生成。


    public void readFields(DataInput arg0) throws IOException { //反序列化
        this.year=arg0.readInt();
        this.month=arg0.readInt();
        this.hot=arg0.readDouble();
    }
    
    public void write(DataOutput arg0) throws IOException { //序列化
        arg0.writeInt(year);
        arg0.writeInt(month);
        arg0.writeDouble(hot);
    }

    //判断对象是否是同一个对象,当该对象作为输出的key
    public int compareTo(MyKey o) {
        int r1 =Integer.compare(this.year, o.getYear());//比较当前的年和你传过来的年
        if(r1==0){
        int r2 =Integer.compare(this.month, o.getMonth());
        if(r2==0){
            return Double.compare(this.hot, o.getHot());
        }else{
            return r2;
        }
        }else{
            return r1;
        }
    }

}
复制代码

 

 

 

 

 

 

 

 

 

 

 

 

 

复制代码
package zhouls.bigdata.myMapReduce.weather;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

public class MyPartitioner extends HashPartitioner<MyKey, DoubleWritable>{//这里就是洗牌

    //执行时间越短越好
    public int getPartition(MyKey key, DoubleWritable value, int numReduceTasks) {
        return (key.getYear()-1949)%numReduceTasks;//对于一个数据集,找到最小,1949
    }
}


//1949-10-01 14:21:02    34c
//1949-10-02 14:01:02    36c
//1950-01-01 11:21:02    32c
//1950-10-01 12:21:02    37c
//1951-12-01 12:21:02    23c
//1950-10-02 12:21:02    41c
//1950-10-03 12:21:02    27c
//1951-07-01 12:21:02    45c
//1951-07-02 12:21:02    46c
//1951-07-03 12:21:03    47c

 
复制代码

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

复制代码
package zhouls.bigdata.myMapReduce.weather;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class MySort extends WritableComparator{

    public MySort(){
        super(MyKey.class,true);//把MyKey传进了
    }

    public int compare(WritableComparable a, WritableComparable b) {//这是排序的精髓
        MyKey k1 =(MyKey) a;
        MyKey k2 =(MyKey) b;
        int r1 =Integer.compare(k1.getYear(), k2.getYear());
        if(r1==0){//年相同
        int r2 =Integer.compare(k1.getMonth(), k2.getMonth());
        if(r2==0){//月相同
            return -Double.compare(k1.getHot(), k2.getHot());//比较气温
        }else{
            return r2;
        }
        }else{
            return r1;
        }

    }
}
复制代码

 

 

 

 

 

 

 

复制代码
package zhouls.bigdata.myMapReduce.weather;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class MyGroup extends WritableComparator{

    public MyGroup(){
        super(MyKey.class,true);//把MyKey传进了
}

    public int compare(WritableComparable a, WritableComparable b) {//这是分组的精髓
        MyKey k1 =(MyKey) a;
        MyKey k2 =(MyKey) b;
        int r1 =Integer.compare(k1.getYear(), k2.getYear());
    if(r1==0){
        return Integer.compare(k1.getMonth(), k2.getMonth());
    }else{
        return r1;
    }

    }
}

 
复制代码

 

 

 

 

 

 

复制代码
package zhouls.bigdata.myMapReduce.weather;


import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class RunJob {


//    1949-10-01 14:21:02    34c WeatherMapper
//    1949-10-02 14:01:02    36c
//    1950-01-01 11:21:02    32c    分区在MyPartitioner.java 
//    1950-10-01 12:21:02    37c
//    1951-12-01 12:21:02    23c    排序在MySort.java
//    1950-10-02 12:21:02    41c
//    1950-10-03 12:21:02    27c    分组在MyGroup.java
//    1951-07-01 12:21:02    45c
//    1951-07-02 12:21:02    46c    再,WeatherReducer
//    1951-07-03 12:21:03    47c

//key:每行第一个隔开符(制表符)左边为key,右边为value    自定义类型MyKey,洗牌,    
    static class WeatherMapper extends Mapper<Text, Text, MyKey, DoubleWritable>{
    SimpleDateFormat sdf =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    NullWritable v =NullWritable.get();
//    1949-10-01 14:21:02是自定义类型MyKey,即key
//    34c是DoubleWritable,即value

    protected void map(Text key, Text value,Context context) throws IOException, InterruptedException {
    try {
        Date date =sdf.parse(key.toString());
        Calendar c =Calendar.getInstance();
        //Calendar 类是一个抽象类,可以通过调用 getInstance() 静态方法获取一个 Calendar 对象,
        //此对象已由当前日期时间初始化,即默认代表当前时间,如 Calendar c = Calendar.getInstance();    
        c.setTime(date);
        int year =c.get(Calendar.YEAR);
        int month =c.get(Calendar.MONTH);

        double hot =Double.parseDouble(value.toString().substring(0, value.toString().lastIndexOf("c")));
        MyKey k =new MyKey();
        k.setYear(year);
        k.setMonth(month);
        k.setHot(hot);
        context.write(k, new DoubleWritable(hot));
    } catch (Exception e) {
        e.printStackTrace();
    }
    }
}

    static class WeatherReducer extends Reducer<MyKey, DoubleWritable, Text, NullWritable>{
    protected void reduce(MyKey arg0, Iterable<DoubleWritable> arg1,Context arg2)throws IOException, InterruptedException {
        int i=0;
        for(DoubleWritable v :arg1){
        i++;
        String msg =arg0.getYear()+"\t"+arg0.getMonth()+"\t"+v.get();//"\t"是制表符
        arg2.write(new Text(msg), NullWritable.get());
                if(i==3){
                    break;
                }
        }
    }
}

public static void main(String[] args) {
    Configuration config =new Configuration();
//    config.set("fs.defaultFS", "hdfs://HadoopMaster:9000");
//    config.set("yarn.resourcemanager.hostname", "HadoopMaster");
//    config.set("mapred.jar", "C:\\Users\\Administrator\\Desktop\\wc.jar");
//    config.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ",");//默认分隔符是制表符"\t",这里自定义,如","
    try {
        FileSystem fs =FileSystem.get(config);

        Job job =Job.getInstance(config);
        job.setJarByClass(RunJob.class);

        job.setJobName("weather");

        job.setMapperClass(WeatherMapper.class);
        job.setReducerClass(WeatherReducer.class);
        job.setMapOutputKeyClass(MyKey.class);
        job.setMapOutputValueClass(DoubleWritable.class);

        job.setPartitionerClass(MyPartitioner.class);
        job.setSortComparatorClass(MySort.class);
        job.setGroupingComparatorClass(MyGroup.class);

        job.setNumReduceTasks(3);

        job.setInputFormatClass(KeyValueTextInputFormat.class);

//    FileInputFormat.addInputPath(job, new Path("hdfs://HadoopMaster:9000/weather.txt"));//输入路径,下有weather.txt
//    
//    Path outpath =new Path("hdfs://HadoopMaster:9000/out/weather");

        FileInputFormat.addInputPath(job, new Path("./data/weather.txt"));//输入路径,下有weather.txt

    Path outpath =new Path("./out/weather");

    if(fs.exists(outpath)){
        fs.delete(outpath, true);
    }
    FileOutputFormat.setOutputPath(job, outpath);

        boolean f= job.waitForCompletion(true);
        if(f){
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
    }

}
复制代码

 

 


本文转自大数据躺过的坑博客园博客,原文链接:http://www.cnblogs.com/zlslch/p/6164729.html,如需转载请自行联系原作者

相关文章
|
6天前
|
数据采集 监控 数据挖掘
常用电商商品数据API接口(item get)概述,数据分析以及上货
电商商品数据API接口(item get)是电商平台上用于提供商品详细信息的接口。这些接口允许开发者或系统以编程方式获取商品的详细信息,包括但不限于商品的标题、价格、库存、图片、销量、规格参数、用户评价等。这些信息对于电商业务来说至关重要,是商品数据分析、价格监控、上货策略制定等工作的基础。
|
24天前
|
API 网络安全
发送UDP数据免费API接口教程
此API用于向指定主机发送UDP数据,支持POST或GET请求。需提供用户ID、密钥、接收IP及端口、数据内容等参数。返回状态码和信息提示。示例中含公共ID与KEY,建议使用个人凭证以提高调用频率。
44 13
|
24天前
|
网络协议 API 网络安全
发送TCP数据免费API接口教程
此API用于向指定主机发送TCP数据,支持POST/GET请求,需提供用户ID、KEY、接收IP、端口及数据内容。返回状态码和信息提示,示例如下:{&quot;code&quot;:200,&quot;msg&quot;:&quot;发送成功!&quot;}。详情见:https://www.apihz.cn/api/datacstcp.html
35 11
|
1月前
|
人工智能 关系型数据库 MySQL
数据魔力,一触即发 —— Dataphin数据服务API,百炼插件新星降临!
本文通过一个利用百炼大模型平台和Dataphin数据服务API构建一个客户360智能应用的案例,介绍如何使用Dataphin数据服务API在百炼平台创建一个自定义插件,用于智能应用的开发,提升企业智能化应用水平。
130 3
数据魔力,一触即发 —— Dataphin数据服务API,百炼插件新星降临!
|
24天前
|
API 数据安全/隐私保护 开发者
实时获取小红书详情 API 数据
小红书详情API数据获取指南:注册开发者账号,创建应用并申请接口权限,构建请求获取笔记详情,使用Python等语言处理响应数据。需遵守使用规则,注意调用频率和数据安全。
|
1月前
|
XML 数据可视化 API
商品详情数据实战案例,API接口系列
淘宝商品详情数据在电商领域具有广泛的应用价值,而淘宝商品详情API接口则为开发者提供了获取这些数据的重要途径。通过合理利用这些接口和数据,可以提升业务效率、优化用户体验,为电商行业的发展注入新的活力。
|
1月前
|
SQL 缓存 API
在API接口数据获取过程中,如何确保数据的安全性和隐私性?
在API接口数据获取过程中,确保数据的安全性和隐私性至关重要。本文介绍了身份认证与授权、防止SQL注入和XSS攻击、加密传输、API版本控制、限流与熔断、压力测试与性能优化、备份与恢复以及法律和伦理考量等关键措施,帮助开发者和管理者有效保护API接口的数据安全和隐私性。
|
1月前
|
JSON API 数据安全/隐私保护
拍立淘按图搜索API接口返回数据的JSON格式示例
拍立淘按图搜索API接口允许用户通过上传图片来搜索相似的商品,该接口返回的通常是一个JSON格式的响应,其中包含了与上传图片相似的商品信息。以下是一个基于淘宝平台的拍立淘按图搜索API接口返回数据的JSON格式示例,同时提供对其关键字段的解释
|
1月前
|
JSON 前端开发 JavaScript
API接口商品详情接口数据解析
商品详情接口通常用于提供特定商品的详细信息,这些信息比商品列表接口中的信息更加详细和全面。以下是一个示例的JSON数据格式,用于表示一个商品详情API接口的响应。这个示例假定API返回一个包含商品详细信息的对象。
|
1月前
|
存储 监控 安全
API接口数据获取全流程用户指南
本文介绍了从明确需求到数据存储与管理的API接口数据获取全流程。首先,明确业务需求和选择合适的数据源;接着,准备API接口,包括审查文档、申请密钥和安全存储;然后,构建与发送请求,处理响应与数据;最后,进行数据存储与管理,并持续监控与优化,确保数据的安全与合规。通过这些步骤,用户可以高效地获取和管理数据,为数据分析和业务优化提供支持。