"我的Flink(1.6)作业侦听流并执行一些聚合。我希望在汇总后收集指标但遇到一些困难。
我的指标看起来像这样:
id_1, 0.1
id_2, 0.3
...
ids将是可变的,并且值会随着时间的推移而增加和减少,因此看起来像Gauge是最合适的。
我创建了这个地图函数来捕获量表中的这些指标:
class MetricsMapper extends RichMapFunction[MyObject, Double] {
override def map(obj: MyObject): Double = {
val metricVal = obj.metricVal
getRuntimeContext.getMetricGroup.gauge[Double, ScalaGauge[Double]](obj.id, ScalaGauge[Double](() => metricVal))
metricVal
}
}
如图所示,我正在使用我的对象的id属性来注册仪表。
我遇到的问题是我在运行工作时收到此警告:
Name collision: Group already contains a Metric with the name ""x"" Metric will not be reported
我解释这一点,因为我们已经在流中创建了这个量表,并且忽略了新值。有办法克服这个问题吗?
"
"你应该遵循文档中显示的模式:
new class MyMapper extends RichMapFunction[MyObject, Double] {
@transient private var valueToExpose = 0.0
override def open(parameters: Configuration): Unit = {
getRuntimeContext()
.getMetricGroup()
.gauge[Double, ScalaGauge[Double]](""MyGauge"", ScalaGauge[Double]( () => valueToExpose ) )
}
override def map(obj: MyObject): String = {
valueToExpose = obj.metricval
valueToExpose
}
}
换句话说,在open()方法中注册一次,并在每次调用map()时更新该值。
在你的情况下,你需要为每个唯一对象ID单独计量。如果你真的想用指标来做这件事,你将不得不保留诸如仪表的散列图之类的东西,根据需要创建新的,以及更新map()函数中相关规范的值。或者更好的是,通过id键入你的流。
在考虑使用指标是否合适时要记住的另一个因素是指标不是检查点。
"
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。