大家好:我在使用flink table api 实现group by 聚合操作的时候,自定义了一个UDAF函数,首次在集群上运行的时候是正确的,但是从check point恢复任务的时候报下面错误,但是使用内置的函数就没问题,不知道我该怎么解决呢?
java.lang.RuntimeException: Error while getting state at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62) at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144) at org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.open(GroupAggProcessFunction.scala:74) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.open(LegacyKeyedProcessOperator.java:60) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.StateMigrationException: For heap backends, the new state serializer must not be incompatible. at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:227) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createInternalState(HeapKeyedStateBackend.java:270) at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72) at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286) at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:335) at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124) at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60) ... 9 more
groupBy: (risk_id, dev_only_id), select: (risk_id, dev_only_id, UpdateColumn(org_name, evaluation_time_millis) AS TMP_586, UpdateColumn(rule_weight_sum, evaluation_time_millis) AS TMP_581, UpdateColumn(area_code, evaluation_time_millis) AS TMP_583, SUM(rule_risk) AS TMP_580, UpdateColumn(ip, evaluation_time_millis) AS TMP_587, UpdateColumn(evaluation_time_millis, evaluation_time_millis) AS TMP_588, UpdateColumn(area_name, evaluation_time_millis) AS TMP_584, UpdateColumn(evaluation_time, evaluation_time_millis) AS TMP_582, UpdateColumn(org_code, evaluation_time_millis) AS TMP_585) -> select: (risk_id, _UTF-16LE'''' AS risk_name, /(TMP_580, CAST(TMP_581)) AS risk_value, TMP_582 AS evaluation_time, TMP_583 AS area_code, TMP_584 AS area_name, TMP_585 AS org_code, TMP_586 AS org_name, dev_only_id, TMP_587 AS ip, CAST(TMP_581) AS rule_weight_sum, CAST(TMP_588) AS evaluation_time_millis) -> to: Tuple2 -> Filter -> Map -> from: (risk_id, risk_name, risk_value, evaluation_time, area_code, area_name, org_code, org_name, dev_only_id, ip, risk_weight_sum, evaluation_time_millis) (3/6)
*来自志愿者整理的flink邮件归档
你在checkpoint 恢复任务前,有修改过 UDAF 的 ACC 结构吗? 另外,如果可以的话,最好也能带上 UDAF 的实现,和 SQL。*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。