Flink - metrics

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介:

Metrics是以MetricsGroup来组织的

MetricGroup

MetricGroup

这就是个metric容器,里面可以放subGroup,或者各种metric

所以主要的接口就是注册,

复制代码
/**
 * A MetricGroup is a named container for {@link Metric Metrics} and further metric subgroups.
 * 
 * <p>Instances of this class can be used to register new metrics with Flink and to create a nested
 * hierarchy based on the group names.
 * 
 * <p>A MetricGroup is uniquely identified by it's place in the hierarchy and name.
 */
public interface MetricGroup {
    <C extends Counter> C counter(int name, C counter);
    <T, G extends Gauge<T>> G gauge(int name, G gauge);
    <H extends Histogram> H histogram(String name, H histogram);
    MetricGroup addGroup(String name);
}
复制代码

 

AbstractMetricGroup

关键是实现MetricGroup,逻辑很简单,在注册或close的时候都需要加锁互斥

复制代码
/**
 * Abstract {@link MetricGroup} that contains key functionality for adding metrics and groups.
 * 
 */
 
public abstract class AbstractMetricGroup implements MetricGroup {

    /** The registry that this metrics group belongs to */
    protected final MetricRegistry registry;

    /** All metrics that are directly contained in this group */
    private final Map<String, Metric> metrics = new HashMap<>();

    /** All metric subgroups of this group */
    private final Map<String, AbstractMetricGroup> groups = new HashMap<>();

    /** The metrics scope represented by this group.
     *  For example ["host-7", "taskmanager-2", "window_word_count", "my-mapper" ]. */
    private final String[] scopeComponents;  //命名空间

    /** The metrics scope represented by this group, as a concatenated string, lazily computed.
     * For example: "host-7.taskmanager-2.window_word_count.my-mapper" */
    private String scopeString;

    @Override
    public <C extends Counter> C counter(String name, C counter) {
        addMetric(name, counter);
        return counter;
    }
    
    /**
     * Adds the given metric to the group and registers it at the registry, if the group
     * is not yet closed, and if no metric with the same name has been registered before.
     * 
     * @param name the name to register the metric under
     * @param metric the metric to register
     */
    protected void addMetric(String name, Metric metric) {
        // add the metric only if the group is still open
        synchronized (this) { //加锁
            if (!closed) {
                // immediately put without a 'contains' check to optimize the common case (no collition)
                // collisions are resolved later
                Metric prior = metrics.put(name, metric);

                // check for collisions with other metric names
                if (prior == null) {
                    // no other metric with this name yet

                    registry.register(metric, name, this);
                }
                else {
                    // we had a collision. put back the original value
                    metrics.put(name, prior);
                    
                }
            }
        }
    }
}
复制代码

 

MetricReporter

采集好的Metrics需要用reporter才能发送出去,

复制代码
/**
 * Reporters are used to export {@link Metric Metrics} to an external backend.
 * 
 * <p>Reporters are instantiated via reflection and must be public, non-abstract, and have a
 * public no-argument constructor.
 */
public interface MetricReporter {

    // ------------------------------------------------------------------------
    //  life cycle
    // ------------------------------------------------------------------------

    /**
     * Configures this reporter. Since reporters are instantiated generically and hence parameter-less,
     * this method is the place where the reporters set their basic fields based on configuration values.
     * 
     * <p>This method is always called first on a newly instantiated reporter.
     *
     * @param config The configuration with all parameters.
     */
    void open(MetricConfig config);

    /**
     * Closes this reporter. Should be used to close channels, streams and release resources.
     */
    void close();

    void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group);
    void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group);
}
复制代码

 

AbstractReporter实现MetricReport接口,

复制代码
/**
 * Base interface for custom metric reporters.
 */
public abstract class AbstractReporter implements MetricReporter, CharacterFilter {
    protected final Logger log = LoggerFactory.getLogger(getClass());

    protected final Map<Gauge<?>, String> gauges = new HashMap<>();
    protected final Map<Counter, String> counters = new HashMap<>();
    protected final Map<Histogram, String> histograms = new HashMap<>();

    @Override
    public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
        final String name = group.getMetricIdentifier(metricName, this); //group只是用来获取metrics完整的name

        synchronized (this) {
            if (metric instanceof Counter) {
                counters.put((Counter) metric, name);
            } else if (metric instanceof Gauge) {
                gauges.put((Gauge<?>) metric, name);
            } else if (metric instanceof Histogram) {
                histograms.put((Histogram) metric, name);
            } else {
                log.warn("Cannot add unknown metric type {}. This indicates that the reporter " +
                    "does not support this metric type.", metric.getClass().getName());
            }
        }
    }

    @Override
    public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
        synchronized (this) {
            if (metric instanceof Counter) {
                counters.remove(metric);
            } else if (metric instanceof Gauge) {
                gauges.remove(metric);
            } else if (metric instanceof Histogram) {
                histograms.remove(metric);
            } else {
                log.warn("Cannot remove unknown metric type {}. This indicates that the reporter " +
                    "does not support this metric type.", metric.getClass().getName());
            }
        }
    }
}
复制代码

 

MetricRegistry

MetricRegistry用于连接MetricGroups和MetricReporters,

会把需要report的metric加到MetricReporters,并启动定时的report线程

复制代码
/**
 * A MetricRegistry keeps track of all registered {@link Metric Metrics}. It serves as the
 * connection between {@link MetricGroup MetricGroups} and {@link MetricReporter MetricReporters}.
 */
public class MetricRegistry {
    
    private List<MetricReporter> reporters;
    private ScheduledExecutorService executor;

    private final ScopeFormats scopeFormats;

    private final char delimiter;

    /**
     * Creates a new MetricRegistry and starts the configured reporter.
     */
    public MetricRegistry(Configuration config) {
        // first parse the scope formats, these are needed for all reporters
        ScopeFormats scopeFormats;
        try {
            scopeFormats = createScopeConfig(config);  //从配置中读到scope的格式,即监控数据的namespace的格式是什么
        }
        catch (Exception e) {
            LOG.warn("Failed to parse scope format, using default scope formats", e);
            scopeFormats = new ScopeFormats();
        }
        this.scopeFormats = scopeFormats;

        char delim;
        try {
            delim = config.getString(ConfigConstants.METRICS_SCOPE_DELIMITER, ".").charAt(0);  //从配置里面读出分隔符
        } catch (Exception e) {
            LOG.warn("Failed to parse delimiter, using default delimiter.", e);
            delim = '.';
        }
        this.delimiter = delim;

        // second, instantiate any custom configured reporters
        this.reporters = new ArrayList<>();

        final String definedReporters = config.getString(ConfigConstants.METRICS_REPORTERS_LIST, null);  //读出配置的Reporters

        if (definedReporters == null) {
            // no reporters defined
            // by default, don't report anything
            LOG.info("No metrics reporter configured, no metrics will be exposed/reported.");
            this.executor = null;
        } else {
            // we have some reporters so
            String[] namedReporters = definedReporters.split("\\s*,\\s*");
            for (String namedReporter : namedReporters) {  //对于配置的每个reporter

                DelegatingConfiguration reporterConfig = new DelegatingConfiguration(config, ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + ".");
                final String className = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, null);  //reporter class名配置

                try {
                    String configuredPeriod = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, null); //report interval配置
                    TimeUnit timeunit = TimeUnit.SECONDS;
                    long period = 10;

                    if (configuredPeriod != null) {
                        try {
                            String[] interval = configuredPeriod.split(" ");
                            period = Long.parseLong(interval[0]);
                            timeunit = TimeUnit.valueOf(interval[1]);
                        }
                        catch (Exception e) {
                            LOG.error("Cannot parse report interval from config: " + configuredPeriod +
                                    " - please use values like '10 SECONDS' or '500 MILLISECONDS'. " +
                                    "Using default reporting interval.");
                        }
                    }

                    Class<?> reporterClass = Class.forName(className);
                    MetricReporter reporterInstance = (MetricReporter) reporterClass.newInstance(); //实例化reporter

                    MetricConfig metricConfig = new MetricConfig();
                    reporterConfig.addAllToProperties(metricConfig);
                    reporterInstance.open(metricConfig);  //open reporter

                    if (reporterInstance instanceof Scheduled) {
                        if (this.executor == null) {
                            executor = Executors.newSingleThreadScheduledExecutor(); //创建Executor
                        }
                        LOG.info("Periodically reporting metrics in intervals of {} {} for reporter {} of type {}.", period, timeunit.name(), namedReporter, className);

                        executor.scheduleWithFixedDelay(
                                new ReporterTask((Scheduled) reporterInstance), period, period, timeunit); //Scheduled report
                    }
                    reporters.add(reporterInstance); //加入reporters列表
                }
                catch (Throwable t) {
                    shutdownExecutor();
                    LOG.error("Could not instantiate metrics reporter" + namedReporter + ". Metrics might not be exposed/reported.", t);
                }
            }
        }
    }

    // ------------------------------------------------------------------------
    //  Metrics (de)registration
    // ------------------------------------------------------------------------

    /**
     * Registers a new {@link Metric} with this registry.
     *
     * @param metric      the metric that was added
     * @param metricName  the name of the metric
     * @param group       the group that contains the metric
     */
    public void register(Metric metric, String metricName, MetricGroup group) { //在AbstractMetricGroup.addMetric中被调用,metric被加到group的同时也会加到reporter中
        try {
            if (reporters != null) {
                for (MetricReporter reporter : reporters) {
                    if (reporter != null) {
                        reporter.notifyOfAddedMetric(metric, metricName, group); //把metric加到每个reporters上面
                    }
                }
            }
        } catch (Exception e) {
            LOG.error("Error while registering metric.", e);
        }
    }

    /**
     * Un-registers the given {@link org.apache.flink.metrics.Metric} with this registry.
     *
     * @param metric      the metric that should be removed
     * @param metricName  the name of the metric
     * @param group       the group that contains the metric
     */
    public void unregister(Metric metric, String metricName, MetricGroup group) {
        try {
            if (reporters != null) {
                for (MetricReporter reporter : reporters) {
                    if (reporter != null) {
                        reporter.notifyOfRemovedMetric(metric, metricName, group);
                    }
                }
            }
        } catch (Exception e) {
            LOG.error("Error while registering metric.", e);
        }
    }

    // ------------------------------------------------------------------------

    /**
     * This task is explicitly a static class, so that it does not hold any references to the enclosing
     * MetricsRegistry instance.
     *
     * This is a subtle difference, but very important: With this static class, the enclosing class instance
     * may become garbage-collectible, whereas with an anonymous inner class, the timer thread
     * (which is a GC root) will hold a reference via the timer task and its enclosing instance pointer.
     * Making the MetricsRegistry garbage collectible makes the java.util.Timer garbage collectible,
     * which acts as a fail-safe to stop the timer thread and prevents resource leaks.
     */
    private static final class ReporterTask extends TimerTask {

        private final Scheduled reporter;

        private ReporterTask(Scheduled reporter) {
            this.reporter = reporter;
        }

        @Override
        public void run() {
            try {
                reporter.report();  //Task的核心就是调用reporter.report
            } catch (Throwable t) {
                LOG.warn("Error while reporting metrics", t);
            }
        }
    }
}
复制代码

 

TaskManager

在TaskManager中,

associateWithJobManager
metricsRegistry = new FlinkMetricRegistry(config.configuration)

taskManagerMetricGroup = 
  new TaskManagerMetricGroup(metricsRegistry, this.runtimeInfo.getHostname, id.toString)

TaskManager.instantiateStatusMetrics(taskManagerMetricGroup)

创建metricsRegistry 和TaskManagerMetricGroup

可以看到instantiateStatusMetrics,只是注册各种taskManager的status metrics,

复制代码
private def instantiateStatusMetrics(taskManagerMetricGroup: MetricGroup) : Unit = {
    val jvm = taskManagerMetricGroup
      .addGroup("Status")
      .addGroup("JVM")

    instantiateClassLoaderMetrics(jvm.addGroup("ClassLoader"))
    instantiateGarbageCollectorMetrics(jvm.addGroup("GarbageCollector"))
    instantiateMemoryMetrics(jvm.addGroup("Memory"))
    instantiateThreadMetrics(jvm.addGroup("Threads"))
    instantiateCPUMetrics(jvm.addGroup("CPU"))
  }

  private def instantiateClassLoaderMetrics(metrics: MetricGroup) {
    val mxBean = ManagementFactory.getClassLoadingMXBean //从ManagementFactory可以取出表示JVM指标的MXBean

    metrics.gauge[Long, FlinkGauge[Long]]("ClassesLoaded", new FlinkGauge[Long] {
      override def getValue: Long = mxBean.getTotalLoadedClassCount
    })
    metrics.gauge[Long, FlinkGauge[Long]]("ClassesUnloaded", new FlinkGauge[Long] {
      override def getValue: Long = mxBean.getUnloadedClassCount
    })
  }
复制代码

 

在submitTask的时候,

submitTask
复制代码
  val taskMetricGroup = taskManagerMetricGroup.addTaskForJob(tdd)

  val task = new Task(
    tdd,
    memoryManager,
    ioManager,
    network,
    bcVarManager,
    selfGateway,
    jobManagerGateway,
    config.timeout,
    libCache,
    fileCache,
    runtimeInfo,
    taskMetricGroup)
复制代码

看到会为每个task,创建taskMetricGroup

并在创建Task对象的时候传入该对象,

复制代码
Environment env = new RuntimeEnvironment(jobId, vertexId, executionId,
        executionConfig, taskInfo, jobConfiguration, taskConfiguration,
        userCodeClassLoader, memoryManager, ioManager,
        broadcastVariableManager, accumulatorRegistry,
        splitProvider, distributedCacheEntries,
        writers, inputGates, jobManager, taskManagerConfig, metrics, this);

// let the task code create its readers and writers
invokable.setEnvironment(env);
复制代码

在Task中, 关键的就是把这个taskMetricGroup,加入RuntimeEnvironment,这样在实际逻辑中,就可以通过RuntimeEnvironment获取到metrics

而StreamTask就是一种Invokable,接口定义如下

复制代码
public abstract class AbstractInvokable {

    /** The environment assigned to this invokable. */
    private Environment environment;

    /**
     * Starts the execution.
     *
     * <p>Must be overwritten by the concrete task implementation. This method
     * is called by the task manager when the actual execution of the task
     * starts.
     *
     * <p>All resources should be cleaned up when the method returns. Make sure
     * to guard the code with <code>try-finally</code> blocks where necessary.
     * 
     * @throws Exception
     *         Tasks may forward their exceptions for the TaskManager to handle through failure/recovery.
     */
    public abstract void invoke() throws Exception;

    /**
     * Sets the environment of this task.
     * 
     * @param environment
     *        the environment of this task
     */
    public final void setEnvironment(Environment environment) {
        this.environment = environment;
    }

    /**
     * Returns the environment of this task.
     * 
     * @return The environment of this task.
     */
    public Environment getEnvironment() {
        return this.environment;
    }
}
复制代码

 

所以在StreamTask里面可以这样使用metrics,

getEnvironment().getMetricGroup().gauge("lastCheckpointSize", new Gauge<Long>() {
    @Override
    public Long getValue() {
        return StreamTask.this.lastCheckpointSize;
    }
});
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
SQL 监控 API
Flink教程(27)- Flink Metrics监控
Flink教程(27)- Flink Metrics监控
576 1
|
9月前
|
Oracle 关系型数据库 数据库连接
实时计算 Flink版操作报错之遇到Unable to register metrics as an,该怎么处理
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
9月前
|
消息中间件 存储 Kafka
实时计算 Flink版产品使用合集之metrics缺少debezium的相关指标,如何获取
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
监控 大数据 API
大数据Flink Metrics监控
大数据Flink Metrics监控
134 0
|
Prometheus 监控 Cloud Native
Flink Metrics 简介
Fink Metrics 是 Flink 内部的一个度量系统,除了可以在 Flink UI 上展示运行时的指标,也可以对外暴露接口集成到第三方系统,本文详述了这两方面的应用
|
SQL 运维 监控
Apache Flink 进阶(八):详解 Metrics 原理与实战
本文由 Apache Flink Contributor 刘彪分享,本文对两大问题进行了详细的介绍,即什么是 Metrics、如何使用 Metrics,并对 Metrics 监控实战进行解释说明。
Apache Flink 进阶(八):详解 Metrics 原理与实战
|
5月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
3月前
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
1735 73
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
|
1月前
|
消息中间件 关系型数据库 MySQL
Flink CDC 在阿里云实时计算Flink版的云上实践
本文整理自阿里云高级开发工程师阮航在Flink Forward Asia 2024的分享,重点介绍了Flink CDC与实时计算Flink的集成、CDC YAML的核心功能及应用场景。主要内容包括:Flink CDC的发展及其在流批数据处理中的作用;CDC YAML支持的同步链路、Transform和Route功能、丰富的监控指标;典型应用场景如整库同步、Binlog原始数据同步、分库分表同步等;并通过两个Demo展示了MySQL整库同步到Paimon和Binlog同步到Kafka的过程。最后,介绍了未来规划,如脏数据处理、数据限流及扩展数据源支持。
196 0
Flink CDC 在阿里云实时计算Flink版的云上实践

热门文章

最新文章