开发者社区> 问答> 正文

Flink Metrics名称冲突

"我的Flink(1.6)作业侦听流并执行一些聚合。我希望在汇总后收集指标但遇到一些困难。

我的指标看起来像这样:

id_1, 0.1
id_2, 0.3
...
ids将是可变的,并且值会随着时间的推移而增加和减少,因此看起来像Gauge是最合适的。

我创建了这个地图函数来捕获量表中的这些指标:

class MetricsMapper extends RichMapFunction[MyObject, Double] {

override def map(obj: MyObject): Double = {

val metricVal = obj.metricVal
getRuntimeContext.getMetricGroup.gauge[Double, ScalaGauge[Double]](obj.id, ScalaGauge[Double](() => metricVal))
metricVal

}
}
如图所示,我正在使用我的对象的id属性来注册仪表。

我遇到的问题是我在运行工作时收到此警告:

Name collision: Group already contains a Metric with the name ""x"" Metric will not be reported
我解释这一点,因为我们已经在流中创建了这个量表,并且忽略了新值。有办法克服这个问题吗?
"

展开
收起
flink小助手 2018-11-28 16:17:21 3029 0
1 条回答
写回答
取消 提交回答
  • flink小助手会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关flink的问题及回答。

    "你应该遵循文档中显示的模式:

    new class MyMapper extends RichMapFunction[MyObject, Double] {
    @transient private var valueToExpose = 0.0

    override def open(parameters: Configuration): Unit = {

    getRuntimeContext()
      .getMetricGroup()
      .gauge[Double, ScalaGauge[Double]](""MyGauge"", ScalaGauge[Double]( () => valueToExpose ) )

    }

    override def map(obj: MyObject): String = {

    valueToExpose = obj.metricval
    valueToExpose

    }
    }
    换句话说,在open()方法中注册一次,并在每次调用map()时更新该值。

    在你的情况下,你需要为每个唯一对象ID单独计量。如果你真的想用指标来做这件事,你将不得不保留诸如仪表的散列图之类的东西,根据需要创建新的,以及更新map()函数中相关规范的值。或者更好的是,通过id键入你的流。

    在考虑使用指标是否合适时要记住的另一个因素是指标不是检查点。
    "

    2019-07-17 23:16:50
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载