MapReduce编程案例之电商网站日志的行为分析

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 笔记

一、需求分析


1.网站数据分析的四个指标:


PV:PageView ,浏览量


用户每打开一个网页就会被记录1次浏览量,多次打开同一个页面浏览量累计加一


UV:Unique Visitor 独立访客数


同一用户多次访问,独立访客数只算一次


VV:visitor view,访客的访问次数


同一用户完成浏览并关闭该网站时,访客的访问次数算一次


IP:独立IP数


同一IP不管访问了几个页面,独立IP数均为1


2.各个省份PV的统计:

我们的需求分析是统计网站日志文件中的各省份ID出现的次数,比如客户A访问了该网站,省份ID是5,客户B的省份ID是6,客户C的省份ID是8,客户D的省份ID是10,客户E的省份ID是5,客户F的省份ID是6…这样分析类比过来其实就是词频统计,得出的结果也是ID为5的有2人,ID为6的为2人,ID为8的为1人…


根据需求分析,下面为mapreduce各阶段的数据类型

  map -> input  <LongWritable, Text>
  map -> output  <IntWritable, IntWritable>
  reduce -> input  <IntWritable, IntWritable>
  reduce -> output  <IntWritable, IntWritable>


二、程序编写


在编写程序之前,我们需要分析一下每行数据各字段的含义,在编写程序的过程中对数据进行预处理

20.png

示例代码:

package com.kfk.hadoop.mr;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/10/9
 * @time : 7:07 下午
 */
public class WebPV extends Configured implements Tool {
    /**
     * map
     * TODO
     */
    public static class WebPvMapper extends Mapper<LongWritable, Text,IntWritable, IntWritable>{
        // 创建map输出的对象
        private static final IntWritable mapOutValue = new IntWritable(1);
        private static final IntWritable mapOutKey = new IntWritable();
        @Override
        public void setup(Context context) throws IOException, InterruptedException {
            // TODO
        }
        @Override
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            // 每一行数据按"\t"分割
            String[] values = value.toString().split("\t");
            // 数据预处理:每一行数据少于30个字段就过滤掉
            if (values.length<30){
                // 统计每一行数据少于30个字段的个数
                context.getCounter("WEBPV_COUNTERS","LENGTH_LT30_COUNTER").increment(1);
                return;
            }
            String provinceIdValue = values[23];
            String url = values[1];
            // provinceIdValue为空的话将数据过滤掉
            if (StringUtils.isBlank(provinceIdValue)){
                // 统计provinceIdValue为空的个数
                context.getCounter("WEBPV_COUNTERS","PROVINCEID_ISBLACK_COUNTER").increment(1);
                return;
            }
            // url为空的话。将数据过滤掉
            if (StringUtils.isBlank(url)){
                // 统计url为空的个数
                context.getCounter("WEBPV_COUNTERS","URL_ISBLACK_COUNTER").increment(1);
                return;
            }
            // 如果provinceIdValue不能转换成Integer型,则将数据过滤
            int provinceId = 0;
            try{
                provinceId = Integer.valueOf(provinceIdValue);
            }catch (Exception e){
                // 统计provinceIdValue不能转换成Integer型的个数
                context.getCounter("WEBPV_COUNTERS","PROVINCEID_VALIDATE_COUNTER").increment(1);
                return;
            }
            // 将provinceId设置为map输出的key
            mapOutKey.set(provinceId);
            // map输出端的key和value
            context.write(mapOutKey,mapOutValue);
        }
        @Override
        public void cleanup(Context context) throws IOException, InterruptedException {
            // TODO
        }
    }
    /**
     * reduce
     * TODO
     */
    public static class WebPvReducer extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable>{
        // 创建reduce输出的对象
        private static final IntWritable reduceOutValue = new IntWritable();
        @Override
        public void setup(Context context) throws IOException, InterruptedException {
            // TODO
        }
        @Override
        public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            // 对reduce输入的value求和
            int sum = 0;
            for (IntWritable value:values){
                sum += value.get();
            }
            // 将sum设置为reduce输出的key
            reduceOutValue.set(sum);
            // reduce输出端的key和value
            context.write(key,reduceOutValue);
            // 打印出reduce输出端的key和value的值
            System.out.println("Reduce out == KeyOut: "+key+"   ValueOut: "+reduceOutValue);
        }
        @Override
        public void cleanup(Context context) throws IOException, InterruptedException {
            // TODO
        }
    }
    /**
     * run
     * @param args
     * @return
     * @throws IOException
     * @throws ClassNotFoundException
     * @throws InterruptedException
     */
    public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 1) get conf
        Configuration configuration = this.getConf();
        // 2) create job
        Job job = Job.getInstance(configuration,this.getClass().getSimpleName());
        job.setJarByClass(this.getClass());
        // 3.1) input,指定job的输入
        Path path = new Path(args[0]);
        FileInputFormat.addInputPath(job,path);
        // 3.2) map,指定job的mapper和输出的类型
        job.setMapperClass(WebPvMapper.class);
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(IntWritable.class);
        // 1.分区
        // job.setPartitionerClass();
        // 2.排序
        // job.setSortComparatorClass();
        // 3.combiner -可选项
        job.setCombinerClass(WebPvReducer.class);
        // 4.compress -可配置
        // configuration.set("mapreduce.map.output.compress","true");
        // 使用的SnappyCodec压缩算法
        // configuration.set("mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");
        // 5.分组
        // job.setGroupingComparatorClass();
        // 6.设置reduce的数量
        // job.setNumReduceTasks(2);
        // 3.3) reduce,指定job的reducer和输出类型
        job.setReducerClass(WebPvReducer.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(IntWritable.class);
        // 3.4) output,指定job的输出
        Path outpath = new Path(args[1]);
        FileOutputFormat.setOutputPath(job,outpath);
        // 4) commit,执行job
        boolean isSuccess = job.waitForCompletion(true);
        // 如果正常执行返回0,否则返回1
        return (isSuccess) ? 0 : 1;
    }
    public static void main(String[] args) {
        // 添加输入,输入参数
        args = new String[]{
            "hdfs://bigdata-pro-m01:9000/user/caizhengjie/datas/2015082818",
            "hdfs://bigdata-pro-m01:9000/user/caizhengjie/mr/output"
        };
//        WordCountUpMR wordCountUpMR = new WordCountUpMR();
        Configuration configuration = new Configuration();
        try {
            // 判断输出的文件存不存在,如果存在就将它删除
            Path fileOutPath = new Path(args[1]);
            FileSystem fileSystem = FileSystem.get(configuration);
            if (fileSystem.exists(fileOutPath)){
                fileSystem.delete(fileOutPath,true);
            }
            // 调用run方法
            int status = ToolRunner.run(configuration,new WebPV(),args);
            // 退出程序
            System.exit(status);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

运行结果:

21.png22.png

注意:No reduce task的情况

当没有reduce任务的时候,combiner是不生效。但是,map端的shuffle过程是有的。


相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
9天前
|
存储 SQL 监控
|
9天前
|
运维 监控 安全
|
12天前
|
监控 关系型数据库 MySQL
分析慢查询日志
【10月更文挑战第29天】分析慢查询日志
28 3
|
12天前
|
监控 关系型数据库 数据库
怎样分析慢查询日志?
【10月更文挑战第29天】怎样分析慢查询日志?
29 2
|
1月前
|
存储 缓存 关系型数据库
MySQL事务日志-Redo Log工作原理分析
事务的隔离性和原子性分别通过锁和事务日志实现,而持久性则依赖于事务日志中的`Redo Log`。在MySQL中,`Redo Log`确保已提交事务的数据能持久保存,即使系统崩溃也能通过重做日志恢复数据。其工作原理是记录数据在内存中的更改,待事务提交时写入磁盘。此外,`Redo Log`采用简单的物理日志格式和高效的顺序IO,确保快速提交。通过不同的落盘策略,可在性能和安全性之间做出权衡。
1619 14
|
1月前
|
存储 消息中间件 大数据
大数据-69 Kafka 高级特性 物理存储 实机查看分析 日志存储一篇详解
大数据-69 Kafka 高级特性 物理存储 实机查看分析 日志存储一篇详解
32 4
|
1月前
|
存储 数据可视化 安全
一个简单案例,带你看懂GC日志!
一个简单案例,带你看懂GC日志!
一个简单案例,带你看懂GC日志!
|
1月前
|
SQL 分布式计算 Hadoop
Hadoop-19 Flume Agent批量采集数据到HDFS集群 监听Hive的日志 操作则把记录写入到HDFS 方便后续分析
Hadoop-19 Flume Agent批量采集数据到HDFS集群 监听Hive的日志 操作则把记录写入到HDFS 方便后续分析
43 2
|
1月前
|
分布式计算 资源调度 数据可视化
Hadoop-06-Hadoop集群 历史服务器配置 超详细 执行任务记录 JobHistoryServer MapReduce执行记录 日志聚合结果可视化查看
Hadoop-06-Hadoop集群 历史服务器配置 超详细 执行任务记录 JobHistoryServer MapReduce执行记录 日志聚合结果可视化查看
34 1
|
2月前
|
Prometheus Cloud Native Go
Golang语言之Prometheus的日志模块使用案例
这篇文章是关于如何在Golang语言项目中使用Prometheus的日志模块的案例,包括源代码编写、编译和测试步骤。
51 3
Golang语言之Prometheus的日志模块使用案例