大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: 大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)

HDFS(已更完)

MapReduce(已更完)

Hive(已更完)

Flume(已更完)

Sqoop(已更完)

Zookeeper(已更完)

HBase(已更完)

Redis (已更完)

Kafka(已更完)

Spark(正在更新!)

章节内容

上节完成了如下的内容:


Spark Streaming Kafka

自定义管理Offset Scala代码实现

3e9e311d2b81353fbb8ce9b0494fe66d_f3090593eb65433a8064e9fe6e969f72.png Offset 管理

Spark Streaming 集成Kafka,允许从Kafka中读取一个或者多个Topic的数据,一个Kafka Topic包含一个或者多个分区,每个分区中的消息顺序存储,并使用offset来标记消息位置,开发者可以在Spark Streaming应用中通过offset来控制数据的读取位置。

Offsets 管理对于保证流式应用在整个生命周期中数据的连贯性是非常重要的,如果在应用停止或者报错退出之前将Offset持久化保存,该消息就会丢失,那么Spark Streaming就没有办法从上次停止或保存的位置继续消费Kafka中的消息。


Spark Streaming 与 Kafka 的集成

Spark Streaming 可以通过 KafkaUtils.createDirectStream 直接与 Kafka 集成。这种方式不会依赖于 ZooKeeper,而是直接从 Kafka 分区中读取数据。

在这种直接方式下,Spark Streaming 依赖 Kafka 的 API 来管理和存储消费者偏移量(Offsets),默认情况下偏移量保存在 Kafka 自身的 __consumer_offsets 主题中。


使用 Redis 管理 Offsets

Redis 作为一个高效的内存数据库,常用于存储 Spark Streaming 中的 Kafka 偏移量。

通过手动管理偏移量,你可以在每批次数据处理后,将当前批次的 Kafka 偏移量存储到 Redis 中。这样,在应用程序重新启动时,可以从 Redis 中读取最后处理的偏移量,从而从正确的位置继续消费 Kafka 数据。


实现步骤

从 Redis 获取偏移量

应用启动时,从 Redis 中读取上次处理的偏移量,并从这些偏移量开始消费 Kafka 数据。


处理数据

通过 Spark Streaming 处理从 Kafka 消费到的数据。


保存偏移量到 Redis

每处理完一批数据后,将最新的偏移量存储到 Redis 中。这样,如果应用程序崩溃或重启,可以从这个位置继续消费。


自定义Offsets:根据Key从Redis获取Offsets 处理完更新Redis

添加依赖

<!-- jedis -->
<dependency>
  <groupId>redis.clients</groupId>
  <artifactId>jedis</artifactId>
  <version>2.9.0</version>
</dependency>

服务器上我们需要有:

Redis服务启动

Kafka服务启动

编写代码,实现的主要逻辑如下所示:

package icu.wzk

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object KafkaDStream3 {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("args").setLevel(Level.ERROR)
    val conf = new SparkConf()
      .setAppName("KafkaDStream3")
      .setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(5))
    val groupId: String = "wzkicu"
    val topics: Array[String] = Array("spark_streaming_test01")
    val kafkaParams: Map[String, Object] = getKafkaConsumerParameters(groupId)

    // 从 Kafka 获取 Offsets
    val offsets: Map[TopicPartition, Long] = OffsetsRedisUtils.getOffsetsFromRedis(topics, groupId)

    // 创建 DStream
    val dstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets)
    )

    // DStream 转换&输出
    dstream.foreachRDD {
      (rdd, time) =>
        if (!rdd.isEmpty()) {
          // 处理消息
          println(s"====== rdd.count = ${rdd.count()}, time = $time =======")
          // 将 Offsets 保存到 Redis
          val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
          OffsetsRedisUtils.saveOffsetsToRedis(offsetRanges, groupId)
        }
    }

    ssc.start()
    ssc.awaitTermination()

  }

  private def getKafkaConsumerParameters(groupId: String): Map[String, Object] = {
    Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "h121.wzk.icu:9092",
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.GROUP_ID_CONFIG -> groupId,
      ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest",
      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean)
    )
  }
}

代码中我们封装了一个工具类:

package icu.wzk

import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.kafka010.OffsetRange
import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}

import scala.collection.mutable

object OffsetsRedisUtils {

  private val config = new JedisPoolConfig
  private val redisHost = "h121.wzk.icu"
  private val redisPort = 6379

  config.setMaxTotal(30)
  config.setMaxIdle(10)

  private val pool= new JedisPool(config, redisHost, redisPort, 10000)
  private val topicPrefix = "kafka:topic"

  private def getKey(topic: String, groupId: String, prefix: String = topicPrefix): String = s"$prefix:$topic:$groupId"
  private def getRedisConnection: Jedis = pool.getResource

  // 从Redis中获取Offsets
  def getOffsetsFromRedis(topics: Array[String], groupId: String): Map[TopicPartition, Long] = {
    val jedis: Jedis = getRedisConnection
    val offsets: Array[mutable.Map[TopicPartition, Long]] = topics.map {
      topic =>
        import scala.collection.JavaConverters._
        jedis.hgetAll(getKey(topic, groupId))
          .asScala
          .map {
            case (partition, offset) => new TopicPartition(topic, partition.toInt) -> offset.toLong
          }
    }
    jedis.close()
    offsets.flatten.toMap
  }

  // 将 Offsets 保存到 Redis
  def saveOffsetsToRedis(ranges: Array[OffsetRange], groupId: String): Unit = {
    val jedis: Jedis = getRedisConnection
    ranges
      .map(range => (range.topic, range.partition -> range.untilOffset))
      .groupBy(_._1)
      .map {
        case (topic, buffer) => (topic, buffer.map(_._2))
      }
      .foreach {
        case (topic, partitionAndOffset) =>
          val offsets: Array[(String, String)] = partitionAndOffset.map(elem => (elem._1.toString, elem._2.toString))
          import scala.collection.JavaConverters._
          jedis.hmset(getKey(topic, groupId), offsets.toMap.asJava)
      }

    jedis.close()
  }

}

我们启动后,如图所示:

这里我使用Redis查看当前的存储情况:

可以看到当前已经写入了,我们继续启动 KafkaProducer工具,继续写入数据。

可以看到,已经统计到数据了。

我们继续查看当前的Redis中的数据:

目录
相关文章
|
2月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
187 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
19天前
|
消息中间件 运维 大数据
道旅科技借助云消息队列 Kafka 版加速旅游大数据创新发展
阿里云云消息队列 Kafka 版 Serverless 系列凭借其卓越的弹性能力,为道旅科技提供了灵活高效的数据流处理解决方案。无论是应对突发流量还是规划长期资源需求,该方案均能帮助企业实现资源动态调整和成本优化,同时保障业务的高可用性和连续性。
|
2月前
|
SQL 机器学习/深度学习 分布式计算
Spark快速上手:揭秘大数据处理的高效秘密,让你轻松应对海量数据
【10月更文挑战第25天】本文全面介绍了大数据处理框架 Spark,涵盖其基本概念、安装配置、编程模型及实际应用。Spark 是一个高效的分布式计算平台,支持批处理、实时流处理、SQL 查询和机器学习等任务。通过详细的技术综述和示例代码,帮助读者快速掌握 Spark 的核心技能。
120 6
|
2月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
145 2
|
2月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第26天】本文详细探讨了Hadoop与Spark在大数据处理中的协同作用,通过具体案例展示了两者的最佳实践。Hadoop的HDFS和MapReduce负责数据存储和预处理,确保高可靠性和容错性;Spark则凭借其高性能和丰富的API,进行深度分析和机器学习,实现高效的批处理和实时处理。
124 1
|
2月前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
82 1
|
3月前
|
分布式计算 大数据 Apache
利用.NET进行大数据处理:Apache Spark与.NET for Apache Spark
【10月更文挑战第15天】随着大数据成为企业决策和技术创新的关键驱动力,Apache Spark作为高效的大数据处理引擎,广受青睐。然而,.NET开发者面临使用Spark的门槛。本文介绍.NET for Apache Spark,展示如何通过C#和F#等.NET语言,结合Spark的强大功能进行大数据处理,简化开发流程并提升效率。示例代码演示了读取CSV文件及统计分析的基本操作,突显了.NET for Apache Spark的易用性和强大功能。
78 1
|
2月前
|
分布式计算 流计算 Spark
【赵渝强老师】Spark Streaming中的DStream
本文介绍了Spark Streaming的核心概念DStream,即离散流。DStream通过时间间隔将连续的数据流转换为一系列不连续的RDD,再通过Transformation进行转换,实现流式数据的处理。文中以MyNetworkWordCount程序为例,展示了DStream生成RDD的过程,并附有视频讲解。
|
3月前
|
消息中间件 存储 druid
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
63 3
|
3月前
|
消息中间件 druid 大数据
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
53 2