MapReduce编程案例之电商网站日志的行为分析

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 笔记

一、需求分析


1.网站数据分析的四个指标:


PV:PageView ,浏览量


用户每打开一个网页就会被记录1次浏览量,多次打开同一个页面浏览量累计加一


UV:Unique Visitor 独立访客数


同一用户多次访问,独立访客数只算一次


VV:visitor view,访客的访问次数


同一用户完成浏览并关闭该网站时,访客的访问次数算一次


IP:独立IP数


同一IP不管访问了几个页面,独立IP数均为1


2.各个省份PV的统计:

我们的需求分析是统计网站日志文件中的各省份ID出现的次数,比如客户A访问了该网站,省份ID是5,客户B的省份ID是6,客户C的省份ID是8,客户D的省份ID是10,客户E的省份ID是5,客户F的省份ID是6…这样分析类比过来其实就是词频统计,得出的结果也是ID为5的有2人,ID为6的为2人,ID为8的为1人…


根据需求分析,下面为mapreduce各阶段的数据类型

  map -> input  <LongWritable, Text>
  map -> output  <IntWritable, IntWritable>
  reduce -> input  <IntWritable, IntWritable>
  reduce -> output  <IntWritable, IntWritable>


二、程序编写


在编写程序之前,我们需要分析一下每行数据各字段的含义,在编写程序的过程中对数据进行预处理

20.png

示例代码:

package com.kfk.hadoop.mr;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/10/9
 * @time : 7:07 下午
 */
public class WebPV extends Configured implements Tool {
    /**
     * map
     * TODO
     */
    public static class WebPvMapper extends Mapper<LongWritable, Text,IntWritable, IntWritable>{
        // 创建map输出的对象
        private static final IntWritable mapOutValue = new IntWritable(1);
        private static final IntWritable mapOutKey = new IntWritable();
        @Override
        public void setup(Context context) throws IOException, InterruptedException {
            // TODO
        }
        @Override
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            // 每一行数据按"\t"分割
            String[] values = value.toString().split("\t");
            // 数据预处理:每一行数据少于30个字段就过滤掉
            if (values.length<30){
                // 统计每一行数据少于30个字段的个数
                context.getCounter("WEBPV_COUNTERS","LENGTH_LT30_COUNTER").increment(1);
                return;
            }
            String provinceIdValue = values[23];
            String url = values[1];
            // provinceIdValue为空的话将数据过滤掉
            if (StringUtils.isBlank(provinceIdValue)){
                // 统计provinceIdValue为空的个数
                context.getCounter("WEBPV_COUNTERS","PROVINCEID_ISBLACK_COUNTER").increment(1);
                return;
            }
            // url为空的话。将数据过滤掉
            if (StringUtils.isBlank(url)){
                // 统计url为空的个数
                context.getCounter("WEBPV_COUNTERS","URL_ISBLACK_COUNTER").increment(1);
                return;
            }
            // 如果provinceIdValue不能转换成Integer型,则将数据过滤
            int provinceId = 0;
            try{
                provinceId = Integer.valueOf(provinceIdValue);
            }catch (Exception e){
                // 统计provinceIdValue不能转换成Integer型的个数
                context.getCounter("WEBPV_COUNTERS","PROVINCEID_VALIDATE_COUNTER").increment(1);
                return;
            }
            // 将provinceId设置为map输出的key
            mapOutKey.set(provinceId);
            // map输出端的key和value
            context.write(mapOutKey,mapOutValue);
        }
        @Override
        public void cleanup(Context context) throws IOException, InterruptedException {
            // TODO
        }
    }
    /**
     * reduce
     * TODO
     */
    public static class WebPvReducer extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable>{
        // 创建reduce输出的对象
        private static final IntWritable reduceOutValue = new IntWritable();
        @Override
        public void setup(Context context) throws IOException, InterruptedException {
            // TODO
        }
        @Override
        public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            // 对reduce输入的value求和
            int sum = 0;
            for (IntWritable value:values){
                sum += value.get();
            }
            // 将sum设置为reduce输出的key
            reduceOutValue.set(sum);
            // reduce输出端的key和value
            context.write(key,reduceOutValue);
            // 打印出reduce输出端的key和value的值
            System.out.println("Reduce out == KeyOut: "+key+"   ValueOut: "+reduceOutValue);
        }
        @Override
        public void cleanup(Context context) throws IOException, InterruptedException {
            // TODO
        }
    }
    /**
     * run
     * @param args
     * @return
     * @throws IOException
     * @throws ClassNotFoundException
     * @throws InterruptedException
     */
    public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 1) get conf
        Configuration configuration = this.getConf();
        // 2) create job
        Job job = Job.getInstance(configuration,this.getClass().getSimpleName());
        job.setJarByClass(this.getClass());
        // 3.1) input,指定job的输入
        Path path = new Path(args[0]);
        FileInputFormat.addInputPath(job,path);
        // 3.2) map,指定job的mapper和输出的类型
        job.setMapperClass(WebPvMapper.class);
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(IntWritable.class);
        // 1.分区
        // job.setPartitionerClass();
        // 2.排序
        // job.setSortComparatorClass();
        // 3.combiner -可选项
        job.setCombinerClass(WebPvReducer.class);
        // 4.compress -可配置
        // configuration.set("mapreduce.map.output.compress","true");
        // 使用的SnappyCodec压缩算法
        // configuration.set("mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");
        // 5.分组
        // job.setGroupingComparatorClass();
        // 6.设置reduce的数量
        // job.setNumReduceTasks(2);
        // 3.3) reduce,指定job的reducer和输出类型
        job.setReducerClass(WebPvReducer.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(IntWritable.class);
        // 3.4) output,指定job的输出
        Path outpath = new Path(args[1]);
        FileOutputFormat.setOutputPath(job,outpath);
        // 4) commit,执行job
        boolean isSuccess = job.waitForCompletion(true);
        // 如果正常执行返回0,否则返回1
        return (isSuccess) ? 0 : 1;
    }
    public static void main(String[] args) {
        // 添加输入,输入参数
        args = new String[]{
            "hdfs://bigdata-pro-m01:9000/user/caizhengjie/datas/2015082818",
            "hdfs://bigdata-pro-m01:9000/user/caizhengjie/mr/output"
        };
//        WordCountUpMR wordCountUpMR = new WordCountUpMR();
        Configuration configuration = new Configuration();
        try {
            // 判断输出的文件存不存在,如果存在就将它删除
            Path fileOutPath = new Path(args[1]);
            FileSystem fileSystem = FileSystem.get(configuration);
            if (fileSystem.exists(fileOutPath)){
                fileSystem.delete(fileOutPath,true);
            }
            // 调用run方法
            int status = ToolRunner.run(configuration,new WebPV(),args);
            // 退出程序
            System.exit(status);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

运行结果:

21.png22.png

注意:No reduce task的情况

当没有reduce任务的时候,combiner是不生效。但是,map端的shuffle过程是有的。


相关实践学习
【涂鸦即艺术】基于云应用开发平台CAP部署AI实时生图绘板
【涂鸦即艺术】基于云应用开发平台CAP部署AI实时生图绘板
相关文章
|
Prometheus Cloud Native Go
Golang语言之Prometheus的日志模块使用案例
这篇文章是关于如何在Golang语言项目中使用Prometheus的日志模块的案例,包括源代码编写、编译和测试步骤。
261 4
Golang语言之Prometheus的日志模块使用案例
|
存储 数据可视化 安全
一个简单案例,带你看懂GC日志!
一个简单案例,带你看懂GC日志!
112 0
一个简单案例,带你看懂GC日志!
|
分布式计算 资源调度 数据可视化
Hadoop-06-Hadoop集群 历史服务器配置 超详细 执行任务记录 JobHistoryServer MapReduce执行记录 日志聚合结果可视化查看
Hadoop-06-Hadoop集群 历史服务器配置 超详细 执行任务记录 JobHistoryServer MapReduce执行记录 日志聚合结果可视化查看
217 1
|
设计模式 SQL 安全
PHP中的设计模式:单例模式的深入探索与实践在PHP的编程实践中,设计模式是解决常见软件设计问题的最佳实践。单例模式作为设计模式中的一种,确保一个类只有一个实例,并提供全局访问点,广泛应用于配置管理、日志记录和测试框架等场景。本文将深入探讨单例模式的原理、实现方式及其在PHP中的应用,帮助开发者更好地理解和运用这一设计模式。
在PHP开发中,单例模式通过确保类仅有一个实例并提供一个全局访问点,有效管理和访问共享资源。本文详细介绍了单例模式的概念、PHP实现方式及应用场景,并通过具体代码示例展示如何在PHP中实现单例模式以及如何在实际项目中正确使用它来优化代码结构和性能。
212 2
|
Python
Python编程实战:利用闭包与装饰器优化日志记录功能
【7月更文挑战第7天】Python的闭包和装饰器简化了日志记录。通过定义如`log_decorator`的装饰器,可以在不修改原函数代码的情况下添加日志功能。当@log_decorator用于`add(x, y)`函数时,调用时自动记录日志。进一步,`timestamp_log_decorator`展示了如何创建特定功能的装饰器,如添加时间戳。这些技术减少了代码冗余,提高了代码的可维护性。
207 1
|
SQL 关系型数据库 MySQL
MySQL数据库——索引(3)-索引语法(创建索引、查看索引、删除索引、案例演示),SQL性能分析(SQL执行频率,慢查询日志)
MySQL数据库——索引(3)-索引语法(创建索引、查看索引、删除索引、案例演示),SQL性能分析(SQL执行频率,慢查询日志)
286 2
|
域名解析 缓存 监控
【域名解析 DNS 专栏】DNS 查询日志分析:洞察网络行为与优化建议
【5月更文挑战第28天】DNS查询日志分析对于理解和优化网络行为至关重要。通过日志,可洞察用户访问偏好、流量分布,进而进行缓存优化、负载均衡和安全检测。简单Python代码示例展示了如何读取和分析日志。根据分析结果,可针对性设置优化策略,提升网络性能、稳定性和安全性。不断探索新的分析方法,充分挖掘DNS查询日志的价值,以驱动网络持续优化。
736 3
|
XML Java 数据格式
支付系统----微信支付20---创建案例项目--集成Mybatis-plus的补充,target下只有接口的编译文件,xml文件了,添加日志的写法
支付系统----微信支付20---创建案例项目--集成Mybatis-plus的补充,target下只有接口的编译文件,xml文件了,添加日志的写法
|
存储 Java 关系型数据库
基于JSP的九宫格日志网站
基于JSP的九宫格日志网站
|
Java 程序员
技术日志:揭秘Java编程 —— 抽象类与接口的隐藏力量!
【6月更文挑战第17天】在Java编程中,抽象类和接口如同内功心法,增强代码灵活性和维护性。抽象类`Course`定义共性属性和行为,如显示大纲,子类如`ProgrammingCourse`继承并实现细节。接口`Ratable`提供评分功能,允许不同课程以多态方式实现。通过抽象类和接口,代码组织更有序,系统扩展性更强,犹如武侠高手以平凡招式创出非凡武学。不断学习和探索这些技术,能提升编程技艺,应对复杂挑战。
122 0

热门文章

最新文章