监控Apache Flink应用程序101

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 这篇博客文章介绍了Apache Flink的内置监控和指标系统,使开发人员能够有效地监控他们的Flink程序。通常情况下,对于刚开始使用流处理和Apache Flink的DevOps团队,选择相关指标来监控Flink程序可能是比较困难的。

这篇博客文章介绍了Apache Flink的内置监控和指标系统,使开发人员能够有效地监控他们的Flink程序。通常情况下,对于刚开始使用流处理和Apache Flink的DevOps团队,选择相关指标来监控Flink程序可能是比较困难的。在参与部署过许多大规模Flink环境后,我想在这里与社区分享我的经验和一些最佳实践。

对于在Apache Flink上运行的关键业务程序,其性能监控成为生产环境中越来越重要的部分。它可确保快速发现并处理业务程序的性能下降、异常停止等问题。

监控与可观察性密切相关,可观察性是故障排除和性能调优的先决条件。如今,随着现代企业应用程序的复杂性和交付速度的提高,工程团队必须在任何给定的时间点理解并全面了解其应用程序的状态。

1.Flink的度量系统

监控Flink程序的基础是其度量系统 ,它由两部分组成:度量指标和度量报告。

1.1 度量指标

Flink提供了一整套内置指标,例如:

  • 使用的JVM堆/非堆/直接内存(每个TaskManager/JobManager)
  • 程序重启次数(每个程序)
  • 每秒记录数(每个算子)
  • ...

这些指标具有不同的衡量范围,不仅包含Flink本身,还有更普遍的JVM,操作系统指标。

作为用户,可以为自己的功能添加特定应用程序的指标。通常这些包括无效记录数计数器或在State中临时缓冲的记录数计数器。除了计数器,Flink还提供其他指标类型,如仪表和直方图。有关如何使用Flink的指标系统注册自己的指标的说明,请查看Flink的文档。在这篇博客文章中,我们将重点介绍如何充分利用Flink的内置指标。

1.2 度量报告

通过Flink的REST API可以查询所有指标,同时用户可以配置度量报告系统,将指标发送到外部系统。Apache Flink为度量报告提供了开箱即用的最常见的监控工具,包括JMX,Prometheus,Datadog,Graphite和InfluxDB。有关如何配置报告的信息,请查看Flink的MetricsReporter文档。
在本博文的其余部分,我们将介绍一些最重要的指标来监控你的Apache Flink应用程序。

2.监测指标

要监控的第一件事是你的程序是否实际处于运行状态。此外,它还可以监控重启次数和自上一次重启的时间。

一般来说,成功的检查点是检测应用程序整体状况的关键因素。对于每个检查点,检查点挡板需要流经Flink程序的整个拓扑,拓扑中正常的业务事件和挡板按顺序处理不能互相超越。因此,成功的检查点可以表示没有通道被完全阻塞。

关键指标

度量指标 范围 描述
uptime 程序 程序没有中断的运行时长。
fullRestarts 程序 自提交此程序以来完全重新启动的总次数。
numberOfCompletedCheckpoints 程序 成功完成检查点的数量。
numberOfFailedCheckpoints 程序 失败检查点的数量。

示例仪表板面板

图片1.png

正常运行时间(35分钟),重启时间(3毫秒)和完全重启次数(7)


图片2.png

完成检查点(18336),失败(14)

可能的警报

  • ΔfullRestarts > threshold
  • ΔnumberOfFailedCheckpoints > threshold

3.监控进度和吞吐量

现在可以知道你的程序是否正在运行,它的检查点是否正常完成,但还不能知道程序是否在不断消费,能否跟得上上游系统数据的产生速度。

3.1 吞吐量

Flink提供多个指标来衡量我们的应用程序的吞吐量。一个程序可以包含多个链式任务 Flink计算进出的记录和字节数。在这些度量中,每个操作员的传出记录的速率通常是最直观和最容易推理的。对于每一个算子或任务(一个任务可以包含多个串起来的子任务),Flink记录了进入和写出的记录数和字节数大小。在这些度量指标中,每个算子的写出速率通常是最直观,最容易理解的。

关键指标

度量指标 范围 描述
numRecordsOutPerSecond 任务 此任务每秒发送的记录数。
numRecordsOutPerSecond 算子 此算子每秒发送的记录数。

示例仪表板面板

图片3.png

每个算子每秒平均记录数

可能的警报

  • recordsOutPerSecond= 0(对于非Sink算子)

注意:Source算子始终没有传入记录,Sink算子始终没有传出记录,因为度量标准仅计算Flink内部通信。不过JIRA Ticket可以改变这个情况。

3.2 进展
对于使用事件时间语义的应用程序,随时间推移水位就显得非常重要。时间水位t告诉整个程序,它不应再期望接收时间戳早于t的事件,并且触发程序调度时间戳< t的所有操作  。例如,一旦水位通过30,程序将关闭并计算在t = 30 结束的时间窗口内的事件。

因此,在应用程序中对事件时间敏感的算子需要监控水位,例如过程函数和窗口。如果当前处理时间和水位之间的差异非常高,那么它通常意味着两个问题。第一,它可能意味着你正在处理旧事件,例如在停机后追数据或程序处理速度没办法赶上上游数据产生速度,上游事件堆积。第二,这可能意味着单个上游子任务长时间没有发送水位(例如,因为它没有接收任何基于水位的事件),这也阻止了下游算子的水位处理。这个JIRA Ticket为后者提供了进一步的信息和解决方案。

关键指标

度量指标 范围 描述
currentOutputWatermark 算子 此算子发出的最后一个水印。

示例仪表板面板

图片4.png

拓扑中单个算子的每个子任务的事件时间延迟。在这种情况下,对于每个子任务,时间水位落后几秒钟。

可能的警报

  • currentProcessingTime - currentOutputWatermark > threshold

3.3 “紧跟”
当从消息队列中进行消费时,通常可以直接监视应用程序是否跟得上消息产生速度。通过使用特定连接器的度量指标,你可以监视当前消费者组的消息与最新消息的差距。Flink可以基于大多数Source连接器转发其基础指标。

关键指标

度量指标 范围 描述
records-lag-max 用户 适用于FlinkKafkaConsumer。此窗口中任何分区的记录数最大延迟。随着时间的推移,越来越大的延时表明消费者没有跟上生产者的速度。
millisBehindLatest 用户 适用于FlinkKinesisConsumer。消费者距离最新消息的毫秒数。对于任何消费者和Kinesis分片,这表示它与当前时间之间的差距。

可能的警报

  • records-lag-max > threshold
  • millisBehindLatest > threshold

4.监控时延

一般而言,时延是指事件创建与基于此事件的结果变得可见之间的时间延迟。创建事件后,它通常存储在持久性消息队列中,然后由Apache Flink处理,将结果写入数据库或调用下游系统。在这样的数据处理管道中,可以在每个阶段引入时延。原因有多种,包括:

  1. 在事件持久存储在消息队列中之前,可能需要不同的时间。
  2. 在高负载期间或恢复期间,事件可能会在消息队列中花费一些时间,直到Flink处理它们(请参阅上一节)。
  3. 出于功能原因,流式拓扑中的一些函数需要缓冲事件一段时间(例如,在时间窗口中)。
  4. Flink拓扑(框架或用户代码)中的每个计算以及每个网络shuffle都需要时间并增加时延。
  5. 如果应用程序通过事务型Sink节点写出,则Sink节点将仅在Flink的成功检查点上提交和下发事务,每个检查点之间的间隔时间也将增加时延。

实际上,事实证明,在多个阶段(事件创建,存储,进入Flink,写出Flink,若数据量过大可以只采样部分数据)为事件添加时间戳是非常有价值的。这些时间戳之间的差异可以作为Flink拓扑中的用户自定义度量标准展示,以获得每个阶段的延迟分布情况。

在本节的剩下部分,我们只考虑在Flink拓扑中的时延,但并不包括事务型sink节点或由于函数原因缓存事件的节点。

为此,Flink提供了一项称为时延跟踪的功能。启用后,Flink将在所有来源定期插入延迟标记事件。对于每个子任务,将报告从每个源到此算子的延迟分布。可以通过根据需要设置metrics.latency.granularity来进一步控制这些直方图的粒度。

由于可能存在大量的直方图(特别是对于 metrics.latency.granularity:子任务),启用延迟跟踪会显着影响群集的性能。建议仅在调试期间使其能够找到延迟源。

度量

度量指标 范围 描述
latency 算子 从源算子到此算子的时延。
restartingTime 程序 重新启动程序所花费的时间,或当前重新启动的持续时间。

示例仪表板面板

图片5.png

Source和单个Sink子任务之间的时延分布。

4.JVM指标

到目前为止,我们只关注了Flink特定的指标。只要你的应用程序时延和吞吐量符合你的期望并且检查点持续正常,整个程序应该是没有问题的。但另一方面,如果程序性能开始下降,你首要考虑的指标就是TaskManager&JobManager JVM的内存消耗和CPU负载。

4.1 内存

Flink报告了JobManagers和TaskManagers的Heap,NonHeap,Direct和Mapped内存的使用情况。

  • 堆内存 - 与大多数JVM应用程序一样 - 是最易于观察的重要指标。尤其是在使用Flink的文件系统statebackend时,因为它将所有状态对象保留在JVM堆上。如果堆上的对象大小显着增加,这通常可归因于应用程序状态的大小(检查堆栈状态的估计大小的检查点指标)。增长状态的可能原因是特定于应用程序的。通常,越来越多的主键,不同输入流之间的事件时间偏差过大或者仅仅缺少状态清理都可能导致整个堆呈增长状态。
  • NonHeap内存由元空间控制,默认情况下其大小不受限制,并保存类元数据和静态内容。有一个 JIRA Ticket默认将大小限制为250兆字节。
  • 直接内存的最大驱动因素是Flink的网络缓冲区数量,可以配置
  • 映射内存通常接近零,因为Flink不使用内存映射文件。

在容器化环境中,你还应监视JobManger和TaskManager容器的总体内存消耗,以确保它们不超过其资源限制。当使用RocksDB状态后端时,要注意RocksDB会从堆下分配大量内存。如果想了解更多关于RocksDB使用内存量的多少,可以查看 Stefan Richter 撰写的这篇博客文章

关键指标

度量指标 范围 描述
Status.JVM.Memory.NonHeap.Committed 程序/任务管理器 保证JVM可用的非堆内存量(以字节为单位)。
Status.JVM.Memory.Heap.Used 程序/任务管理器 当前使用的堆内存量(以字节为单位)。
Status.JVM.Memory.Heap.Committed Job-/TaskManager 保证可供JVM使用的堆内存量(以字节为单位)。
Status.JVM.Memory.Direct.MemoryUsed Job-/TaskManager JVM用于直接缓冲池的内存量(以字节为单位)。
Status.JVM.Memory.Mapped.MemoryUsed Job-/TaskManager 执行G1 Young Generation垃圾收集所花费的总时间。
Status.JVM.GarbageCollector.G1 Young Generation.Time Job-/TaskManager JVM用于直接缓冲池的内存量(以字节为单位)。
Status.JVM.GarbageCollector.G1 Old Generation.Time Job-/TaskManager 执行G1 Old Generation垃圾收集所花费的总时间。

示例仪表板面板

图片6.png

TaskManager内存消耗和垃圾收集时间。

图片7.png

JobManager内存消耗和垃圾收集时间。

可能的警报

  • container memory limit < container memory + safety margin

4.2 CPU

除了内存,你还应该监视TaskManagers的CPU负载。如果你的TaskManagers经常处于非常高的负载下,你可以通过减少每个TaskManager的任务槽数来提高整体性能(如果是Standalone部署),为TaskManager提供更多资源(如果是容器化部署),或提供更多的TaskManagers。通常,在正常业务期间已经在非常高的负载状态下运行的系统,在从停机时间恢复之后将需要更多的时间来追数据。在此期间,你将看到比平时更高的延迟(事件时间偏差)。

CPU负载的突然增加也可能是因为过多垃圾收集压力,这在JVM内存指标中可见。

如果一个或几个TaskManagers一直处于非常高的负载下,由于长检查点对齐时间和事件时间偏差增加,这可能会降低整个拓扑结构的速度。常见的原因是数据分区键的数据倾斜,这可以通过在shuffle之前预先聚合或者将分区键更改为均匀分布的主键上来减轻数据倾斜。

关键指标

范围 描述
Status.JVM.CPU.Load Job-/TaskManager JVM最近的CPU使用情况。

示例仪表板面板

图片8.png

TaskManager和JobManager CPU加载。

5.系统资源

除了上面的JVM指标之外,还可以使用Flink的指标系统来收集有关系统资源,即整个计算机的内存,CPU和网络相关指标,而不仅仅是Flink程序。默认情况下禁用系统资源监视,并且需要对类路径有其他依赖性。可以查看 Flink系统资源指标文档以获取相关指导和详细信息。Flink中的系统资源监视在没有现有主机监视功能的设置中非常有用。

6.结论

这篇文章主要阐明Flink的指标和监控系统。当你是第一次考虑如何成功实现监控Flink应用程序时,可以通过这方面的知识学习作为起点。我更推荐在开发阶段的早期就开始监控你的Flink应用程序。这样能够随着时间的推移改进仪表板和警报,更重要的是,你在整个开发阶段观察应用程序更改对性能的影响。通过这样做你可以提出有关应用程序运行时行为的正确问题,在早期就可以了解到有关Flink内部的更多信息。

最后想说的是,这篇文章仅涉及Apache Flink的整体指标和监控功能。本人建议可以浏览 Flink的指标文档 ,以获取Flink指标系统的完整参考。


本文由阿里云开发者社区组织翻译。

文章原标题《Monitoring Apache Flink Applications 101》

作者:AJ Christensen

译者:么凹

校对:校对者:杨阳(时溪)

文章为简译,更为详细的内容,请查看原文

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
分布式计算 数据处理 Apache
Spark和Flink的区别是什么?如何选择?都应用在哪些行业?
【10月更文挑战第10天】Spark和Flink的区别是什么?如何选择?都应用在哪些行业?
201 1
|
2月前
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
671 13
Apache Flink 2.0-preview released
|
2月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
74 3
|
2月前
|
分布式计算 监控 大数据
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
58 1
|
2月前
|
数据挖掘 物联网 数据处理
深入探讨Apache Flink:实时数据流处理的强大框架
在数据驱动时代,企业需高效处理实时数据流。Apache Flink作为开源流处理框架,以其高性能和灵活性成为首选平台。本文详细介绍Flink的核心特性和应用场景,包括实时流处理、强大的状态管理、灵活的窗口机制及批处理兼容性。无论在实时数据分析、金融服务、物联网还是广告技术领域,Flink均展现出巨大潜力,是企业实时数据处理的理想选择。随着大数据需求增长,Flink将继续在数据处理领域发挥重要作用。
|
4月前
|
存储 消息中间件 Java
Apache Flink 实践问题之原生TM UI日志问题如何解决
Apache Flink 实践问题之原生TM UI日志问题如何解决
46 1
|
3月前
|
SQL 消息中间件 关系型数据库
Apache Doris Flink Connector 24.0.0 版本正式发布
该版本新增了对 Flink 1.20 的支持,并支持通过 Arrow Flight SQL 高速读取 Doris 中数据。
|
4月前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
246 2
|
4月前
|
消息中间件 分布式计算 Hadoop
Apache Flink 实践问题之Flume与Hadoop之间的物理墙问题如何解决
Apache Flink 实践问题之Flume与Hadoop之间的物理墙问题如何解决
56 3
|
4月前
|
消息中间件 运维 Kafka
Apache Flink 实践问题之达到网卡的最大速度如何解决
Apache Flink 实践问题之达到网卡的最大速度如何解决
53 2

推荐镜像

更多