大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: 大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)

HDFS(已更完)

MapReduce(已更完)

Hive(已更完)

Flume(已更完)

Sqoop(已更完)

Zookeeper(已更完)

HBase(已更完)

Redis (已更完)

Kafka(已更完)

Spark(正在更新!)

章节内容

上节完成了如下的内容:


Spark Streaming 与 Kafka

08和10版本的接口对比

Producer、KafkaDStream 实例代码

3e9e311d2b81353fbb8ce9b0494fe66d_f3090593eb65433a8064e9fe6e969f72.png Offset 管理

Spark Streaming 集成Kafka,允许从Kafka中读取一个或者多个Topic的数据,一个Kafka Topic包含一个或者多个分区,每个分区中的消息顺序存储,并使用offset来标记消息位置,开发者可以在Spark Streaming应用中通过offset来控制数据的读取位置。

Offsets 管理对于保证流式应用在整个生命周期中数据的连贯性是非常重要的,如果在应用停止或者报错退出之前将Offset持久化保存,该消息就会丢失,那么Spark Streaming就没有办法从上次停止或保存的位置继续消费Kafka中的消息。


获取偏移量(Obtaining Offsets)

Spark Streaming 与 Kafka 整合时,允许获取其消费的Offset,具体方法如下:

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  rdd.foreachPartition { iter => val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
  println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
  }
}

注意:对 HashOffsetRanges的类型转换只有在对 createDirectStream 调用的第一个方法中完成时才会成功,而不是在随后的方法链中。RDD分区和Kafka分区之间的对应关系在Shuffle或重分区后会丧失,如 reduceByKey 或 window。


存储偏移量(Storing Offsets)

在 Spark Streaming程序失败的情况下,Kafka交付语义取决于 如何、何时存储偏移量,Spark输出操作的语义为 at-least-once。

如果要实现EOS语义(Exactly Once Semantics),必须在幂等的输出之后存储偏移量或者将存储偏移量与输出放在一个事务中。可以按照增加可靠性(和代码复杂度)的顺序使用以下选项来存储偏移量。


CheckPoint

CheckPoint 是 Spark Streaming 运行过程中的元数据和每RDDs的数据状态保存到一个持久化系统中,当然这里面也包含了Offset,一般是 HDFS、S3,如果应用程序或者集群挂了,可以迅速恢复。


如果 Spark Streaming程序代码变了,重新打包执行就会出现反序列化异常的问题。

这是因为 CheckPoint 首次持久化时会将整个Jar包序列化,以便重启时恢复,重新打包后,新旧逻辑不一致,导致报错。


要解决这个问题,只能将HDFS上的CheckPoint文件删除,但这样也会同时删除Kafka的Offset信息。


Kafka

默认情况下,消费者定期自动提交偏移量,它将偏移量存储子啊一个特殊的Kafka主题中(_consumer_offsets),但在某些情况下,这将导致问题,因为消息可能已经被消费者从Kafka拉取了,但是还没有处理。


可以将 enable.auto.commit 设置为 false,在 Spark Streaming程序输出结果后,手动提交偏移。

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  // 在输出操作完成之后,手工提交偏移量;此时将偏移量提交到 Kafka 的消息队列中
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

与 HasOffsetRanges 一样,只有在 createDirectStream 的结果上调用时,转换到 CanCommitOffsets 才会成功,而不是在转换之后,commitAsync调用是线程安全的,但必须在输出之后执行。


自定义存储

Offsets可以通过多种方式来管理,但是一般来说遵循下面的步骤:


在 DStream 初始化的时候,需要指定每个分区的Offsets用于从指定位置读取数据

读取并处理消息

处理完之后存储结果数据

用虚线存储和提交 Offset,敲掉用户可能会执行一系列操作来满足他们更加严格的语义要求。这包括幂等操作和通过原子操作的方式存储Offset

将 Offsets 保存在外部持久化数据库,如 HBase、Kafka、HDFS、ZooKeeper、Redis、MySQL

6287541f1fab94d316a45b60efc5fb78_e7d86b6e0e4444a18ab8d10d2e3027ae.png

可以将 Offsets 存储到 HDFS 中,但这并不是一个好的方案,因为 HDFS 的延迟很高,此外将每批数据的 Offset 存储到 HDFS 中还会带来小文件过大的问题。

可以将 Offsets 存储到 ZK 中,但是将ZK作为存储,也并不是一个明智的选择,同时ZK也不适合频繁的读写操作

Redis管理Offset

要想将 Offset 保存到外部的存储中,关键实现以下几个功能:


Spark Streaming程序启动的时候,从外部存储获取保存的Offsets(执行一次)

在 foreachRDD中,每个批次数据处理之后,更新外部存储的Offsets(执行多次)

Redis管理Offsets:

数据结构选择:Hash、Key、Field、Value

Key:kafka:topic:topicName:groupId

Value:offset

从 Redis 中获取到保存的 Offsets

消费数据后将 Offsets 保存到 Redis

自定义Offsets:Kafka读数据 处理完打印Offsets


package icu.wzk

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object kafkaDStream2 {

  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.ERROR)
    val conf = new SparkConf()
      .setAppName("KafkaDStream2")
      .setMaster("local[*]")

    val ssc = new StreamingContext(conf, Seconds(2))
    val kafkaParams: Map[String, Object] = getKafkaConsumerParameters("wzkicu")
    val topics: Array[String] = Array("spark_streaming_test01")

        // 从指定位置获取Kafka数据
    val offsets: collection.Map[TopicPartition, Long] = Map(
      new TopicPartition("spark_streaming_test01",0) -> 100,
      // 我这里只有一个分区 你可以多创建几个
      // new TopicPartition("spark_streaming_test01", 1) -> 200,
      // new TopicPartition("spark_streaming_test01", 2) -> 300,
    )

    // 从Kafka中获取数据
    val dstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets)
    )

    // DStream 输出
    dstream.foreachRDD  {
      (rdd, time) => {
        println(s"=========== rdd.count = ${rdd.count()}, time = $time ==============")
      }
        val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        rdd.foreachPartition {
          iter => val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
            println(s"${o.topic}, ${o.partition}, ${o.fromOffset}, ${o.untilOffset}")
        }
    }

    ssc.start()
    ssc.awaitTermination()

  }

  private def getKafkaConsumerParameters(groupId: String): Map[String, Object] = {
    Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "h121.wzk.icu:9092",
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.GROUP_ID_CONFIG -> groupId,
      ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest",
      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean)
    )
  }
}

运行结果如下图所示:

目录
相关文章
|
28天前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
82 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
29天前
|
SQL 机器学习/深度学习 分布式计算
Spark快速上手:揭秘大数据处理的高效秘密,让你轻松应对海量数据
【10月更文挑战第25天】本文全面介绍了大数据处理框架 Spark,涵盖其基本概念、安装配置、编程模型及实际应用。Spark 是一个高效的分布式计算平台,支持批处理、实时流处理、SQL 查询和机器学习等任务。通过详细的技术综述和示例代码,帮助读者快速掌握 Spark 的核心技能。
51 6
|
27天前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
86 2
|
28天前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第26天】本文详细探讨了Hadoop与Spark在大数据处理中的协同作用,通过具体案例展示了两者的最佳实践。Hadoop的HDFS和MapReduce负责数据存储和预处理,确保高可靠性和容错性;Spark则凭借其高性能和丰富的API,进行深度分析和机器学习,实现高效的批处理和实时处理。
65 1
|
28天前
|
分布式计算 Java 开发工具
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。
|
29天前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
54 1
|
2月前
|
分布式计算 大数据 Java
大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友
大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友
51 5
|
2月前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
51 3
|
2月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
42 0
|
2月前
|
分布式计算 大数据 Java
大数据-86 Spark 集群 WordCount 用 Scala & Java 调用Spark 编译并打包上传运行 梦开始的地方
大数据-86 Spark 集群 WordCount 用 Scala & Java 调用Spark 编译并打包上传运行 梦开始的地方
27 1
大数据-86 Spark 集群 WordCount 用 Scala & Java 调用Spark 编译并打包上传运行 梦开始的地方