开发者社区> 问答> 正文

Flink savepoint迁移问题

社区各位大佬,我遇到了一个Flink版本升级后,savepoint不会正确恢复问题。

版本从1.9.3 升级到1.11.0或1.11.3.

连接器使用的pulsar的连接器。在Source的checkpoint数据结构如下

oldUnionOffsetStates = stateStore.getUnionListState(

new ListStateDescriptor<>(

OFFSETS_STATE_NAME,

TypeInformation.of(new TypeHint<Tuple2<String, MessageId>>() {

})));

oldUnionSubscriptionNameStates =

stateStore.getUnionListState(

new ListStateDescriptor<>(

OFFSETS_STATE_NAME + "_subName",

TypeInformation.of(new TypeHint () {

})));

我在本地通过bin/flink savepoint 命令保存下1.9.3的state,然后停止flink 1.9.3的集群,启动1.11.3的flink集群,使用bin/flink run -s 参数恢复任务。

任务在启动后,会遇到下面的错误

2021-03-11 10:02:25

java.lang.Exception: Exception while creating StreamOperatorStateContext.

at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:222)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)

at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)

at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506)

at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)

at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)

at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)

at java.base/java.lang.Thread.run(Thread.java:832)

Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for StreamSource_15541fe0b20ca4f01df784b178a8cc9d_(1/1) from any of the 1 provided restore options.

at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)

at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:283)

at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:156)

... 9 more

Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend

at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)

at org.apache.flink.runtime.state.memory.MemoryStateBackend.createOperatorStateBackend(MemoryStateBackend.java:314)

at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:274)

at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)

at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)

... 11 more

Caused by: java.iohttp://java.io.EOFException: No more bytes left.

at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:79)

at com.esotericsoftware.kryo.iohttp://com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355)

at com.esotericsoftware.kryo.iohttp://com.esotericsoftware.kryo.io.Input.readInt(Input.java:350)

at com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeIntField.read(UnsafeCacheFields.java:46)

at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)

at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)

at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)

at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:151)

at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37)

at org.apache.flink.runtime.state.OperatorStateRestoreOperation.deserializeOperatorStateValues(OperatorStateRestoreOperation.java:191)

at org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:165)

at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83)

... 15 more

请问题大佬们可以提供排查问题的办法或者解决方案吗?*来自志愿者整理的flink邮件归档

展开
收起
EXCEED 2021-12-01 14:48:33 1301 0
1 条回答
写回答
取消 提交回答
  • 新的集群使用的是更新之后的 pulsar connector ?我看了下 pulsar-flink 的代码,这个更新对 state 是破坏性的。

    • unionOffsetStates = stateStore.getUnionListState(
    • new ListStateDescriptor<>(
    • OFFSETS_STATE_NAME,
    • TypeInformation.of(new TypeHint<Tuple3<String, MessageId, String>>() {
    • })));

    解决方法 :? 1. 尝试通过 state-processor-api 重写下 state ? 2. 1.9 的 pulsar-flink connector 可以跑在 1.11 的 flink 上吗?

    感觉后面还有不兼容的更新

    new ListStateDescriptor<>( OFFSETS_STATE_NAME, - TypeInformation.of(new TypeHint<Tuple3<String, MessageId, String>>() { + TypeInformation.of(new TypeHint<Tuple3<TopicRange, MessageId, String>>() { })));

    不兼容的 state 更新,如果不好做 schema migration,可以试试 StatefulSinkWriterOperator 的方法:同时读取新旧 state + 只写新 state。

    可以等 streamnative 的人确认下。*来自志愿者整理的flink邮件归档

    2021-12-01 15:18:47
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载