开发者社区> 问答> 正文

关于flink状态后端使用Rocksdb序列化问题

目前采用flink on yarn, flink版本是1.7.2,hadoop是2.8.5

现在遇到个问题,假设订单对象Order,因使用了窗口等,所以有状态产生,假设现在要在Order中添加几个新的字段,采用savepoint重启任务,就会报错,看错误信息是反序列化问题(rocksdb默认用的kryo), 只能kill掉再重启,导致状态丢失,请问有什么解决办法吗?需求不断变更,肯定会定期增加字段的;或者更改Rocksdb的序列化方式,用json?可以吗?

错误信息如下

INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job test (e241f87f338a95ce7a2b1e8dd96bea0a) switched from state RUNNING to FAILING. TimerException{com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 104} at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:288) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 104 at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:492) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:428) at org.apache.flink.contrib.streaming.state.RocksDBListState.deserializeNextElement(RocksDBListState.java:146) at org.apache.flink.contrib.streaming.state.RocksDBListState.deserializeList(RocksDBListState.java:137) at org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:121) at org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:112) at org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:61) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:498) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:235) at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285) ... 7 more*来自志愿者整理的flink邮件归档

展开
收起
小阿怪 2021-12-07 22:38:13 1451 0
1 条回答
写回答
取消 提交回答
  • hi, 你看以看一下 1.8 开始支持的 POJO Scheme Evolution[1] 是否满足你的需求,你需要注意的是如何满足 Flink 中判断 POJO 的定义 [2] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/schema_evolution.html [2] https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/types_serialization.html#rules-for-pojo-types*来自志愿者整理的flink邮件归档

    2021-12-08 10:30:22
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载