sparkstreaming遇到的问题

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,182元/月
MSE Nacos/ZooKeeper 企业版试用,1600元额度,限量50份
云原生网关 MSE Higress,422元/月
简介: sparkstreaming遇到的问题

这篇文章介绍sparkstreaming对接kafka时遇到的两个offset的问题,首选我们介绍下offset的存储。

sparkstreaming offset存储

sparkstreaming采用kafkaUtils的createDirectStream()处理kafka数据的方式,会直接从kafka的broker的分区中读取数据,跳过zookeeper,并且没有receiver,是spark的task直接对接kafka topic partition。

由于这种方式没有经过ZK,topic的offset没有保存,当job重启后只能从最新的offset开始消费数据,造成重启过程中的消息丢失。

如果spark自动提交,会在sparkstreaming刚运行时就立马提交offset,如果这个时候Spark streaming消费信息失败了,那么offset也就错误提交了。

所以要在sparkstreaming中实现exactly-once恰好一次,必须

1.手动提交偏移量

2.处理完业务数据后再提交offset

手动维护偏移量 需设置kafka参数enable.auto.commit改为false

手动维护提交offset有两种选择:

1.处理完业务数据后手动提交到Kafka

    2.处理完业务数据后手动提交到本地库 如MySql、HBase

也可以将offset提交到zookeeper,但是经过我们测试,发现zookeeper不适合存储大量数据,在大数据量的情况下很容易崩溃。

我们来看下如何将offset存储到mysql中:

// 处理完 业务逻辑后,手动提交offset偏移量到本地Mysql中
stream.foreachRDD(rdd => {
      val sqlProxy = new SqlProxy()
      val client = DataSourceUtil.getConnection
      try {
        val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        for (or <- offsetRanges) {
          sqlProxy.executeUpdate(client, "replace into `offset_manager` (groupid,topic,`partition`,untilOffset) values(?,?,?,?)",
            Array(groupid, or.topic, or.partition.toString, or.untilOffset))
        }
      } catch {
        case e: Exception => e.printStackTrace()
      } finally {
        sqlProxy.shutdown(client)
      }
    })

HBase中也是类似的

inputDStream.foreachRDD((rdd, batchTime) => {
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      offsetRanges.foreach(offset => println(offset.topic, offset.partition, offset.fromOffset, offset.untilOffset))
      val newRDD = rdd.map(message => processMessage(message))
      newRDD.count()
      //save the offsets to HBase  批量处理把数据存储到Hbase当中
      saveOffsets(topic, consumerGroupID, offsetRanges, hbaseTableName, batchTime)
    })
    ssc
  }
  /**
    * 对数据进行处理
    * @param message
    * @return
    */
  def processMessage(message: ConsumerRecord[String, String]): ConsumerRecord[String, String] = {
    message
  }
  /*
Save Offsets into HBase
 */
  def saveOffsets(TOPIC_NAME: String, GROUP_ID: String, offsetRanges: Array[OffsetRange], hbaseTableName: String,
                  batchTime: org.apache.spark.streaming.Time) = {
    val hbaseConf = HBaseConfiguration.create()
    hbaseConf.addResource("src/main/resources/hbase-site.xml")
    val conn = ConnectionFactory.createConnection(hbaseConf)
    val table = conn.getTable(TableName.valueOf(hbaseTableName))
    val rowKey = TOPIC_NAME + ":" + GROUP_ID + ":" + String.valueOf(batchTime.milliseconds)
    val put = new Put(rowKey.getBytes)
    for (offset <- offsetRanges) {
      put.addColumn(Bytes.toBytes("offsets"), Bytes.toBytes(offset.partition.toString),
        Bytes.toBytes(offset.untilOffset.toString))
    }
    table.put(put)
    conn.close()
  }
  /*
Returns last committed offsets for all the partitions of a given topic from HBase in following cases.
  - CASE 1: SparkStreaming job is started for the first time. This function gets the number of topic partitions from
    Zookeeper and for each partition returns the last committed offset as 0
  - CASE 2: SparkStreaming is restarted and there are no changes to the number of partitions in a topic. Last
    committed offsets for each topic-partition is returned as is from HBase.
  - CASE 3: SparkStreaming is restarted and the number of partitions in a topic increased. For old partitions, last
    committed offsets for each topic-partition is returned as is from HBase as is. For newly added partitions,
    function returns last committed offsets as 0
 */
  def getLastCommittedOffsets(TOPIC_NAME: String, GROUP_ID: String, hbaseTableName: String, zkQuorum: String,
                              zkRootDir: String, sessionTimeout: Int, connectionTimeOut: Int): Map[TopicPartition, Long] = {
    val hbaseConf = HBaseConfiguration.create()
    hbaseConf.addResource("src/main/resources/hbase-site.xml")
    val zkUrl = zkQuorum + "/" + zkRootDir
    val zkClientAndConnection = ZkUtils.createZkClientAndConnection(zkUrl, sessionTimeout, connectionTimeOut)
    val zkUtils = new ZkUtils(zkClientAndConnection._1, zkClientAndConnection._2, false)
    val zKNumberOfPartitionsForTopic = zkUtils.getPartitionsForTopics(Seq(TOPIC_NAME)).get(TOPIC_NAME).toList.head.size
    //Connect to HBase to retrieve last committed offsets
    val conn = ConnectionFactory.createConnection(hbaseConf)
    val table = conn.getTable(TableName.valueOf(hbaseTableName))
    val startRow = TOPIC_NAME + ":" + GROUP_ID + ":" + String.valueOf(System.currentTimeMillis())
    val stopRow = TOPIC_NAME + ":" + GROUP_ID + ":" + 0
    val scan = new Scan()
    val scanner = table.getScanner(scan.setStartRow(startRow.getBytes).setStopRow(stopRow.getBytes).setReversed(true))
    val result = scanner.next()
    var hbaseNumberOfPartitionsForTopic = 0 //Set the number of partitions discovered for a topic in HBase to 0
    if (result != null) {
      //If the result from hbase scanner is not null, set number of partitions from hbase to the number of cells
      hbaseNumberOfPartitionsForTopic = result.listCells().size()
    }
    val fromOffsets = collection.mutable.Map[TopicPartition, Long]()
    if (hbaseNumberOfPartitionsForTopic == 0) {
      // initialize fromOffsets to beginning
      for (partition <- 0 to zKNumberOfPartitionsForTopic - 1) {
        fromOffsets += (new TopicPartition(TOPIC_NAME, partition) -> 0)
      }
    } else if (zKNumberOfPartitionsForTopic > hbaseNumberOfPartitionsForTopic) {
      // handle scenario where new partitions have been added to existing kafka topic
      for (partition <- 0 to hbaseNumberOfPartitionsForTopic - 1) {
        val fromOffset = Bytes.toString(result.getValue(Bytes.toBytes("offsets"), Bytes.toBytes(partition.toString)))
        fromOffsets += (new TopicPartition(TOPIC_NAME, partition) -> fromOffset.toLong)
      }
      for (partition <- hbaseNumberOfPartitionsForTopic to zKNumberOfPartitionsForTopic - 1) {
        fromOffsets += (new TopicPartition(TOPIC_NAME, partition) -> 0)
      }
    } else {
      //initialize fromOffsets from last run
      for (partition <- 0 to hbaseNumberOfPartitionsForTopic - 1) {
        val fromOffset = Bytes.toString(result.getValue(Bytes.toBytes("offsets"), Bytes.toBytes(partition.toString)))
        fromOffsets += (new TopicPartition(TOPIC_NAME, partition) -> fromOffset.toLong)
      }
    }
    scanner.close()
    conn.close()
    fromOffsets.toMap
  }
}

第一个问题 numRecords must not be negative

当*删除已经使用过的kafka topic,然后新建同名topic*,用spark streaming Kakfa createDirectStream方法时出现了"numRecords must not be negative"异常,这个异常是不合法的参数异常,RDD的记录数目必须不能是负数

异常分析

首先我们看异常打印出现问题的位置

org.apache.spark.streaming.scheduler.StreamInputInfo.InputInfoTracker的第38行

/**
 * :: DeveloperApi ::
 * Track the information of input stream at specified batch time.
 *
 * @param inputStreamId the input stream id
 * @param numRecords the number of records in a batch
 * @param metadata metadata for this batch. It should contain at least one standard field named
 *                 "Description" which maps to the content that will be shown in the UI.
 */
@DeveloperApi
case class StreamInputInfo(
    inputStreamId: Int, numRecords: Long, metadata: Map[String, Any] = Map.empty) {
  require(numRecords >= 0, "numRecords must not be negative")
  def metadataDescription: Option[String] =
    metadata.get(StreamInputInfo.METADATA_KEY_DESCRIPTION).map(_.toString)
}

代码38行,判断了numRecords是否大于等于0,当不满足条件时抛出异常,可判断此时numRecords<0。 numRecords的解释: numRecords: the number of records in a batch 应该是当前rdd中records 数目计算出了问题。

offsetRanges的计算逻辑

offsetRanges的定义

offsetRanges: offset ranges that define the Kafka data belonging to this RDD

在KafkaRDDPartition 40行找到kafka partition offsetRange的计算逻辑:

def count(): Long = untilOffset - fromOffset`
`fromOffset: per-topic/partition Kafka offset defining the (inclusive) starting point of the batch`
`untilOffset: per-topic/partition Kafka offset defining the (inclusive) ending point of the batch

fromOffset来自zk中保存; untilOffset通过DirectKafkaInputDStream第145行:

val untilOffsets = clamp(latestLeaderOffsets(maxRetries))

计算得到,计算过程得到最新的offset,然后使用spark.streaming.kafka.maxRatePerPartition做clamp,得到允许的最大untilOffsets,##而此时新建的topic,如果topic中没有数据,untilOffsets应该为0##

原因总结

当删除一个topic时,zk中的offset信息并没有被清除,因此KafkaDirectStreaming再次启动时仍会得到旧的topic offset为old_offset,作为fromOffset。 当新建了topic后,使用untiloffset计算逻辑,得到untilOffset为0(如果topic已有数据则>0); 再次被启动的KafkaDirectStreaming Job通过异常的计算逻辑得到的rdd numRecords值为可计算为: numRecords = untilOffset - fromOffset(old_offset)\当untilOffset < old_offset时,此异常会出现,对于新建的topic这种情况的可能性很大

解决方法

根据以上分析,可在确定KafkaDirectStreaming 的fromOffsets时判断fromOffset与untiloffset的大小关系,当untilOffset < fromOffset时,矫正fromOffset为offset初始值0。

从zk获取topic/partition 的fromOffset利用SimpleConsumer获取每个partiton的lastOffset(untilOffset )判断每个partition lastOffset与fromOffset的关系当lastOffset < fromOffset时,将fromOffset赋值为0 通过以上步骤完成fromOffset的值矫正。

矫正offset的核心代码如下:

/** 以下 矫正 offset */
    // 得到Topic/partition 的lastOffsets
    Map&lt;TopicAndPartition, Long&gt; topicAndPartitionLongMap =
        KafkaOffsetTool.getInstance().getLastOffset(kafkaParams.get("metadata.broker.list"),
            topicList, "my.group.id");
    // 遍历每个Topic.partition
    for (Map.Entry&lt;TopicAndPartition, Long&gt; topicAndPartitionLongEntry : fromOffsets.entrySet()) {
      // fromOffset &gt; lastOffset时
      if (topicAndPartitionLongEntry.getValue() &gt;
          topicAndPartitionLongMap.get(topicAndPartitionLongEntry.getKey())) {
         //矫正fromoffset为offset初始值0
        topicAndPartitionLongEntry.setValue(0L);
      }
    }
    /** 以上 矫正 offset */

第二个问题 Offsets out of range

Kafka DirectStream 读取topic中数据做测试,停止了一段时间,再次启动时出现了kafka.common.OffsetOutOfRangeException

异常如下:

0/12/16 11:08:33 WARN TaskSetManager: Lost task 2.0 in stage 105.0 (TID 85, localhost, executor driver): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {register_topic-5=23550}

异常分析

offset越界分为头越界和尾越界,头部越界是zookeeper中保存的offset在topic中仍然存在的最老message的offset之前时(zk_offset < earliest_offset);尾越界是zookeeper中保存的offset在topic中最新message的offset之后(zk_offset > last_offset),我们在前面遇到了这个问题,并做了处理,因此这个问题应该是头部越界导致。

经过分析,我们有一段时间没有消费topic中的数据了,大概已经过了七天,而kafka broker中我们设置的log保存时间为七天


因此,应该是kafka 中未被消费的数据被broker清除了,使得从zookeeper中读取到的offset落在仍存在的最老message offset的左侧,本来合法的offset变得不非法了。

解决方法

首先想到的方法就是 streaming job要及时消费掉topic中的数据,消费延迟不得大于log.retention.ms的配置。 但是更好的办法是在遇到该问题时,依然能让job正常运行,因此就需要在发现zk_offset时矫正zk_offset为合法值

矫正offset的核心的代码如下:

/** 以下 矫正 offset */
    // lastest offsets
    Map&lt;TopicAndPartition, Long&gt; lastestTopicAndPartitionLongMap =
        KafkaOffsetTool.getInstance().getLastOffset(kafkaParams.get("metadata.broker.list"),
            Lists.newArrayList(topicsSet), kafkaParams.get(Constants.KAFKA_CONSUMER_GROUP_ID));
    // earliest offsets
    Map&lt;TopicAndPartition, Long&gt; earliestTopicAndPartitionLongMap =
        KafkaOffsetTool.getInstance().getEarliestOffset(kafkaParams.get("metadata.broker.list"),
            Lists.newArrayList(topicsSet), kafkaParams.get(Constants.KAFKA_CONSUMER_GROUP_ID));
    for (Map.Entry&lt;TopicAndPartition, Long&gt; topicAndPartitionLongEntry : fromOffsets.entrySet()) {
      long zkOffset = topicAndPartitionLongEntry.getValue();
      long lastestOffset = lastestTopicAndPartitionLongMap.get(topicAndPartitionLongEntry.getKey());
      long earliestOffset = earliestTopicAndPartitionLongMap.get(topicAndPartitionLongEntry.getKey());
      // zkoffset 不在可用message offset区间内
      if (zkOffset &gt; lastestOffset || zkOffset &lt; earliestOffset) {
        // set offset = earliestOffset
        logger.warn("矫正offset: " + zkOffset +" -&gt; "+ earliestOffset);
        topicAndPartitionLongEntry.setValue(earliestOffset);
      }
    }
相关文章
|
消息中间件 缓存 算法
kafka(三)
kafka(三)
|
资源调度 分布式计算 Hadoop
Hadoop Yarn 核心调优参数
这是一个关于测试集群环境的配置说明,包括3台服务器(master, slave1, slave2)运行CentOS 7.5,每台有4核CPU和4GB内存。集群使用Hadoop 3.1.3,JDK1.8。Yarn核心配置涉及调度器选择、ResourceManager线程数、节点检测、逻辑处理器使用、核心转换乘数、NodeManager内存和CPU设置,以及容器的内存和CPU限制。配置完成后,需要重启Hadoop并检查yarn配置。
403 4
|
11月前
|
SQL 存储 关系型数据库
Mysql并发控制和日志
通过深入理解和应用 MySQL 的并发控制和日志管理技术,您可以显著提升数据库系统的效率和稳定性。
382 10
|
消息中间件 Java Kafka
Kafka【环境搭建 01】kafka_2.12-2.6.0 单机版安装+参数配置及说明+添加到service服务+开机启动配置+验证+chkconfig配置说明(一篇入门kafka)
【2月更文挑战第19天】Kafka【环境搭建 01】kafka_2.12-2.6.0 单机版安装+参数配置及说明+添加到service服务+开机启动配置+验证+chkconfig配置说明(一篇入门kafka)
1150 1
|
数据可视化 前端开发 JavaScript
【Echarts大屏】数字化巡检可视化大屏(附原码一键复制)
【Echarts大屏】数字化巡检可视化大屏(附原码一键复制)
|
NoSQL Redis
RedisTemplate.opsForHash()用法简介并举例
RedisTemplate.opsForHash()用法简介并举例
1741 1
python并发执行request请求
选择哪种并发方式取决于我们的具体需求。对于I/O密集型任务,多线程或异步I/O通常是更好的选择;对于CPU密集型任务,多进程可能是更好的选择。此外,异步I/O通常比多线程具有更好的性能,特别是在高并发的网络应用中。
|
JSON Java fastjson
Spring Boot返回Json数据及数据封装
本文详细介绍了如何在Spring Boot项目中处理JSON数据的传输 Spring Boot默认使用Jackson作为JSON处理器,并通过`spring-boot-starter-web`依赖自动包含相关组件。文章还展示了如何配置Jackson处理null值,使其转换为空字符串。此外,文章比较了Jackson和FastJson的特点,并提供了FastJson的配置示例,展示了如何处理null值以适应不同应用场景。
|
存储 监控 安全
Java基于物联网技术的智慧工地云管理平台源码 依托丰富的设备接口标准库,快速接入工地现场各类型设备
围绕施工安全、质量管理主线,通过物联感知设备全周期、全覆盖实时监测,将管理动作前置,实现从事后被动补救到事前主动预防的转变。例如塔吊运行监测,超重预警,升降机、高支模等机械设备危险监控等,通过安全关键指标设定,全面掌握现场安全情况,防患于未然。
364 5
|
消息中间件 存储 监控
深度解析 Kafka 中的 Offset 管理与最佳实践
Kafka 中的 Offset(偏移量)是消息处理的关键元素,对于保证消息传递的可靠性和一致性至关重要。本篇博客将深度解析 Kafka 中的 Offset 管理机制,并提供丰富的示例代码,让你更全面地理解 Offset 的原理、使用方法以及最佳实践。