我是用flink1.10的ddl的双流窗口join,但是当我新增字段或修改字段类型,把程序重启后,无论是从savepoint处还是checkpoint处重启都是失败,最后只能删除掉checkpoint或savepoint才能使用,但是这样会丢些数据,请问该怎么处理,非常感谢?
报错如下: Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for KeyedProcessOperator_33601e3dd532edccff92bfce124910c6_(1/3) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131) ... 9 common frames omitted Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception. at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336) at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ... 11 common frames omitted Caused by: org.apache.flink.util.StateMigrationException: The new key serializer must be compatible. at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:194) at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:180) at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:167) at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:151) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279) ... 15 common frames omitted*来自志愿者整理的FLINK邮件归档
原因是因为新增字段或者修改字段类型后,新的serializer无法(反)序列化原先存储的数据,对于这种有字段增改需求的场景,目前Flink社区主要借助于Pojo或者avro来实现 [1],建议对相关的state schema做重新规划,以满足这种有后续升级需求的场景。
[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html*来自志愿者整理的FLINK邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。