尝试在Flink的自定义聚合函数中使用State,发现open函数中通过FunctionContext无法获取到RuntimeContext
如何在聚合函数中使用State?
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
import org.apache.flink.api.java.typeutils.TupleTypeInfo
import org.apache.flink.table.functions.{AggregateFunction, FunctionContext}
import java.lang.{Iterable => JIterable}
class IntDiffSumAccumulator extends JTuple2[Int, Boolean]
class IntDiffSumFunction extends AggregateFunction[Int, IntDiffSumAccumulator] {
override def open(context: FunctionContext): Unit = {
// Flink1.7.2 这里没法获取到 RuntimeContext,没有办法初始化State
//getRuntimeContext.getState(desc)
val a = this.hashCode()
print(s"hashCode:$a")
super.open(context)
}
override def createAccumulator(): IntDiffSumAccumulator = {
val acc = new IntDiffSumAccumulator()
acc.f0 = 0
acc.f1 = false
acc
}
def accumulate(accumulator: IntDiffSumAccumulator, value: Int): Unit = {
accumulator.f0 += value
accumulator.f1 = true
}
override def getValue(accumulator: IntDiffSumAccumulator): Int = {
if (accumulator.f1) {
accumulator.f0
} else {
Int.MinValue
}
}
def merge(acc: IntDiffSumAccumulator, its: JIterable[IntDiffSumAccumulator]) = {
val iter = its.iterator()
while (true) {
val a = iter.next()
if (a.f1) {
acc.f0 += a.f0
acc.f1 = true
}
}
}
def resetAccumulator(acc: IntDiffSumAccumulator) = {
acc.f0 = 0
acc.f1 = false
}
override def getAccumulatorType: TypeInformation[IntDiffSumAccumulator] =
new TupleTypeInfo[IntDiffSumAccumulator](BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.BOOLEAN_TYPE_INFO)
}*来自志愿者整理的flink邮件归档
可以参考一下Flink代码里已有的例子:https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxWithRetractAggFunction.java https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxWithRetractAggFunction.java *来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。