0x00 教程内容
- 项目准备
- 编写代码
- 运行代码并观察结果
0x01 项目准备
1. 新建Maven项目
(1)新建项目
(2)引入Hadoop相关的Jar包
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.5</version> </dependency>
(3)将数据拷贝到项目中
数据示例:
Nehru,2016-01-01 Dane,2016-01-01 Walter,2016-01-01 Gloria,2016-01-01 Clarke,2016-01-01 Madeline,2016-01-01 Kevyn,2016-01-01
数据说明:
1、文件名是: user_login.txt
2、字段只有两个,一个是 名字 、一个是 登录的日期
3、分隔符是 , 号
2. 需求说明
(1)需求1:按日期进行统计,其实就是统计某一天,一共有多少人登录
(2)需求2:按日期进行排序,其实就是按登录人数的低到高进行排序
0x02 编写代码
1. 需求1:按日期进行统计
完整代码如下:
package com.shaonaiyi.mapreduce; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class dailyAccessCount { public static class MyMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); // 按逗号进行分割 String array[] = line.split(","); // 将日期作为key String keyOutput = array[1]; // 输出格式:(日期, 1) context.write(new Text(keyOutput), one); } } public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> { // 定义统计结果result private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // 定义累加器,初始值为0 int sum = 0; // 遍历将map传递过来的相同日期所对应的1进行累加 for (IntWritable val : values) { sum += val.get(); } // 给统计结果result设值 result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length < 2) { // 参数小于2个时报错并提示内容 System.err.println("Usage: wordcount <in> [<in>...] <out>"); System.exit(2); } Job job = new Job(conf, "Daily Access Count"); job.setJarByClass(dailyAccessCount.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); for (int i = 0; i < otherArgs.length - 1; ++i) { FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }