大家好:
我这里有个Job,会经常不定时的挂掉重启,具体的异常日志为一下,大致是checkpoint的同步失败的
在这个job中,具体是查询最近半小时的数据量是否达到配置的阈值,我使用了rockDB作为后端,然后中间会有个processElement方法会把Key出来的消息,在内部中的MapState堆积,同时也会每条消息过来,就遍历一遍MapState,来删除其中过时的数据。
这个异常是不是我在ProcessElement方法内对MapState进行删除的过程中,checkpoint异步复制这个state到rockDB导致的?
java.lang.Exception: Could not perform checkpoint 550 for operator KeyedProcess -> async wait operator -> Flat Map -> Sink: 写入redis库存 (16/20).
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:595)
at org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:396)
at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:292)
at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:200)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:209)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not complete snapshot 550 for operator KeyedProcess -> async wait operator -> Flat Map -> Sink: 写入redis库存 (16/20).
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:422)
at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1113)
at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1055)
at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:729)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:641)
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:586)
... 8 more
Caused by: java.util.ConcurrentModificationException
at java.util.HashMap$HashIterator.nextNode(HashMap.java:1442)
at java.util.HashMap$EntryIterator.next(HashMap.java:1476)
at java.util.HashMap$EntryIterator.next(HashMap.java:1474)
at com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:156)
at com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:248)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:105)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:46)
at org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:73)
at org.apache.flink.runtime.state.PartitionableListState. (PartitionableListState.java:68)
at org.apache.flink.runtime.state.PartitionableListState.deepCopy(PartitionableListState.java:80)
at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.snapshot(DefaultOperatorStateBackendSnapshotStrategy.java:88)
at org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:261)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:402)
... 13 more*来自志愿者整理的flink邮件归档
Hi
你给的代码跟你的异常栈其实还是对不上,前文已经说了,出问题的是operator state,但是你的代码都是keyed state相关的代码。不过从你出问题的operator name "KeyedProcess -> async wait operator -> Flat Map -> Sink", 以及异常栈中的StreamElementSerializer使用和一致性问题的表象,我推测应该是应该是AsyncWaitOperator中的operator state "async_wait_operator_state"相关。最近fix的 https://issues.apache.org/jira/browse/FLINK-13063 问题,正是暂时解决一致性问题,可以考虑cherry pick相关的fix重新部署你们的Flink作业,观察该问题是否还会复现。*来自志愿者整理的flink
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。