Flink 指标参数源码解读(读取数量、发送数量、发送字节数、接收字节数等)(上)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink 指标参数源码解读(读取数量、发送数量、发送字节数、接收字节数等)(上)

01 引言

附:Flink源码下载地址

在Flink的Web页面,细心的话可以看到监控页面里,有任务的详情,其中里面有详细的监控指标,如下图(发送记录数、接收记录数、发送字节数,接收字节数等):

很多时候,我们都需要 “取出这些数据”,并用在我们的需求上,那么该如何取出这些数据呢?本文来分析下源码。

02 源码分析

2.1 源码入口

Flinkweb页面,按F12查看源码,可以看到:

{
  "jid": "ad75bbaaa624e41a249825a9820a65cc",
  "name": "insert-into_default_catalog.default_database.t_student_copy",
  "isStoppable": false,
  "state": "RUNNING",
  "start-time": 1650352652357,
  "end-time": -1,
  "duration": 64629227,
  "maxParallelism": -1,
  "now": 1650417281584,
  "timestamps": {
    "INITIALIZING": 1650352652357,
    "FAILED": 0,
    "CREATED": 1650352652449,
    "RESTARTING": 0,
    "FAILING": 0,
    "FINISHED": 0,
    "SUSPENDED": 0,
    "RECONCILING": 0,
    "CANCELLING": 0,
    "CANCELED": 0,
    "RUNNING": 1650352653087
  },
  "vertices": [
    {
      "id": "cbc357ccb763df2852fee8c4fc7d55f2",
      "name": "Source: TableSourceScan(table=[[default_catalog, default_database, t_student]], fields=[id, name]) -> NotNullEnforcer(fields=[id]) -> Sink: Sink(table=[default_catalog.default_database.t_student_copy], fields=[id, name])",
      "maxParallelism": 128,
      "parallelism": 1,
      "status": "RUNNING",
      "start-time": 1650352658363,
      "end-time": -1,
      "duration": 64623221,
      "tasks": {
        "CREATED": 0,
        "CANCELING": 0,
        "INITIALIZING": 0,
        "RECONCILING": 0,
        "CANCELED": 0,
        "RUNNING": 1,
        "DEPLOYING": 0,
        "FINISHED": 0,
        "FAILED": 0,
        "SCHEDULED": 0
      },
      "metrics": {
        "read-bytes": 0,
        "read-bytes-complete": true,
        "write-bytes": 0,
        "write-bytes-complete": true,
        "read-records": 0,
        "read-records-complete": true,
        "write-records": 0,
        "write-records-complete": true
      }
    }
  ],
  "status-counts": {
    "CREATED": 0,
    "CANCELING": 0,
    "INITIALIZING": 0,
    "RECONCILING": 0,
    "CANCELED": 0,
    "RUNNING": 1,
    "DEPLOYING": 0,
    "FINISHED": 0,
    "FAILED": 0,
    "SCHEDULED": 0
  },
  "plan": {
    "jid": "ad75bbaaa624e41a249825a9820a65cc",
    "name": "insert-into_default_catalog.default_database.t_student_copy",
    "nodes": [
      {
        "id": "cbc357ccb763df2852fee8c4fc7d55f2",
        "parallelism": 1,
        "operator": "",
        "operator_strategy": "",
        "description": "Source: TableSourceScan(table=[[default_catalog, default_database, t_student]], fields=[id, name]) -> NotNullEnforcer(fields=[id]) -> Sink: Sink(table=[default_catalog.default_database.t_student_copy], fields=[id, name])",
        "optimizer_properties": {
        }
      }
    ]
  }
}

可以看到,vertices.[0].metrics下的内容就是本文要读取的内容,如下图:

Ctrl+H全局搜索Flink源码,我们可能会想到先查看接口 “/jobs/{jobId}”,其实这样效率很低,最好的方法就是使用其 “特殊性”,比如,我们可以从返回的字段read-bytes入手,发现定义的地方在org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo 这个类:

好了,我们可以把IOMetricsInfo这个类当做我们的源码分析入口。

2.2 IOMetricsInfo

IOMetricsInfo,Ctrl+G查看,可以看到这个类有多个地方被调用,其实真正的是被JobDetailsHandler调用了,其它的类后缀都是Test测试类,所以不作为分析的下一步,下面看看JobDetailsHandler

JobDetailsHandler,可以看到指标值是总counts里获取的,继续看counts

counts在这里赋值了:

接下来,我们看看MutableIOMetrics这个类。

2.3 MutableIOMetrics

进入上一步指定的MutableIOMetrics里的addIOMetrics方法,可以看到代码根据程序的运行状态,从不同的地方获取指标值了:

  • 终止状态:从AccessExecution获取了指标值
  • 运行状态:从MetricFetcher获取了指标值

因为我们的程序是运行的,当然,我们需要研究MetricFetcher这个类里面的值是怎么拿到的。

2.3 MetricFetcher

fetcher:是抓取的意思,可以理解为取数据

我翻译了MetricFetcher这个类的注释,内容如下:

package org.apache.flink.runtime.rest.handler.legacy.metrics;
/**
 * MetricFetcher可用于从JobManager和所有注册的taskmanager中获取指标。
 * <p>
 * 只有在调用{@link MetricFetcher#update()}时指标才会被获取,前提是自上次调用传递之后有足够的时间。
 *
 * @author : YangLinWei
 * @createTime: 2022/4/20 10:30 上午
 * @version: 1.0.0
 */
public interface MetricFetcher {
    /**
     * 获取{@link MetricStore},其中包含当前获取的所有指标。
     *
     * @return {@link MetricStore} 包含的所有获取的指标
     */
    MetricStore getMetricStore();
    /**
     * 触发获取指标
     */
    void update();
    /**
     * @return 最近一次更新的时间戳。
     */
    long getLastUpdateTime();
}

继续Ctrl+T查看其实现:

可以看到有几个实现类,毋庸置疑,MetricFetcherImpl是它真正的实现类,看看里面的代码。

2.3.1 MetricFetcherImpl

MetricFetcherImpl里面有几个方法,如下:

我们需要知道这些指标从何而来?里面的代码不多,大部分都不是我们需要的,经一番阅读,可以知道,指标是从queryMetrics这个方法里获取。看看这个方法的代码:

/**
     * Query the metrics from the given QueryServiceGateway.
     *
     * @param queryServiceGateway to query for metrics
     */
    private void queryMetrics(final MetricQueryServiceGateway queryServiceGateway) {
        LOG.debug("Query metrics for {}.", queryServiceGateway.getAddress());
        queryServiceGateway
                .queryMetrics(timeout)
                .whenCompleteAsync(
                        (MetricDumpSerialization.MetricSerializationResult result, Throwable t) -> {
                            if (t != null) {
                                LOG.debug("Fetching metrics failed.", t);
                            } else {
                                metrics.addAll(deserializer.deserialize(result));
                            }
                        },
                        executor);
    }

所以,代码追踪了这么久,发现指标是从网关(MetricQueryServiceGateway)里调接口去获取的,所以我们需要看源码这个网关接口(queryMetrics)的代码实现。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
6月前
|
NoSQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之设置什么参数可以让多张表同时写入
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4月前
|
消息中间件 Kubernetes 监控
实时计算 Flink版操作报错合集之在编译源码时遇到报错:无法访问,该如何处理
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
4月前
|
SQL 资源调度 数据处理
实时计算 Flink版产品使用问题之-s参数在yarn-session.sh命令中是否有效
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
存储 缓存 监控
Flink内存管理机制及其参数调优
Flink内存管理机制及其参数调优
|
5月前
|
SQL 缓存 资源调度
实时计算 Flink版产品使用问题之在Flink on Yarn模式下,如何对job作业进行指标监控
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
SQL 分布式计算 关系型数据库
实时计算 Flink版产品使用问题之在使用FlinkCDC与PostgreSQL进行集成时,该如何配置参数
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
实时计算 Flink版产品使用问题之在使用FlinkCDC与PostgreSQL进行集成时,该如何配置参数
|
6月前
|
关系型数据库 MySQL Java
实时计算 Flink版操作报错合集之udf是怎么定义接收和返回的数据类型的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5月前
|
消息中间件 存储 资源调度
实时计算 Flink版产品使用问题之在消费Kafka的Avro消息,如何配置FlinkKafka消费者的相关参数
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
监控 Oracle 关系型数据库
实时计算 Flink版操作报错合集之在配置连接时,添加了scan.startup.mode参数后,出现报错。是什么导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
911 0
|
Web App开发 监控 API
Flink技术源码解析(一):Flink概述与源码研读准备
一、前言 Apache Flink作为一款高吞吐量、低延迟的针对流数据和批数据的分布式实时处理引擎,是当前实时处理领域的一颗炙手可热的新星。关于Flink与其它主流实时大数据处理引擎Storm、Spark Streaming的不同与优势,可参考https://blog.csdn.net/cm_chenmin/article/details/53072498。 出于技术人对技术本能的好奇与冲动,
32400 0