学习笔记:StructuredStreaming入门(十二)

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,高可用系列 2核4GB
简介: 学习笔记:StructuredStreaming入门(十二)

Spark Day12:Structured Streaming

01-[了解]-上次课程内容回顾

主要讲解SparkStreaming如何企业开发:集成Kafka、三大应用场景(实时增量ETL、状态累加统计、窗口分析统计)。

1、集成Kafka
  由于Kafka框架提供2套Consumer API,所以集成Kafka时,也提供2套API,但是推荐使用New Consumer API
    - KafkaConsumer
    - ConsumerRecord<Key, Value>,都是String类型
  http://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html
    GAV:org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.5
  方法:
    val kafkaDStream: DStream[String, String] = KafkaUtils.createDirectStream
    直接从Kafka消费数据获取数据流中,每批次RDD是KafkaRDD
  原理:
    每批次BatchInterval时间间隔,依据偏移量范围到Kafka Topic中各个分区获取相应范围数据

从Kafka消费数据时,属性设置:"enable.auto.commit" -> (false: java.lang.Boolean)

Kafka New Consumer API,默认情况下,提供一种机制,从Kafka Topic消费数据以后,可以定时异步或同步将消费偏移量信息存储到:__consumer__offsets,当设置属性为false时,表示不需要提交保存偏移量

从Kafka消费数据时,不仅可以指定某个Topic获取或某些Topic,而且还有指定正则表达式,很方便消费多个Topic

SparkStreaming流式计算模块,在实际项目中有3大应用场景:主要如下所示

2、实时增量ETL,【实际项目中,此种应用类型最多】
  实时将海量业务数据,进行实时ETL转换,存储到外部存储引擎,以便系统进行分析处理
    业务数据一产生发送到 Kafka Topic -> 流式应用程序:ETL转换 -> HBase/ES  
  使用2个函数:
      transform转换函数,针对每批次RDD进行转换处理,返回还是RDD
      foreachRDD输出函数,针对每批次RDD进行输出,返回值为Unit
    输出函数模式:
      dstream.forearchRDD((rdd, batchTime) => {
        // 每批次RDD针对每个分区数据进行操作,适当考虑是否降低分区数目
        rdd.coalease(1).forearchPartition{iter =>
          // 从连接池中获取连接
          val conn: Connection 
          // 将每个分区数据进行保存,考虑批量保存
          iter.foreach{item =>
          }
          // 将连接放回连接池中
          conn.release()
        }
      })
3、状态累加统计
  实时对数据进行聚合操作,并且状态属于累加统计的,比如双11大屏计算销售额
    updateStateByKey 函数
      依据Key更新状态的,需要定义状态更新函数,表示如何更新状态
      updateFunc:
        (values: Seq[V], state: Option[S]) => Option[S]
          values: 表示当前批次中Key对应的所有Value的值
          state:表示当前Key以前的状态,如果没有状态就是None
    mapWithState 函数
      依据Key更新状态,当Key存在时,才更新状态,否则不更新,性能远远由于updateStateByKey
      StateSpec对象
        StateSpec.function函数创建实例,传递map映射函数,针对每条数据进行状态更新处理
          (KeyType, Option[ValueType], State[StateType]) => MappedType
          Key类型   Value值数据类型       状态数据类型         返回数据类型
  保存以前状态State,所以设置Checkpoint检查的目录,存储State数据
    ssc.checkpoint("datas/streaming-ckpt-999999")
4、窗口分析统计
  描述需求:每隔多长时间,统计最近多久范围内数据情况(趋势统计)
    比如每隔1分钟统计最近20分钟内数据情况
  窗口统计:
    window size = 20 分钟
    slide size = 1 分钟
  分为2种类型窗口:
    当window size = slide size : 滚动窗口,数据不会被重复处理
    当window sieze > slide size : 滑动窗口,数据会被重复处理
  函数:
      window函数,设置窗口大小和滑动大小
      将聚合函数和窗口函数合在一起:
        reduceByKeyAndWindow
          窗口大小和滑动大小,还需要设置聚合函数

02-[了解]-今日课程内容提纲

2个方面内容:偏移量管理(Checkpoint检查点)和StructuredStreaming入门(新的流式计算模块)

1、偏移量管理
  SparkStreaming从Kafka消费数据时,如何管理偏移量,实现实时流式应用容灾恢复
  方式一:
    Checkpoint检查点恢复偏移量,继续消费数据
  方式二:
    用户手动管理偏移量,进行存储和读取,续集消费数据
    推荐此种方式,相当来说比较麻烦,了解思路即可
【此部分内容,属于SparkStreaming模块处理流式数据一个不足之处,一大软点,使得用户不喜欢框架】
2、StructuredStreaming 快速入门
  数据结构:DataFrame/Dataset,流式数据集
  - 2.x提出结构化流模块处理流式数据
    SparkStreaming不足之处
    StructuredStreaming 设计原理和编程模型
  - 入门案例:词频统计WordCount
    实时累加统计
    代码就是SparkSQL词频统计代码(DSL和SQL)
  - 内置数据源,了解即可,几乎项目不用
  - StructuredStreaming应用程序基本设置

03-[理解]-偏移量管理之引例和概述

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-6wtQxLP6-1626354186973)(/img/image-20210506154426999.png)]

  • 方式一:Checkpoint 恢复

  • 方式二:手动管理偏移量和加载状态

程序中指定加载上一次状态信息,继续运行累加计算状态。

04-[理解]-偏移量管理之重构代码

实际项目开发中,为了代码重构复用和代码简洁性,将【从数据源读取数据、实时处理及结果输出】封装到方法【processData】中,类的结构如下:

Streaming流式应用模板完整代码:

package cn.itcast.spark.app.ckpt
import cn.itcast.spark.app.StreamingContextUtils
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}
/**
 * 实时消费Kafka Topic数据,累加统计各个搜索词的搜索次数,实现百度搜索风云榜
 */
object _01StreamingTemplate {
  /**
   * 抽象一个函数:专门从数据源读取流式数据,经过状态操作分析数据,最终将数据输出
   * @param ssc 流式上下文StreamingContext实例对象
   */
  def processData(ssc: StreamingContext): Unit = {
    // 1. 从Kafka消费数据,使用Kafka New Consumer API
    val kafkaDStream: DStream[ConsumerRecord[String, String]] = StreamingContextUtils
      .consumerKafka(ssc, "search-log-topic")
    // 2. 对每批次的数据进行搜索词进行次数统计
    val reduceDStream: DStream[(String, Int)] = kafkaDStream.transform{ rdd =>
      val reduceRDD: RDD[(String, Int)] = rdd
        // 过滤不合格的数据
        .filter{ record =>
          val message: String = record.value()
          null != message && message.trim.split(",").length == 4
        }
        // 提取搜索词,转换数据为二元组,表示每个搜索词出现一次
        .map{record =>
          val keyword: String = record.value().trim.split(",").last
          keyword -> 1
        }
        // 按照单词分组,聚合统计
        .reduceByKey((tmp, item) => tmp + item) // TODO: 先聚合,再更新,优化
      // 返回
      reduceRDD
    }
    // 3、实时累加统计搜索词搜索次数,使用mapWithState函数
    /*
      def mapWithState[StateType: ClassTag, MappedType: ClassTag](
          spec: StateSpec[K, V, StateType, MappedType]
        ): MapWithStateDStream[K, V, StateType, MappedType]
     */
    // 状态更新函数,针对每条数据进行更新状态
    val spec: StateSpec[String, Int, Int, (String, Int)] = StateSpec.function(
      // mappingFunction: (KeyType, Option[ValueType], State[StateType]) => MappedType
      (keyword: String, countOption: Option[Int], state: State[Int]) => {
        // a. 获取当前批次中搜索词搜索次数
        val currentState: Int = countOption.getOrElse(0)
        // b. 从以前状态中获取搜索词搜索次数
        val previousState = state.getOption().getOrElse(0)
        // c. 搜索词总的搜索次数
        val latestState = currentState + previousState
        // d. 更行状态
        state.update(latestState)
        // e. 返回最新搜索次数
        (keyword, latestState)
      }
    )
    // 调用mapWithState函数进行实时累加状态统计
    val stateDStream: DStream[(String, Int)] = reduceDStream.mapWithState(spec)
    // 5. 将结果数据输出 -> 将每批次的数据处理以后输出
    stateDStream.print()
  }
  def main(args: Array[String]): Unit = {
    // 1. 获取StreamingContext实例对象
    /*
      def getActiveOrCreate(
          checkpointPath: String, // 检查点目录
          creatingFunc: () => StreamingContext,
          hadoopConf: Configuration = SparkHadoopUtil.get.conf,
          createOnError: Boolean = false
        ): StreamingContext
     */
    val ssc: StreamingContext = {
      // a. 创建SparkConf对象,设置应用配置信息
      val sparkConf = new SparkConf()
        .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
        .setMaster("local[3]")
        // TODO: 设置消费最大数量
        /*
          每秒钟消费每个分区数据最大量:1W
            topic: 3个分区,batchInterval:5s
          问:每批次数据消费最大量是多少?
            1w * 3 * 5 = 15W
         */
        .set("spark.streaming.kafka.maxRatePerPartition", "10000")
      // b. 传递SparkConf和BatchInterval创建流式上下对象
      val context = new StreamingContext(sparkConf, Seconds(5))
      // c. 返回实例对象
      context
    }
    // TODO: 设置检查点目录
    ssc.checkpoint("datas/streaming/state-8888")
    // TODO:实时处理流式数据
    processData(ssc)
    // TODO: 启动流式应用,等待终止(人为或程序异常)
    ssc.start()
    ssc.awaitTermination() // 流式应用启动以后,一直等待终止,否则一直运行
    // 无论是否异常最终关闭流式应用(优雅的关闭)
    ssc.stop(stopSparkContext = true, stopGracefully = true)
  }
}

扩展知识点:Scala语言中设计模式【贷出模式

  • 贷出函数:管理资源【获取资源和关闭资源】
  • 用户函数:业务逻辑实现地方
  • MAIN方法,调用贷出函数,将用户函数传递给贷出函数

05-[理解]-偏移量管理之Checkpoint编码实现

针对Spark Streaming状态应用程序,设置Checkpoint检查点目录,其中存储两种类型数据:

Metadata Checkpointing 用来恢复 DriverData Checkpointing用来容错stateful的数据处理失败的场景 。

当我们再次运行Streaming Application时,只要从Checkpoint 检查点目录恢复,构建StreamingContext应用,就可以继续从上次消费偏移量消费数据。

使用StreamingContext中【getActiveOrCreate】方法构建StreamingContext实例对象,方法声明如下:

若Application为首次重启,将创建一个新的StreamingContext实例;如果Application从失败中重启,从checkpoint目录导入checkpoint数据来重新创建StreamingContext实例。

修改上述案例代码:

package cn.itcast.spark.app.ckpt
import cn.itcast.spark.app.StreamingContextUtils
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}
/**
 * 实时消费Kafka Topic数据,累加统计各个搜索词的搜索次数,实现百度搜索风云榜
 */
object _02StreamingStateCkpt {
  /**
   * 抽象一个函数:专门从数据源读取流式数据,经过状态操作分析数据,最终将数据输出
   * @param ssc 流式上下文StreamingContext实例对象
   */
  def processData(ssc: StreamingContext): Unit = {
    // 1. 从Kafka消费数据,使用Kafka New Consumer API
    val kafkaDStream: DStream[ConsumerRecord[String, String]] = StreamingContextUtils
      .consumerKafka(ssc, "search-log-topic")
    // 2. 对每批次的数据进行搜索词进行次数统计
    val reduceDStream: DStream[(String, Int)] = kafkaDStream.transform{ rdd =>
      val reduceRDD: RDD[(String, Int)] = rdd
        // 过滤不合格的数据
        .filter{ record =>
          val message: String = record.value()
          null != message && message.trim.split(",").length == 4
        }
        // 提取搜索词,转换数据为二元组,表示每个搜索词出现一次
        .map{record =>
          val keyword: String = record.value().trim.split(",").last
          keyword -> 1
        }
        // 按照单词分组,聚合统计
        .reduceByKey((tmp, item) => tmp + item) // TODO: 先聚合,再更新,优化
      // 返回
      reduceRDD
    }
    // 3、实时累加统计搜索词搜索次数,使用mapWithState函数
    /*
      def mapWithState[StateType: ClassTag, MappedType: ClassTag](
          spec: StateSpec[K, V, StateType, MappedType]
        ): MapWithStateDStream[K, V, StateType, MappedType]
     */
    // 状态更新函数,针对每条数据进行更新状态
    val spec: StateSpec[String, Int, Int, (String, Int)] = StateSpec.function(
      // mappingFunction: (KeyType, Option[ValueType], State[StateType]) => MappedType
      (keyword: String, countOption: Option[Int], state: State[Int]) => {
        // a. 获取当前批次中搜索词搜索次数
        val currentState: Int = countOption.getOrElse(0)
        // b. 从以前状态中获取搜索词搜索次数
        val previousState = state.getOption().getOrElse(0)
        // c. 搜索词总的搜索次数
        val latestState = currentState + previousState
        // d. 更行状态
        state.update(latestState)
        // e. 返回最新搜索次数
        (keyword, latestState)
      }
    )
    // 表示,在启动应用时,可以初始化状态,比如从Redis中读取状态数据,转换为RDD,进行赋值初始化操作
    /*
     def initialState(rdd: RDD[(KeyType, StateType)]): this.type
     */
    //spec.initialState()
    // 调用mapWithState函数进行实时累加状态统计
    val stateDStream: DStream[(String, Int)] = reduceDStream
      .mapWithState(spec)
      .filter(tuple => tuple._2 >= 10)
    // 5. 将结果数据输出 -> 将每批次的数据处理以后输出
    stateDStream.print()
  }
  def main(args: Array[String]): Unit = {
    //TODO: 检查点目录
    val CKPT_DIR: String = "datas/streaming/state-1000"
    // 1. 获取StreamingContext实例对象
    /*
      def getActiveOrCreate(
          checkpointPath: String, // 检查点目录
          creatingFunc: () => StreamingContext,
          hadoopConf: Configuration = SparkHadoopUtil.get.conf,
          createOnError: Boolean = false
        ): StreamingContext
     */
    val ssc: StreamingContext = StreamingContext.getActiveOrCreate(
      CKPT_DIR, // 如果目录存在,从Checkpoint数据恢复构建StreamingContext对象,包括DStream创建、转换和输出
      // 匿名函数,函数参数没有,返回值要求:StreamingContext对象
      () => { // CKPT不存在时,调用此函数构建StreamingContext对象,读取数据,转换和输出
        // a. 创建SparkConf对象,设置应用配置信息
        val sparkConf = new SparkConf()
          .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
          .setMaster("local[3]")
          // 设置消费最大数量
          .set("spark.streaming.kafka.maxRatePerPartition", "10000")
        // b. 传递SparkConf和BatchInterval创建流式上下对象
        val context = new StreamingContext(sparkConf, Seconds(5))
        // c. TODO: 处理数据
        processData(context)
        // d. 返回流式上下文对象
        context
      }
    )
    // TODO: 设置检查点目录
    ssc.checkpoint(CKPT_DIR)
    // TODO: 启动流式应用,等待终止(人为或程序异常)
    ssc.start()
    ssc.awaitTermination() // 流式应用启动以后,一直等待终止,否则一直运行
    // 无论是否异常最终关闭流式应用(优雅的关闭)
    ssc.stop(stopSparkContext = true, stopGracefully = true)
  }
}

当Streaming Application再次运行时,从Checkpoint检查点目录恢复时,有时有问题,比如修改程序,再次从运行时,可能出现类型转换异常,如下所示:

原因在于修改DStream转换操作,在检查点目录中存储的数据没有此类的相关代码,ClassCastException异常。

此时无法从检查点读取偏移量信息和转态信息,所以SparkStreaming中Checkpoint功能,属于鸡肋,食之无味,弃之可惜。

06-[理解]-偏移量管理之手动管理偏移量和状态思路

SparkStreaming中Checkpoint功能,属于鸡肋,食之无味,弃之可惜

  • 解决问题一:状态State,针对实数累计统计来说,再次运行流式应用,获取上次状态
  • 解决问题二:偏移量,从Kafka消费数据位置,再次运行应用时,继续上次消费位置消费数据

解决方案:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WBJj1lCY-1626354186980)(/img/image-20210506164820304.png)]

当运行流式应用程序时,首先从状态存储系统获取状态数据,进行状态初始化操作。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-GwzyGO7b-1626354186981)(/img/image-20210506164851608.png)]

保存每批次数据偏移量信息到存储系统中,比如MySQL表、Zookeeper节点等,当再次运行流式应用时,从保存系统加载偏移量消息,继续消费数据。

考虑第一个问题:状态恢复如何完成呢???从存储状态系统加载状态State,进行初始化操作。

07-[理解]-偏移量管理之MySQL存储偏移量

此处将偏移量数据存储到MySQL表中,数据库及表的DDL和DML语句如下:

-- 1. 创建数据库的语句
CREATE DATABASE IF NOT EXISTS db_spark DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
USE db_spark ;
-- 2. 创建表的语句
CREATE TABLE `tb_offset` (
`topic` varchar(255) NOT NULL,
`partition` int NOT NULL,
`groupid` varchar(255) NOT NULL,
`offset` bigint NOT NULL,
PRIMARY KEY (`topic`,`partition`,`groupid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci ;
-- 3. 插入数据语句replace
replace into tb_offset (`topic`, `partition`, `groupid`, `offset`) values(?, ?, ?, ?)
--/*
--
replace语句执行时,分以下两种情况:
--
- 情况1:insert,当不存或唯一索引冲突,相当于insert操作
--
- 情况2:delete and insert,当存在主键冲突或唯一索引冲突,相当于delete操作,加insert操作
--*/
-- 4. 查询数据语句select
select * from tb_offset where topic in ('xx', 'yy') AND groupid = 'gid001' ;
select * from tb_offset where topic in (?) and groupid = ? ;

工具类OffsetsUtils从MySQL数据库表中读取消费的偏移量信息和保存最近消费的偏移量值,示意图如下所示:

工 具 类 中 包 含 如 何 保 存 偏 移 量 【 saveOffsetsToTable 】 和 读 取 偏 移 量【getOffsetsToMap】两个函数,具体代码如下:

package cn.itcast.spark.app.offset
import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}
import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.kafka010.OffsetRange
import scala.collection.mutable
/**
 * 将消费Kafka Topic偏移量数据存储MySQL数据库,工具类用于读取和保存偏移量数据
 */
object OffsetsUtils {
  /**
   * 依据Topic名称和消费组GroupId获取各个分区的偏移量
   *
   *@param topicNames Topics名称
   *@param groupId 消费组ID
   **/
  def getOffsetsToMap(topicNames: Array[String], groupId: String): Map[TopicPartition, Long] ={
    // 构建集合
    val map: mutable.Map[TopicPartition, Long] = scala.collection.mutable.Map[TopicPartition, Long]()
    // 声明变量
    var conn: Connection = null
    var pstmt: PreparedStatement = null
    var result: ResultSet = null
    try{
      // a. 加载驱动类
      Class.forName("com.mysql.cj.jdbc.Driver")
      // b. 获取连接
      conn = DriverManager.getConnection(
        "jdbc:mysql://node1.itcast.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true", //
        "root", //
        "123456" //
      )
      // c. 编写SQL,获取PreparedStatement对象
      // Array("1-topic", "2-topic") ->  topic in ("1-topic", "2-topic")
      val topicNamesStr = topicNames.map(topicName => s"\'$topicName\'").mkString(", ")
      val querySQL =
        s"""
           |SELECT
           |  `topic`, `partition`, `groupid`, `offset`
           |FROM
           |  db_spark.tb_offset
           |WHERE
           |  groupid = ? AND topic in ($topicNamesStr)
           |""".stripMargin
      pstmt = conn.prepareStatement(querySQL)
      pstmt.setString(1, groupId)
      // d. 查询数据
      result = pstmt.executeQuery()
      // e. 遍历获取值
      while (result.next()){
        val topicName = result.getString("topic")
        val partitionId = result.getInt("partition")
        val offset = result.getLong("offset")
        // 加入集合中
        map += new TopicPartition(topicName, partitionId) -> offset
      }
    }catch {
      case e: Exception => e.printStackTrace()
    }finally {
      if(null != result) result.close()
      if(null != pstmt) pstmt.close()
      if(null != conn) conn.close()
    }
    // 返回集合,转换为不可变的
    map.toMap
  }
  /**
   * 保存Streaming每次消费Kafka数据后最新偏移量到MySQL表中
   *
   * @param offsetRanges Topic中各个分区消费偏移量范围
   * @param groupId 消费组ID
   */
  def saveOffsetsToTable(offsetRanges: Array[OffsetRange], groupId: String): Unit = {
    // 声明变量
    var conn: Connection = null
    var pstmt: PreparedStatement = null
    try{
      // a. 加载驱动类
      Class.forName("com.mysql.cj.jdbc.Driver")
      // b. 获取连接
      conn = DriverManager.getConnection(
        "jdbc:mysql://node1.itcast.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true", //
        "root", //
        "123456" //
      )
      // c. 编写SQL,获取PreparedStatement对象
      val insertSQL = "replace into db_spark.tb_offset (`topic`, `partition`, `groupid`, `offset`) values (?, ?, ?, ?)"
      pstmt = conn.prepareStatement(insertSQL)
      // d. 设置参数
      offsetRanges.foreach{offsetRange =>
        println(offsetRange.toString())
        pstmt.setString(1, offsetRange.topic)
        pstmt.setInt(2, offsetRange.partition)
        pstmt.setString(3, groupId)
        pstmt.setLong(4, offsetRange.untilOffset)
        // 加入批次
        pstmt.addBatch()
      }
      // e. 批量插入
      pstmt.executeBatch()
    }catch {
      case e: Exception => e.printStackTrace()
    }finally {
      if(null != pstmt) pstmt.close()
      if(null != conn) conn.close()
    }
  }
}

从Kafka Topic消费数据时,首先从MySQL数据库加载偏移量,如果有值,使用如下函数:

从Kafka Topic消费数据时,直接获取的DStream中每批次RDD都是KafkaRDD,可以获取数据偏移量范围信息OffsetRanges。

修改前面实时订单消费额统计代码,自己管理消费偏移量,存储到MySQL表中,代码如下:

package cn.itcast.spark.app.offset
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}
/**
 * 实时消费Kafka Topic数据,累加统计各个搜索词的搜索次数,实现百度搜索风云榜
 */
object _03StreamingManagerOffsets {
  /**
   * 抽象一个函数:专门从数据源读取流式数据,经过状态操作分析数据,最终将数据输出
   * @param ssc 流式上下文StreamingContext实例对象
   */
  def processData(ssc: StreamingContext): Unit = {
    val groupId = "gui_0001" // 消费组ID
    // 1. 从Kafka消费数据,使用Kafka New Consumer API
    val kafkaDStream: DStream[ConsumerRecord[String, String]] = {
      // i.位置策略
      val locationStrategy: LocationStrategy = LocationStrategies.PreferConsistent
      // ii.读取哪些Topic数据
      val topics = Array("search-log-topic")
      // iii.消费Kafka 数据配置参数
      val kafkaParams = Map[String, Object](
        "bootstrap.servers" -> "node1.itcast.cn:9092",
        "key.deserializer" -> classOf[StringDeserializer],
        "value.deserializer" -> classOf[StringDeserializer],
        "group.id" -> groupId,
        "auto.offset.reset" -> "latest",
        "enable.auto.commit" -> (false: java.lang.Boolean)
      )
      // iv.消费数据策略
      /*
        1. 从MySQL表加载偏移量
        2. 判断是否有值,如果没有值,表示第一次消费数据,从最新偏移量开始
        3. 如果有值,从指定偏移量消费数据
       */
      // TODO: a. 从MySQL表加载偏移量
      val map: Map[TopicPartition, Long] = OffsetsUtils.getOffsetsToMap(topics, groupId)
      val consumerStrategy: ConsumerStrategy[String, String] = if(map.isEmpty){
        // TODO: b. 判断是否有值,如果没有值,表示第一次消费数据,从最新偏移量开始
        ConsumerStrategies.Subscribe(topics, kafkaParams)
      }else{
        // TODO: c.  如果有值,从指定偏移量消费数据
        ConsumerStrategies.Subscribe(topics, kafkaParams, map)
      }
      // v.采用新消费者API获取数据,类似于Direct方式
      val kafkaDStream: DStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
        ssc, locationStrategy, consumerStrategy
      )
      // vi.返回DStream
      kafkaDStream
    }
    // TODO:其一、定义数组存储每批次数据对应RDD中各个分区的Topic Partition中偏移量信息
    var offsetRanges: Array[OffsetRange] = Array.empty
    // 2. 对每批次的数据进行搜索词进行次数统计
    val reduceDStream: DStream[(String, Int)] = kafkaDStream.transform{ rdd =>
      // TODO:其二、直接从Kafka获取的每批次KafkaRDD中获取偏移量信息
      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      val reduceRDD: RDD[(String, Int)] = rdd
        // 过滤不合格的数据
        .filter{ record =>
          val message: String = record.value()
          null != message && message.trim.split(",").length == 4
        }
        // 提取搜索词,转换数据为二元组,表示每个搜索词出现一次
        .map{record =>
          val keyword: String = record.value().trim.split(",").last
          keyword -> 1
        }
        // 按照单词分组,聚合统计
        .reduceByKey((tmp, item) => tmp + item) // TODO: 先聚合,再更新,优化
      // 返回
      reduceRDD
    }
    // 3、实时累加统计搜索词搜索次数,使用mapWithState函数
    /*
      def mapWithState[StateType: ClassTag, MappedType: ClassTag](
          spec: StateSpec[K, V, StateType, MappedType]
        ): MapWithStateDStream[K, V, StateType, MappedType]
     */
    // 状态更新函数,针对每条数据进行更新状态
    val spec: StateSpec[String, Int, Int, (String, Int)] = StateSpec.function(
      // mappingFunction: (KeyType, Option[ValueType], State[StateType]) => MappedType
      (keyword: String, countOption: Option[Int], state: State[Int]) => {
        // a. 获取当前批次中搜索词搜索次数
        val currentState: Int = countOption.getOrElse(0)
        // b. 从以前状态中获取搜索词搜索次数
        val previousState = state.getOption().getOrElse(0)
        // c. 搜索词总的搜索次数
        val latestState = currentState + previousState
        // d. 更行状态
        state.update(latestState)
        // e. 返回最新搜索次数
        (keyword, latestState)
      }
    )
    // 调用mapWithState函数进行实时累加状态统计
    val stateDStream: DStream[(String, Int)] = reduceDStream.mapWithState(spec)
    // 5. 将结果数据输出 -> 将每批次的数据处理以后输出
    stateDStream.foreachRDD{(resultRDD, batchTime) =>
      // 将batchTime进行转换:yyyy-MM-dd HH:mm:ss
      val formatTime = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
        .format(batchTime.milliseconds)
      println("-------------------------------------------")
      println(s"Time: $formatTime")
      println("-------------------------------------------")
      // 判断结果RDD是否有数据,没有数据不要打印
      if(!resultRDD.isEmpty()){
        // TODO: 针对RDD数据进行输出,以前在SparkCore中怎么编写此处就编写
        resultRDD.coalesce(1).foreachPartition(iter => iter.foreach(println))
      }
      // TODO: 其三、当每批次结果RDD保存至外部存储系统以后,保存偏移量
      OffsetsUtils.saveOffsetsToTable(offsetRanges, groupId)
    }
  }
  def main(args: Array[String]): Unit = {
    // 1. 获取StreamingContext实例对象
    val ssc: StreamingContext = {
      // a. 创建SparkConf对象,设置应用配置信息
      val sparkConf = new SparkConf()
        .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
        .setMaster("local[3]")
        // TODO: 设置消费最大数量
        .set("spark.streaming.kafka.maxRatePerPartition", "10000")
      // b. 传递SparkConf和BatchInterval创建流式上下对象
      val context = new StreamingContext(sparkConf, Seconds(5))
      // c. 返回实例对象
      context
    }
    // TODO: 设置检查点目录
    ssc.checkpoint("datas/streaming/state-9999")
    // TODO: 调用处理流式数据代码
    processData(ssc)
    // TODO: 启动流式应用,等待终止(人为或程序异常)
    ssc.start()
    ssc.awaitTermination() // 流式应用启动以后,一直等待终止,否则一直运行
    // 无论是否异常最终关闭流式应用(优雅的关闭)
    ssc.stop(stopSparkContext = true, stopGracefully = true)
  }
}

实际项目中将偏移量保存至Zookeeper上或者Redis中,原因如下:

08-[了解]-Spark Streaming不足

StructuredStreaming结构化流:

  • 第一点、从Spark 2.0开始出现新型的流式计算模块
  • 第二点、Spark 2.2版本,发布Release版本,可以用于实际生产环境中
  • 第三点、Spark 2.3版本,提供ContinuesProcessing持续流处理,原生流处理模式,来一条数据处理一条数据,达到实时性

本质上,这是一种micro-batch(微批处理)的方式处理,用批的思想去处理流数据。这种设计让Spark Streaming面对复杂的流式处理场景时捉襟见肘。

Spark Streaming 存在哪些不足,总结一下主要有下面几点:

第一点:使用 Processing Time 而不是 Event Time
  窗口分析时
  使用处理时间进行窗口分析不合理的
            每个小时订单销售额       23  -  00 
                23:59:58 产生订单   
            流式应用程序接受时间              00  -  01    
                00:00:05 获取数据
第二点:Complex, low-level api
  编程复杂,底层RDD
第三点:reason about end-to-end application
  很难支持流式应用端到端精确性一次语义
  DStream 只能保证自己的一致性语义是 exactly-once 的
第四点:批流代码不统一
  批处理:Dataset、DataFrame
  流计算:DStream

流式计算一直没有一套标准化、能应对各种场景的模型,直到2015年Google发表了TheDataflow Model的论文( https://yq.aliyun.com/articles/73255 )。

09-[掌握]-Structured Streaming编程模型

Structured Streaming是一个基于Spark SQL引擎的可扩展、容错的流处理引擎。随着数据不断地到达,Spark 引擎会以一种增量的方式来执行这些操作,并且持续更新结算结果。

结构化流StructuredStreaming模块仅仅就是SparkSQL中针对流式数据处理功能模块而已。

1、流式处理引擎,基于SparkSQL引擎之上
  DataFrame/Dataset
  处理数据时,使用Catalyst优化器
2、富有的、统一的、高级API
  DataFrame/Dataset
    数据源、数据处理、数据输出
  DSL或SQL分析数据
3、数据源比较丰富
  提供一套流式数据源接口,只要实现,就可以流式读取和保存

Structured Streaming 在 Spark 2.0 版本于 2016 年引入,设计思想参考很多其他系统的思想,

Structured Streaming 和其他系统的显著区别主要如下:

编程模型:将流式数据当做一张没有限制(无界)表,将源源不断地数据追加到表中,默认情况下,只要表中一有数据(有1条数据或多条数据),就会立即进行处理分析(增量处理,本质来说,还是微批处理,底层使用SparkSQL引擎)。

在这个模型中,主要存在下面几个组成部分:
1、第一部分:unbounded table(input table)
  输入表,将流式数据放在表中
2、第二部分:Query(查询)
  当输入表input table中一有数据,立即处理分析
  增量查询分析
3、第三部分:Result Table
  Query 产生的结果表
4、第四部分:Output
  Result Table 的输出,依据设置的输出模式OutputMode输出结果;

Structured Streaming最核心的思想就是将实时到达的数据看作是一个不断追加的unbound table无界表,到达流的每个数据项就像是表中的一个新行被附加到无边界的表中,用静态结构化数据的批处理查询方式进行流计算。

10-[掌握]-入门案例WordCount之功能演示

需求:入门案例与SparkStreaming的入门案例基本一致:实时从TCP Socket读取数据(采用nc)实时进行词频统计WordCount,并将结果输出到控制台Console。

以词频统计WordCount案例,Structured Streaming实时处理数据的示意图如下,各行含义:

第一行、表示从TCP Socket不断接收数据,使用【nc -lk 9999】;
第二行、表示时间轴,每隔1秒进行一次数据处理;
第三行、可以看成是“input unbound table",当有新数据到达时追加到表中;
第四行、最终的wordCounts是结果表,新数据到达后触发查询Query,输出的结果;
第五行、当有新的数据到达时,Spark会执行“增量"查询,并更新结果集;该示例设置为CompleteMode,因此每次都将所有数据输出到控制台;

使用Structured Streaming处理实时数据时,会负责将新到达的数据与历史数据进行整合,并完成正确的计算操作,同时更新Result Table

运行词频统计WordCount程序,从TCP Socket消费数据,官方演示说明截图如下:

演示运行案例步骤:

  • 第一步、打开终端Terminal,运行NetCat,命令为:nc -lk 9999
  • 第二步、打开另一个终端Terminal,执行如下命令
# 官方入门案例运行:词频统计
/export/server/spark/bin/run-example \
--master local[2] \
--conf spark.sql.shuffle.partitions=2 \
org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount \
node1.itcast.cn 9999
# 测试数据
spark hadoop spark hadoop spark hive
spark spark spark
spark hadoop hive

发送数据以后,最终统计输出结果如下:

11-[掌握]-入门案例WordCount之编程实现

需求:编程使用StructuredStreaming词频统计WordCount程序,从TCP Socket消费数据,最终结果打印控制台

  • Socket 数据源

  • Console 接收器

第一点、程序入口SparkSession,加载流式数据:spark.readStream
第二点、数据封装Dataset/DataFrame中,分析数据时,建议使用DSL编程,调用API,很少使用SQL方式
第三点、启动流式应用,设置Output结果相关信息、start方法启动应用
package cn.itcast.spark.start
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
/**
 * 使用Structured Streaming从TCP Socket实时读取数据,进行词频统计,将结果打印到控制台。
   * 第一点、程序入口SparkSession,加载流式数据:spark.readStream
   * 第二点、数据封装Dataset/DataFrame中,分析数据时,建议使用DSL编程,调用API,很少使用SQL方式
   * 第三点、启动流式应用,设置Output结果相关信息、start方法启动应用
 */
object _04StructuredWordCount {
  def main(args: Array[String]): Unit = {
    // TODO: step1. 构建SparkSession实例对象,相关配置进行设置
    val spark: SparkSession = SparkSession.builder()
        .appName(this.getClass.getSimpleName.stripSuffix("$"))
        .master("local[2]")
        .config("spark.sql.shuffle.partitions", "2")
        .getOrCreate()
    import spark.implicits._
    // TODO: step2. 从TCP Socket加载数据,读取数据列名称为value,类型是String
    val inputStreamDF: DataFrame = spark.readStream
        .format("socket")
        .option("host", "node1.itcast.cn")
        .option("port", "9999")
        .load()
    /*
    root
      |-- value: string (nullable = true)
     */
    //inputStreamDF.printSchema()
    // TODO: step3. 进行词频统计
    /*
      table: words , column: value
          SQL: SELECT word, COUNT(1) AS count  FROM words GROUP BY word
     */
    val resultStreamDF: DataFrame =  inputStreamDF
      // hadoop spark hadoop spark spark  -> 分割单词,并且扁平化
      .select(explode(split(trim($"value"), "\\s+")).as("word"))
      .groupBy("word").count()
    /*
    root
     |-- word: string (nullable = true)
     |-- count: long (nullable = false)
     */
    // TODO: step4. 将结果输出(ResultTable结果输出,此时需要设置输出模式)
    val query: StreamingQuery = resultStreamDF.writeStream
      .outputMode(OutputMode.Update()) // 表示当词频更新时,再输出
      .format("console")
        .option("numRows", "10")
        .option("truncate", "false")
      // 启动流式应用
      .start()
    // TODO: step5. 启动流式应用后,等待终止
    query.awaitTermination()
    query.stop()
  }
}
amDF: DataFrame = spark.readStream
        .format("socket")
        .option("host", "node1.itcast.cn")
        .option("port", "9999")
        .load()
    /*
    root
      |-- value: string (nullable = true)
     */
    //inputStreamDF.printSchema()
    // TODO: step3. 进行词频统计
    /*
      table: words , column: value
          SQL: SELECT word, COUNT(1) AS count  FROM words GROUP BY word
     */
    val resultStreamDF: DataFrame =  inputStreamDF
      // hadoop spark hadoop spark spark  -> 分割单词,并且扁平化
      .select(explode(split(trim($"value"), "\\s+")).as("word"))
      .groupBy("word").count()
    /*
    root
     |-- word: string (nullable = true)
     |-- count: long (nullable = false)
     */
    // TODO: step4. 将结果输出(ResultTable结果输出,此时需要设置输出模式)
    val query: StreamingQuery = resultStreamDF.writeStream
      .outputMode(OutputMode.Update()) // 表示当词频更新时,再输出
      .format("console")
        .option("numRows", "10")
        .option("truncate", "false")
      // 启动流式应用
      .start()
    // TODO: step5. 启动流式应用后,等待终止
    query.awaitTermination()
    query.stop()
  }
}


目录
相关文章
|
8月前
|
编译器 C++
【c++】入门4
【c++】入门4
61 2
|
3月前
|
C#
C#入门
C#入门
30 0
变分法入门介绍
读完这篇博文你可以了解变分的基本概念,以及使用变分法求解最简泛函的极值。本文没有严密的数学证明,只是感性地对变分法做一个初步了解。
129 0
|
5月前
|
编译器 程序员 C语言
C++入门
C++入门
42 5
|
8月前
|
Kubernetes 开发工具 Docker
K8S 极速入门
K8S 极速入门
101 0
|
Web App开发 移动开发 IDE
laya入门,这一篇应该够了
laya入门,这一篇应该够了
5241 1
|
存储 安全 小程序
c++入门(下)
c++入门(下)
|
编译器 C++
【C++】C++入门(三)
【C++】C++入门(三)
102 0
|
编译器 Linux C语言
【C++】入门(上)
【C++】入门(上)
200 0
【C++】入门(上)
|
程序员
三字棋基本入门
三字棋基本入门
126 0
三字棋基本入门