大数据Spark实时搜索日志实时分析

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 大数据Spark实时搜索日志实时分析

1 业务场景

百度搜索风云榜(http://top.baidu.com/)以数亿网民的单日搜索行为作为数据基础,以搜索关键词为统计对象建立权威全面的各类关键词排行榜,以榜单形式向用户呈现基于百度海量搜索数据的排行信息,线上覆盖十余个行业类别,一百多个榜单

在【热点榜单】中,可以看到依据搜索关键词实时统计各种维度热点,下图展示【实时热点】。

仿【百度搜索风云榜】对用户使用百度搜索时日志进行分析:【百度搜索日志实时分析】,主要业务需求如下三个方面:

  • 业务一:搜索日志数据存储HDFS,实时对日志数据进行ETL提取转换,存储HDFS文件系统;
  • 业务二:百度热搜排行榜Top10,累加统计所有用户搜索词次数,获取Top10搜索词及次数;
  • 业务三:近期时间内热搜Top10,统计最近一段时间范围(比如,最近半个小时或最近2个小时)
    内用户搜索词次数,获取Top10搜索词及次数;
  • 开发Maven Project中目录结构如下所示:

2 初始化环境

编程实现业务之前,首先编写程序模拟产生用户使用百度搜索产生日志数据和创建工具类StreamingContextUtils提供StreamingContext对象与从Kafka接收数据方法。

2.1 创建 Topic

启动Kafka Broker服务,创建Topic【search-log-topic】,命令如下所示:

# 1. 启动Zookeeper 服务
zookeeper-daemon.sh start
# 2. 启动Kafka 服务
kafka-daemon.sh start
# 3. Create Topic
kafka-topics.sh --create --topic search-log-topic \
--partitions 3 --replication-factor 1 --zookeeper node1.oldlut.cn:2181/kafka200
# List Topics
kafka-topics.sh --list --zookeeper node1.oldlut.cn:2181/kafka200
# Producer
kafka-console-producer.sh --topic search-log-topic --broker-list node1.oldlut.cn:9092
# Consumer
kafka-console-consumer.sh --topic search-log-topic \
--bootstrap-server node1.oldlut.cn:9092 --from-beginning

2.2 模拟日志数据

模拟用户搜索日志数据,字段信息封装到CaseClass样例类【SearchLog】类,代码如下:

package cn.oldlut.spark.app.mock
/**
 * 用户百度搜索时日志数据封装样例类CaseClass
 * <p>
 *
 * @param sessionId 会话ID
 * @param ip        IP地址
 * @param datetime  搜索日期时间
 * @param keyword   搜索关键词
 */
case class SearchLog(
                      sessionId: String, //
                      ip: String, //
                      datetime: String, //
                      keyword: String //
                    ) {
  override def toString: String = s"$sessionId,$ip,$datetime,$keyword"
}
`` `
模拟产生搜索日志数据类 【 MockSearchLogs 】 具体代码如下 :
`` `
package cn.oldlut.spark.app.mock
import java.util.{Properties, UUID}
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import scala.util.Random
/**
 * 模拟产生用户使用百度搜索引擎时,搜索查询日志数据,包含字段为:
 * uid, ip, search_datetime, search_keyword
 */
object MockSearchLogs {
  def main(args: Array[String]): Unit = {
    // 搜索关键词,直接到百度热搜榜获取即可
    val keywords: Array[String] = Array("罗志祥", "谭卓疑", "当当网", "裸海蝶", "张建国")
    // 发送Kafka Topic
    val props = new Properties()
    props.put("bootstrap.servers", "node1.oldlut.cn:9092")
    props.put("acks", "1")
    props.put("retries", "3")
    props.put("key.serializer", classOf[StringSerializer].getName)
    props.put("value.serializer", classOf[StringSerializer].getName)
    val producer = new KafkaProducer[String, String](props)
    val random: Random = new Random()
    while (true) {
      // 随机产生一条搜索查询日志
      val searchLog: SearchLog = SearchLog(
        getUserId(), //
        getRandomIp(), //
        getCurrentDateTime(), //
        keywords(random.nextInt(keywords.length)) //
      )
      println(searchLog.toString)
      Thread.sleep(10 + random.nextInt(100))
      val record = new ProducerRecord[String, String]("search-log-topic", searchLog.toString)
      producer.send(record)
    }
    // 关闭连接
    producer.close()
  }
  /**
   * 随机生成用户SessionId
   */
  def getUserId(): String = {
    val uuid: String = UUID.randomUUID().toString
    uuid.replaceAll("-", "").substring(16)
  }
  /**
   * 获取当前日期时间,格式为yyyyMMddHHmmssSSS
   */
  def getCurrentDateTime(): String = {
    val format = FastDateFormat.getInstance("yyyyMMddHHmmssSSS")
    val nowDateTime: Long = System.currentTimeMillis()
    format.format(nowDateTime)
  }
  /**
   * 获取随机IP地址
   */
  def getRandomIp(): String = {
    // ip范围
    val range: Array[(Int, Int)] = Array(
      (607649792, 608174079), //36.56.0.0-36.63.255.255
      (1038614528, 1039007743), //61.232.0.0-61.237.255.255
      (1783627776, 1784676351), //106.80.0.0-106.95.255.255
      (2035023872, 2035154943), //121.76.0.0-121.77.255.255
      (2078801920, 2079064063), //123.232.0.0-123.235.255.255
      (-1950089216, -1948778497), //139.196.0.0-139.215.255.255
      (-1425539072, -1425014785), //171.8.0.0-171.15.255.255
      (-1236271104, -1235419137), //182.80.0.0-182.92.255.255
      (-770113536, -768606209), //210.25.0.0-210.47.255.255
      (-569376768, -564133889) //222.16.0.0-222.95.255.255
    )
    // 随机数:IP地址范围下标
    val random = new Random()
    val index = random.nextInt(10)
    val ipNumber: Int = range(index)._1 + random.nextInt(range(index)._2 - range(index)._1)
    //println(s"ipNumber = ${ipNumber}")
    // 转换Int类型IP地址为IPv4格式
    number2IpString(ipNumber)
  }
  /**
   * 将Int类型IPv4地址转换为字符串类型
   */
  def number2IpString(ip: Int): String = {
    val buffer: Array[Int] = new Array[Int](4)
    buffer(0) = (ip >> 24) & 0xff
    buffer(1) = (ip >> 16) & 0xff
    buffer(2) = (ip >> 8) & 0xff
    buffer(3) = ip & 0xff
    // 返回IPv4地址
    buffer.mkString(".")
  }
}

运行应用程序,源源不断产生日志数据,发送至Kafka(同时在控制台打印),截图如下:

2.3 StreamingContextUtils 工具类

所有SparkStreaming应用都需要构建StreamingContext实例对象,并且从采用New KafkaConsumer API消费Kafka数据,编写工具类【StreamingContextUtils】,提供两个方法:

  • 方法一:getStreamingContext,获取StreamingContext实例对象
  • 方法二:consumerKafka,消费Kafka Topic中数据
    具体代码如下:
package cn.oldlut.spark.app
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
 * 工具类提供:构建流式应用上下文StreamingContext实例对象和从Kafka Topic消费数据
 */
object StreamingContextUtils {
  /**
   * 获取StreamingContext实例,传递批处理时间间隔
   *
   * @param batchInterval 批处理时间间隔,单位为秒
   */
  def getStreamingContext(clazz: Class[_], batchInterval: Int): StreamingContext = {
    // i. 创建SparkConf对象,设置应用配置信息
    val sparkConf = new SparkConf()
      .setAppName(clazz.getSimpleName.stripSuffix("$"))
      .setMaster("local[3]")
      // 设置Kryo序列化
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .registerKryoClasses(Array(classOf[ConsumerRecord[String, String]]))
    // ii.创建流式上下文对象, 传递SparkConf对象和时间间隔
    val context = new StreamingContext(sparkConf, Seconds(batchInterval))
    // iii. 返回
    context
  }
  /**
   * 从指定的Kafka Topic中消费数据,默认从最新偏移量(largest)开始消费
   *
   * @param ssc       StreamingContext实例对象
   * @param topicName 消费Kafka中Topic名称
   */
  def consumerKafka(ssc: StreamingContext, topicName: String): DStream[ConsumerRecord[String, String]] = {
    // i.位置策略
    val locationStrategy: LocationStrategy = LocationStrategies.PreferConsistent
    // ii.读取哪些Topic数据
    val topics = Array(topicName)
    // iii.消费Kafka 数据配置参数
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "node1.oldlut.cn:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "group_id_streaming_0001",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    // iv.消费数据策略
    val consumerStrategy: ConsumerStrategy[String, String] = ConsumerStrategies.Subscribe(
      topics, kafkaParams
    )
    // v.采用新消费者API获取数据,类似于Direct方式
    val kafkaDStream: DStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
      ssc, locationStrategy, consumerStrategy
    )
    // vi.返回DStream
    kafkaDStream
  }
}

3 实时数据ETL存储

实时从Kafka Topic消费数据,提取ip地址字段,调用【ip2Region】库解析为省份和城市,存储到HDFS文件中,设置批处理时间间隔BatchInterval为10秒,完整代码如下:

package cn.oldlut.spark.app.etl
import cn.oldlut.spark.app.StreamingContextUtils
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream
import org.lionsoul.ip2region.{DataBlock, DbConfig, DbSearcher}
/**
 * 实时消费Kafka Topic数据,经过ETL(过滤、转换)后,保存至HDFS文件系统中,BatchInterval为:10s
 */
object StreamingETLHdfs {
  def main(args: Array[String]): Unit = {
    // 1. 获取StreamingContext实例对象
    val ssc: StreamingContext = StreamingContextUtils.getStreamingContext(this.getClass, 10)
    // 2. 从Kafka消费数据,使用Kafka New Consumer API
    val kafkaDStream: DStream[ConsumerRecord[String, String]] = StreamingContextUtils
      .consumerKafka(ssc, "search-log-topic")
    // 3. 数据ETL:过滤不合格数据及转换IP地址为省份和城市,并存储HDFS上
    kafkaDStream.foreachRDD { (rdd, time) =>
      // i. message不为null,且分割为4个字段
      val kafkaRDD: RDD[ConsumerRecord[String, String]] = rdd.filter { record =>
        val message: String = record.value()
        null != message && message.trim.split(",").length == 4
      }
      // ii. 解析IP地址
      val etlRDD: RDD[String] = kafkaRDD.mapPartitions { iter =>
        // 创建DbSearcher对象,针对每个分区创建一个,并不是每条数据创建一个
        val dbSearcher = new DbSearcher(new DbConfig(), "dataset/ip2region.db")
        iter.map { record =>
          val Array(_, ip, _, _) = record.value().split(",")
          // 依据IP地址解析
          val dataBlock: DataBlock = dbSearcher.btreeSearch(ip)
          val region: String = dataBlock.getRegion
          val Array(_, _, province, city, _) = region.split("\\|")
          // 组合字符串
          s"${record.value()},$province,$city"
        }
      }
      // iii. 保存至文件
      val savePath = s"datas/streaming/etl/search-log-${time.milliseconds}"
      if (!etlRDD.isEmpty()) {
        etlRDD.coalesce(1).saveAsTextFile(savePath)
      }
    }
    // 4.启动流式应用,一直运行,直到程序手动关闭或异常终止
    ssc.start()
    ssc.awaitTermination()
    ssc.stop(stopSparkContext = true, stopGracefully = true)
  }
}

运行模拟日志数据程序和ETL应用程序,查看实时数据ETL后保存文件,截图如下:

4 实时状态更新统计

实 时 累 加 统 计 用 户 各 个 搜 索 词 出 现 的 次 数 , 在 SparkStreaming 中 提 供 函 数【updateStateByKey】实现累加统计,Spark 1.6提供【mapWithState】函数状态统计,性能更好,实际应用中也推荐使用。

4.1 updateStateByKey 函数

状态更新函数【updateStateByKey】表示依据Key更新状态,要求DStream中数据类型为【Key/Value】对二元组,函数声明如下:

将每批次数据状态,按照Key与以前状态,使用定义函数【updateFunc】进行更新,示意图如下:

文档: http://spark.apache.org/docs/2.4.5/streaming-programming-guide.html#updatestatebykey-operation

针对搜索词词频统计WordCount,状态更新逻辑示意图如下:

以前的状态数据,保存到Checkpoint检查点目录中,所以在代码中需要设置Checkpoint检查点目录:

完整演示代码如下:

package cn.oldlut.spark.app.state
import cn.oldlut.spark.app.StreamingContextUtils
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream
/**
 * 实时消费Kafka Topic数据,累加统计各个搜索词的搜索次数,实现百度搜索风云榜
 */
object StreamingUpdateState {
  def main(args: Array[String]): Unit = {
    // 1. 获取StreamingContext实例对象
    val ssc: StreamingContext = StreamingContextUtils.getStreamingContext(this.getClass, 5)
    // TODO: 设置检查点目录
    ssc.checkpoint(s"datas/streaming/state-${System.nanoTime()}")
    // 2. 从Kafka消费数据,使用Kafka New Consumer API
    val kafkaDStream: DStream[ConsumerRecord[String, String]] = StreamingContextUtils
      .consumerKafka(ssc, "search-log-topic")
    // 3. 对每批次的数据进行搜索词次数统计
    val reduceDStream: DStream[(String, Int)] = kafkaDStream.transform { rdd =>
      val reduceRDD = rdd
        // 过滤不合格的数据
        .filter { record =>
          val message: String = record.value()
          null != message && message.trim.split(",").length == 4
        }
        // 提取搜索词,转换数据为二元组,表示每个搜索词出现一次
        .map { record =>
          val keyword: String = record.value().trim.split(",").last
          keyword -> 1
        }
        // 按照单词分组,聚合统计
        .reduceByKey((tmp, item) => tmp + item) // TODO: 先聚合,再更新,优化
      reduceRDD // 返回
    }
    /*
    def updateStateByKey[S: ClassTag](
    // 状态更新函数
    updateFunc: (Seq[V], Option[S]) => Option[S]
    ): DStream[(K, S)]
    第一个参数:Seq[V]
    表示的是相同Key的所有Value值
    第二个参数:Option[S]
    表示的是Key的以前状态,可能有值Some,可能没值None,使用Option封装
    S泛型,具体类型有业务具体,此处是词频:Int类型
    */
    val stateDStream: DStream[(String, Int)] = reduceDStream.updateStateByKey(
      (values: Seq[Int], state: Option[Int]) => {
        // a. 获取以前状态信息
        val previousState = state.getOrElse(0)
        // b. 获取当前批次中Key对应状态
        val currentState = values.sum
        // c. 合并状态
        val latestState = previousState + currentState
        // d. 返回最新状态
        Some(latestState)
      }
    )
    // 5. 将结果数据输出 -> 将每批次的数据处理以后输出
    stateDStream.print()
    // 6.启动流式应用,一直运行,直到程序手动关闭或异常终止
    ssc.start()
    ssc.awaitTermination()
    ssc.stop(stopSparkContext = true, stopGracefully = true)
  }
}

运行应用程序,通过WEB UI界面可以发现,将以前状态保存到Checkpoint检查点目录中,更新时在读取。

此外,updateStateByKey函数有很多重载方法,依据不同业务需求选择合适的方式使用。

4.2 mapWithState 函数

Spark 1.6提供新的状态更新函数【mapWithState】,mapWithState函数也会统计全局的key的状态,但是如果没有数据输入,便不会返回之前的key的状态,只是关心那些已经发生的变化的key,对于没有数据输入,则不会返回那些没有变化的key的数据。

这样的话,即使数据量很大,checkpoint也不会像updateStateByKey那样,占用太多的存储,效率比较高;

ae55e536ed4d4e40940c5d3db1ef7309.png

需要构建StateSpec对象,对状态State进行封装,可以进行相关操作,类的声明定义如下:

状态函数【mapWithState】参数相关说明:

完整演示代码如下:

package cn.oldlut.spark.app.state
import cn.oldlut.spark.app.StreamingContextUtils
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{State, StateSpec, StreamingContext}
import org.apache.spark.streaming.dstream.DStream
/**
 * 实时消费Kafka Topic数据,累加统计各个搜索词的搜索次数,实现百度搜索风云榜
 */
object StreamingMapWithState {
  def main(args: Array[String]): Unit = {
    // 1. 获取StreamingContext实例对象
    val ssc: StreamingContext = StreamingContextUtils.getStreamingContext(this.getClass, 5)
    // TODO: 设置检查点目录
    ssc.checkpoint(s"datas/streaming/state-${System.nanoTime()}")
    // 2. 从Kafka消费数据,使用Kafka New Consumer API
    val kafkaDStream: DStream[ConsumerRecord[String, String]] = StreamingContextUtils
      .consumerKafka(ssc, "search-log-topic")
    // 3. 对每批次的数据进行搜索词进行次数统计
    val reduceDStream: DStream[(String, Int)] = kafkaDStream.transform { rdd =>
      val reduceRDD: RDD[(String, Int)] = rdd
        // 过滤不合格的数据
        .filter { record =>
          val message: String = record.value()
          null != message && message.trim.split(",").length == 4
        }
        // 提取搜索词,转换数据为二元组,表示每个搜索词出现一次
        .map { record =>
          val keyword: String = record.value().trim.split(",").last
          keyword -> 1
        }
        // 按照单词分组,聚合统计
        .reduceByKey((tmp, item) => tmp + item) // TODO: 先聚合,再更新,优化
      // 返回
      reduceRDD
    }
    // TODO: 4、实时累加统计搜索词搜索次数,使用mapWithState函数
    /*
    按照Key来更新状态的,一条一条数据的更新状态
    def mapWithState[StateType: ClassTag, MappedType: ClassTag](
    spec: StateSpec[K, V, StateType, MappedType]
    ): MapWithStateDStream[K, V, StateType, MappedType]
    a. 通过函数源码发现参数使用对象
    StateSpec 实例对象
    b. StateSpec
    表示对状态封装,里面涉及到相关数据类型
    c. 如何构建StateSpec对象实例呢??
    StateSpec 伴生对象中function函数构建对象
    def function[KeyType, ValueType, StateType, MappedType](
    // 从函数名称可知,针对每条数据更新Key的转态信息
    mappingFunction: (KeyType, Option[ValueType], State[StateType]) => MappedType
    ): StateSpec[KeyType, ValueType, StateType, MappedType]
    */
    // 状态更新函数,针对每条数据进行更新状态
    val spec: StateSpec[String, Int, Int, (String, Int)] = StateSpec.function(
      // (KeyType, Option[ValueType], State[StateType]) => MappedType
      (keyword: String, countOption: Option[Int], state: State[Int]) => {
        // a. 获取当前批次中搜索词搜索次数
        val currentState: Int = countOption.getOrElse(0)
        // b. 从以前状态中获取搜索词搜索次数
        val previousState = state.getOption().getOrElse(0)
        // c. 搜索词总的搜索次数
        val latestState = currentState + previousState
        // d. 更行状态
        state.update(latestState)
        // e. 返回最新省份销售订单额
        (keyword, latestState)
      }
    )
    // 调用mapWithState函数进行实时累加状态统计
    val stateDStream: DStream[(String, Int)] = reduceDStream.mapWithState(spec)
    // 5. 将结果数据输出 -> 将每批次的数据处理以后输出
    stateDStream.print()
    // 6.启动流式应用,一直运行,直到程序手动关闭或异常终止
    ssc.start()
    ssc.awaitTermination()
    ssc.stop(stopSparkContext = true, stopGracefully = true)
  }
}

运行程序可以发现,当Key(搜索单词)没有出现时,不会更新状态,仅仅更新当前批次中出现的Key的状态。

mapWithState 实现有状态管理主要是通过两点:a)、历史状态需要在内存中维护,这里必需的了,updateStateBykey也是一样;b)、自定义更新状态的mappingFunction,这些就是具体的业务功能实现逻辑了(什么时候需要更新状态)


33a4ee8b3a4f4f90ba00dd74139b08e2.png

首先数据像水流一样从左侧的箭头流入,把mapWithState看成一个转换器的话,mappingFunc就是转换的规则,流入的新数据(key-value)结合历史状态(通过key从内存中获取的历史状态)进行一些自定义逻辑的更新等操作,最终从红色箭头中流出。

5 实时窗口统计

SparkStreaming中提供一些列窗口函数,方便对窗口数据进行分析,文档:

http://spark.apache.org/docs/2.4.5/streaming-programming-guide.html#window-operations

在实际项目中,很多时候需求:每隔一段时间统计最近数据状态,并不是对所有数据进行统计,称为趋势统计或者窗口统计,SparkStreaming中提供相关函数实现功能,业务逻辑如下:

针对用户百度搜索日志数据,实现【近期时间内热搜Top10】,统计最近一段时间范围(比如,最近半个小时或最近2个小时)内用户搜索词次数,获取Top10搜索词及次数。窗口函数【window】声明如下,包含两个参数:窗口大小(WindowInterval,每次统计数据范围)和滑动大小(每隔多久统计一次),都必须是批处理时间间隔BatchInterval整数倍。

4e1fa1911431414fba2fbe93a3017613.png

案例完整实现代码如下,为了演示方便,假设BatchInterval为2秒,WindowInterval

为4秒,SlideInterval为2秒。

package cn.oldlut.spark.app.window
import cn.oldlut.spark.app.StreamingContextUtils
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
 * 实时消费Kafka Topic数据,每隔一段时间统计最近搜索日志中搜索词次数
 * 批处理时间间隔:BatchInterval = 2s
 * 窗口大小间隔:WindowInterval = 4s
 * 滑动大小间隔:SliderInterval = 2s
 */
object StreamingWindow {
  def main(args: Array[String]): Unit = {
    // Streaming应用BatchInterval
    val BATCH_INTERVAL: Int = 2
    // Streaming应用窗口大小
    val WINDOW_INTERVAL: Int = BATCH_INTERVAL * 2
    val SLIDER_INTERVAL: Int = BATCH_INTERVAL * 1
    // 1. 获取StreamingContext实例对象
    val ssc: StreamingContext = StreamingContextUtils.getStreamingContext(this.getClass, BATCH_INTERVAL)
    // 2. 从Kafka消费数据,使用Kafka New Consumer API
    val kafkaDStream: DStream[String] = StreamingContextUtils
      .consumerKafka(ssc, "search-log-topic")
      .map(record => record.value())
    // TODO: 添加窗口,设置对应参数
    /*
    def window(windowDuration: Duration, slideDuration: Duration): DStream[T]
    警告信息:
    ERROR KafkaRDD: Kafka ConsumerRecord is not serializable.
    Use .map to extract fields before calling .persist or .window
    */
    val windowDStream: DStream[String] = kafkaDStream.window(
      Seconds(WINDOW_INTERVAL), Seconds(SLIDER_INTERVAL)
    )
    // 4. 对每批次的数据进行搜索词进行次数统计
    val countDStream: DStream[(String, Int)] = windowDStream.transform { rdd =>
      val resultRDD = rdd
        // 过滤不合格的数据
        .filter(message => null != message && message.trim.split(",").length == 4)
        // 提取搜索词,转换数据为二元组,表示每个搜索词出现一次
        .map { message =>
          val keyword: String = message.trim.split(",").last
          keyword -> 1
        }
        // 按照单词分组,聚合统计
        .reduceByKey((tmp, item) => tmp + item)
      // 返回
      resultRDD
    }
    // 5. 将结果数据输出 -> 将每批次的数据处理以后输出
    countDStream.print()
    // 6.启动流式应用,一直运行,直到程序手动关闭或异常终止
    ssc.start()
    ssc.awaitTermination()
    ssc.stop(stopSparkContext = true, stopGracefully = true)
  }
}

SparkStreaming中同时提供将窗口Window设置与聚合reduceByKey合在一起的函数,为了更加方便编程。

使用【reduceByKeyAndWindow】函数,修改上述代码,实现窗口统计,具体代码如下:

package cn.oldlut.spark.app.window
import cn.oldlut.spark.app.StreamingContextUtils
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
 * 实时消费Kafka Topic数据,每隔一段时间统计最近搜索日志中搜索词次数
 * 批处理时间间隔:BatchInterval = 2s
 * 窗口大小间隔:WindowInterval = 4s
 * 滑动大小间隔:SliderInterval = 2s
 */
object StreamingReduceWindow {
  def main(args: Array[String]): Unit = {
    // Streaming应用BatchInterval
    val BATCH_INTERVAL: Int = 2
    // Streaming应用窗口大小
    val WINDOW_INTERVAL: Int = BATCH_INTERVAL * 2
    val SLIDER_INTERVAL: Int = BATCH_INTERVAL * 1
    // 1. 获取StreamingContext实例对象
    val ssc: StreamingContext = StreamingContextUtils.getStreamingContext(this.getClass, BATCH_INTERVAL)
    // 2. 从Kafka消费数据,使用Kafka New Consumer API
    val kafkaDStream: DStream[String] = StreamingContextUtils
      .consumerKafka(ssc, "search-log-topic")
      .map(recored => recored.value())
    // 3. 对每批次的数据进行搜索词进行次数统计
    val etlDStream: DStream[(String, Int)] = kafkaDStream.transform { rdd =>
      val etlRDD = rdd
        // 过滤不合格的数据
        .filter(message => null != message && message.trim.split(",").length == 4)
        // 提取搜索词,转换数据为二元组,表示每个搜索词出现一次
        .map { message =>
          val keyword: String = message.trim.split(",").last
          keyword -> 1
        }
      etlRDD // 返回
    }
    // 4. 对获取流式数据进行ETL后,使用窗口聚合函数统计计算
    /*
    def reduceByKeyAndWindow(
    reduceFunc: (V, V) => V, // 聚合函数
    windowDuration: Duration, // 窗口大小
    slideDuration: Duration // 滑动大小
    ): DStream[(K, V)]
    */
    val resultDStream: DStream[(String, Int)] = etlDStream.reduceByKeyAndWindow(
      (tmp: Int, value: Int) => tmp + value, //
      Seconds(WINDOW_INTERVAL), //
      Seconds(SLIDER_INTERVAL) //
    )
    // 5. 将结果数据输出 -> 将每批次的数据处理以后输出
    resultDStream.print()
    // 6.启动流式应用,一直运行,直到程序手动关闭或异常终止
    ssc.start()
    ssc.awaitTermination()
    ssc.stop(stopSparkContext = true, stopGracefully = true)
  }
}


目录
相关文章
|
22天前
|
存储 消息中间件 监控
【Flume】Flume在大数据分析领域的应用
【4月更文挑战第4天】【Flume】Flume在大数据分析领域的应用
|
1月前
|
Cloud Native 数据处理 云计算
探索云原生技术在大数据分析中的应用
随着云计算技术的不断发展,云原生架构作为一种全新的软件开发和部署模式,正逐渐引起企业的广泛关注。本文将探讨云原生技术在大数据分析领域的应用,介绍其优势与挑战,并探讨如何利用云原生技术提升大数据分析的效率和可靠性。
|
1月前
|
存储 消息中间件 大数据
Go语言在大数据处理中的实际应用与案例分析
【2月更文挑战第22天】本文深入探讨了Go语言在大数据处理中的实际应用,通过案例分析展示了Go语言在处理大数据时的优势和实践效果。文章首先介绍了大数据处理的挑战与需求,然后详细分析了Go语言在大数据处理中的适用性和核心技术,最后通过具体案例展示了Go语言在大数据处理中的实际应用。
|
15天前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。
|
3天前
|
分布式计算 大数据 数据处理
[AIGC大数据基础] Spark 入门
[AIGC大数据基础] Spark 入门
|
3天前
|
机器学习/深度学习 前端开发 数据挖掘
工具变量法(两阶段最小二乘法2SLS)线性模型分析人均食品消费时间序列数据和回归诊断(下)
工具变量法(两阶段最小二乘法2SLS)线性模型分析人均食品消费时间序列数据和回归诊断
74 11
|
9天前
工具变量法(两阶段最小二乘法2SLS)线性模型分析人均食品消费时间序列数据和回归诊断2
工具变量法(两阶段最小二乘法2SLS)线性模型分析人均食品消费时间序列数据和回归诊断
15 0
|
9天前
|
机器学习/深度学习 前端开发 数据挖掘
R语言计量经济学:工具变量法(两阶段最小二乘法2SLS)线性模型分析人均食品消费时间序列数据和回归诊断
R语言计量经济学:工具变量法(两阶段最小二乘法2SLS)线性模型分析人均食品消费时间序列数据和回归诊断
39 0
|
16天前
|
机器学习/深度学习 人工智能 安全
Azure Databricks实战:在云上轻松进行大数据分析与AI开发
【4月更文挑战第8天】Databricks在大数据分析和AI开发中表现出色,简化流程并提高效率。文中列举了三个应用场景:数据湖分析、实时流处理和AI机器学习,并阐述了Databricks的一体化平台、云原生弹性及企业级安全优势。博主认为,Databricks提升了研发效能,无缝集成Azure生态,并具有持续创新潜力,是应对大数据挑战和加速AI创新的理想工具。
41 0
|
21天前
|
SQL 存储 监控
日志问题精要:分析与总结
该文档讲述了应用系统日志记录的重要性和规则。主要目的是记录操作轨迹、监控系统状态和回溯故障。日志记录点包括系统入口、调用其他模块、调用结束、出口和出错时。内容应遵循UTF-8编码,避免敏感信息,按INFO级别记录,及时、完整且安全。日志输出要控制频率和长度,不影响系统性能,并按策略备份和清理。日志等级分为DEBUG、INFO、WARN、ERROR和FATAL。日志文件应有明确目录结构,大小有限制,并定期清理。注意事项包括输出异常堆栈、避免打印对象实例的hashCode、选择合适的日志框架和格式,并支持动态修改日志级别。还要实现链路追踪,确保在多线程环境中正确记录日志。
20 0