2.7 kafka 动态分区检测
2.7.1 Spark Streaming
Spark Streaming:对于有实时处理业务需求的企业,随着业务增长数据量也会同步增长,将导致原有的 kafka 分区数不满足数据写入所需的并发度,需要扩展 kafka 的分区或者增加 kafka 的 topic,这时就要求实时处理程序,如 SparkStreaming、flink 能检测到 kafka 新增的 topic 、分区及消费新增分区的数据。
接下来结合源码分析,Spark Streaming 和 flink 在 kafka 新增 topic 或 partition 时能否动态发现新增分区并消费处理新增分区的数据。 Spark Streaming 与 kafka 结合有两个区别比较大的版本,如图所示是官网给出的对比数据:
其中确认的是 Spark Streaming 与 kafka 0.8 版本结合不支持动态分区检测,与 0.10 版本结合支持,接着通过源码分析。
Spark Streaming 与 kafka 0.8 版本结合(源码分析只针对是否分区检测),入口是 DirectKafkaInputDStream 的 compute:
// 改行代码会计算这个job,要消费的每个kafka分区的最大偏移 override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = { // 构建KafkaRDD,用指定的分区数和要消费的offset范围 val untilOffsets = clamp(latestLeaderOffsets(maxRetries)) val rdd = KafkaRDD[K, V, U, T, R]( // Report the record number and metadata of this batch interval to InputInfoTracker. context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler) val offsetRanges = currentOffsets.map { case (tp, fo) => val uo = untilOffsets(tp) OffsetRange(tp.topic, tp.partition, fo, uo.offset) } val description = offsetRanges.filter { offsetRange => // Don't display empty ranges. offsetRange.fromOffset != offsetRange.untilOffset }.map { offsetRange => // Copy offsetRanges to immutable.List to prevent from being modified by the user s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" + s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}" }.mkString("\n") val metadata = Map("offsets" -> offsetRanges.toList, StreamInputInfo.METADATA_KEY_DESCRIPTION -> description) val inputInfo = StreamInputInfo(id, rdd.count, metadata) ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo) currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset) Some(rdd) }
第一行就是计算得到该批次生成 KafkaRDD 每个分区要消费的最大 offset。 接着看 latestLeaderOffsets(maxRetries):
// 可以看到的是用来指定获取最大偏移分区的列表还是只有currentOffsets,没有发现关于新增的分区的内容。 @tailrec protected final def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, LeaderOffset] = { val o = kc.getLatestLeaderOffsets(currentOffsets.keySet) // Either.fold would confuse @tailrec, do it manually if (o.isLeft) { val err = o.left.get.toString if (retries <= 0) { throw new SparkException(err) } else { logError(err) Thread.sleep(kc.config.refreshLeaderBackoffMs) latestLeaderOffsets(retries - 1) } } else { o.right.get } }
其中 protected var currentOffsets = fromOffsets,这个仅仅是在构建 DirectKafkaInputDStream 的时候初始化,并在 compute 里面更新:
currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
中间没有检测 kafka 新增 topic 或者分区的代码,所以可以确认 Spark Streaming 与 kafka 0.8 的版本结合不支持动态分区检测。
Spark Streaming 与 kafka 0.10 版本结合:入口同样是 DirectKafkaInputDStream 的 compute 方法,捡主要的部分说,Compute 里第一行也是计算当前 job 生成 kafkardd 要消费的每个分区的最大 offset:
// 获取当前生成job,要用到的KafkaRDD每个分区最大消费偏移值 val untilOffsets = clamp(latestOffsets())
具体检测 kafka 新增 topic 或者分区的代码在 latestOffsets()
/** * Returns the latest (highest) available offsets, taking new partitions into account. */ protected def latestOffsets(): Map[TopicPartition, Long] = { val c = consumer paranoidPoll(c) // 获取所有的分区信息 // make sure new partitions are reflected in currentOffsets val parts = c.assignment().asScala // 做差获取新增的分区信息 val newPartitions = parts.diff(currentOffsets.keySet) // position for new partitions determined by auto.offset.reset if no commit // 新分区消费位置,没有记录的化是由auto.offset.reset决定 currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> c.position(tp)).toMap // don't want to consume messages, so pause c.pause(newPartitions.asJava) // find latest available offsets c.seekToEnd(currentOffsets.keySet.asJava) parts.map(tp => tp -> c.position(tp)).toMap }
该方法内有获取 kafka 新增分区,并将其更新到 currentOffsets 的过程,所以可以验证 Spark Streaming 与 kafka 0.10 版本结合支持动态分区检测。
2.7.2 Flink
入口类是 FlinkKafkaConsumerBase,该类是所有 flink 的 kafka 消费者的父类。
在 FlinkKafkaConsumerBase 的 run 方法中,创建了 kafkaFetcher,实际上就是消费者:
this.kafkaFetcher = createFetcher( sourceContext, subscribedPartitionsToStartOffsets, periodicWatermarkAssigner, punctuatedWatermarkAssigner, (StreamingRuntimeContext) getRuntimeContext(), offsetCommitMode, getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP), useMetrics);
接是创建了一个线程,该线程会定期检测 kafka 新增分区,然后将其添加到 kafkaFetcher 里。
if (discoveryIntervalMillis != PARTITION_DISCOVERY_DISABLED) { final AtomicReference<Exception> discoveryLoopErrorRef = new AtomicReference<>(); this.discoveryLoopThread = new Thread(new Runnable() { @Override public void run() { try { // --------------------- partition discovery loop --------------------- List<KafkaTopicPartition> discoveredPartitions; // throughout the loop, we always eagerly check if we are still running before // performing the next operation, so that we can escape the loop as soon as possible while (running) { if (LOG.isDebugEnabled()) { LOG.debug("Consumer subtask {} is trying to discover new partitions ...", getRuntimeContext().getIndexOfThisSubtask()); } try { discoveredPartitions = partitionDiscoverer.discoverPartitions(); } catch (AbstractPartitionDiscoverer.WakeupException | AbstractPartitionDiscoverer.ClosedException e) { // the partition discoverer may have been closed or woken up before or during the discovery; // this would only happen if the consumer was canceled; simply escape the loop break; } // no need to add the discovered partitions if we were closed during the meantime if (running && !discoveredPartitions.isEmpty()) { kafkaFetcher.addDiscoveredPartitions(discoveredPartitions); } // do not waste any time sleeping if we're not running anymore if (running && discoveryIntervalMillis != 0) { try { Thread.sleep(discoveryIntervalMillis); } catch (InterruptedException iex) { // may be interrupted if the consumer was canceled midway; simply escape the loop break; } } } } catch (Exception e) { discoveryLoopErrorRef.set(e); } finally { // calling cancel will also let the fetcher loop escape // (if not running, cancel() was already called) if (running) { cancel(); } } } }, "Kafka Partition Discovery for " + getRuntimeContext().getTaskNameWithSubtasks()); } discoveryLoopThread.start(); kafkaFetcher.runFetchLoop();
上面,就是 flink 动态发现 kafka 新增分区的过程。不过与 Spark 无需做任何配置不同的是,flink 动态发现 kafka 新增分区,这个功能时需要被开启的。也很简单,需要将 flink.partition-discovery.interval-millis 该属性设置为大于 0 即可。
2.8 容错机制及处理语义
抛出一个问题:实时处理的时候,如何保证数据仅一次处理语义?
2.8.1 Spark Streaming 保证仅一次处理
对于 Spark Streaming 任务,我们可以设置 checkpoint,然后假如发生故障并重启,我们可以从上次 checkpoint 之处恢复,但是这个行为只能使得数据不丢失,可能会重复处理,不能做到恰一次处理语义。
对于 Spark Streaming 与 kafka 结合的 direct Stream 可以自己维护 offset 到 zookeeper、kafka 或任何其它外部系统,每次提交完结果之后再提交 offset,这样故障恢复重启可以利用上次提交的 offset 恢复,保证数据不丢失。但是假如故障发生在提交结果之后、提交 offset 之前会导致数据多次处理,这个时候我们需要保证处理结果多次输出不影响正常的业务。
由此可以分析,假设要保证数据恰一次处理语义,那么结果输出和 offset 提交必须在一个事务内完成。在这里有以下两种做法:
- repartition(1) : Spark Streaming 输出的 action 变成仅一个 partition,这样可以利用事务去做:
rdd.repartition(1).foreachPartition(partition=>{ // 开启事务 partition.foreach(each=>{//提交数据 }) // 提交事务 }) })
将结果和 offset 一起提交: 也就是结果数据包含 offset。这样提交结果和提交 offset 就是一个操作完成,不会数据丢失,也不会重复处理。故障恢复的时候可以利用上次提交结果带的 offset。
2.8.2 Flink 与 kafka 0.11 保证仅一次处理
若要 sink 支持仅一次语义,必须以事务的方式写数据到 Kafka,这样当提交事务时两次 checkpoint 间的所有写入操作作为一个事务被提交。这确保了出现故障或崩溃时这些写入操作能够被回滚。
在一个分布式且含有多个并发执行 sink 的应用中,仅仅执行单次提交或回滚是不够的,因为所有组件都必须对这些提交或回滚达成共识,这样才能保证得到一致性的结果。Flink 使用两阶段提交协议以及预提交(pre-commit)阶段来解决这个问题。
本例中的 Flink 应用如图 11 所示包含以下组件:
- 一个source,从Kafka中读取数据(即KafkaConsumer)
- 一个时间窗口化的聚会操作
- 一个sink,将结果写回到Kafka(即KafkaProducer)
下面详细讲解 flink 的两段提交思路:
如图所示,Flink checkpointing 开始时便进入到 pre-commit 阶段。具体来说,一旦 checkpoint 开始,Flink 的 JobManager 向输入流中写入一个 checkpoint barrier ,将流中所有消息分割成属于本次 checkpoint 的消息以及属于下次 checkpoint 的,barrier 也会在操作算子间流转。对于每个 operator 来说,该 barrier 会触发 operator 状态后端为该 operator 状态打快照。data source 保存了 Kafka 的 offset,之后把 checkpoint barrier 传递到后续的 operator。
这种方式仅适用于 operator 仅有它的内部状态。内部状态是指 Flink state backends 保存和管理的内容(如第二个 operator 中 window 聚合算出来的 sum)。
当一个进程仅有它的内部状态的时候,除了在 checkpoint 之前将需要将数据更改写入到 state backend,不需要在预提交阶段做其他的动作。在 checkpoint 成功的时候,Flink 会正确的提交这些写入,在 checkpoint 失败的时候会终止提交,过程可见图。
当结合外部系统的时候,外部系统必须要支持可与两阶段提交协议捆绑使用的事务。显然本例中的 sink 由于引入了 kafka sink,因此在预提交阶段 data sink 必须预提交外部事务。如下图:
当 barrier 在所有的算子中传递一遍,并且触发的快照写入完成,预提交阶段完成。所有的触发状态快照都被视为 checkpoint 的一部分,也可以说 checkpoint 是整个应用程序的状态快照,包括预提交外部状态。出现故障可以从 checkpoint 恢复。下一步就是通知所有的操作算子 checkpoint 成功。该阶段 jobmanager 会为每个 operator 发起 checkpoint 已完成的回调逻辑。
本例中 data source 和窗口操作无外部状态,因此该阶段,这两个算子无需执行任何逻辑,但是 data sink 是有外部状态的,因此,此时我们必须提交外部事务,如下图:
以上就是 flink 实现恰一次处理的基本逻辑。
2.9 Back pressure背压/反压
消费者消费的速度低于生产者生产的速度,为了使应用正常,消费者会反馈给生产者来调节生产者生产的速度,以使得消费者需要多少,生产者生产多少。
- back pressure 后面一律称为背压。
2.9.1 Spark Streaming 的背压
Spark Streaming 跟 kafka 结合是存在背压机制的,目标是根据当前 job 的处理情况来调节后续批次的获取 kafka 消息的条数。为了达到这个目的,Spark Streaming 在原有的架构上加入了一个 RateController,利用的算法是 PID,需要的反馈数据是任务处理的结束时间、调度时间、处理时间、消息条数,这些数据是通过 SparkListener 体系获得,然后通过 PIDRateEsimator 的 compute 计算得到一个速率,进而可以计算得到一个 offset,然后跟限速设置最大消费条数比较得到一个最终要消费的消息最大 offset。
PIDRateEsimator 的 compute 方法如下:
def compute(time: Long, // in milliseconds numElements: Long, processingDelay: Long, // in milliseconds schedulingDelay: Long // in milliseconds ): Option[Double] = { logTrace(s"\ntime = $time, # records = $numElements, " + s"processing time = $processingDelay, scheduling delay = $schedulingDelay") this.synchronized { if (time > latestTime && numElements > 0 && processingDelay > 0) { val delaySinceUpdate = (time - latestTime).toDouble / 1000 val processingRate = numElements.toDouble / processingDelay * 1000 val error = latestRate - processingRate val historicalError = schedulingDelay.toDouble * processingRate / batchIntervalMillis // in elements/(second ^ 2) val dError = (error - latestError) / delaySinceUpdate val newRate = (latestRate - proportional * error - integral * historicalError - derivative * dError).max(minRate) logTrace(s""" | latestRate = $latestRate, error = $error | latestError = $latestError, historicalError = $historicalError | delaySinceUpdate = $delaySinceUpdate, dError = $dError """.stripMargin) latestTime = time if (firstRun) { latestRate = processingRate latestError = 0D firstRun = false logTrace("First run, rate estimation skipped") None } else { latestRate = newRate latestError = error logTrace(s"New rate = $newRate") Some(newRate) } } else { logTrace("Rate estimation skipped") None } } }
2.9.2 Flink 的背压
与 Spark Streaming 的背压不同的是,Flink 1.5 之后实现了自己托管的 credit – based 流控机制,在应用层模拟 TCP 的流控机制,就是每一次 ResultSubPartition 向 InputChannel 发送消息的时候都会发送一个 backlog size 告诉下游准备发送多少消息,下游就会去计算有多少的 Buffer 去接收消息,算完之后如果有充足的 Buffer 就会返还给上游一个 Credit 告知他可以发送消息
jobmanager 针对每一个 task 每 50ms 触发 100 次 Thread.getStackTrace() 调用,求出阻塞的占比。过程如图 所示:
阻塞占比在 web 上划分了三个等级:
- OK: 0 <= Ratio <= 0.10,表示状态良好;
- LOW: 0.10 < Ratio <= 0.5,表示有待观察;
- HIGH: 0.5 < Ratio <= 1,表示要处理了。
例如,0.01,代表着100次中有一次阻塞在内部调用
03 文末
本文主要讲解了Flink与Spark的对比,谢谢大家的阅读,本文完!