一、需求分析
1.网站数据分析的四个指标:
PV:PageView ,浏览量
用户每打开一个网页就会被记录1次浏览量,多次打开同一个页面浏览量累计加一
UV:Unique Visitor 独立访客数
同一用户多次访问,独立访客数只算一次
VV:visitor view,访客的访问次数
同一用户完成浏览并关闭该网站时,访客的访问次数算一次
IP:独立IP数
同一IP不管访问了几个页面,独立IP数均为1
2.各个省份PV的统计:
我们的需求分析是统计网站日志文件中的各省份ID出现的次数,比如客户A访问了该网站,省份ID是5,客户B的省份ID是6,客户C的省份ID是8,客户D的省份ID是10,客户E的省份ID是5,客户F的省份ID是6…这样分析类比过来其实就是词频统计,得出的结果也是ID为5的有2人,ID为6的为2人,ID为8的为1人…
根据需求分析,下面为mapreduce各阶段的数据类型
map -> input <LongWritable, Text> map -> output <IntWritable, IntWritable> reduce -> input <IntWritable, IntWritable> reduce -> output <IntWritable, IntWritable>
二、程序编写
在编写程序之前,我们需要分析一下每行数据各字段的含义,在编写程序的过程中对数据进行预处理
示例代码:
package com.kfk.hadoop.mr; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/10/9 * @time : 7:07 下午 */ public class WebPV extends Configured implements Tool { /** * map * TODO */ public static class WebPvMapper extends Mapper<LongWritable, Text,IntWritable, IntWritable>{ // 创建map输出的对象 private static final IntWritable mapOutValue = new IntWritable(1); private static final IntWritable mapOutKey = new IntWritable(); @Override public void setup(Context context) throws IOException, InterruptedException { // TODO } @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 每一行数据按"\t"分割 String[] values = value.toString().split("\t"); // 数据预处理:每一行数据少于30个字段就过滤掉 if (values.length<30){ // 统计每一行数据少于30个字段的个数 context.getCounter("WEBPV_COUNTERS","LENGTH_LT30_COUNTER").increment(1); return; } String provinceIdValue = values[23]; String url = values[1]; // provinceIdValue为空的话将数据过滤掉 if (StringUtils.isBlank(provinceIdValue)){ // 统计provinceIdValue为空的个数 context.getCounter("WEBPV_COUNTERS","PROVINCEID_ISBLACK_COUNTER").increment(1); return; } // url为空的话。将数据过滤掉 if (StringUtils.isBlank(url)){ // 统计url为空的个数 context.getCounter("WEBPV_COUNTERS","URL_ISBLACK_COUNTER").increment(1); return; } // 如果provinceIdValue不能转换成Integer型,则将数据过滤 int provinceId = 0; try{ provinceId = Integer.valueOf(provinceIdValue); }catch (Exception e){ // 统计provinceIdValue不能转换成Integer型的个数 context.getCounter("WEBPV_COUNTERS","PROVINCEID_VALIDATE_COUNTER").increment(1); return; } // 将provinceId设置为map输出的key mapOutKey.set(provinceId); // map输出端的key和value context.write(mapOutKey,mapOutValue); } @Override public void cleanup(Context context) throws IOException, InterruptedException { // TODO } } /** * reduce * TODO */ public static class WebPvReducer extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable>{ // 创建reduce输出的对象 private static final IntWritable reduceOutValue = new IntWritable(); @Override public void setup(Context context) throws IOException, InterruptedException { // TODO } @Override public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // 对reduce输入的value求和 int sum = 0; for (IntWritable value:values){ sum += value.get(); } // 将sum设置为reduce输出的key reduceOutValue.set(sum); // reduce输出端的key和value context.write(key,reduceOutValue); // 打印出reduce输出端的key和value的值 System.out.println("Reduce out == KeyOut: "+key+" ValueOut: "+reduceOutValue); } @Override public void cleanup(Context context) throws IOException, InterruptedException { // TODO } } /** * run * @param args * @return * @throws IOException * @throws ClassNotFoundException * @throws InterruptedException */ public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 1) get conf Configuration configuration = this.getConf(); // 2) create job Job job = Job.getInstance(configuration,this.getClass().getSimpleName()); job.setJarByClass(this.getClass()); // 3.1) input,指定job的输入 Path path = new Path(args[0]); FileInputFormat.addInputPath(job,path); // 3.2) map,指定job的mapper和输出的类型 job.setMapperClass(WebPvMapper.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(IntWritable.class); // 1.分区 // job.setPartitionerClass(); // 2.排序 // job.setSortComparatorClass(); // 3.combiner -可选项 job.setCombinerClass(WebPvReducer.class); // 4.compress -可配置 // configuration.set("mapreduce.map.output.compress","true"); // 使用的SnappyCodec压缩算法 // configuration.set("mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.SnappyCodec"); // 5.分组 // job.setGroupingComparatorClass(); // 6.设置reduce的数量 // job.setNumReduceTasks(2); // 3.3) reduce,指定job的reducer和输出类型 job.setReducerClass(WebPvReducer.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); // 3.4) output,指定job的输出 Path outpath = new Path(args[1]); FileOutputFormat.setOutputPath(job,outpath); // 4) commit,执行job boolean isSuccess = job.waitForCompletion(true); // 如果正常执行返回0,否则返回1 return (isSuccess) ? 0 : 1; } public static void main(String[] args) { // 添加输入,输入参数 args = new String[]{ "hdfs://bigdata-pro-m01:9000/user/caizhengjie/datas/2015082818", "hdfs://bigdata-pro-m01:9000/user/caizhengjie/mr/output" }; // WordCountUpMR wordCountUpMR = new WordCountUpMR(); Configuration configuration = new Configuration(); try { // 判断输出的文件存不存在,如果存在就将它删除 Path fileOutPath = new Path(args[1]); FileSystem fileSystem = FileSystem.get(configuration); if (fileSystem.exists(fileOutPath)){ fileSystem.delete(fileOutPath,true); } // 调用run方法 int status = ToolRunner.run(configuration,new WebPV(),args); // 退出程序 System.exit(status); } catch (IOException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } } }
运行结果:
注意:No reduce task的情况
当没有reduce任务的时候,combiner是不生效。但是,map端的shuffle过程是有的。