MapReduce计算广州2022年每月最高温度

简介: MapReduce计算广州2022年每月最高温度

数据集

1.查询地区编号

NCDC是美国国家气象数据中心的缩写,是一个负责收集、存储和分发全球气象和气候数据的组织。

我们通过NCDC查询地区对应的编号,参考这里

2.数据集的下载

打开资源管理器并输入路径:

ftp://ftp.ncdc.noaa.gov/pub/data/noaa/isd-lite/

打开目录 "2022",查询编号(比如广州="592870"),找到文件 "592870-9999-20222.gz"复制到本地即可。

通过解压工具解压得到文件"592870-9999-20222",修改后缀为txt。

 


注意这里的温度都是经过*10后的格式!

编写MapReduce程序

输入格式

2022 01 01 00   107    99 10265   339    11 -9999 -9999 -9999
2022 01 01 01   150   100 -9999 -9999    10 -9999 -9999 -9999
2022 01 01 02   160   100 -9999 -9999    10 -9999 -9999 -9999
2022 01 01 03   176   100 10270   354    20 -9999 -9999 -9999
2022 01 01 04   190    80 -9999 -9999    10     0 -9999 -9999
2022 01 01 05   210    80 -9999 -9999    20     0 -9999 -9999
2022 01 01 06   216   104 10234    19    11 -9999 -9999 -9999
2022 01 01 07   220   100 -9999 -9999    10 -9999 -9999 -9999
2022 01 01 08   210    90 -9999   270    20 -9999 -9999 -9999
2022 01 01 09   211   108 10229   331    25 -9999 -9999 -9999
2022 01 01 10   190    90 -9999   340    50 -9999 -9999 -9999
2022 01 01 11   190    90 -9999   340    50 -9999 -9999 -9999
...共8700行数据

输出格式

01  260
02  360
03  310
...共12行数据

Mapper类

确定参数

  • KEY_IN:使用默认的TextInputFormat,所以 KEY_IN 为每一行的字节偏移量 ,为LongWritable类型。
  • VALUE_IN:使用默认的TextInputFormat,所以 VALUE_IN为对应的一行文本,为Text型。
  • KEY_OUT:我们统计每个月的最高温度,所以以月份为map函数输出的键 KEY_IN,为Text型。
  • VALUE_OUT:每一行数据中的温度作为map函数输出的值,为 IntWritable型。

代码

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.StringUtils;
import org.checkerframework.checker.units.qual.K;
import java.io.IOException;
public class MaxTempMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
    private Text KEY_OUT = new Text();
    private IntWritable VALUE_OUT = new IntWritable();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //获得数据行
        String line = value.toString();
        System.out.println(line);
        //转为数组
        String[] words = StringUtils.split(line,' ');
        //获取月份
        String month = words[1];
        //获取温度
        String temp = line.substring(16,19);
        temp = temp.trim();
        System.out.println(month+"    "+temp);
        //设置输出的 键和值
        KEY_OUT.set(month);
        VALUE_OUT.set(Integer.parseInt(temp));
        //写出
        context.write(KEY_OUT,VALUE_OUT);
    }
}

Reducer类

map函数的输出键值对即为reduce函数的输入键值对,所以:4

KEY_IN:Text类型。

VALUE_IN:IntWritable型。

KEY_OUT:Text型。

VALUE_OUT:IntWritable型。

思路

因为每个reduce方法每次处理的都是同一KEY(同一个月)的键值对,我们只需要定义一个变量maxTmp来不断更新最大的温度值即可。

代码

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class MaxTempReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
    private IntWritable OUT_VALUE = new IntWritable();
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int maxTemp = Integer.MIN_VALUE;
        for (IntWritable value : values) {
            maxTemp = Math.max(value.get(),maxTemp);
        }
        //设置输出值
        OUT_VALUE.set(maxTemp);
        //写出
        context.write(key,OUT_VALUE);
    }
}

Runner类

注意:输出目录不可存在!

import com.lyh.mapreduce.conbineTextInputFormat.WordCountMapper;
import com.lyh.mapreduce.conbineTextInputFormat.WordCountReducer;
import com.lyh.mapreduce.conbineTextInputFormat.WordCountRunner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class MaxTempRunner extends Configured implements Tool {
    public static void main(String[] args) throws Exception {
        ToolRunner.run(new Configuration(),new MaxTempRunner(),args);
    }
    @Override
    public int run(String[] args) throws Exception {
        //1.获取job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "max temperature count");
        //2.配置jar包路径
        job.setJarByClass(MaxTempRunner.class);
        //3.关联mapper和reducer
        job.setMapperClass(MaxTempMapper.class);
        job.setReducerClass(MaxTempReducer.class);
        //4.设置map、reduce输出的k、v类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //5.设置统计文件输入的路径,将命令行的第一个参数作为输入文件的路径
        FileInputFormat.setInputPaths(job,new Path("D:\\MapReduce_Data_Test\\temperature\\input"));
        //6.设置结果数据存放路径,将命令行的第二个参数作为数据的输出路径
        FileOutputFormat.setOutputPath(job,new Path("D:\\MapReduce_Data_Test\\temperature\\output"));
        return job.waitForCompletion(true) ? 0 : 1;//verbose:是否监控并打印job的信息
    }
}

运行结果

01  260
02  260
03  310
04  330
05  350
06  370
07  400
08  380
09  370
10  370
11  320
12  220


相关文章
|
分布式计算
使用MapReduce计算用户流量使用情况
使用MapReduce计算用户流量使用情况
使用MapReduce计算用户流量使用情况
|
存储 缓存 分布式计算
大数据计算的基石——MapReduce
MapReduce Google File System提供了大数据存储的方案,这也为后来HDFS提供了理论依据,但是在大数据存储之上的大数据计算则不得不提到MapReduce。 虽然现在通过框架的不断发展,MapReduce已经渐渐的淡出人们的视野,越来越多的框架提供了简单的SQL语法来进行大数据计算。但是,MapReduce所提供的编程模型为这一切奠定了基础,所以Google的这篇MapReduce 论文值得我们去认真的研读。
241 0
大数据计算的基石——MapReduce
|
分布式计算 资源调度 算法
【大数据计算】(三) MapReduce的安装和基础编程
目录 1.词频统计任务要求 1.1 MapReduce程序编写方法 1.1.1 编写Map处理逻辑 1.1.2 编写Reduce处理逻辑 1.1.3 编写main方法 2 完整的词频统计程序 3. 编译打包程序 3.1 使用命令行编译打包词频统计程序 3.2 使用IDEA编译打包词频统计程序 4. 运行程序 5. 编程题 5.1 根据附件的数据文件flow_data.dat , 编程完成下面需求: 5.2 附加题(选做) 6. 福利送书 最后
356 0
【大数据计算】(三) MapReduce的安装和基础编程
|
分布式计算 Java 调度
使用MapReduce计算用户流量使用情况
使用MapReduce计算用户流量使用情况
使用MapReduce计算用户流量使用情况
|
分布式计算 Java 大数据
Spark和MapReduce任务计算模型
【前言:本文主要从任务处理的运行模式为角度,分析Spark计算模型,希望帮助大家对Spark有一个更深入的了解。同时拿MapReduce和Spark计算模型做对比,强化对Spark和MapReduce理解】
|
分布式计算 Hadoop 数据格式
|
分布式计算 Hadoop Shell
|
存储 分布式计算 Hadoop
MapReduce计算框架
MapReduce计算框架 一、MapReduce实现原理   图展示了MapReduce实现中的全部流程,处理步骤如下:   1、用户程序中的MapReduce函数库首先把输入文件分成M块(每块大小默认64M),在集群上执行处理程序,见序号1   2、主控程序master分配Map任务和Reduce任务给工作执行机器worker。
2007 0
|
4月前
|
分布式计算 Hadoop
Hadoop系列 mapreduce 原理分析
Hadoop系列 mapreduce 原理分析
40 1