Flink 执行checkpoint报错-问答-阿里云开发者社区-阿里云

开发者社区> 问答> 正文

Flink 执行checkpoint报错

冷丰 2019-01-04 15:50:33 3388

flink 程序不开启checkpoint运行正常,
开启checkpoint报:

AsynchronousException{java.lang.Exception: Could not materialize checkpoint 18 for operator orderConnectGeohashFunction -> (Filter, Filter -> Flat Map) (4/4).}
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 18 for operator orderConnectGeohashFunction -> (Filter, Filter -> Flat Map) (4/4).
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
    ... 6 more
Caused by: java.util.concurrent.ExecutionException: java.io.NotSerializableException: org.apache.flink.runtime.state.heap.HeapValueState
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
    at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
    ... 5 more
Caused by: java.io.NotSerializableException: org.apache.flink.runtime.state.heap.HeapValueState
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:520)
    at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.write(TypeSerializerSerializationUtil.java:350)
    at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.writeSerializer(TypeSerializerSerializationUtil.java:64)
    at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:154)
    at org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentWriterImpl.writeStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:182)
    at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.write(KeyedBackendSerializationProxy.java:124)
    at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.performOperation(HeapKeyedStateBackend.java:801)
    at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.performOperation(HeapKeyedStateBackend.java:759)
    at org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
    ... 7 more

对应代码 orderKeyedStream.connect(geohashKeyedStream)

  .process(new orderConnectGeohashFunction).uid("orderConnectGeohashFunction")

process里面是个有状态的函数.

其中用到了ValueState

  case class OrderState(geoHash: String, lat: Double, lon: Double, var gridList: String, var isEdge: Int)
  var valueState: ValueState[OrderState] = _
流计算
分享到
取消 提交回答
全部回答(1)
  • 徐前进
    2019-07-17 23:24:36

    如图你得OrderState应该没有描述成Flink所能接受的对象类型。你可以尝试在open方法中创建声明和描述:
    valueState= getRuntimeContext.getState(

      new ValueStateDescriptor[OrderState]("average", createTypeInformation[OrderState])

    )

    0 0
大数据
使用钉钉扫一扫加入圈子
+ 订阅

大数据计算实践乐园,近距离学习前沿技术

推荐文章
相似问题