Flink Metrics 简介

本文涉及的产品
智能开放搜索 OpenSearch行业算法版,1GB 20LCU 1个月
实时数仓Hologres,5000CU*H 100GB 3个月
实时计算 Flink 版,5000CU*H 3个月
简介: Fink Metrics 是 Flink 内部的一个度量系统,除了可以在 Flink UI 上展示运行时的指标,也可以对外暴露接口集成到第三方系统,本文详述了这两方面的应用

一、Flink metrics简介

Flink 的 metrics 是 Flink 公开的一个度量系统,metrics 也可以暴露给外部系统,通过在 Flink 配置文件 conf/flink-conf.yaml 配置即可,Flink原生已经支持了很多reporter,如 JMX、InfluxDB、Prometheus 等等。

我们也可以自定义指标通过 metric 收集,实际开发时经常需要查看当前程序的运行状况,flink 提供了 UI 界面,有比较详细的统计信息。

但是 UI 界面也有不完善的地方,比如想要获取 flink 的实时吞吐。本文将详细介绍如何通过 metric 监控 flink 程序,自定义监控指标以及 metrics 在 flink 的 UI 界面的应用。

二、Metrics在UI页面上的应用

在 flink 的 UI 的界面上我们点击任务详情,然后点击 Task Metrics 会弹出如下的界面,在 add metic 按钮上 我们可以添加我需要的监控指标。

注意:如果点击 Task Metrics 没有显示 Add metics 点击一下任务的 DAG 图就会显示出来,当我们点击了 DAG 图中某个算子的名字,那么 Add metric 显示的就是该算子的监控指标,且按照分区显示,算子名前置的数字就是分区号。

三、各个指标的含义

关于各个指标的含义官网上有详细介绍:

   https://ci.apache.org/projects/flink/flink-docs-release-1.7/monitoring/metrics.html#availability

四、自定义监控指标

案例:在map算子内计算输入的总数据,设置 :

MetricGroup为:flink_test_metric,指标变量为:mapDataNub

DataStream<String> userData = kafkaData.map(new RichMapFunction<String, String>() {
            Counter mapDataNub;
            @Override
            public void open(Configuration parameters) throws Exception {
                mapDataNub=  getRuntimeContext()
                       .getMetricGroup()
                       .addGroup("flink_test_metric")
                       .counter("mapDataNub");
            }
            @Override
            public String map(String s)  {
                String s1 ="";
                try {
                    String[] split = s.split(",");
                    long userID = Long.parseLong(split[0]);
                    long itemId = Long.parseLong(split[1]);
                    long categoryId = Long.parseLong(split[2]);
                    String behavior = split[3];
                    long timestamp = Long.parseLong(split[4]);
                    Map map = new HashMap();
                    map.put("userID", userID);
                    map.put("itemId", itemId);
                    map.put("categoryId", categoryId);
                    map.put("behavior", behavior);
                    map.put("timestamp", timestamp);
                    s1 = JSON.toJSONString(map);
                    mapDataNub.inc();
                    System.out.println("数据"+map.toString());
                } catch (NumberFormatException e) {
                    e.printStackTrace();
                }
                return  s1;
            }

程序启动之后就可以在任务的ui界面上查看

注意点:

  1. 搜索自定义或者查看某个指标需要点击DAG图中对应算子的名称
  2. 指标的前缀0,1,2....是指算子的分区数
  3. 进行监控时,尽量不要对算子进行重命名,使用默认的名字,这样一套监控程序可以监控多个flink任务,比如对sink重新命名,如果不同的flink程序对sink的命名不一样,则一套监控无法监控多个flink程序
.addSink(KafkaSink.getProducer()).name("kafka_sink");

五、Flink UI 不显示算子数据接收和发送的条数

有时候我们Flink任务正常运行,数据也可以打印,而且都保存到数据库了,但是UI上面却不显示数据接收和发送的条数 ,导致无法进行指标监控和查查flink任务运行的具体情况,这是什么原因导致的呢?

    原因:是因为默认情况下Flink开启了operator chain,所以当flink程序所有的算子都在一个chain里面时,也就是在一个DAG(task)里面,所有没有向下游发送数据,所以显示都为0。比如下图的情况所有指标都是0;

解决方案:第一种方法:在flink程序里添加自定义metric

         第二种方法:使用startNewChain和disableChainin打断程序默认的operator chain

         第三种方法:修改某个算子的并行度使其和上下游算子并行度不一致

六、Metric Reporter

     Metrics可以暴露给外部系统,通过在flink配置文件conf/flink-conf.yaml配置即可,flink原生已经支持了很多reporter,如JMX、InfluxDB、Prometheus等等,同时也支持自定义reporter。Flink自带了很多Reporter,包括JMX、InfluxDB、Prometheus等等,接下来介绍下InfluxDB Reporter的使用。只需在flink配置文件conf/flink-conf.yaml中配置Influxdb相关信息即可,主要包括域名、端口号、用户密码等等。

flink1.10之后采用

metrics.reporter.influxdb.factory.class: org.apache.flink.metrics.influxdb.InfluxdbReporterFactory
metrics.reporter.influxdb.host: localhost
metrics.reporter.influxdb.port: 8086
metrics.reporter.influxdb.db: flink
metrics.reporter.influxdb.consistency: ANY
metrics.reporter.influxdb.connectTimeout: 60000
metrics.reporter.influxdb.writeTimeout: 60000
metrics.reporter.influxdb.interval: 30 SECONDS

flink1.10之前

metrics.reporters: influxdb
metrics.reporter.influxdb.class: org.apache.flink.metrics.influxdb.InfluxdbReporter
metrics.reporter.influxdb.host: localhost
metrics.reporter.influxdb.port: 8086
metrics.reporter.influxdb.db: flink_monitor
metrics.reporter.influxdb.username: flink-metrics
metrics.reporter.influxdb.password: 123

注意事项:收集flinkSQL任务的监控指标,如果用户书写的sql语句 insert into 或者insert overwrite 中单引号带有换行符,写入influxdb会报错

查看influxdb收集到监控信息,发现会自动给我生成数据库和measurement,所有的指标都存储在了具体的measurement中。

七、flink metric监控程序

    前面介绍了flink公共的监控指标以及如何自定义监控指标,那么实际开发flink任务我们需要及时知道这些监控指标的数据,去获取程序的健康值以及状态。这时候就需要我们通过 flink REST API ,自己编写监控程序去获取这些指标。很简单,当我们知道每个指标请求的URL,我们便可以编写程序通过http请求获取指标的监控数据。

八、flink REST API监控程序

为了获取flink任务运行状态和吞吐量我们需要注意一下两点:

  1. flink集群模式需要知道 JobManager 的地址和端口(5004)
  2. 对于 flink on yarn 模式来说,则需要知道 RM 代理的 JobManager UI 地址,例如 http://yarn-resource-manager-ui/proxy/application_155316436xxxx_xxxx

1.获取flink任务运行状态(我们可以在浏览器进行测试,输入如下的连接)

http://yarn-resource-manager-ui/proxy/application_155316436xxxx_xxxx/jobs

返回的结果

{
  jobs: [{
      id: "ce793f18efab10127f0626a37ff4b4d4",
      status: "RUNNING"
    }
  ]
}

2.获取 job 详情

   需要在/jobs/jobid

http://yarn-resource-manager-ui/proxy/application_155316436xxxx_xxxx/jobs/ce793f18efab10127f0626a37ff4b4d4

{
  jid: "ce793f18efab10127f0626a37ff4b4d4",
  name: "Test",
  isStoppable: false,
  state: "RUNNING",
  start - time: 1551577191874,
  end - time: -1,
  duration: 295120489,
  now: 1551872312363,
  。。。。。。
      此处省略n行
    。。。。。。
      }, {
        id: "cbc357ccb763df2852fee8c4fc7d55f2",
        parallelism: 12,
        operator: "",
        operator_strategy: "",
        description: "Source: Custom Source -&gt; Flat Map",
        optimizer_properties: {}
      }
    ]
  }
}

九、更灵活的方式获取每个指标的请求连接

有人可能会问,这么多指标,难道我要把每个指标的请求的URL格式都记住吗?今天教大家一个小技巧,一个前端技术,就是进入flink任务的UI界面,按住F12进入开发者模式,然后我们点击任意一个metric指标,便能立即看到每个指标的请求的URL。比如获取flink任务的背压情况:

如下图我们点击某一个task的status,按一下f12,便看到了backpressue,点开backpressue就是获取任务背压情况的连接如下:

http://127.0.0.1/proxy/application_12423523_133234/jobs/86eb310874aeccb37b58ae2892feced3/vertices/cbc357ccb763df2852fee8c4fc7d55f2/backpressure

请求连接返回的json字符串如下:我们可以获取每一个分区的背压情况,如果不是OK状态便可以进行任务报警,其他的指标获取监控值都可以这样获取 简单而又便捷。

十、案例:实时获取yarn上flink任务运行状态

    我们使用 flink REST API的方式,通过http请求实时获取flink任务状态,不是RUNNING状态则进行电话或邮件报警,达到实时监控的效果。

public class SendGet {
    public static String sendGet(String url) {
        String result = "";
        BufferedReader in = null;
        try {
            String urlNameString = url;
            URL realUrl = new URL(urlNameString);
            // 打开和URL之间的连接
            URLConnection connection = realUrl.openConnection();
            // 设置通用的请求属性
            connection.setRequestProperty("accept", "*/*");
            connection.setRequestProperty("connection", "Keep-Alive");
            connection.setRequestProperty("user-agent",
                    "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;SV1)");
            // 建立实际的连接
            connection.connect();
            in = new BufferedReader(new InputStreamReader(
                    connection.getInputStream()));
            String line;
            while ((line = in.readLine()) != null) {
                result += line;
            }
        } catch (Exception e) {
            System.out.println("发送GET请求出现异常!" + e);
            e.printStackTrace();
        }
        // 使用finally块来关闭输入流
        finally {
            try {
                if (in != null) {
                    in.close();
                }
            } catch (Exception e2) {
                e2.printStackTrace();
            }
        }
        return result;
    }
    public static void main(String[] args) {
        String s = sendGet("http://127.0.0.1:5004/proxy/application_1231435364565_0350/jobs");
        JSONObject jsonObject = JSON.parseObject(s);
        String string = jsonObject.getString("jobs");
        String substring = string.substring(1, string.length() - 1);
        JSONObject jsonObject1 = JSONObject.parseObject(substring);
        String status = jsonObject1.getString("status");
        System.out.println(status);
    }
}

结果

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
6月前
|
Oracle 关系型数据库 数据库连接
实时计算 Flink版操作报错之遇到Unable to register metrics as an,该怎么处理
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
SQL 监控 API
Flink教程(27)- Flink Metrics监控
Flink教程(27)- Flink Metrics监控
515 1
|
6月前
|
消息中间件 存储 Kafka
实时计算 Flink版产品使用合集之metrics缺少debezium的相关指标,如何获取
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
分布式计算 API 数据处理
Flink【基础知识 01】(简介+核心架构+分层API+集群架构+应用场景+特点优势)(一篇即可大概了解flink)
【2月更文挑战第15天】Flink【基础知识 01】(简介+核心架构+分层API+集群架构+应用场景+特点优势)(一篇即可大概了解flink)
164 1
|
监控 大数据 API
大数据Flink Metrics监控
大数据Flink Metrics监控
111 0
|
分布式计算 Java Hadoop
flink hadoop 从0~1分布式计算与大数据项目实战(4)zookeeper内部原理流程简介以及java curator client操作集群注册,读取
flink hadoop 从0~1分布式计算与大数据项目实战(4)zookeeper内部原理流程简介以及java curator client操作集群注册,读取
flink hadoop 从0~1分布式计算与大数据项目实战(4)zookeeper内部原理流程简介以及java curator client操作集群注册,读取
|
存储 SQL 分布式计算
Flink 引擎简介 | 青训营笔记
从产品技术来看,Flink 具备如下流计算技术特征:完全一次保证:故障后应正确恢复有状态运算符中的状态;低延迟:越低越好。许多应用程序需要亚秒级延迟;高吞吐量:随着数据速率的增长,通过管道推送大量数据至关重要;强大的计算模型:框架应该提供一种编程模型,该模型不限制用户并允许各种各样的应用程序在没有故障的情况下,容错机制的开销很低;流量控制:来自慢速算子的反压应该由系统和数据源自然吸收,以避免因消费者缓慢而导致崩溃或降低性能;乱序数据的支持:支持由于其他原因导致的数据乱序达到、延迟到达后,计算出正确的结果;完备的流式语义:支持窗口等现代流式处理语义抽象;
219 0
Flink 引擎简介 | 青训营笔记
|
存储 SQL 资源调度
环境篇之 flink 简介|学习笔记
快速学习环境篇之 flink 简介
143 0
环境篇之 flink 简介|学习笔记
|
消息中间件 分布式计算 大数据
Flink原理简介和使用(3)
Flink原理简介和使用(3)
214 0
Flink原理简介和使用(3)
|
流计算
Flink原理简介和使用(2)
Flink原理简介和使用(2)
225 0
Flink原理简介和使用(2)

相关产品

  • 实时计算 Flink版