Flink 指标参数源码解读(读取数量、发送数量、发送字节数、接收字节数等)(上)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink 指标参数源码解读(读取数量、发送数量、发送字节数、接收字节数等)(上)

01 引言

附:Flink源码下载地址

在Flink的Web页面,细心的话可以看到监控页面里,有任务的详情,其中里面有详细的监控指标,如下图(发送记录数、接收记录数、发送字节数,接收字节数等):

很多时候,我们都需要 “取出这些数据”,并用在我们的需求上,那么该如何取出这些数据呢?本文来分析下源码。

02 源码分析

2.1 源码入口

Flinkweb页面,按F12查看源码,可以看到:

{
  "jid": "ad75bbaaa624e41a249825a9820a65cc",
  "name": "insert-into_default_catalog.default_database.t_student_copy",
  "isStoppable": false,
  "state": "RUNNING",
  "start-time": 1650352652357,
  "end-time": -1,
  "duration": 64629227,
  "maxParallelism": -1,
  "now": 1650417281584,
  "timestamps": {
    "INITIALIZING": 1650352652357,
    "FAILED": 0,
    "CREATED": 1650352652449,
    "RESTARTING": 0,
    "FAILING": 0,
    "FINISHED": 0,
    "SUSPENDED": 0,
    "RECONCILING": 0,
    "CANCELLING": 0,
    "CANCELED": 0,
    "RUNNING": 1650352653087
  },
  "vertices": [
    {
      "id": "cbc357ccb763df2852fee8c4fc7d55f2",
      "name": "Source: TableSourceScan(table=[[default_catalog, default_database, t_student]], fields=[id, name]) -> NotNullEnforcer(fields=[id]) -> Sink: Sink(table=[default_catalog.default_database.t_student_copy], fields=[id, name])",
      "maxParallelism": 128,
      "parallelism": 1,
      "status": "RUNNING",
      "start-time": 1650352658363,
      "end-time": -1,
      "duration": 64623221,
      "tasks": {
        "CREATED": 0,
        "CANCELING": 0,
        "INITIALIZING": 0,
        "RECONCILING": 0,
        "CANCELED": 0,
        "RUNNING": 1,
        "DEPLOYING": 0,
        "FINISHED": 0,
        "FAILED": 0,
        "SCHEDULED": 0
      },
      "metrics": {
        "read-bytes": 0,
        "read-bytes-complete": true,
        "write-bytes": 0,
        "write-bytes-complete": true,
        "read-records": 0,
        "read-records-complete": true,
        "write-records": 0,
        "write-records-complete": true
      }
    }
  ],
  "status-counts": {
    "CREATED": 0,
    "CANCELING": 0,
    "INITIALIZING": 0,
    "RECONCILING": 0,
    "CANCELED": 0,
    "RUNNING": 1,
    "DEPLOYING": 0,
    "FINISHED": 0,
    "FAILED": 0,
    "SCHEDULED": 0
  },
  "plan": {
    "jid": "ad75bbaaa624e41a249825a9820a65cc",
    "name": "insert-into_default_catalog.default_database.t_student_copy",
    "nodes": [
      {
        "id": "cbc357ccb763df2852fee8c4fc7d55f2",
        "parallelism": 1,
        "operator": "",
        "operator_strategy": "",
        "description": "Source: TableSourceScan(table=[[default_catalog, default_database, t_student]], fields=[id, name]) -> NotNullEnforcer(fields=[id]) -> Sink: Sink(table=[default_catalog.default_database.t_student_copy], fields=[id, name])",
        "optimizer_properties": {
        }
      }
    ]
  }
}

可以看到,vertices.[0].metrics下的内容就是本文要读取的内容,如下图:

Ctrl+H全局搜索Flink源码,我们可能会想到先查看接口 “/jobs/{jobId}”,其实这样效率很低,最好的方法就是使用其 “特殊性”,比如,我们可以从返回的字段read-bytes入手,发现定义的地方在org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo 这个类:

好了,我们可以把IOMetricsInfo这个类当做我们的源码分析入口。

2.2 IOMetricsInfo

IOMetricsInfo,Ctrl+G查看,可以看到这个类有多个地方被调用,其实真正的是被JobDetailsHandler调用了,其它的类后缀都是Test测试类,所以不作为分析的下一步,下面看看JobDetailsHandler

JobDetailsHandler,可以看到指标值是总counts里获取的,继续看counts

counts在这里赋值了:

接下来,我们看看MutableIOMetrics这个类。

2.3 MutableIOMetrics

进入上一步指定的MutableIOMetrics里的addIOMetrics方法,可以看到代码根据程序的运行状态,从不同的地方获取指标值了:

  • 终止状态:从AccessExecution获取了指标值
  • 运行状态:从MetricFetcher获取了指标值

因为我们的程序是运行的,当然,我们需要研究MetricFetcher这个类里面的值是怎么拿到的。

2.3 MetricFetcher

fetcher:是抓取的意思,可以理解为取数据

我翻译了MetricFetcher这个类的注释,内容如下:

package org.apache.flink.runtime.rest.handler.legacy.metrics;
/**
 * MetricFetcher可用于从JobManager和所有注册的taskmanager中获取指标。
 * <p>
 * 只有在调用{@link MetricFetcher#update()}时指标才会被获取,前提是自上次调用传递之后有足够的时间。
 *
 * @author : YangLinWei
 * @createTime: 2022/4/20 10:30 上午
 * @version: 1.0.0
 */
public interface MetricFetcher {
    /**
     * 获取{@link MetricStore},其中包含当前获取的所有指标。
     *
     * @return {@link MetricStore} 包含的所有获取的指标
     */
    MetricStore getMetricStore();
    /**
     * 触发获取指标
     */
    void update();
    /**
     * @return 最近一次更新的时间戳。
     */
    long getLastUpdateTime();
}

继续Ctrl+T查看其实现:

可以看到有几个实现类,毋庸置疑,MetricFetcherImpl是它真正的实现类,看看里面的代码。

2.3.1 MetricFetcherImpl

MetricFetcherImpl里面有几个方法,如下:

我们需要知道这些指标从何而来?里面的代码不多,大部分都不是我们需要的,经一番阅读,可以知道,指标是从queryMetrics这个方法里获取。看看这个方法的代码:

/**
     * Query the metrics from the given QueryServiceGateway.
     *
     * @param queryServiceGateway to query for metrics
     */
    private void queryMetrics(final MetricQueryServiceGateway queryServiceGateway) {
        LOG.debug("Query metrics for {}.", queryServiceGateway.getAddress());
        queryServiceGateway
                .queryMetrics(timeout)
                .whenCompleteAsync(
                        (MetricDumpSerialization.MetricSerializationResult result, Throwable t) -> {
                            if (t != null) {
                                LOG.debug("Fetching metrics failed.", t);
                            } else {
                                metrics.addAll(deserializer.deserialize(result));
                            }
                        },
                        executor);
    }

所以,代码追踪了这么久,发现指标是从网关(MetricQueryServiceGateway)里调接口去获取的,所以我们需要看源码这个网关接口(queryMetrics)的代码实现。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
1月前
|
消息中间件 缓存 关系型数据库
Flink CDC产品常见问题之upsert-kafka增加参数报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
4月前
|
流计算
Flink源码解析
Flink源码解析
59 0
|
4月前
|
SQL 算法 API
读Flink源码谈设计:图的抽象与分层
前阵子组里的小伙伴问我“为什么Flink从我们的代码到真正可执行的状态,要经过这么多个graph转换?这样做有什么好处嘛?”我早期看到这里的设计时的确有过相同的疑惑,当时由于手里还在看别的东西,查阅过一些资料后就翻页了。如今又碰到了这样的问题,不妨就在这篇文章中好好搞清楚。
540 0
读Flink源码谈设计:图的抽象与分层
|
4月前
|
存储 消息中间件 缓存
读Flink源码谈设计:有效管理内存之道
在最初接触到Flink时,是来自于业界里一些头部玩家的分享——大家会用其来处理海量数据。在这种场景下,`如何避免JVM GC带来StopTheWorld带来的副作用`这样的问题一直盘绕在我心头。直到用了Flink以后,阅读了相关的源码(以1.14.0为基准),终于有了一些答案。在这篇文章里也是会分享给大家。
541 1
|
1月前
|
存储 SQL Oracle
flink cdc 时区问题之文档添加参数无效如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
2月前
|
SQL Java API
Flink部署问题之committedOffsets指标为负值如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
2月前
|
SQL 消息中间件 Java
Flink问题之优化消费如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
27 0
|
2月前
|
消息中间件 Oracle 关系型数据库
Flink CDC 数据源问题之参数配置如何解决
Flink CDC数据源指的是使用Apache Flink的CDC特性来连接并捕获外部数据库变更数据的数据源;本合集将介绍如何配置和管理Flink CDC数据源,以及解决数据源连接和同步过程中遇到的问题。
42 0
|
2月前
|
Java 流计算
【极数系列】Flink配置参数如何获取?(06)
【极数系列】Flink配置参数如何获取?(06)
|
3月前
|
Java 流计算
Flink指标汇总
Flink指标汇总