目前采用flink on yarn, flink版本是1.7.2,hadoop是2.8.5
现在遇到个问题,假设订单对象Order,因使用了窗口等,所以有状态产生,假设现在要在Order中添加几个新的字段,采用savepoint重启任务,就会报错,看错误信息是反序列化问题(rocksdb默认用的kryo), 只能kill掉再重启,导致状态丢失,请问有什么解决办法吗?需求不断变更,肯定会定期增加字段的;或者更改Rocksdb的序列化方式,用json?可以吗?
错误信息如下
INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job test (e241f87f338a95ce7a2b1e8dd96bea0a) switched from state RUNNING to FAILING. TimerException{com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 104} at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:288) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 104 at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:492) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:428) at org.apache.flink.contrib.streaming.state.RocksDBListState.deserializeNextElement(RocksDBListState.java:146) at org.apache.flink.contrib.streaming.state.RocksDBListState.deserializeList(RocksDBListState.java:137) at org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:121) at org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:112) at org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:61) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:498) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:235) at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285) ... 7 more*来自志愿者整理的flink邮件归档
hi, 你看以看一下 1.8 开始支持的 POJO Scheme Evolution[1] 是否满足你的需求,你需要注意的是如何满足 Flink 中判断 POJO 的定义 [2] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/schema_evolution.html [2] https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/types_serialization.html#rules-for-pojo-types*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。