01 引言
附:Flink源码下载地址
在Flink的Web页面,细心的话可以看到监控页面里,有任务的详情,其中里面有详细的监控指标,如下图(发送记录数、接收记录数、发送字节数,接收字节数等):
很多时候,我们都需要 “取出这些数据”,并用在我们的需求上,那么该如何取出这些数据呢?本文来分析下源码。
02 源码分析
2.1 源码入口
在Flink
的web
页面,按F12
查看源码,可以看到:
{ "jid": "ad75bbaaa624e41a249825a9820a65cc", "name": "insert-into_default_catalog.default_database.t_student_copy", "isStoppable": false, "state": "RUNNING", "start-time": 1650352652357, "end-time": -1, "duration": 64629227, "maxParallelism": -1, "now": 1650417281584, "timestamps": { "INITIALIZING": 1650352652357, "FAILED": 0, "CREATED": 1650352652449, "RESTARTING": 0, "FAILING": 0, "FINISHED": 0, "SUSPENDED": 0, "RECONCILING": 0, "CANCELLING": 0, "CANCELED": 0, "RUNNING": 1650352653087 }, "vertices": [ { "id": "cbc357ccb763df2852fee8c4fc7d55f2", "name": "Source: TableSourceScan(table=[[default_catalog, default_database, t_student]], fields=[id, name]) -> NotNullEnforcer(fields=[id]) -> Sink: Sink(table=[default_catalog.default_database.t_student_copy], fields=[id, name])", "maxParallelism": 128, "parallelism": 1, "status": "RUNNING", "start-time": 1650352658363, "end-time": -1, "duration": 64623221, "tasks": { "CREATED": 0, "CANCELING": 0, "INITIALIZING": 0, "RECONCILING": 0, "CANCELED": 0, "RUNNING": 1, "DEPLOYING": 0, "FINISHED": 0, "FAILED": 0, "SCHEDULED": 0 }, "metrics": { "read-bytes": 0, "read-bytes-complete": true, "write-bytes": 0, "write-bytes-complete": true, "read-records": 0, "read-records-complete": true, "write-records": 0, "write-records-complete": true } } ], "status-counts": { "CREATED": 0, "CANCELING": 0, "INITIALIZING": 0, "RECONCILING": 0, "CANCELED": 0, "RUNNING": 1, "DEPLOYING": 0, "FINISHED": 0, "FAILED": 0, "SCHEDULED": 0 }, "plan": { "jid": "ad75bbaaa624e41a249825a9820a65cc", "name": "insert-into_default_catalog.default_database.t_student_copy", "nodes": [ { "id": "cbc357ccb763df2852fee8c4fc7d55f2", "parallelism": 1, "operator": "", "operator_strategy": "", "description": "Source: TableSourceScan(table=[[default_catalog, default_database, t_student]], fields=[id, name]) -> NotNullEnforcer(fields=[id]) -> Sink: Sink(table=[default_catalog.default_database.t_student_copy], fields=[id, name])", "optimizer_properties": { } } ] } }
可以看到,vertices.[0].metrics
下的内容就是本文要读取的内容,如下图:
Ctrl+H
全局搜索Flink
源码,我们可能会想到先查看接口 “/jobs/{jobId}
”,其实这样效率很低,最好的方法就是使用其 “特殊性”,比如,我们可以从返回的字段read-bytes
入手,发现定义的地方在org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo
这个类:
好了,我们可以把IOMetricsInfo
这个类当做我们的源码分析入口。
2.2 IOMetricsInfo
在IOMetricsInfo
,Ctrl+G查看,可以看到这个类有多个地方被调用,其实真正的是被JobDetailsHandler
调用了,其它的类后缀都是Test
测试类,所以不作为分析的下一步,下面看看JobDetailsHandler
。
在JobDetailsHandler
,可以看到指标值是总counts
里获取的,继续看counts
counts
在这里赋值了:
接下来,我们看看MutableIOMetrics
这个类。
2.3 MutableIOMetrics
进入上一步指定的MutableIOMetrics
里的addIOMetrics
方法,可以看到代码根据程序的运行状态,从不同的地方获取指标值了:
- 终止状态:从
AccessExecution
获取了指标值 - 运行状态:从
MetricFetcher
获取了指标值
因为我们的程序是运行的,当然,我们需要研究MetricFetcher
这个类里面的值是怎么拿到的。
2.3 MetricFetcher
fetcher:是抓取的意思,可以理解为取数据
我翻译了MetricFetcher
这个类的注释,内容如下:
package org.apache.flink.runtime.rest.handler.legacy.metrics; /** * MetricFetcher可用于从JobManager和所有注册的taskmanager中获取指标。 * <p> * 只有在调用{@link MetricFetcher#update()}时指标才会被获取,前提是自上次调用传递之后有足够的时间。 * * @author : YangLinWei * @createTime: 2022/4/20 10:30 上午 * @version: 1.0.0 */ public interface MetricFetcher { /** * 获取{@link MetricStore},其中包含当前获取的所有指标。 * * @return {@link MetricStore} 包含的所有获取的指标 */ MetricStore getMetricStore(); /** * 触发获取指标 */ void update(); /** * @return 最近一次更新的时间戳。 */ long getLastUpdateTime(); }
继续Ctrl+T查看其实现:
可以看到有几个实现类,毋庸置疑,MetricFetcherImpl
是它真正的实现类,看看里面的代码。
2.3.1 MetricFetcherImpl
MetricFetcherImpl
里面有几个方法,如下:
我们需要知道这些指标从何而来?里面的代码不多,大部分都不是我们需要的,经一番阅读,可以知道,指标是从queryMetrics
这个方法里获取。看看这个方法的代码:
/** * Query the metrics from the given QueryServiceGateway. * * @param queryServiceGateway to query for metrics */ private void queryMetrics(final MetricQueryServiceGateway queryServiceGateway) { LOG.debug("Query metrics for {}.", queryServiceGateway.getAddress()); queryServiceGateway .queryMetrics(timeout) .whenCompleteAsync( (MetricDumpSerialization.MetricSerializationResult result, Throwable t) -> { if (t != null) { LOG.debug("Fetching metrics failed.", t); } else { metrics.addAll(deserializer.deserialize(result)); } }, executor); }
所以,代码追踪了这么久,发现指标是从网关(MetricQueryServiceGateway
)里调接口去获取的,所以我们需要看源码这个网关接口(queryMetrics
)的代码实现。