六十五、Spark-综合案例(搜狗搜索日志分析)

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 六十五、Spark-综合案例(搜狗搜索日志分析)

搜狗搜索日志官网:http://www.sogou.com/labs/resource/q.php


37.png

迷你版日志下载链接:http://download.labs.sogou.com/dl/sogoulabdown/SogouQ/SogouQ.mini.zip

38.png


        注:由于进行测试使用,迷你版数据就可以满足需求


原数据展示


39.png

       注:原数据存在10000条 ,字段分别为:访问时间 \t 用户ID \t [查询词] \t 该URL在返回结果中的排名 \t 用户点击的顺序号 \t  用户点击的URL


业务需求


需求说明: 对SougouSearchLog进行分词并统计如下指标:


1.热门搜索词

2.用户热门搜索词(带上用户id)

3.各个时间段搜索热度


40.png


业务逻辑


业务逻辑:针对SougoQ用户查询日志数据中不同字段,使用SparkContext读取日志数据,封装到RDD数据集中,调用Transformation函数和Action函数进行处理不同业务统计分析


分词工具


HanLP官网:http://www.sogou.com/labs/resource/q.php

41.png

42.png



        HanLP主要功能:基于HanLP最新技术,使用亿级通用语料库训练,直接API调用,简单高效!


Maven依赖


<dependency>
<groupId>com.hankcs</groupId>
<artifactId>hanlp</artifactId>
<version>portable-1.7.7</version>
</dependency>

HanLP入门案例


package org.example.spark
import java.util
import com.hankcs.hanlp.HanLP
import com.hankcs.hanlp.seg.common.Term
/**
 * Author tuomasi
 * Desc HanLP入门案例
 */
object HanLPTest {
  def main(args: Array[String]): Unit = {
    val words = "[HanLP入门案例]"
    val terms: util.List[Term] = HanLP.segment(words) //分段
    println(terms) //直接打印java的list:[[/w, HanLP/nx, 入门/vn, 案例/n, ]/w]
    import scala.collection.JavaConverters._
    println(terms.asScala.map(_.word)) //转为scala的list:ArrayBuffer([, HanLP, 入门, 案例, ])
    val cleanWords1: String = words.replaceAll("\\[|\\]", "") //将"["或"]"替换为空"" //"HanLP入门案例"
    println(cleanWords1) //HanLP入门案例
    println(HanLP.segment(cleanWords1).asScala.map(_.word)) //ArrayBuffer(HanLP, 入门, 案例)
    val log = """00:00:00 2982199073774412    [360安全卫士]   8 3 download.it.com.cn/softweb/software/firewall/antivirus/20067/17938.html"""
    val cleanWords2 = log.split("\\s+")(2) //[360安全卫士]
      .replaceAll("\\[|\\]", "") //360安全卫士
    println(HanLP.segment(cleanWords2).asScala.map(_.word)) //ArrayBuffer(360, 安全卫士)
  }
}

控制台打印效果


43.png


代码实现


package org.example.spark
import com.hankcs.hanlp.HanLP
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import shapeless.record
import spire.std.tuples
import scala.collection.immutable.StringOps
import scala.collection.mutable
/**
 * Author tuomasi
 * Desc 需求:对SougouSearchLog进行分词并统计如下指标:
 * 1.热门搜索词
 * 2.用户热门搜索词(带上用户id)
 * 3.各个时间段搜索热度
 */
object SougouSearchLogAnalysis {
  def main(args: Array[String]): Unit = {
    //TODO 0.准备环境
    val conf: SparkConf = new SparkConf().setAppName("spark").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)
    sc.setLogLevel("WARN")
    //TODO 1.加载数据
    val lines: RDD[String] = sc.textFile("data/input/SogouQ.sample")
    //TODO 2.处理数据
    //封装数据
    val SogouRecordRDD: RDD[SogouRecord] = lines.map(line => { //map是一个进去一个出去
      val arr: Array[String] = line.split("\\s+")
      SogouRecord(
        arr(0),
        arr(1),
        arr(2),
        arr(3).toInt,
        arr(4).toInt,
        arr(5)
      )
    })
    //切割数据
    /* val wordsRDD0: RDD[mutable.Buffer[String]] = SogouRecordRDD.map(record => {
       val wordsStr: String = record.queryWords.replaceAll("\\[|\\]", "") //360安全卫士
       import scala.collection.JavaConverters._ //将Java集合转为scala集合
       HanLP.segment(wordsStr).asScala.map(_.word) //ArrayBuffer(360, 安全卫士)
     })*/
    val wordsRDD: RDD[String] = SogouRecordRDD.flatMap(record => { //flatMap是一个进去,多个出去(出去之后会被压扁) //360安全卫士==>[360, 安全卫士]
      val wordsStr: String = record.queryWords.replaceAll("\\[|\\]", "") //360安全卫士
      import scala.collection.JavaConverters._ //将Java集合转为scala集合
      HanLP.segment(wordsStr).asScala.map(_.word) //ArrayBuffer(360, 安全卫士)
    })
    //TODO 3.统计指标
    //--1.热门搜索词
    val result1: Array[(String, Int)] = wordsRDD
      .filter(word => !word.equals(".") && !word.equals("+"))
      .map((_, 1))
      .reduceByKey(_ + _)
      .sortBy(_._2, false)
      .take(10)
    //--2.用户热门搜索词(带上用户id)
    val userIdAndWordRDD: RDD[(String, String)] = SogouRecordRDD.flatMap(record => { //flatMap是一个进去,多个出去(出去之后会被压扁) //360安全卫士==>[360, 安全卫士]
      val wordsStr: String = record.queryWords.replaceAll("\\[|\\]", "") //360安全卫士
      import scala.collection.JavaConverters._ //将Java集合转为scala集合
      val words: mutable.Buffer[String] = HanLP.segment(wordsStr).asScala.map(_.word) //ArrayBuffer(360, 安全卫士)
      val userId: String = record.userId
      words.map(word => (userId, word))
    })
    val result2: Array[((String, String), Int)] = userIdAndWordRDD
      .filter(t => !t._2.equals(".") && !t._2.equals("+"))
      .map((_, 1))
      .reduceByKey(_ + _)
      .sortBy(_._2, false)
      .take(10)
    //--3.各个时间段搜索热度
    val result3: Array[(String, Int)] = SogouRecordRDD.map(record => {
      val timeStr: String = record.queryTime
      val hourAndMitunesStr: String = timeStr.substring(0, 5)
      (hourAndMitunesStr, 1)
    }).reduceByKey(_ + _)
      .sortBy(_._2, false)
      .take(10)
    //TODO 4.输出结果
    result1.foreach(println)
    result2.foreach(println)
    result3.foreach(println)
    //TODO 5.释放资源
    sc.stop()
  }
  //准备一个样例类用来封装数据
  /**
   * 用户搜索点击网页记录Record
   *
   * @param queryTime  访问时间,格式为:HH:mm:ss
   * @param userId     用户ID
   * @param queryWords 查询词
   * @param resultRank 该URL在返回结果中的排名
   * @param clickRank  用户点击的顺序号
   * @param clickUrl   用户点击的URL
   */
  case class SogouRecord(
                          queryTime: String,
                          userId: String,
                          queryWords: String,
                          resultRank: Int,
                          clickRank: Int,
                          clickUrl: String
                        )
}

效果展现


44.png

        注:对SougouSearchLog进行分词并统计出了如下指标,热门搜索词,用户热门搜索词(带上用户id),各个时间段搜索热度,此效果与预期想法基本一致


相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
1月前
|
分布式计算 大数据 Java
大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友
大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友
49 5
|
1月前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
48 3
|
1月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
60 0
|
3月前
|
存储 搜索推荐 大数据
阿里泛日志设计与实践问题之schema-on-read技术的发展对日志搜索的影响是啥,如何解决
阿里泛日志设计与实践问题之schema-on-read技术的发展对日志搜索的影响是啥,如何解决
|
1月前
|
存储 数据可视化 安全
一个简单案例,带你看懂GC日志!
一个简单案例,带你看懂GC日志!
一个简单案例,带你看懂GC日志!
|
1月前
|
分布式计算 大数据 Spark
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(二)
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(二)
39 1
|
2月前
|
Prometheus Cloud Native Go
Golang语言之Prometheus的日志模块使用案例
这篇文章是关于如何在Golang语言项目中使用Prometheus的日志模块的案例,包括源代码编写、编译和测试步骤。
51 3
Golang语言之Prometheus的日志模块使用案例
|
1月前
|
设计模式 数据采集 分布式计算
企业spark案例 —出租车轨迹分析
企业spark案例 —出租车轨迹分析
62 0
|
1月前
|
消息中间件 分布式计算 Kafka
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
55 0
|
1月前
|
SQL 分布式计算 大数据
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(一)
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(一)
29 0