大数据Spark偏移量管理

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: 大数据Spark偏移量管理

1 重构代码

针对前面实现【百度热搜排行榜Top10】实时状态统计应用来说,当应用关闭以后,再次启动(Restart)执行,并没有继续从上次消费偏移量读取数据和获取以前状态信息,而是从最新偏移量(Latest Offset)开始的消费,肯定不符合实际需求,有两种解决方式:


方式一:Checkpoint 恢复

1.当流式应用再次启动时,从Checkpoint 检查点目录恢复,可以读取上次消费偏移量信息和状态相关数据,继续实时处理数据。

2.文档: http://spark.apache.org/docs/2.4.5/streaming-programming-guide.html#checkpointing

方式二:手动管理偏移量

1.用户编程管理每批次消费数据的偏移量,当再次启动应用时,读取上次消费偏移量信息,继续实时处理数据。

2.文档: http://spark.apache.org/docs/2.4.5/streaming-kafka-0-10-integration.html#storing-offsets

在实际生产项目中,常常使用第二种方式【手动管理偏移量】,将偏移量存储到MySQL、Redis或Zookeeper中,接下来讲解两种方式实现,都需要掌握。实际项目开发中,为了代码重构复用和代码简洁性,将【从数据源读取数据、实时处理及结果输出】封装到方法【processData】中,类的结构如下:


  1. Streaming流式应用模板完整代码:
import java.util.Date
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}
/**
 * SparkStreaming流式应用模板Template,将从数据源读取数据、实时处理及结果输出封装到方法中。
 */
object StreamingTemplate {
  /**
   * 抽象一个函数:专门从数据源读取流式数据,经过状态操作分析数据,最终将数据输出
   *
   * @param ssc 流式上下文StreamingContext实例对象
   */
  def processData(ssc: StreamingContext): Unit = {
    // TODO: 1. 从Kafka Topic实时消费数据
    val kafkaDStream: DStream[ConsumerRecord[String, String]] = {
      // i.位置策略
      val locationStrategy: LocationStrategy = LocationStrategies.PreferConsistent
      // ii.读取哪些Topic数据
      val topics = Array("search-log-topic")
      // iii.消费Kafka 数据配置参数
      val kafkaParams = Map[String, Object](
        "bootstrap.servers" -> "node1.oldlu.cn:9092",
        "key.deserializer" -> classOf[StringDeserializer],
        "value.deserializer" -> classOf[StringDeserializer],
        "group.id" -> "group_id_streaming_0002",
        "auto.offset.reset" -> "latest",
        "enable.auto.commit" -> (false: java.lang.Boolean)
      )
      // iv.消费数据策略
      val consumerStrategy: ConsumerStrategy[String, String] = ConsumerStrategies.Subscribe(
        topics, kafkaParams
      )
      // v.采用消费者新API获取数据
      KafkaUtils.createDirectStream(ssc, locationStrategy, consumerStrategy)
    }
    // TODO: 2. 词频统计,实时累加统计
    // 2.1 对数据进行ETL和聚合操作
    val reduceDStream: DStream[(String, Int)] = kafkaDStream.transform { rdd =>
      val reduceRDD: RDD[(String, Int)] = rdd
        // 过滤不合格的数据
        .filter { record =>
          val message: String = record.value()
          null != message && message.trim.split(",").length == 4
        }
        // 提取搜索词,转换数据为二元组,表示每个搜索词出现一次
        .map { record =>
          val keyword: String = record.value().trim.split(",").last
          keyword -> 1
        }
        // 按照单词分组,聚合统计
        .reduceByKey((tmp, item) => tmp + item) // TODO: 先聚合,再更新,优化
      // 返回
      reduceRDD
    }
    // 2.2 使用mapWithState函数状态更新, 针对每条数据进行更新状态
    val spec: StateSpec[String, Int, Int, (String, Int)] = StateSpec.function(
      // (KeyType, Option[ValueType], State[StateType]) => MappedType
      (keyword: String, countOption: Option[Int], state: State[Int]) => {
        // a. 获取当前批次中搜索词搜索次数
        val currentState: Int = countOption.getOrElse(0)
        // b. 从以前状态中获取搜索词搜索次数
        val previousState = state.getOption().getOrElse(0)
        // c. 搜索词总的搜索次数
        val latestState = currentState + previousState
        // d. 更行状态
        state.update(latestState)
        // e. 返回最新省份销售订单额
        (keyword, latestState)
      }
    )
    // 调用mapWithState函数进行实时累加状态统计
    val stateDStream: DStream[(String, Int)] = reduceDStream.mapWithState(spec)
    // TODO: 3. 统计结果打印至控制台
    stateDStream.foreachRDD { (rdd, time) =>
      val batchTime: String = FastDateFormat.getInstance("yyyy/MM/dd HH:mm:ss")
        .format(new Date(time.milliseconds))
      println("-------------------------------------------")
      println(s"BatchTime: $batchTime")
      println("-------------------------------------------")
      if (!rdd.isEmpty()) {
        rdd.coalesce(1).foreachPartition {
          _.foreach(println)
        }
      }
    }
  }
  // 应用程序入口
  def main(args: Array[String]): Unit = {
    // TODO: 构建流式上下文实例对象StreamingContext
    val ssc: StreamingContext = {
      // a. 创建SparkConf对象,设置应用配置信息
      val sparkConf = new SparkConf()
        .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
        .setMaster("local[3]")
        // TODO: 设置消费最大数量
        .set("spark.streaming.kafka.maxRatePerPartition", "10000")
      // b. 传递SparkConf和BatchInterval创建流式上下对象
      val context = new StreamingContext(sparkConf, Seconds(5))
      // c. 返回实例对象
      context
    }
    // TODO: 从数据源端消费数据,实时处理分析及最后输出
    processData(ssc)
    // TODO: 启动流式应用,等待终止(人为或程序异常)
    ssc.start()
    ssc.awaitTermination() // 流式应用启动以后,一直等待终止,否则一直运行
    // 无论是否异常最终关闭流式应用(优雅的关闭)
    ssc.stop(stopSparkContext = true, stopGracefully = true)
  }
}

如果流式应用业务复杂,可以将其单独抽取方法,对DStream数据进行处理分析。

2 Checkpoint 恢复

针对Spark Streaming状态应用程序,设置Checkpoint检查点目录,其中存储两种类型数据:

  • 第一类:元数据(Metadata checkpointing),保存定义了 Streaming 计算逻辑

1.应用程序的配置(Configuration): The configuration that was used to create thestreaming application;

2.DStream操作(DStream operations):The set of DStream operations that define thestreaming application;

3.没有完成批处理(Incomplete batches):Batches whose jobs are queued but have notcompleted yet;

第二类:数据(Data checkpointing),保存已生成的RDDs至可靠的存储

1.通常有状态的数据横跨多个batch流的时候,需要做checkpointMetadata Checkpointing 用来恢复 Driver;Data Checkpointing用来容错stateful的数据处理失败的场景 。

当我们再次运行Streaming Application时,只要从Checkpoint 检查点目录恢复,构建StreamingContext应用,就可以继续从上次消费偏移量消费数据。

使用StreamingContext中【getActiveOrCreate】方法构建StreamingContext实例对象,方法声明如下:

若Application为首次重启,将创建一个新的StreamingContext实例;如果Application从失败中重启,从checkpoint目录导入checkpoint数据来重新创建StreamingContext实例。伪代码如下:

修改上述案例代码:

import java.util.Date
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}
/**
 * SparkStreaming实现状态累计实时统计:DStream#mapWithState,当流式应用停止以后,再次启动时:
 * - 其一:继续上次消费Kafka数据偏移量消费数据:MetaData
 * - 其二:继续上次应用停止的状态累加更新状态:State
 */
object StreamingStateCkpt {
  // 检查点目录
  val CKPT_DIR: String = s"datas/streaming/state-ckpt-10002"
  /**
   * 抽象一个函数:专门从数据源读取流式数据,经过状态操作分析数据,最终将数据输出
   *
   * @param ssc 流式上下文StreamingContext实例对象
   */
  def processData(ssc: StreamingContext): Unit = {
    // 1. 从Kafka Topic实时消费数据
    val kafkaDStream: DStream[ConsumerRecord[String, String]] = {
      // i.位置策略
      val locationStrategy: LocationStrategy = LocationStrategies.PreferConsistent
      // ii.读取哪些Topic数据
      val topics = Array("search-log-topic")
      // iii.消费Kafka 数据配置参数
      val kafkaParams = Map[String, Object](
        "bootstrap.servers" -> "node1.oldlu.cn:9092",
        "key.deserializer" -> classOf[StringDeserializer],
        "value.deserializer" -> classOf[StringDeserializer],
        "group.id" -> "group_id_streaming_0002",
        "auto.offset.reset" -> "latest",
        "enable.auto.commit" -> (false: java.lang.Boolean)
      )
      // iv.消费数据策略
      val consumerStrategy: ConsumerStrategy[String, String] = ConsumerStrategies.Subscribe(
        topics, kafkaParams
      )
      // v.采用消费者新API获取数据
      KafkaUtils.createDirectStream(ssc, locationStrategy, consumerStrategy)
    }
    // 2. 词频统计,实时累加统计
    // 2.1 对数据进行ETL和聚合操作
    val reduceDStream: DStream[(String, Int)] = kafkaDStream.transform { rdd =>
      val reduceRDD: RDD[(String, Int)] = rdd
        // 过滤不合格的数据
        .filter { record =>
          val message: String = record.value()
          null != message && message.trim.split(",").length == 4
        }
        // 提取搜索词,转换数据为二元组,表示每个搜索词出现一次
        .map { record =>
          val keyword: String = record.value().trim.split(",").last
          keyword -> 1
        }
        // 按照单词分组,聚合统计
        .reduceByKey((tmp, item) => tmp + item) // TODO: 先聚合,再更新,优化
      // 返回
      reduceRDD
    }
    // 2.2 使用mapWithState函数状态更新, 针对每条数据进行更新状态
    val spec: StateSpec[String, Int, Int, (String, Int)] = StateSpec.function(
      // (KeyType, Option[ValueType], State[StateType]) => MappedType
      (keyword: String, countOption: Option[Int], state: State[Int]) => {
        // a. 获取当前批次中搜索词搜索次数
        val currentState: Int = countOption.getOrElse(0)
        // b. 从以前状态中获取搜索词搜索次数
        val previousState = state.getOption().getOrElse(0)
        // c. 搜索词总的搜索次数
        val latestState = currentState + previousState
        // d. 更行状态
        state.update(latestState)
        // e. 返回最新省份销售订单额
        (keyword, latestState)
      }
    )
    // 调用mapWithState函数进行实时累加状态统计
    val stateDStream: DStream[(String, Int)] = reduceDStream.mapWithState(spec)
    // 3. 统计结果打印至控制台
    stateDStream.foreachRDD { (rdd, time) =>
      val batchTime: String = FastDateFormat.getInstance("yyyy/MM/dd HH:mm:ss")
        .format(new Date(time.milliseconds))
      println("-------------------------------------------")
      println(s"BatchTime: $batchTime")
      println("-------------------------------------------")
      if (!rdd.isEmpty()) {
        rdd.coalesce(1).foreachPartition {
          _.foreach(println)
        }
      }
    }
  }
  // 应用程序入口
  def main(args: Array[String]): Unit = {
    // TODO: 构建流式上下文实例对象StreamingContext
    /*
    def getActiveOrCreate(
    checkpointPath: String,
    creatingFunc: () => StreamingContext,
    hadoopConf: Configuration = SparkHadoopUtil.get.conf,
    createOnError: Boolean = false
    ): StreamingContext
    */
    val ssc: StreamingContext = StreamingContext.getActiveOrCreate(
      CKPT_DIR, // 检查点目录,第一次运行时没有,构建新的,调用如下方法
      // TODO: 第一次运行应用时,一切都是新的,需要创建和指定;非第一次一切都是检查点目录数据恢复
      () => {
        // a. 创建SparkConf对象,设置应用配置信息
        val sparkConf = new SparkConf()
          .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
          .setMaster("local[3]")
          // TODO: 设置消费最大数量
          .set("spark.streaming.kafka.maxRatePerPartition", "10000")
        // b. 传递SparkConf和BatchInterval创建流式上下对象
        val context = new StreamingContext(sparkConf, Seconds(5))
        // c. 设置检查点目录
        context.checkpoint(CKPT_DIR)
        // d. 读取数据、处理数据和输出数据
        processData(context)
        // e. 返回StreamingContext对象
        context
      }
    )
    // 启动流式应用,等待终止(人为或程序异常)
    ssc.start()
    ssc.awaitTermination() // 流式应用启动以后,一直等待终止,否则一直运行
    // 无论是否异常最终关闭流式应用(优雅的关闭)
    ssc.stop(stopSparkContext = true, stopGracefully = true)
  }
}

当Streaming Application再次运行时,从Checkpoint检查点目录恢复时,有时有问题,比如修改程序,再次从运行时,可能出现类型转换异常,如下所示:

ERROR Utils: Exception encountered
java.lang.ClassCastException: cannot assign instance of
cn.oldlu.spark.ckpt.StreamingCkptState$$anonfun$streamingProcess$1 to field
org.apache.spark.streaming.dstream.ForEachDStream.org$apache$spark$streaming$dstream$ForEachDStream$$
foreachFunc of type scala.Function2 in instance of org.apache.spark.streaming.dstream.ForEachDStream
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2024)

原因在于修改DStream转换操作,在检查点目录中存储的数据没有此类的相关代码,所以保存

ClassCastException异常。

此时无法从检查点读取偏移量信息和转态信息,所以SparkStreaming中Checkpoint功能,属于

鸡肋,食之无味,弃之可惜。解决方案:


1)、针对状态信息:当应用启动时,从外部存储系统读取最新状态,比如从MySQL表读取,或

者从Redis读取;

  • 2)、针对偏移量数据:自己管理偏移量,将偏移量存储到MySQL表、Zookeeper、HBase或Redis;

3 MySQL 存储偏移量

此处将偏移量数据存储到MySQL表中,数据库及表的DDL和DML语句如下:

-- 1. 创建数据库的语句
CREATE DATABASE IF NOT EXISTS db_spark DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
USE db_spark ;
-- 2. 创建表的语句
CREATE TABLE `tb_offset` (
`topic` varchar(255) NOT NULL,
`partition` int NOT NULL,
`groupid` varchar(255) NOT NULL,
`offset` bigint NOT NULL,
PRIMARY KEY (`topic`,`partition`,`groupid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci ;
-- 3. 插入数据语句replace
replace into tb_offset (`topic`, `partition`, `groupid`, `offset`) values(?, ?, ?, ?)
--/*
-- replace语句执行时,分以下两种情况:
-- - 情况1:insert
-- 当不存在主键冲突或唯一索引冲突,相当于insert操作
-- - 情况2:delete and insert
-- 当存在主键冲突或唯一索引冲突,相当于delete操作,加insert操作
--*/
-- 4. 查询数据语句select
select * from tb_offset where topic in ('xx', 'yy') AND groupid = 'gid001' ;
select * from tb_offset where topic in (?) and groupid = ? ;

3.1 编写工具类

工具类OffsetsUtils从MySQL数据库表中读取消费的偏移量信息和保存最近消费的偏移量值,示意图如下所示:

工 具 类 中 包 含 如 何 保 存 偏 移 量 【 saveOffsetsToTable 】 和 读 取 偏 移 量

【getOffsetsToMap】两个函数,相关声明如下:

import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.kafka010.OffsetRange
/**
 * 将消费Kafka Topic偏移量数据存储MySQL数据库,工具类用于读取和保存偏移量数据
 */
object OffsetsUtils {
  /**
   * 依据Topic名称和消费组GroupId获取各个分区的偏移量
   *
   * @param topicNames Topics名称
   * @param groupId    消费组ID
   */
  def getOffsetsToMap(topicNames: Array[String], groupId: String): Map[TopicPartition, Long] = {
    null
  }
  /**
   * 保存Streaming每次消费Kafka数据后最新偏移量到MySQL表中
   *
   * @param offsetRanges Topic中各个分区消费偏移量范围
   * @param groupId      消费组ID
   */
  def saveOffsetsToTable(offsetRanges: Array[OffsetRange], groupId: String): Unit = {
  }
}

依据业务实现工具类中方法,主要考察就是对MySQL数据库表数据的操作:从表中读

取数据和向表写入数据,完整代码如下:

package cn.oldlu.spark.offset
import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}
import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.kafka010.OffsetRange
import scala.collection.mutable
/**
 * 将消费Kafka Topic偏移量数据存储MySQL数据库,工具类用于读取和保存偏移量数据
 * 表的创建语句:
 * CREATE TABLE `tb_offset` (
 * `topic` varchar(255) NOT NULL,
 * `partition` int NOT NULL,
 * `groupid` varchar(255) NOT NULL,
 * `offset` bigint NOT NULL,
 * PRIMARY KEY (`topic`,`partition`,`groupid`)
 * ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
 */
object OffsetsUtils {
  /**
   * 依据Topic名称和消费组GroupId获取各个分区的偏移量
   *
   * @param topicNames Topics名称
   * @param groupId    消费组ID
   */
  def getOffsetsToMap(topicNames: Array[String], groupId: String): Map[TopicPartition, Long] = {
    // 构建集合
    val map: mutable.Map[TopicPartition, Long] = scala.collection.mutable.Map[TopicPartition, Long]()
    // 声明变量
    var conn: Connection = null
    var pstmt: PreparedStatement = null
    var result: ResultSet = null
    try {
      // a. 加载驱动类
      Class.forName("com.mysql.cj.jdbc.Driver")
      // b. 获取连接
      conn = DriverManager.getConnection(
        "jdbc:mysql://node1.oldlu.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnic
          ode = true", //
          "root", //
          "123456" //
          )
          // c. 编写SQL,获取PreparedStatement对象
          val topicNamesStr = topicNames.map (topicName => s"\'$topicName\'").mkString(", ")
      val querySQL =
        s"""
           |SELECT
           | `topic`, `partition`, `groupid`, `offset`
           |FROM
           | db_spark.tb_offset
           |WHERE
           | groupid = ? AND topic in ($topicNamesStr)
           |""".stripMargin
      pstmt = conn.prepareStatement(querySQL)
      pstmt.setString(1, groupId)
      // d. 查询数据
      result = pstmt.executeQuery()
      // e. 遍历获取值
      while (result.next()) {
        val topicName = result.getString("topic")
        val partitionId = result.getInt("partition")
        val offset = result.getLong("offset")
        // 加入集合中
        map += new TopicPartition(topicName, partitionId) -> offset
      }
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      if (null != result) result.close()
      if (null != pstmt) pstmt.close()
      if (null != conn) conn.close()
    }
    // 返回集合,转换为不可变的
    map.toMap
  }
/**
 * 保存Streaming每次消费Kafka数据后最新偏移量到MySQL表中
 *
 * @param offsetRanges Topic中各个分区消费偏移量范围
 * @param groupId      消费组ID
 */
def saveOffsetsToTable(offsetRanges: Array[OffsetRange], groupId: String): Unit = {
  // 声明变量
  var conn: Connection = null
  var pstmt: PreparedStatement = null
  try {
    // a. 加载驱动类
    Class.forName("com.mysql.cj.jdbc.Driver")
    // b. 获取连接
    conn = DriverManager.getConnection(
      "jdbc:mysql://node1.oldlu.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnic
        ode = true", //
        "root", //
        "123456" //
        )
        // c. 编写SQL,获取PreparedStatement对象
        val insertSQL = "replace into db_spark.tb_offset (`topic`, `partition`, `groupid`, `offset`) values (?, ?, ?, ?)"
        pstmt = conn.prepareStatement (insertSQL)
        // d. 设置参数
        offsetRanges.foreach {offsetRange =>
        pstmt.setString (1, offsetRange.topic)
        pstmt.setInt (2, offsetRange.partition)
        pstmt.setString (3, groupId)
        pstmt.setLong (4, offsetRange.untilOffset)
        // 加入批次
        pstmt.addBatch ()
        }
        // e. 批量插入
        pstmt.executeBatch ()
        } catch {
        case e: Exception => e.printStackTrace ()
        } finally {
        if (null != pstmt) pstmt.close ()
        if (null != conn) conn.close ()
        }
      }
        def main (args: Array[String] ): Unit = {
        /*
        saveOffsetsToTable(
        Array(
        OffsetRange("xx-tp", 0, 11L, 100L),
        OffsetRange("xx-tp", 1, 11L, 100L),
        OffsetRange("yy-tp", 0, 10L, 500L),
        OffsetRange("yy-tp", 1, 10L, 500L)
        ),
        "group_id_00001"
        )
        */
        //getOffsetsToMap(Array("xx-tp"), "group_id_00001").foreach(println)
        }
    }

3.2 加载和保存偏移量

从Kafka Topic消费数据时,首先从MySQL数据库加载偏移量,如果有值,使用如下函数:

从Kafka Topic消费数据时,直接获取的DStream中每批次RDD都是KafkaRDD,可以获取数据偏移量范围信息OffsetRanges。

修改前面实时订单消费额统计代码,自己管理消费偏移量,存储到MySQL表中,代码如下:

import java.util.Date
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}
/**
 * SparkStreaming从Kafka Topic实时消费数据,手动管理消费偏移量,保存至MySQL数据库表
 */
object StreamingManagerOffsets {
  /**
   * 抽象一个函数:专门从数据源读取流式数据,经过状态操作分析数据,最终将数据输出
   */
  def processData(ssc: StreamingContext): Unit = {
    // 1. 从Kafka Topic实时消费数据
    val groupId: String = "group_id_10001" // 消费者GroupID
    val kafkaDStream: DStream[ConsumerRecord[String, String]] = {
      // i.位置策略
      val locationStrategy: LocationStrategy = LocationStrategies.PreferConsistent
      // ii.读取哪些Topic数据
      val topics = Array("search-log-topic")
      // iii.消费Kafka 数据配置参数
      val kafkaParams = Map[String, Object](
        "bootstrap.servers" -> "node1.oldlu.cn:9092",
        "key.deserializer" -> classOf[StringDeserializer],
        "value.deserializer" -> classOf[StringDeserializer],
        "group.id" -> groupId,
        "auto.offset.reset" -> "latest",
        "enable.auto.commit" -> (false: java.lang.Boolean)
      )
      // iv.消费数据策略
      // TODO: 从MySQL数据库表中获取偏移量信息
      val offsetsMap: Map[TopicPartition, Long] = OffsetsUtils.getOffsetsToMap(topics, groupId)
      val consumerStrategy: ConsumerStrategy[String, String] = if (offsetsMap.isEmpty) {
        // TODO: 如果第一次消费topic数据,此时MySQL数据库表中没有偏移量信息, 从最大偏移量消费数据
        ConsumerStrategies.Subscribe(topics, kafkaParams)
      } else {
        // TODO: 如果不为空,指定消费偏移量
        ConsumerStrategies.Subscribe(topics, kafkaParams, offsetsMap)
      }
      // v.采用消费者新API获取数据
      KafkaUtils.createDirectStream(ssc, locationStrategy, consumerStrategy)
    }
    // TODO: 其一、创建空Array数组,指定类型
    var offsetRanges: Array[OffsetRange] = Array.empty[OffsetRange]
    // 2. 词频统计,实时累加统计
    // 2.1 对数据进行ETL和聚合操作
    val reduceDStream: DStream[(String, Int)] = kafkaDStream.transform { rdd =>
      // TODO:其二、从KafkaRDD中获取每个分区数据对应的偏移量信息
      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      val reduceRDD: RDD[(String, Int)] = rdd
        // 过滤不合格的数据
        .filter { record =>
          val message: String = record.value()
          null != message && message.trim.split(",").length == 4
        }
        // 提取搜索词,转换数据为二元组,表示每个搜索词出现一次
        .map { record =>
          val keyword: String = record.value().trim.split(",").last
          keyword -> 1
        }
        // 按照单词分组,聚合统计
        .reduceByKey((tmp, item) => tmp + item) // TODO: 先聚合,再更新,优化
      // 返回
      reduceRDD
    }
    // 2.2 使用mapWithState函数状态更新, 针对每条数据进行更新状态
    val spec: StateSpec[String, Int, Int, (String, Int)] = StateSpec.function(
      // (KeyType, Option[ValueType], State[StateType]) => MappedType
      (keyword: String, countOption: Option[Int], state: State[Int]) => {
        // a. 获取当前批次中搜索词搜索次数
        val currentState: Int = countOption.getOrElse(0)
        // b. 从以前状态中获取搜索词搜索次数
        val previousState = state.getOption().getOrElse(0)
        // c. 搜索词总的搜索次数
        val latestState = currentState + previousState
        // d. 更行状态
        state.update(latestState)
        // e. 返回最新省份销售订单额
        (keyword, latestState)
      }
    )
    // 设置状态的初始值,比如状态数据保存Redis中,此时可以从Redis中读取
    //spec.initialState(ssc.sparkContext.parallelize(List("罗志祥" -> 123, "裸海蝶" -> 342)))
    // 调用mapWithState函数进行实时累加状态统计
    val stateDStream: DStream[(String, Int)] = reduceDStream.mapWithState(spec)
    // 3. 统计结果打印至控制台
    stateDStream.foreachRDD { (rdd, time) =>
      val batchTime: String = FastDateFormat.getInstance("yyyy/MM/dd HH:mm:ss")
        .format(new Date(time.milliseconds))
      println("-------------------------------------------")
      println(s"BatchTime: $batchTime")
      println("-------------------------------------------")
      if (!rdd.isEmpty()) {
        rdd.coalesce(1).foreachPartition {
          _.foreach(println)
        }
      }
      // TODO: 其三、保存每批次数据偏移量到MySQL数据库表中
      OffsetsUtils.saveOffsetsToTable(offsetRanges, groupId)
    }
  }
  // 应用程序入口
  def main(args: Array[String]): Unit = {
    // 1). 构建流式上下文StreamingContext实例对象
    val ssc: StreamingContext = StreamingContext.getActiveOrCreate(
      () => {
        // a. 创建SparkConf对象
        val sparkConf = new SparkConf()
          .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
          .setMaster("local[3]")
        // b. 创建StreamingContext对象
        val context = new StreamingContext(sparkConf, Seconds(5))
        // c. 返回
        context
      }
    )
    ssc.checkpoint(s"datas/streaming/offsets-${System.nanoTime()}")
    // 2). 实时消费Kafka数据,统计分析
    processData(ssc)
    // 3). 启动流式应用,等待终止(人为或程序异常)
    ssc.start()
    ssc.awaitTermination() // 流式应用启动以后,一直等待终止,否则一直运行
    // 无论是否异常最终关闭流式应用(优雅的关闭)
    ssc.stop(stopSparkContext = true, stopGracefully = true)
  }
}

经过测试发现完全没有问题的,代码可以进一步优化,提高性能:由于每批次数据结果RDD输出以后,都需要向MySQL数据库表更新偏移量数据,频繁连接数据库,建议构建数据库连接池,每次从池子中获取连接。

实际项目中将偏移量保存至Zookeeper上或者Redis中,原因如下:


1)、保存Zookeeper上:方便使用Kafka 监控工具管理Kafka 各个Topic被消费信息;

2)、保存Redis上:从Redis读取数据和保存数据很快,基于内存数据库;


5460de1e7c8c44b49d6bc53ce0723e8c.png

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
1月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
142 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
1月前
|
SQL 机器学习/深度学习 分布式计算
Spark快速上手:揭秘大数据处理的高效秘密,让你轻松应对海量数据
【10月更文挑战第25天】本文全面介绍了大数据处理框架 Spark,涵盖其基本概念、安装配置、编程模型及实际应用。Spark 是一个高效的分布式计算平台,支持批处理、实时流处理、SQL 查询和机器学习等任务。通过详细的技术综述和示例代码,帮助读者快速掌握 Spark 的核心技能。
92 6
|
1月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
115 2
|
1月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第26天】本文详细探讨了Hadoop与Spark在大数据处理中的协同作用,通过具体案例展示了两者的最佳实践。Hadoop的HDFS和MapReduce负责数据存储和预处理,确保高可靠性和容错性;Spark则凭借其高性能和丰富的API,进行深度分析和机器学习,实现高效的批处理和实时处理。
84 1
|
1月前
|
分布式计算 Java 开发工具
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。
|
1月前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
72 1
|
2月前
|
分布式计算 大数据 Apache
利用.NET进行大数据处理:Apache Spark与.NET for Apache Spark
【10月更文挑战第15天】随着大数据成为企业决策和技术创新的关键驱动力,Apache Spark作为高效的大数据处理引擎,广受青睐。然而,.NET开发者面临使用Spark的门槛。本文介绍.NET for Apache Spark,展示如何通过C#和F#等.NET语言,结合Spark的强大功能进行大数据处理,简化开发流程并提升效率。示例代码演示了读取CSV文件及统计分析的基本操作,突显了.NET for Apache Spark的易用性和强大功能。
63 1
|
2月前
|
消息中间件 分布式计算 Kafka
大数据平台的毕业设计02:Spark与实时计算
大数据平台的毕业设计02:Spark与实时计算
120 0
|
1月前
|
存储 分布式计算 数据挖掘
数据架构 ODPS 是什么?
数据架构 ODPS 是什么?
347 7
|
1月前
|
存储 分布式计算 大数据
大数据 优化数据读取
【11月更文挑战第4天】
53 2