Web 日志mapreduce 预处理清洗

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介:

WEB访问日志


即指用户访问网站时的所有访问、浏览、点击行为数据。比如点击了哪一个链接,在哪个网页停留时间最多,采用了哪个搜索项、总体浏览时间等。而所有这些信息都可被保存在网站日志中。通过分析这些数据,可以获知许多对网站运营至关重要的信息。采集的数据越全面,分析就能越精准。


  • 日志的生成渠道:
    1.是网站的web服务器所记录的web访问日志
    2.是通过在页面嵌入自定义的js代码来获取用户的所有访问行为(比如鼠标悬停的位置,点击的页面组件等),然后通过ajax请求到后台记录日志;这种方式所能采集的信息最全面;
    3.通过在页面上埋点1像素的图片,将相关页面访问信息请求到后台记录日志;

  • 日志数据内容详述:
    在实际操作中,有以下几个方面的数据可以被采集:
    1.访客的系统属性特征。比如所采用的操作系统、浏览器、域名和访问速度等。
    2.访问特征。包括停留时间、点击的URL等。
    3.来源特征。包括网络内容信息类型、内容分类和来访URL等。

  • 以网站点击日志为例,其点击日志格式如下:
    194.237.142.21 - - [18/Sep/2013:06:49:18 +0000] "GET /wp-content/uploads/2013/07/rstudio-git3.png HTTP/1.1" 304 0 "-" "Mozilla/4.0 (compatible;)"
    183.49.46.228 - - [18/Sep/2013:06:49:23 +0000] "-" 400 0 "-" "-"
    163.177.71.12 - - [18/Sep/2013:06:49:33 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"

  • 用于生成点击流的访问日志表
时间戳 IP地址 Cookie Session 请求URL Referal
2012-01-01 12:31:12 101.0.0.1 User01 S001 /a/... somesite.com
2012-01-01 12:31:16 201.0.0.2 User02 S002 /a/... -
2012-01-01 12:33:06 101.0.0.2 User03 S002 /b/... baidu.com
2012-01-01 15:16:39 234.0.0.3 User01 S003 /c/... google.com
2012-01-01 15:17:11 101.0.0.1 User01 S004 /d/... /c/...
2012-01-01 15:19:23 101.0.0.1 User01 S004 /e/... /d/....

至此开始正文,数据预处理


  • 主要目的:

    1. 过滤“不合规”数据
  • 2.格式转换和规整
  • 3.根据后续的统计需求,过滤分离出各种不同主题(不同栏目path)的基础数据

  • 实现方式:

  • 开发一个mr程序WeblogPreProcess

  • 第一个 WebLogBean.java 程序,定义 日志格式


/**
 * 对接外部数据的层,表结构定义最好跟外部数据源保持一致
 * 术语:贴源表
 * @author 
 *
 */
public class WebLogBean implements Writable {

    private boolean valid = true;       // 判断数据是否合法
    private String remote_addr;         // 记录客户端的ip地址
    private String remote_user;         // 记录客户端用户名称,忽略属性"-"
    private String time_local;              // 记录访问时间与时区
    private String request;                 // 记录请求的url与http协议
    private String status;                      // 记录请求状态;成功是200
    private String body_bytes_sent; // 记录发送给客户端文件主体内容大小
    private String http_referer;            // 用来记录从那个页面链接访问过来的
    private String http_user_agent; // 记录客户浏览器的相关信息

    public void set(boolean valid,String remote_addr, String remote_user, String time_local, String request, String status, String body_bytes_sent, String http_referer, String http_user_agent) {
        this.valid = valid;
        this.remote_addr = remote_addr;
        this.remote_user = remote_user;
        this.time_local = time_local;
        this.request = request;
        this.status = status;
        this.body_bytes_sent = body_bytes_sent;
        this.http_referer = http_referer;
        this.http_user_agent = http_user_agent;
    }

    public String getRemote_addr() {
        return remote_addr;
    }

    public void setRemote_addr(String remote_addr) {
        this.remote_addr = remote_addr;
    }

    public String getRemote_user() {
        return remote_user;
    }

    public void setRemote_user(String remote_user) {
        this.remote_user = remote_user;
    }

    public String getTime_local() {
        return this.time_local;
    }

    public void setTime_local(String time_local) {
        this.time_local = time_local;
    }

    public String getRequest() {
        return request;
    }

    public void setRequest(String request) {
        this.request = request;
    }

    public String getStatus() {
        return status;
    }

    public void setStatus(String status) {
        this.status = status;
    }

    public String getBody_bytes_sent() {
        return body_bytes_sent;
    }

    public void setBody_bytes_sent(String body_bytes_sent) {
        this.body_bytes_sent = body_bytes_sent;
    }

    public String getHttp_referer() {
        return http_referer;
    }

    public void setHttp_referer(String http_referer) {
        this.http_referer = http_referer;
    }

    public String getHttp_user_agent() {
        return http_user_agent;
    }

    public void setHttp_user_agent(String http_user_agent) {
        this.http_user_agent = http_user_agent;
    }

    public boolean isValid() {
        return valid;
    }

    public void setValid(boolean valid) {
        this.valid = valid;
    }

    @Override
    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append(this.valid);
        sb.append("\001").append(this.getRemote_addr());
        sb.append("\001").append(this.getRemote_user());
        sb.append("\001").append(this.getTime_local());
        sb.append("\001").append(this.getRequest());
        sb.append("\001").append(this.getStatus());
        sb.append("\001").append(this.getBody_bytes_sent());
        sb.append("\001").append(this.getHttp_referer());
        sb.append("\001").append(this.getHttp_user_agent());
        return sb.toString();
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.valid = in.readBoolean();
        this.remote_addr = in.readUTF();
        this.remote_user = in.readUTF();
        this.time_local = in.readUTF();
        this.request = in.readUTF();
        this.status = in.readUTF();
        this.body_bytes_sent = in.readUTF();
        this.http_referer = in.readUTF();
        this.http_user_agent = in.readUTF();

    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeBoolean(this.valid);
        out.writeUTF(null==remote_addr?"":remote_addr);
        out.writeUTF(null==remote_user?"":remote_user);
        out.writeUTF(null==time_local?"":time_local);
        out.writeUTF(null==request?"":request);
        out.writeUTF(null==status?"":status);
        out.writeUTF(null==body_bytes_sent?"":body_bytes_sent);
        out.writeUTF(null==http_referer?"":http_referer);
        out.writeUTF(null==http_user_agent?"":http_user_agent);

    }

}
  • 第二个 WebLogParser.java程序,筛选 和 转换 日期格式

public class WebLogParser {

    static SimpleDateFormat df1 = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.US);
    static SimpleDateFormat df2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US);

    public static WebLogBean parser(String line){
        WebLogBean WebLogBean = new WebLogBean();
        String[] arr = line.split(" ");
        if (arr.length > 11){
            WebLogBean.setRemote_addr(arr[0]);
            WebLogBean.setRemote_user(arr[1]);
            String time_local = formatDate(arr[3].substring(1));
            if(null==time_local) time_local="-invalid_time-";
            WebLogBean.setTime_local(time_local);
            WebLogBean.setRequest(arr[6]);
            WebLogBean.setStatus(arr[8]);
            WebLogBean.setBody_bytes_sent(arr[9]);
            WebLogBean.setHttp_referer(arr[10]);

            //如果useragent元素较多,拼接useragent
            if (arr.length > 12){
                StringBuilder sb = new StringBuilder();
                for (int i=11; i<arr.length;i++){
                    sb.append(arr[i]);
                }
                WebLogBean.setHttp_user_agent(sb.toString());
            } else {
                WebLogBean.setHttp_user_agent(arr[11]);
            }
            // 大于400,HTTP错误
            if (Integer.parseInt(WebLogBean.getStatus()) >= 400){
                WebLogBean.setValid(false);
            }

            if("-invalid_time-".equals(WebLogBean.getTime_local())){
                WebLogBean.setValid(false);
            }

        } else {
            WebLogBean.setValid(false);
        }
        return WebLogBean;
    }

    public static void filtStaticResource(WebLogBean bean,Set<String> pages) {
        if (!pages.contains(bean.getRequest())) {
            bean.setValid(false);
        } 
    }
    //更换日期的显示格式
    public static String formatDate(String time_local){

        try {
            return df2.format(df1.parse(time_local));

        } catch (ParseException e) {
            return null;
        }
    }
}

最后一个 WeblogPreProcess.java 主程序


/**
 * 处理原始日志,过滤出真实pv请求
 * 转换时间格式
 * 对缺失字段填充默认值
 * 对记录标记valid和invalid
 * 
 * @author 
 *
 */
public class WeblogPreProcess {

    static class WeblogPreProcessMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
        //用来存储网站url分类数据
        Set<String>pages=new HashSet<String>();
        Text k =new Text();
        NullWritable v = NullWritable.get();

        /**
         * 从外部加载网站url分类数据
         */
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            pages.add("/about");
            pages.add("/black-ip-list/");
            pages.add("/cassandra-clustor/");
            pages.add("/finance-rhive-repurchase/");
            pages.add("/hadoop-family-roadmap/");
            pages.add("/hadoop-hive-intro/");
            pages.add("/hadoop-zookeeper-intro/");
            pages.add("/hadoop-mahout-roadmap/");
        }

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            String line = value.toString();
            WebLogBean WebLogBean = WebLogParser.parser(line);
            //过滤 js/图片/css 等静态资源
            WebLogParser.filtStaticResource(WebLogBean, pages);
            /**
             if (!WebLogBean.isValid()) 
                 return; 
            */
            k.set(WebLogBean.toString());
            context.write(k, v);
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance();

        job.setJarByClass(WeblogPreProcess.class);

        job.setMapperClass(WeblogPreProcessMapper.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        //FileInputFormat.setInputPaths(job, new Path(args[0]));
        //FileOutputFormat.setOutputPath(job, new Path(args[1]));
        FileInputFormat.setInputPaths(job, new Path("E:/srcdata/webLog/input"));
        FileOutputFormat.setOutputPath(job, new Path("E:/srcdata/webLog/output"));

        boolean res = job.waitForCompletion(true);
        System.exit(res?0:1);
    }
}

  • 最后预清洗完的日志格式如下:
    true1.80.249.223-2013-09-18 07:57:33/hadoop-hive-intro/20014764"http://www.google.com.hk/url?sa=t&rct=j&q=hive%E7%9A%84%E5%AE%89%E8%A3%85&source=web&cd=2&ved=0CC4QFjAB&url=%68%74%74%70%3a%2f%2f%62%6c%6f%67%2e%66%65%6e%73%2e%6d%65%2f%68%61%64%6f%6f%70%2d%68%69%76%65%2d%69%6e%74%72%6f%2f&ei=5lw5Uo-2NpGZiQfCwoG4BA&usg=AFQjCNF8EFxPuCMrm7CvqVgzcBUzrJZStQ&bvm=bv.52164340,d.aGc&cad=rjt""Mozilla/5.0(WindowsNT5.2;rv:23.0)Gecko/20100101Firefox/23.0"
    true101.226.167.201-2013-09-18 09:30:36/hadoop-mahout-roadmap/20010335"http://blog.fens.me/hadoop-mahout-roadmap/""Mozilla/4.0(compatible;MSIE8.0;WindowsNT6.1;Trident/4.0;SLCC2;.NETCLR2.0.50727;.NETCLR3.5.30729;.NETCLR3.0.30729;MediaCenterPC6.0;MDDR;.NET4.0C;.NET4.0E;.NETCLR1.1.4322;TabletPC2.0);360Spider"
    true101.226.167.205-2013-09-18 09:30:32/hadoop-family-roadmap/20011715"http://blog.fens.me/hadoop-family-roadmap/""Mozilla/4.0(compatible;MSIE8.0;WindowsNT6.1;Trident/4.0;SLCC2;.NETCLR2.0.50727;.NETCLR3.5.30729;.NETCLR3.0.30729;MediaCenterPC6.0;MDDR;.NET4.0C;.NET4.0E;.NETCLR1.1.4322;TabletPC2.0);360Spider"
    true101.226.169.215-2013-09-18 10:07:31/about3015"http://blog.fens.me/about""Mozilla/4.0(compatible;MSIE8.0;WindowsNT6.1;Trident/4.0;SLCC2;.NETCLR2.0.50727;.NETCLR3.5.30729;.NETCLR3.0.30729;MediaCenterPC6.0;MDDR;.NET4.0C;.NET4.0E;.NETCLR1.1.4322;TabletPC2.0);360Spider"

    文章到这里就完成了,谢谢观看。



本文转自 SimplePoint 51CTO博客,原文链接:http://blog.51cto.com/2226894115/2058352,如需转载请自行联系原作者
相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
2月前
|
分布式计算 资源调度 数据可视化
Hadoop-06-Hadoop集群 历史服务器配置 超详细 执行任务记录 JobHistoryServer MapReduce执行记录 日志聚合结果可视化查看
Hadoop-06-Hadoop集群 历史服务器配置 超详细 执行任务记录 JobHistoryServer MapReduce执行记录 日志聚合结果可视化查看
52 1
|
2月前
|
数据采集 机器学习/深度学习 存储
使用 Python 清洗日志数据
使用 Python 清洗日志数据
49 2
|
4月前
|
存储 JSON 监控
FastAPI日志之谜:如何揭开Web应用监控与调试的面纱?
【8月更文挑战第31天】在现代Web开发中,日志记录对于监控应用状态、诊断问题和了解用户行为至关重要。FastAPI框架提供了强大的日志功能,使开发者能轻松集成日志记录。本文将详细介绍如何在FastAPI中设置和利用日志,包括基础配置、请求响应日志、错误处理和结构化日志等内容,帮助提升应用的可维护性和性能。
195 0
|
4月前
|
Prometheus 监控 Cloud Native
Web服务器的日志分析与监控
【8月更文第28天】Web服务器日志提供了关于服务器活动的重要信息,包括访问记录、错误报告以及性能数据。有效地分析这些日志可以帮助我们了解用户行为、诊断问题、优化网站性能,并确保服务的高可用性。本文将介绍如何使用日志分析和实时监控工具来监测Web服务器的状态和性能指标,并提供具体的代码示例。
539 0
|
5月前
|
开发框架 NoSQL 前端开发
在Winform项目和Web API的.NetCore项目中使用Serilog 来记录日志信息
在Winform项目和Web API的.NetCore项目中使用Serilog 来记录日志信息
|
4月前
【Azure 应用服务】通过 Web.config 开启 dotnet 应用的 stdoutLog 日志,查看App Service 产生500错误的原因
【Azure 应用服务】通过 Web.config 开启 dotnet 应用的 stdoutLog 日志,查看App Service 产生500错误的原因
|
4月前
|
存储 Linux 网络安全
【Azure 应用服务】App Service For Linux 如何在 Web 应用实例上住抓取网络日志
【Azure 应用服务】App Service For Linux 如何在 Web 应用实例上住抓取网络日志
|
4月前
【Azure 云服务】Azure Cloud Service 为 Web Role(IIS Host)增加自定义字段 (把HTTP Request Header中的User-Agent字段增加到IIS输出日志中)
【Azure 云服务】Azure Cloud Service 为 Web Role(IIS Host)增加自定义字段 (把HTTP Request Header中的User-Agent字段增加到IIS输出日志中)
|
4月前
|
消息中间件 监控 Kafka
实时计算 Flink版产品使用问题之怎么调整Flink Web U显示的日志行数
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4月前
|
开发框架 .NET API
如何在 ASP.NET Core Web Api 项目中应用 NLog 写日志?
如何在 ASP.NET Core Web Api 项目中应用 NLog 写日志?
227 0