数据集
1.查询地区编号
NCDC是美国国家气象数据中心的缩写,是一个负责收集、存储和分发全球气象和气候数据的组织。
我们通过NCDC查询地区对应的编号,参考这里。
2.数据集的下载
打开资源管理器并输入路径:
ftp://ftp.ncdc.noaa.gov/pub/data/noaa/isd-lite/
打开目录 "2022",查询编号(比如广州="592870"),找到文件 "592870-9999-20222.gz"复制到本地即可。
通过解压工具解压得到文件"592870-9999-20222",修改后缀为txt。
注意:这里的温度都是经过*10后的格式!
编写MapReduce程序
输入格式
2022 01 01 00 107 99 10265 339 11 -9999 -9999 -9999 2022 01 01 01 150 100 -9999 -9999 10 -9999 -9999 -9999 2022 01 01 02 160 100 -9999 -9999 10 -9999 -9999 -9999 2022 01 01 03 176 100 10270 354 20 -9999 -9999 -9999 2022 01 01 04 190 80 -9999 -9999 10 0 -9999 -9999 2022 01 01 05 210 80 -9999 -9999 20 0 -9999 -9999 2022 01 01 06 216 104 10234 19 11 -9999 -9999 -9999 2022 01 01 07 220 100 -9999 -9999 10 -9999 -9999 -9999 2022 01 01 08 210 90 -9999 270 20 -9999 -9999 -9999 2022 01 01 09 211 108 10229 331 25 -9999 -9999 -9999 2022 01 01 10 190 90 -9999 340 50 -9999 -9999 -9999 2022 01 01 11 190 90 -9999 340 50 -9999 -9999 -9999 ...共8700行数据
输出格式
01 260 02 360 03 310 ...共12行数据
Mapper类
确定参数
- KEY_IN:使用默认的TextInputFormat,所以 KEY_IN 为每一行的字节偏移量 ,为LongWritable类型。
- VALUE_IN:使用默认的TextInputFormat,所以 VALUE_IN为对应的一行文本,为Text型。
- KEY_OUT:我们统计每个月的最高温度,所以以月份为map函数输出的键 KEY_IN,为Text型。
- VALUE_OUT:每一行数据中的温度作为map函数输出的值,为 IntWritable型。
代码
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.util.StringUtils; import org.checkerframework.checker.units.qual.K; import java.io.IOException; public class MaxTempMapper extends Mapper<LongWritable, Text,Text, IntWritable> { private Text KEY_OUT = new Text(); private IntWritable VALUE_OUT = new IntWritable(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //获得数据行 String line = value.toString(); System.out.println(line); //转为数组 String[] words = StringUtils.split(line,' '); //获取月份 String month = words[1]; //获取温度 String temp = line.substring(16,19); temp = temp.trim(); System.out.println(month+" "+temp); //设置输出的 键和值 KEY_OUT.set(month); VALUE_OUT.set(Integer.parseInt(temp)); //写出 context.write(KEY_OUT,VALUE_OUT); } }
Reducer类
map函数的输出键值对即为reduce函数的输入键值对,所以:4
KEY_IN:Text类型。
VALUE_IN:IntWritable型。
KEY_OUT:Text型。
VALUE_OUT:IntWritable型。
思路
因为每个reduce方法每次处理的都是同一KEY(同一个月)的键值对,我们只需要定义一个变量maxTmp来不断更新最大的温度值即可。
代码
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class MaxTempReducer extends Reducer<Text, IntWritable,Text,IntWritable> { private IntWritable OUT_VALUE = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int maxTemp = Integer.MIN_VALUE; for (IntWritable value : values) { maxTemp = Math.max(value.get(),maxTemp); } //设置输出值 OUT_VALUE.set(maxTemp); //写出 context.write(key,OUT_VALUE); } }
Runner类
注意:输出目录不可存在!
import com.lyh.mapreduce.conbineTextInputFormat.WordCountMapper; import com.lyh.mapreduce.conbineTextInputFormat.WordCountReducer; import com.lyh.mapreduce.conbineTextInputFormat.WordCountRunner; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class MaxTempRunner extends Configured implements Tool { public static void main(String[] args) throws Exception { ToolRunner.run(new Configuration(),new MaxTempRunner(),args); } @Override public int run(String[] args) throws Exception { //1.获取job Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "max temperature count"); //2.配置jar包路径 job.setJarByClass(MaxTempRunner.class); //3.关联mapper和reducer job.setMapperClass(MaxTempMapper.class); job.setReducerClass(MaxTempReducer.class); //4.设置map、reduce输出的k、v类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //5.设置统计文件输入的路径,将命令行的第一个参数作为输入文件的路径 FileInputFormat.setInputPaths(job,new Path("D:\\MapReduce_Data_Test\\temperature\\input")); //6.设置结果数据存放路径,将命令行的第二个参数作为数据的输出路径 FileOutputFormat.setOutputPath(job,new Path("D:\\MapReduce_Data_Test\\temperature\\output")); return job.waitForCompletion(true) ? 0 : 1;//verbose:是否监控并打印job的信息 } }
运行结果
01 260 02 260 03 310 04 330 05 350 06 370 07 400 08 380 09 370 10 370 11 320 12 220