这篇文章介绍sparkstreaming对接kafka时遇到的两个offset的问题,首选我们介绍下offset的存储。
sparkstreaming offset存储
sparkstreaming采用kafkaUtils的createDirectStream()处理kafka数据的方式,会直接从kafka的broker的分区中读取数据,跳过zookeeper,并且没有receiver,是spark的task直接对接kafka topic partition。
由于这种方式没有经过ZK,topic的offset没有保存,当job重启后只能从最新的offset开始消费数据,造成重启过程中的消息丢失。
如果spark自动提交,会在sparkstreaming刚运行时就立马提交offset,如果这个时候Spark streaming消费信息失败了,那么offset也就错误提交了。
所以要在sparkstreaming中实现exactly-once恰好一次,必须
1.手动提交偏移量
2.处理完业务数据后再提交offset
手动维护偏移量 需设置kafka参数enable.auto.commit改为false
手动维护提交offset有两种选择:
1.处理完业务数据后手动提交到Kafka
2.处理完业务数据后手动提交到本地库 如MySql、HBase
也可以将offset提交到zookeeper,但是经过我们测试,发现zookeeper不适合存储大量数据,在大数据量的情况下很容易崩溃。
我们来看下如何将offset存储到mysql中:
// 处理完 业务逻辑后,手动提交offset偏移量到本地Mysql中 stream.foreachRDD(rdd => { val sqlProxy = new SqlProxy() val client = DataSourceUtil.getConnection try { val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges for (or <- offsetRanges) { sqlProxy.executeUpdate(client, "replace into `offset_manager` (groupid,topic,`partition`,untilOffset) values(?,?,?,?)", Array(groupid, or.topic, or.partition.toString, or.untilOffset)) } } catch { case e: Exception => e.printStackTrace() } finally { sqlProxy.shutdown(client) } })
HBase中也是类似的
inputDStream.foreachRDD((rdd, batchTime) => { val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges offsetRanges.foreach(offset => println(offset.topic, offset.partition, offset.fromOffset, offset.untilOffset)) val newRDD = rdd.map(message => processMessage(message)) newRDD.count() //save the offsets to HBase 批量处理把数据存储到Hbase当中 saveOffsets(topic, consumerGroupID, offsetRanges, hbaseTableName, batchTime) }) ssc } /** * 对数据进行处理 * @param message * @return */ def processMessage(message: ConsumerRecord[String, String]): ConsumerRecord[String, String] = { message } /* Save Offsets into HBase */ def saveOffsets(TOPIC_NAME: String, GROUP_ID: String, offsetRanges: Array[OffsetRange], hbaseTableName: String, batchTime: org.apache.spark.streaming.Time) = { val hbaseConf = HBaseConfiguration.create() hbaseConf.addResource("src/main/resources/hbase-site.xml") val conn = ConnectionFactory.createConnection(hbaseConf) val table = conn.getTable(TableName.valueOf(hbaseTableName)) val rowKey = TOPIC_NAME + ":" + GROUP_ID + ":" + String.valueOf(batchTime.milliseconds) val put = new Put(rowKey.getBytes) for (offset <- offsetRanges) { put.addColumn(Bytes.toBytes("offsets"), Bytes.toBytes(offset.partition.toString), Bytes.toBytes(offset.untilOffset.toString)) } table.put(put) conn.close() } /* Returns last committed offsets for all the partitions of a given topic from HBase in following cases. - CASE 1: SparkStreaming job is started for the first time. This function gets the number of topic partitions from Zookeeper and for each partition returns the last committed offset as 0 - CASE 2: SparkStreaming is restarted and there are no changes to the number of partitions in a topic. Last committed offsets for each topic-partition is returned as is from HBase. - CASE 3: SparkStreaming is restarted and the number of partitions in a topic increased. For old partitions, last committed offsets for each topic-partition is returned as is from HBase as is. For newly added partitions, function returns last committed offsets as 0 */ def getLastCommittedOffsets(TOPIC_NAME: String, GROUP_ID: String, hbaseTableName: String, zkQuorum: String, zkRootDir: String, sessionTimeout: Int, connectionTimeOut: Int): Map[TopicPartition, Long] = { val hbaseConf = HBaseConfiguration.create() hbaseConf.addResource("src/main/resources/hbase-site.xml") val zkUrl = zkQuorum + "/" + zkRootDir val zkClientAndConnection = ZkUtils.createZkClientAndConnection(zkUrl, sessionTimeout, connectionTimeOut) val zkUtils = new ZkUtils(zkClientAndConnection._1, zkClientAndConnection._2, false) val zKNumberOfPartitionsForTopic = zkUtils.getPartitionsForTopics(Seq(TOPIC_NAME)).get(TOPIC_NAME).toList.head.size //Connect to HBase to retrieve last committed offsets val conn = ConnectionFactory.createConnection(hbaseConf) val table = conn.getTable(TableName.valueOf(hbaseTableName)) val startRow = TOPIC_NAME + ":" + GROUP_ID + ":" + String.valueOf(System.currentTimeMillis()) val stopRow = TOPIC_NAME + ":" + GROUP_ID + ":" + 0 val scan = new Scan() val scanner = table.getScanner(scan.setStartRow(startRow.getBytes).setStopRow(stopRow.getBytes).setReversed(true)) val result = scanner.next() var hbaseNumberOfPartitionsForTopic = 0 //Set the number of partitions discovered for a topic in HBase to 0 if (result != null) { //If the result from hbase scanner is not null, set number of partitions from hbase to the number of cells hbaseNumberOfPartitionsForTopic = result.listCells().size() } val fromOffsets = collection.mutable.Map[TopicPartition, Long]() if (hbaseNumberOfPartitionsForTopic == 0) { // initialize fromOffsets to beginning for (partition <- 0 to zKNumberOfPartitionsForTopic - 1) { fromOffsets += (new TopicPartition(TOPIC_NAME, partition) -> 0) } } else if (zKNumberOfPartitionsForTopic > hbaseNumberOfPartitionsForTopic) { // handle scenario where new partitions have been added to existing kafka topic for (partition <- 0 to hbaseNumberOfPartitionsForTopic - 1) { val fromOffset = Bytes.toString(result.getValue(Bytes.toBytes("offsets"), Bytes.toBytes(partition.toString))) fromOffsets += (new TopicPartition(TOPIC_NAME, partition) -> fromOffset.toLong) } for (partition <- hbaseNumberOfPartitionsForTopic to zKNumberOfPartitionsForTopic - 1) { fromOffsets += (new TopicPartition(TOPIC_NAME, partition) -> 0) } } else { //initialize fromOffsets from last run for (partition <- 0 to hbaseNumberOfPartitionsForTopic - 1) { val fromOffset = Bytes.toString(result.getValue(Bytes.toBytes("offsets"), Bytes.toBytes(partition.toString))) fromOffsets += (new TopicPartition(TOPIC_NAME, partition) -> fromOffset.toLong) } } scanner.close() conn.close() fromOffsets.toMap } }
第一个问题 numRecords must not be negative
当*删除已经使用过的kafka topic,然后新建同名topic*,用spark streaming Kakfa createDirectStream方法时出现了"numRecords must not be negative"
异常,这个异常是不合法的参数异常,RDD的记录数目必须不能是负数。
异常分析
首先我们看异常打印出现问题的位置
org.apache.spark.streaming.scheduler.StreamInputInfo.InputInfoTracker
的第38行
/** * :: DeveloperApi :: * Track the information of input stream at specified batch time. * * @param inputStreamId the input stream id * @param numRecords the number of records in a batch * @param metadata metadata for this batch. It should contain at least one standard field named * "Description" which maps to the content that will be shown in the UI. */ @DeveloperApi case class StreamInputInfo( inputStreamId: Int, numRecords: Long, metadata: Map[String, Any] = Map.empty) { require(numRecords >= 0, "numRecords must not be negative") def metadataDescription: Option[String] = metadata.get(StreamInputInfo.METADATA_KEY_DESCRIPTION).map(_.toString) }
代码38行,判断了numRecords是否大于等于0,当不满足条件时抛出异常,可判断此时numRecords<0。 numRecords的解释: numRecords: the number of records in a batch
应该是当前rdd中records 数目计算出了问题。
offsetRanges的计算逻辑
offsetRanges的定义
offsetRanges: offset ranges that define the Kafka data belonging to this RDD
在KafkaRDDPartition 40行找到kafka partition offsetRange的计算逻辑:
def count(): Long = untilOffset - fromOffset` `fromOffset: per-topic/partition Kafka offset defining the (inclusive) starting point of the batch` `untilOffset: per-topic/partition Kafka offset defining the (inclusive) ending point of the batch
fromOffset来自zk中保存; untilOffset通过DirectKafkaInputDStream第145行:
val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
计算得到,计算过程得到最新的offset,然后使用spark.streaming.kafka.maxRatePerPartition
做clamp,得到允许的最大untilOffsets,##而此时新建的topic,如果topic中没有数据,untilOffsets应该为0##
原因总结
当删除一个topic时,zk中的offset信息并没有被清除,因此KafkaDirectStreaming再次启动时仍会得到旧的topic offset为old_offset,作为fromOffset。 当新建了topic后,使用untiloffset计算逻辑,得到untilOffset为0(如果topic已有数据则>0); 再次被启动的KafkaDirectStreaming Job通过异常的计算逻辑得到的rdd numRecords值为可计算为: numRecords = untilOffset - fromOffset(old_offset)\当untilOffset < old_offset时,此异常会出现,对于新建的topic这种情况的可能性很大
解决方法
根据以上分析,可在确定KafkaDirectStreaming 的fromOffsets时判断fromOffset与untiloffset的大小关系,当untilOffset < fromOffset时,矫正fromOffset为offset初始值0。
•从zk获取topic/partition 的fromOffset•利用SimpleConsumer获取每个partiton的lastOffset(untilOffset )•判断每个partition lastOffset与fromOffset的关系•当lastOffset < fromOffset时,将fromOffset赋值为0 通过以上步骤完成fromOffset的值矫正。
矫正offset的核心代码如下:
/** 以下 矫正 offset */ // 得到Topic/partition 的lastOffsets Map<TopicAndPartition, Long> topicAndPartitionLongMap = KafkaOffsetTool.getInstance().getLastOffset(kafkaParams.get("metadata.broker.list"), topicList, "my.group.id"); // 遍历每个Topic.partition for (Map.Entry<TopicAndPartition, Long> topicAndPartitionLongEntry : fromOffsets.entrySet()) { // fromOffset > lastOffset时 if (topicAndPartitionLongEntry.getValue() > topicAndPartitionLongMap.get(topicAndPartitionLongEntry.getKey())) { //矫正fromoffset为offset初始值0 topicAndPartitionLongEntry.setValue(0L); } } /** 以上 矫正 offset */
第二个问题 Offsets out of range
Kafka DirectStream 读取topic中数据做测试,停止了一段时间,再次启动时出现了kafka.common.OffsetOutOfRangeException
异常如下:
0/12/16 11:08:33 WARN TaskSetManager: Lost task 2.0 in stage 105.0 (TID 85, localhost, executor driver): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {register_topic-5=23550}
异常分析
offset越界分为头越界和尾越界,头部越界是zookeeper中保存的offset在topic中仍然存在的最老message的offset之前时(zk_offset < earliest_offset);尾越界是zookeeper中保存的offset在topic中最新message的offset之后(zk_offset > last_offset),我们在前面遇到了这个问题,并做了处理,因此这个问题应该是头部越界导致。
经过分析,我们有一段时间没有消费topic中的数据了,大概已经过了七天,而kafka broker中我们设置的log保存时间为七天
因此,应该是kafka 中未被消费的数据被broker清除了,使得从zookeeper中读取到的offset落在仍存在的最老message offset的左侧,本来合法的offset变得不非法了。
解决方法
首先想到的方法就是 streaming job要及时消费掉topic中的数据,消费延迟不得大于log.retention.ms的配置。 但是更好的办法是在遇到该问题时,依然能让job正常运行,因此就需要在发现zk_offset时矫正zk_offset为合法值
矫正offset的核心的代码如下:
/** 以下 矫正 offset */ // lastest offsets Map<TopicAndPartition, Long> lastestTopicAndPartitionLongMap = KafkaOffsetTool.getInstance().getLastOffset(kafkaParams.get("metadata.broker.list"), Lists.newArrayList(topicsSet), kafkaParams.get(Constants.KAFKA_CONSUMER_GROUP_ID)); // earliest offsets Map<TopicAndPartition, Long> earliestTopicAndPartitionLongMap = KafkaOffsetTool.getInstance().getEarliestOffset(kafkaParams.get("metadata.broker.list"), Lists.newArrayList(topicsSet), kafkaParams.get(Constants.KAFKA_CONSUMER_GROUP_ID)); for (Map.Entry<TopicAndPartition, Long> topicAndPartitionLongEntry : fromOffsets.entrySet()) { long zkOffset = topicAndPartitionLongEntry.getValue(); long lastestOffset = lastestTopicAndPartitionLongMap.get(topicAndPartitionLongEntry.getKey()); long earliestOffset = earliestTopicAndPartitionLongMap.get(topicAndPartitionLongEntry.getKey()); // zkoffset 不在可用message offset区间内 if (zkOffset > lastestOffset || zkOffset < earliestOffset) { // set offset = earliestOffset logger.warn("矫正offset: " + zkOffset +" -> "+ earliestOffset); topicAndPartitionLongEntry.setValue(earliestOffset); } }