开发者社区> 问答> 正文

FLINK TABLE API 自定义聚合函数UDAF从check point恢复任务报状态序列

大家好:我在使用flink table api 实现group by 聚合操作的时候,自定义了一个UDAF函数,首次在集群上运行的时候是正确的,但是从check point恢复任务的时候报下面错误,但是使用内置的函数就没问题,不知道我该怎么解决呢?

java.lang.RuntimeException: Error while getting state at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62) at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144) at org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.open(GroupAggProcessFunction.scala:74) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.open(LegacyKeyedProcessOperator.java:60) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.StateMigrationException: For heap backends, the new state serializer must not be incompatible. at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:227) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createInternalState(HeapKeyedStateBackend.java:270) at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72) at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286) at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:335) at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124) at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60) ... 9 more

groupBy: (risk_id, dev_only_id), select: (risk_id, dev_only_id, UpdateColumn(org_name, evaluation_time_millis) AS TMP_586, UpdateColumn(rule_weight_sum, evaluation_time_millis) AS TMP_581, UpdateColumn(area_code, evaluation_time_millis) AS TMP_583, SUM(rule_risk) AS TMP_580, UpdateColumn(ip, evaluation_time_millis) AS TMP_587, UpdateColumn(evaluation_time_millis, evaluation_time_millis) AS TMP_588, UpdateColumn(area_name, evaluation_time_millis) AS TMP_584, UpdateColumn(evaluation_time, evaluation_time_millis) AS TMP_582, UpdateColumn(org_code, evaluation_time_millis) AS TMP_585) -> select: (risk_id, _UTF-16LE'''' AS risk_name, /(TMP_580, CAST(TMP_581)) AS risk_value, TMP_582 AS evaluation_time, TMP_583 AS area_code, TMP_584 AS area_name, TMP_585 AS org_code, TMP_586 AS org_name, dev_only_id, TMP_587 AS ip, CAST(TMP_581) AS rule_weight_sum, CAST(TMP_588) AS evaluation_time_millis) -> to: Tuple2 -> Filter -> Map -> from: (risk_id, risk_name, risk_value, evaluation_time, area_code, area_name, org_code, org_name, dev_only_id, ip, risk_weight_sum, evaluation_time_millis) (3/6)

*来自志愿者整理的flink邮件归档

展开
收起
小阿怪 2021-12-07 22:07:47 921 0
1 条回答
写回答
取消 提交回答
  • 你在checkpoint 恢复任务前,有修改过 UDAF 的 ACC 结构吗? 另外,如果可以的话,最好也能带上 UDAF 的实现,和 SQL。*来自志愿者整理的flink邮件归档

    2021-12-08 10:29:11
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载