大数据Spark对SogouQ日志分析

简介: 大数据Spark对SogouQ日志分析

1 业务需求

使用搜狗实验室提供【用户查询日志(SogouQ)】数据,使用Spark框架,将数据封装到RDD中

进行业务数据处理分析。数据网址:http://www.sogou.com/labs/resource/q.php



  • 1)、数据介绍:搜索引擎查询日志库设计为包括约1个月(2008年6月)Sogou搜索引擎部分网页
    查询需求及用户点击情况的网页查询日志数据集合。
  • 2)、数据格式
  1. 访问时间\t用户ID\t[查询词]\t该URL在返回结果中的排名\t用户点击的顺序号\t用户点击的URL
  2. 用户ID是根据用户使用浏览器访问搜索引擎时的Cookie信息自动赋值,即同一次使用浏览
    器输入的不同查询对应同一个用户ID
  • 3)、数据下载:分为三个数据集,大小不一样
  1. 迷你版(样例数据, 376KB): http://download.labs.sogou.com/dl/sogoulabdown/SogouQ/SogouQ.mini.zip
  2. 精简版(1天数据,63MB): http://download.labs.sogou.com/dl/sogoulabdown/SogouQ/SogouQ.reduced.zip
  1. 完整版(1.9GB): http://www.sogou.com/labs/resource/ftp.php?dir=/Data/SogouQ/SogouQ.zip

针对SougoQ用户查询日志数据中不同字段,不同业务进行统计分析:

使用SparkContext读取日志数据,封装到RDD数据集中,调用Transformation函数和Action函数处理分析,灵活掌握Scala语言编程。

2 准备工作

在编程实现业务功能之前,首先考虑如何对【查询词】进行中文分词及将日志数据解析封装。

2.1 HanLP 中文分词

使用比较流行好用中文分词:HanLP,面向生产环境的自然语言处理工具包,HanLP 是由一

系列模型与算法组成的 Java 工具包,目标是普及自然语言处理在生产环境中的应用。

官方网站:http://www.hanlp.com/,添加Maven依赖

<!-- https://mvnrepository.com/artifact/com.hankcs/hanlp -->
<dependency>
<groupId>com.hankcs</groupId>
<artifactId>hanlp</artifactId>
<version>portable-1.7.7</version>
</dependency>

演示范例:HanLP 入门案例,基本使用

import java.util
import com.hankcs.hanlp.HanLP
import com.hankcs.hanlp.seg.common.Term
import com.hankcs.hanlp.tokenizer.StandardTokenizer
import scala.collection.JavaConverters._
/**
 * HanLP 入门案例,基本使用
 */
object HanLPTest {
  def main(args: Array[String]): Unit = {
    // 入门Demo
    val terms: util.List[Term] = HanLP.segment("杰克奥特曼全集视频")
    println(terms)
    println(terms.asScala.map(_.word.trim))
    // 标准分词
    val terms1: util.List[Term] = StandardTokenizer.segment("放假++端午++重阳")
    println(terms1)
    println(terms1.asScala.map(_.word.replaceAll("\\s+", "")))
    val words: Array[String] =
      """00:00:00 2982199073774412 [360安全卫
士] 8 3 download.it.com.cn/softweb/software/firewall/antivirus/20067/17938.html"""
        .split("\\s+")
    words.foreach(println)
    println(words(2).replaceAll("\\[|\\]", ""))
  }
}

2.2 样例类 SogouRecord

将每行日志数据封装到CaseClass样例类SogouRecord中,方便后续处理:

/**
 * 用户搜索点击网页记录Record
 *
 * @param queryTime  访问时间,格式为:HH:mm:ss
 * @param userId     用户ID
 * @param queryWords 查询词
 * @param resultRank 该URL在返回结果中的排名
 * @param clickRank  用户点击的顺序号
 * @param clickUrl   用户点击的URL
 */
case class SogouRecord(
                        queryTime: String, //
                        userId: String, //
                        queryWords: String, //
                        resultRank: Int, //
                        clickRank: Int, //
                        clickUrl: String //
                      )

3 业务实现

先读取数据,封装到SougoRecord类中,再按照业务处理数据。

3.1 读取数据

构建SparkContext实例对象,读取本次SogouQ.sample数据,封装到SougoRecord中 。

// TODO: 1. 本地读取SogouQ用户查询日志数据
//val rawLogsRDD: RDD[String] = sc.textFile("datas/sogou/SogouQ.sample")
val rawLogsRDD: RDD[String] = sc.textFile("datas/sogou/SogouQ.reduced")
//println(s"Count = ${rawLogsRDD.count()}")
// TODO: 2. 解析数据,封装到CaseClass样例类中
val recordsRDD: RDD[SogouRecord] = rawLogsRDD
  // 过滤不合法数据,如null,分割后长度不等于6
  .filter(log => null != log && log.trim.split("\\s+").length == 6)
  // 对每个分区中数据进行解析,封装到SogouRecord
  .mapPartitions { iter =>
    iter.map { log =>
      val arr: Array[String] = log.trim.split("\\s+")
      SogouRecord(
        arr(0), arr(1), arr(2).replaceAll("\\[|\\]", ""), //
        arr(3).toInt, arr(4).toInt, arr(5) //
      )
    }
  }
println(s"Count = ${recordsRDD.count()}, First = ${recordsRDD.first()}")

3.2 搜索关键词统计

获取用户【查询词】,使用HanLP进行分词,按照单词分组聚合统计出现次数,类似WordCount

程序,具体代码如下:

// =================== 3.1 搜索关键词统计 ===================
// a. 获取搜索词,进行中文分词
val wordsRDD: RDD[String] = recordsRDD.mapPartitions { iter =>
  iter.flatMap { record =>
    // 使用HanLP中文分词库进行分词
    val terms: util.List[Term] = HanLP.segment(record.queryWords.trim)
    // 将Java中集合对转换为Scala中集合对象
    import scala.collection.JavaConverters._
    terms.asScala.map(term => term.word)
  }
}
//println(s"Count = ${wordsRDD.count()}, Example = ${wordsRDD.take(5).mkString(",")}")
// b. 统计搜索词出现次数,获取次数最多Top10
val top10SearchWords: Array[(Int, String)] = wordsRDD
  .map(word => (word, 1)) // 每个单词出现一次
  .reduceByKey((tmp, item) => tmp + item) // 分组统计次数
  .map(tuple => tuple.swap)
  .sortByKey(ascending = false) // 词频降序排序
  .take(10) // 获取前10个搜索词
top10SearchWords.foreach(println)

运行结果如下,仅仅显示搜索最多关键词,其中需要过滤谓词:

3.3 用户搜索点击统计

统计出每个用户每个搜索词点击网页的次数,可以作为搜索引擎搜索效果评价指标。先按照用

户ID分组,再按照【查询词】分组,最后统计次数,求取最大次数、最小次数及平均次数。

// =================== 3.2 用户搜索点击次数统计 ===================
/*
每个用户在搜索引擎输入关键词以后,统计点击网页数目,反应搜索引擎准确度
先按照用户ID分组,再按照搜索词分组,统计出每个用户每个搜索词点击网页个数
*/
val clickCountRDD: RDD[((String, String), Int)] = recordsRDD
  .map { record =>
    // 获取用户ID和搜索词
    val key = record.userId -> record.queryWords
    (key, 1)
  }
  // 按照用户ID和搜索词组合的Key分组聚合
  .reduceByKey((tmp, item) => tmp + item)
clickCountRDD
  .sortBy(tuple => tuple._2, ascending = false)
  .take(10).foreach(println)
println(s"Max Click Count = ${clickCountRDD.map(_._2).max()}")
println(s"Min Click Count = ${clickCountRDD.map(_._2).min()}")
println(s"Avg Click Count = ${clickCountRDD.map(_._2).mean()}")

程序运行结果如下:

3.4 搜索时间段统计

按照【访问时间】字段获取【小时】,分组统计各个小时段用户查询搜索的数量,进一步观察

用户喜欢在哪些时间段上网,使用搜狗引擎搜索,代码如下:

// =================== 3.3 搜索时间段统计 ===================
/*
从搜索时间字段获取小时,统计个小时搜索次数
*/
val hourSearchRDD: RDD[(String, Int)] = recordsRDD
  // 提取小时
  .map { record =>
    // 03:12:50
    record.queryTime.substring(0, 2)
  }
  // 分组聚合
  .map(word => (word, 1)) // 每个单词出现一次
  .reduceByKey((tmp, item) => tmp + item) // 分组统计次数
  .sortBy(tuple => tuple._2, ascending = false)
hourSearchRDD.foreach(println)

程序运行结果如下:

3.5 完整代码

业务实现完整代码SogouQueryAnalysis如下所示:、

import java.util
import com.hankcs.hanlp.HanLP
import com.hankcs.hanlp.seg.common.Term
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
/**
 * 用户查询日志(SogouQ)分析,数据来源Sogou搜索引擎部分网页查询需求及用户点击情况的网页查询日志数据集合。
 * 1. 搜索关键词统计,使用HanLP中文分词
 * 2. 用户搜索次数统计
 * 3. 搜索时间段统计
 * 数据格式:
 * 访问时间\t用户ID\t[查询词]\t该URL在返回结果中的排名\t用户点击的顺序号\t用户点击的URL
 * 其中,用户ID是根据用户使用浏览器访问搜索引擎时的Cookie信息自动赋值,即同一次使用浏览器输入的不同查询对
 * 应同一个用户ID
 */
object SogouQueryAnalysis {
  def main(args: Array[String]): Unit = {
    // 构建SparkContext上下文实例对象
    val sc: SparkContext = {
      // a. 创建SparkConf对象,设置应用配置信息
      val sparkConf = new SparkConf()
        .setMaster("local[2]")
        .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
      // b. 创建SparkContext, 有就获取,没有就创建,建议使用
      val context = SparkContext.getOrCreate(sparkConf)
      // c. 返回对象
      context
    }
    sc.setLogLevel("WARN")
    // TODO: 1. 本地读取SogouQ用户查询日志数据
    //val rawLogsRDD: RDD[String] = sc.textFile("datas/sogou/SogouQ.sample")
    val rawLogsRDD: RDD[String] = sc.textFile("datas/sogou/SogouQ.reduced")
    //println(s"Count = ${rawLogsRDD.count()}")
    // TODO: 2. 解析数据,封装到CaseClass样例类中
    val recordsRDD: RDD[SogouRecord] = rawLogsRDD
      // 过滤不合法数据,如null,分割后长度不等于6
      .filter(log => null != log && log.trim.split("\\s+").length == 6)
      // 对每个分区中数据进行解析,封装到SogouRecord
      .mapPartitions { iter =>
        iter.map { log =>
          val arr: Array[String] = log.trim.split("\\s+")
          SogouRecord(
            arr(0), arr(1), arr(2).replaceAll("\\[|\\]", ""), //
            arr(3).toInt, arr(4).toInt, arr(5) //
          )
        }
      }
    println(s"Count = ${recordsRDD.count()}, First = ${recordsRDD.first()}")
    // 数据使用多次,进行缓存操作,使用count触发
    recordsRDD.persist(StorageLevel.MEMORY_AND_DISK).count()
    // TODO: 3. 依据需求统计分析
    /*
    1. 搜索关键词统计,使用HanLP中文分词
    2. 用户搜索次数统计
    3. 搜索时间段统计
    */
    // =================== 3.1 搜索关键词统计 ===================
    // a. 获取搜索词,进行中文分词
    val wordsRDD: RDD[String] = recordsRDD.mapPartitions { iter =>
      iter.flatMap { record =>
        // 使用HanLP中文分词库进行分词
        val terms: util.List[Term] = HanLP.segment(record.queryWords.trim)
        // 将Java中集合对转换为Scala中集合对象
        import scala.collection.JavaConverters._
        terms.asScala.map(term => term.word)
      }
    }
    //println(s"Count = ${wordsRDD.count()}, Example = ${wordsRDD.take(5).mkString(",")}")
    // b. 统计搜索词出现次数,获取次数最多Top10
    val top10SearchWords: Array[(Int, String)] = wordsRDD
      .map(word => (word, 1)) // 每个单词出现一次
      .reduceByKey((tmp, item) => tmp + item) // 分组统计次数
      .map(tuple => tuple.swap)
      .sortByKey(ascending = false) // 词频降序排序
      .take(10) // 获取前10个搜索词
    top10SearchWords.foreach(println)
    // =================== 3.2 用户搜索点击次数统计 ===================
    /*
    每个用户在搜索引擎输入关键词以后,统计点击网页数目,反应搜索引擎准确度
    先按照用户ID分组,再按照搜索词分组,统计出每个用户每个搜索词点击网页个数
    */
    val clickCountRDD: RDD[((String, String), Int)] = recordsRDD
      .map { record =>
        // 获取用户ID和搜索词
        val key = record.userId -> record.queryWords
        (key, 1)
      }
      // 按照用户ID和搜索词组合的Key分组聚合
      .reduceByKey((tmp, item) => tmp + item)
    clickCountRDD
      .sortBy(tuple => tuple._2, ascending = false)
      .take(10).foreach(println)
    println(s"Max Click Count = ${clickCountRDD.map(_._2).max()}")
    println(s"Min Click Count = ${clickCountRDD.map(_._2).min()}")
    println(s"Avg Click Count = ${clickCountRDD.map(_._2).mean()}")
    // =================== 3.3 搜索时间段统计 ===================
    /*
    从搜索时间字段获取小时,统计个小时搜索次数
    */
    val hourSearchRDD: RDD[(String, Int)] = recordsRDD
      // 提取小时
      .map { record =>
        // 03:12:50
        record.queryTime.substring(0, 2)
      }
      // 分组聚合
      .map(word => (word, 1)) // 每个单词出现一次
      .reduceByKey((tmp, item) => tmp + item) // 分组统计次数
      .sortBy(tuple => tuple._2, ascending = false)
    hourSearchRDD.foreach(println)
    // 释放缓存数据
    recordsRDD.unpersist()
    // 应用结束,关闭资源
    sc.stop()
  }
}
相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
3月前
|
机器学习/深度学习 SQL 分布式计算
Apache Spark 的基本概念和在大数据分析中的应用
介绍 Apache Spark 的基本概念和在大数据分析中的应用
159 0
|
15天前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。
|
3月前
|
机器学习/深度学习 SQL 分布式计算
介绍 Apache Spark 的基本概念和在大数据分析中的应用。
介绍 Apache Spark 的基本概念和在大数据分析中的应用。
|
3天前
|
分布式计算 大数据 数据处理
[AIGC大数据基础] Spark 入门
[AIGC大数据基础] Spark 入门
|
2月前
|
分布式计算 大数据 Java
Spark 大数据实战:基于 RDD 的大数据处理分析
Spark 大数据实战:基于 RDD 的大数据处理分析
122 0
|
3月前
|
分布式计算 监控 大数据
Spark RDD分区和数据分布:优化大数据处理
Spark RDD分区和数据分布:优化大数据处理
|
3月前
|
canal 消息中间件 关系型数据库
大数据数据库增量日志采集之Canal
大数据数据库增量日志采集之Canal
|
25天前
|
Java
使用Java代码打印log日志
使用Java代码打印log日志
81 1
|
26天前
|
Linux Shell
Linux手动清理Linux脚本日志定时清理日志和log文件执行表达式
Linux手动清理Linux脚本日志定时清理日志和log文件执行表达式
78 1
|
30天前
|
SQL 关系型数据库 MySQL
MySQL数据库,可以使用二进制日志(binary log)进行时间点恢复
对于MySQL数据库,可以使用二进制日志(binary log)进行时间点恢复。二进制日志是MySQL中记录所有数据库更改操作的日志文件。要进行时间点恢复,您需要执行以下步骤: 1. 确保MySQL配置文件中启用了二进制日志功能。在配置文件(通常是my.cnf或my.ini)中找到以下行,并确保没有被注释掉: Copy code log_bin = /path/to/binary/log/file 2. 在需要进行恢复的时间点之前创建一个数据库备份。这将作为恢复的基准。 3. 找到您要恢复到的时间点的二进制日志文件和位置。可以通过执行以下命令来查看当前的二进制日志文件和位
100 1