MapReduce编程案例之电商网站日志的行为分析

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 笔记

一、需求分析


1.网站数据分析的四个指标:


PV:PageView ,浏览量


用户每打开一个网页就会被记录1次浏览量,多次打开同一个页面浏览量累计加一


UV:Unique Visitor 独立访客数


同一用户多次访问,独立访客数只算一次


VV:visitor view,访客的访问次数


同一用户完成浏览并关闭该网站时,访客的访问次数算一次


IP:独立IP数


同一IP不管访问了几个页面,独立IP数均为1


2.各个省份PV的统计:

我们的需求分析是统计网站日志文件中的各省份ID出现的次数,比如客户A访问了该网站,省份ID是5,客户B的省份ID是6,客户C的省份ID是8,客户D的省份ID是10,客户E的省份ID是5,客户F的省份ID是6…这样分析类比过来其实就是词频统计,得出的结果也是ID为5的有2人,ID为6的为2人,ID为8的为1人…


根据需求分析,下面为mapreduce各阶段的数据类型

  map -> input  <LongWritable, Text>
  map -> output  <IntWritable, IntWritable>
  reduce -> input  <IntWritable, IntWritable>
  reduce -> output  <IntWritable, IntWritable>


二、程序编写


在编写程序之前,我们需要分析一下每行数据各字段的含义,在编写程序的过程中对数据进行预处理

20.png

示例代码:

package com.kfk.hadoop.mr;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/10/9
 * @time : 7:07 下午
 */
public class WebPV extends Configured implements Tool {
    /**
     * map
     * TODO
     */
    public static class WebPvMapper extends Mapper<LongWritable, Text,IntWritable, IntWritable>{
        // 创建map输出的对象
        private static final IntWritable mapOutValue = new IntWritable(1);
        private static final IntWritable mapOutKey = new IntWritable();
        @Override
        public void setup(Context context) throws IOException, InterruptedException {
            // TODO
        }
        @Override
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            // 每一行数据按"\t"分割
            String[] values = value.toString().split("\t");
            // 数据预处理:每一行数据少于30个字段就过滤掉
            if (values.length<30){
                // 统计每一行数据少于30个字段的个数
                context.getCounter("WEBPV_COUNTERS","LENGTH_LT30_COUNTER").increment(1);
                return;
            }
            String provinceIdValue = values[23];
            String url = values[1];
            // provinceIdValue为空的话将数据过滤掉
            if (StringUtils.isBlank(provinceIdValue)){
                // 统计provinceIdValue为空的个数
                context.getCounter("WEBPV_COUNTERS","PROVINCEID_ISBLACK_COUNTER").increment(1);
                return;
            }
            // url为空的话。将数据过滤掉
            if (StringUtils.isBlank(url)){
                // 统计url为空的个数
                context.getCounter("WEBPV_COUNTERS","URL_ISBLACK_COUNTER").increment(1);
                return;
            }
            // 如果provinceIdValue不能转换成Integer型,则将数据过滤
            int provinceId = 0;
            try{
                provinceId = Integer.valueOf(provinceIdValue);
            }catch (Exception e){
                // 统计provinceIdValue不能转换成Integer型的个数
                context.getCounter("WEBPV_COUNTERS","PROVINCEID_VALIDATE_COUNTER").increment(1);
                return;
            }
            // 将provinceId设置为map输出的key
            mapOutKey.set(provinceId);
            // map输出端的key和value
            context.write(mapOutKey,mapOutValue);
        }
        @Override
        public void cleanup(Context context) throws IOException, InterruptedException {
            // TODO
        }
    }
    /**
     * reduce
     * TODO
     */
    public static class WebPvReducer extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable>{
        // 创建reduce输出的对象
        private static final IntWritable reduceOutValue = new IntWritable();
        @Override
        public void setup(Context context) throws IOException, InterruptedException {
            // TODO
        }
        @Override
        public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            // 对reduce输入的value求和
            int sum = 0;
            for (IntWritable value:values){
                sum += value.get();
            }
            // 将sum设置为reduce输出的key
            reduceOutValue.set(sum);
            // reduce输出端的key和value
            context.write(key,reduceOutValue);
            // 打印出reduce输出端的key和value的值
            System.out.println("Reduce out == KeyOut: "+key+"   ValueOut: "+reduceOutValue);
        }
        @Override
        public void cleanup(Context context) throws IOException, InterruptedException {
            // TODO
        }
    }
    /**
     * run
     * @param args
     * @return
     * @throws IOException
     * @throws ClassNotFoundException
     * @throws InterruptedException
     */
    public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 1) get conf
        Configuration configuration = this.getConf();
        // 2) create job
        Job job = Job.getInstance(configuration,this.getClass().getSimpleName());
        job.setJarByClass(this.getClass());
        // 3.1) input,指定job的输入
        Path path = new Path(args[0]);
        FileInputFormat.addInputPath(job,path);
        // 3.2) map,指定job的mapper和输出的类型
        job.setMapperClass(WebPvMapper.class);
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(IntWritable.class);
        // 1.分区
        // job.setPartitionerClass();
        // 2.排序
        // job.setSortComparatorClass();
        // 3.combiner -可选项
        job.setCombinerClass(WebPvReducer.class);
        // 4.compress -可配置
        // configuration.set("mapreduce.map.output.compress","true");
        // 使用的SnappyCodec压缩算法
        // configuration.set("mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");
        // 5.分组
        // job.setGroupingComparatorClass();
        // 6.设置reduce的数量
        // job.setNumReduceTasks(2);
        // 3.3) reduce,指定job的reducer和输出类型
        job.setReducerClass(WebPvReducer.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(IntWritable.class);
        // 3.4) output,指定job的输出
        Path outpath = new Path(args[1]);
        FileOutputFormat.setOutputPath(job,outpath);
        // 4) commit,执行job
        boolean isSuccess = job.waitForCompletion(true);
        // 如果正常执行返回0,否则返回1
        return (isSuccess) ? 0 : 1;
    }
    public static void main(String[] args) {
        // 添加输入,输入参数
        args = new String[]{
            "hdfs://bigdata-pro-m01:9000/user/caizhengjie/datas/2015082818",
            "hdfs://bigdata-pro-m01:9000/user/caizhengjie/mr/output"
        };
//        WordCountUpMR wordCountUpMR = new WordCountUpMR();
        Configuration configuration = new Configuration();
        try {
            // 判断输出的文件存不存在,如果存在就将它删除
            Path fileOutPath = new Path(args[1]);
            FileSystem fileSystem = FileSystem.get(configuration);
            if (fileSystem.exists(fileOutPath)){
                fileSystem.delete(fileOutPath,true);
            }
            // 调用run方法
            int status = ToolRunner.run(configuration,new WebPV(),args);
            // 退出程序
            System.exit(status);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

运行结果:

21.png22.png

注意:No reduce task的情况

当没有reduce任务的时候,combiner是不生效。但是,map端的shuffle过程是有的。


相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
3月前
|
分布式计算 资源调度 数据可视化
Hadoop-06-Hadoop集群 历史服务器配置 超详细 执行任务记录 JobHistoryServer MapReduce执行记录 日志聚合结果可视化查看
Hadoop-06-Hadoop集群 历史服务器配置 超详细 执行任务记录 JobHistoryServer MapReduce执行记录 日志聚合结果可视化查看
69 1
|
4月前
|
设计模式 SQL 安全
PHP中的设计模式:单例模式的深入探索与实践在PHP的编程实践中,设计模式是解决常见软件设计问题的最佳实践。单例模式作为设计模式中的一种,确保一个类只有一个实例,并提供全局访问点,广泛应用于配置管理、日志记录和测试框架等场景。本文将深入探讨单例模式的原理、实现方式及其在PHP中的应用,帮助开发者更好地理解和运用这一设计模式。
在PHP开发中,单例模式通过确保类仅有一个实例并提供一个全局访问点,有效管理和访问共享资源。本文详细介绍了单例模式的概念、PHP实现方式及应用场景,并通过具体代码示例展示如何在PHP中实现单例模式以及如何在实际项目中正确使用它来优化代码结构和性能。
66 2
|
5月前
|
分布式计算 大数据 Hadoop
揭秘MapReduce背后的魔法:从基础类型到高级格式,带你深入理解这一大数据处理利器的奥秘与实战技巧,让你从此不再是编程门外汉!
【8月更文挑战第17天】MapReduce作为分布式计算模型,是大数据处理的基石。它通过Map和Reduce函数处理大规模数据集,简化编程模型,使开发者聚焦业务逻辑。MapReduce分单阶段和多阶段,支持多种输入输出格式如`TextInputFormat`和`SequenceFileInputFormat`。例如,简单的单词计数程序利用`TextInputFormat`读取文本行并计数;而`SequenceFileInputFormat`适用于高效处理二进制序列文件。合理选择类型和格式可有效解决大数据问题。
91 1
|
7月前
|
分布式计算 Hadoop Java
MapReduce编程模型——在idea里面邂逅CDH MapReduce
MapReduce编程模型——在idea里面邂逅CDH MapReduce
106 15
|
6月前
|
Python
Python编程实战:利用闭包与装饰器优化日志记录功能
【7月更文挑战第7天】Python的闭包和装饰器简化了日志记录。通过定义如`log_decorator`的装饰器,可以在不修改原函数代码的情况下添加日志功能。当@log_decorator用于`add(x, y)`函数时,调用时自动记录日志。进一步,`timestamp_log_decorator`展示了如何创建特定功能的装饰器,如添加时间戳。这些技术减少了代码冗余,提高了代码的可维护性。
81 1
|
7月前
|
分布式计算 Hadoop Java
Hadoop MapReduce编程
该教程指导编写Hadoop MapReduce程序处理天气数据。任务包括计算每个城市ID的最高、最低气温、气温出现次数和平均气温。在读取数据时需忽略表头,且数据应为整数。教程中提供了环境变量设置、Java编译、jar包创建及MapReduce执行的步骤说明,但假设读者已具备基础操作技能。此外,还提到一个扩展练习,通过分区功能将具有相同尾数的数字分组到不同文件。
75 1
|
7月前
|
存储 Java 关系型数据库
基于JSP的九宫格日志网站
基于JSP的九宫格日志网站
|
7月前
|
存储 分布式计算 Hadoop
MapReduce编程模型——自定义序列化类实现多指标统计
MapReduce编程模型——自定义序列化类实现多指标统计
60 0
|
7月前
|
机器学习/深度学习 分布式计算 并行计算
MapReduce是一种用于并行计算的编程模型和处理大规模数据集的实现
MapReduce是一种用于并行计算的编程模型和处理大规模数据集的实现
106 0
|
7月前
|
Java 程序员
技术日志:揭秘Java编程 —— 抽象类与接口的隐藏力量!
【6月更文挑战第17天】在Java编程中,抽象类和接口如同内功心法,增强代码灵活性和维护性。抽象类`Course`定义共性属性和行为,如显示大纲,子类如`ProgrammingCourse`继承并实现细节。接口`Ratable`提供评分功能,允许不同课程以多态方式实现。通过抽象类和接口,代码组织更有序,系统扩展性更强,犹如武侠高手以平凡招式创出非凡武学。不断学习和探索这些技术,能提升编程技艺,应对复杂挑战。
51 0

热门文章

最新文章