开发者社区> 问答> 正文

Flink checkpoint 并发问题

大家好:

我这里有个Job,会经常不定时的挂掉重启,具体的异常日志为一下,大致是checkpoint的同步失败的

在这个job中,具体是查询最近半小时的数据量是否达到配置的阈值,我使用了rockDB作为后端,然后中间会有个processElement方法会把Key出来的消息,在内部中的MapState堆积,同时也会每条消息过来,就遍历一遍MapState,来删除其中过时的数据。

这个异常是不是我在ProcessElement方法内对MapState进行删除的过程中,checkpoint异步复制这个state到rockDB导致的?

java.lang.Exception: Could not perform checkpoint 550 for operator KeyedProcess -> async wait operator -> Flat Map -> Sink: 写入redis库存 (16/20).

at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:595)

at org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:396)

at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:292)

at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:200)

at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:209)

at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)

at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

at java.lang.Thread.run(Thread.java:748)

Caused by: java.lang.Exception: Could not complete snapshot 550 for operator KeyedProcess -> async wait operator -> Flat Map -> Sink: 写入redis库存 (16/20).

at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:422)

at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1113)

at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1055)

at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:729)

at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:641)

at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:586)

... 8 more

Caused by: java.util.ConcurrentModificationException

at java.util.HashMap$HashIterator.nextNode(HashMap.java:1442)

at java.util.HashMap$EntryIterator.next(HashMap.java:1476)

at java.util.HashMap$EntryIterator.next(HashMap.java:1474)

at com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:156)

at com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21)

at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)

at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:248)

at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:105)

at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:46)

at org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:73)

at org.apache.flink.runtime.state.PartitionableListState. (PartitionableListState.java:68)

at org.apache.flink.runtime.state.PartitionableListState.deepCopy(PartitionableListState.java:80)

at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.snapshot(DefaultOperatorStateBackendSnapshotStrategy.java:88)

at org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:261)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:402)

... 13 more*来自志愿者整理的flink邮件归档

展开
收起
雪哥哥 2021-12-07 16:00:17 1120 0
2 条回答
写回答
取消 提交回答
  • 你好问题解决了吗

    2024-08-12 09:58:32
    赞同 展开评论 打赏
  • Hi

    你给的代码跟你的异常栈其实还是对不上,前文已经说了,出问题的是operator state,但是你的代码都是keyed state相关的代码。不过从你出问题的operator name "KeyedProcess -> async wait operator -> Flat Map -> Sink", 以及异常栈中的StreamElementSerializer使用和一致性问题的表象,我推测应该是应该是AsyncWaitOperator中的operator state "async_wait_operator_state"相关。最近fix的 https://issues.apache.org/jira/browse/FLINK-13063 问题,正是暂时解决一致性问题,可以考虑cherry pick相关的fix重新部署你们的Flink作业,观察该问题是否还会复现。*来自志愿者整理的flink

    2021-12-07 16:28:23
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载