MapReduce编程案例之电商网站日志的行为分析

简介: 笔记

一、需求分析


1.网站数据分析的四个指标:


PV:PageView ,浏览量


用户每打开一个网页就会被记录1次浏览量,多次打开同一个页面浏览量累计加一


UV:Unique Visitor 独立访客数


同一用户多次访问,独立访客数只算一次


VV:visitor view,访客的访问次数


同一用户完成浏览并关闭该网站时,访客的访问次数算一次


IP:独立IP数


同一IP不管访问了几个页面,独立IP数均为1


2.各个省份PV的统计:

我们的需求分析是统计网站日志文件中的各省份ID出现的次数,比如客户A访问了该网站,省份ID是5,客户B的省份ID是6,客户C的省份ID是8,客户D的省份ID是10,客户E的省份ID是5,客户F的省份ID是6…这样分析类比过来其实就是词频统计,得出的结果也是ID为5的有2人,ID为6的为2人,ID为8的为1人…


根据需求分析,下面为mapreduce各阶段的数据类型

  map -> input  <LongWritable, Text>
  map -> output  <IntWritable, IntWritable>
  reduce -> input  <IntWritable, IntWritable>
  reduce -> output  <IntWritable, IntWritable>


二、程序编写


在编写程序之前,我们需要分析一下每行数据各字段的含义,在编写程序的过程中对数据进行预处理

20.png

示例代码:

package com.kfk.hadoop.mr;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/10/9
 * @time : 7:07 下午
 */
public class WebPV extends Configured implements Tool {
    /**
     * map
     * TODO
     */
    public static class WebPvMapper extends Mapper<LongWritable, Text,IntWritable, IntWritable>{
        // 创建map输出的对象
        private static final IntWritable mapOutValue = new IntWritable(1);
        private static final IntWritable mapOutKey = new IntWritable();
        @Override
        public void setup(Context context) throws IOException, InterruptedException {
            // TODO
        }
        @Override
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            // 每一行数据按"\t"分割
            String[] values = value.toString().split("\t");
            // 数据预处理:每一行数据少于30个字段就过滤掉
            if (values.length<30){
                // 统计每一行数据少于30个字段的个数
                context.getCounter("WEBPV_COUNTERS","LENGTH_LT30_COUNTER").increment(1);
                return;
            }
            String provinceIdValue = values[23];
            String url = values[1];
            // provinceIdValue为空的话将数据过滤掉
            if (StringUtils.isBlank(provinceIdValue)){
                // 统计provinceIdValue为空的个数
                context.getCounter("WEBPV_COUNTERS","PROVINCEID_ISBLACK_COUNTER").increment(1);
                return;
            }
            // url为空的话。将数据过滤掉
            if (StringUtils.isBlank(url)){
                // 统计url为空的个数
                context.getCounter("WEBPV_COUNTERS","URL_ISBLACK_COUNTER").increment(1);
                return;
            }
            // 如果provinceIdValue不能转换成Integer型,则将数据过滤
            int provinceId = 0;
            try{
                provinceId = Integer.valueOf(provinceIdValue);
            }catch (Exception e){
                // 统计provinceIdValue不能转换成Integer型的个数
                context.getCounter("WEBPV_COUNTERS","PROVINCEID_VALIDATE_COUNTER").increment(1);
                return;
            }
            // 将provinceId设置为map输出的key
            mapOutKey.set(provinceId);
            // map输出端的key和value
            context.write(mapOutKey,mapOutValue);
        }
        @Override
        public void cleanup(Context context) throws IOException, InterruptedException {
            // TODO
        }
    }
    /**
     * reduce
     * TODO
     */
    public static class WebPvReducer extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable>{
        // 创建reduce输出的对象
        private static final IntWritable reduceOutValue = new IntWritable();
        @Override
        public void setup(Context context) throws IOException, InterruptedException {
            // TODO
        }
        @Override
        public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            // 对reduce输入的value求和
            int sum = 0;
            for (IntWritable value:values){
                sum += value.get();
            }
            // 将sum设置为reduce输出的key
            reduceOutValue.set(sum);
            // reduce输出端的key和value
            context.write(key,reduceOutValue);
            // 打印出reduce输出端的key和value的值
            System.out.println("Reduce out == KeyOut: "+key+"   ValueOut: "+reduceOutValue);
        }
        @Override
        public void cleanup(Context context) throws IOException, InterruptedException {
            // TODO
        }
    }
    /**
     * run
     * @param args
     * @return
     * @throws IOException
     * @throws ClassNotFoundException
     * @throws InterruptedException
     */
    public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 1) get conf
        Configuration configuration = this.getConf();
        // 2) create job
        Job job = Job.getInstance(configuration,this.getClass().getSimpleName());
        job.setJarByClass(this.getClass());
        // 3.1) input,指定job的输入
        Path path = new Path(args[0]);
        FileInputFormat.addInputPath(job,path);
        // 3.2) map,指定job的mapper和输出的类型
        job.setMapperClass(WebPvMapper.class);
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(IntWritable.class);
        // 1.分区
        // job.setPartitionerClass();
        // 2.排序
        // job.setSortComparatorClass();
        // 3.combiner -可选项
        job.setCombinerClass(WebPvReducer.class);
        // 4.compress -可配置
        // configuration.set("mapreduce.map.output.compress","true");
        // 使用的SnappyCodec压缩算法
        // configuration.set("mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");
        // 5.分组
        // job.setGroupingComparatorClass();
        // 6.设置reduce的数量
        // job.setNumReduceTasks(2);
        // 3.3) reduce,指定job的reducer和输出类型
        job.setReducerClass(WebPvReducer.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(IntWritable.class);
        // 3.4) output,指定job的输出
        Path outpath = new Path(args[1]);
        FileOutputFormat.setOutputPath(job,outpath);
        // 4) commit,执行job
        boolean isSuccess = job.waitForCompletion(true);
        // 如果正常执行返回0,否则返回1
        return (isSuccess) ? 0 : 1;
    }
    public static void main(String[] args) {
        // 添加输入,输入参数
        args = new String[]{
            "hdfs://bigdata-pro-m01:9000/user/caizhengjie/datas/2015082818",
            "hdfs://bigdata-pro-m01:9000/user/caizhengjie/mr/output"
        };
//        WordCountUpMR wordCountUpMR = new WordCountUpMR();
        Configuration configuration = new Configuration();
        try {
            // 判断输出的文件存不存在,如果存在就将它删除
            Path fileOutPath = new Path(args[1]);
            FileSystem fileSystem = FileSystem.get(configuration);
            if (fileSystem.exists(fileOutPath)){
                fileSystem.delete(fileOutPath,true);
            }
            // 调用run方法
            int status = ToolRunner.run(configuration,new WebPV(),args);
            // 退出程序
            System.exit(status);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

运行结果:

21.png22.png

注意:No reduce task的情况

当没有reduce任务的时候,combiner是不生效。但是,map端的shuffle过程是有的。


相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
8天前
|
存储 分布式计算 监控
Hadoop【基础知识 01+02】【分布式文件系统HDFS设计原理+特点+存储原理】(部分图片来源于网络)【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
【4月更文挑战第3天】【分布式文件系统HDFS设计原理+特点+存储原理】(部分图片来源于网络)【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
144 2
|
8天前
|
存储 分布式计算 算法
【底层服务/编程功底系列】「大数据算法体系」带你深入分析MapReduce算法 — Shuffle的执行过程
【底层服务/编程功底系列】「大数据算法体系」带你深入分析MapReduce算法 — Shuffle的执行过程
31 0
|
3天前
|
分布式计算 资源调度 Hadoop
MapReduce分布式编程
MapReduce分布式编程
10 1
|
8天前
|
分布式计算 数据可视化 Hadoop
大数据实战——基于Hadoop的Mapreduce编程实践案例的设计与实现
大数据实战——基于Hadoop的Mapreduce编程实践案例的设计与实现
|
8天前
|
分布式计算 并行计算 Java
【分布式计算框架】 MapReduce编程初级实践
【分布式计算框架】 MapReduce编程初级实践
9 2
|
8天前
编程日志01:个人网站更新用户头像
编程日志01:个人网站更新用户头像
12 0
|
8天前
|
数据库
编程日记02:个人站优化数据库和日志
编程日记02:个人站优化数据库和日志
15 0
|
8天前
|
SQL 分布式计算 数据可视化
数据分享|Python、Spark SQL、MapReduce决策树、回归对车祸发生率影响因素可视化分析
数据分享|Python、Spark SQL、MapReduce决策树、回归对车祸发生率影响因素可视化分析
|
8天前
|
机器学习/深度学习 分布式计算 监控
面经:MapReduce编程模型与优化策略详解
【4月更文挑战第10天】本文是关于MapReduce在大数据处理中的关键作用的博客摘要。作者分享了面试经验,强调了MapReduce的基本原理、Hadoop API、优化策略和应用场景。MapReduce包含Map和Reduce两个主要阶段,Map阶段处理输入数据生成中间键值对,Reduce阶段进行聚合计算。面试重点包括理解MapReduce工作流程、使用Hadoop API编写Map/Reduce函数、选择优化策略(如分区、Combiner和序列化)以及应用场景,如日志分析和机器学习。
23 2
|
8天前
|
分布式计算 监控 Hadoop
Hadoop【基础知识 02】【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
【4月更文挑战第3天】Hadoop【基础知识 02】【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
61 0

热门文章

最新文章