Flink CheckPoint时,配置TTL的ValueState 状态报NullPointException。
20:11:11,729 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline checkpoint 13 by task d7aa78e851dd620b3e3742d2dd40e451 of job b5ee7a9090f8dbb19fe3e41eb63396ec at 82cef248-450b-4af2-b632-af6552becb44 @ 127.0.0.1 (dataPort=-1).
20:11:11,730 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding checkpoint 13 of job b5ee7a9090f8dbb19fe3e41eb63396ec.
java.lang.Exception: Could not materialize checkpoint 13 for operator Flat Map -> Sink: console-sink (1/1).
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: java.lang.NullPointerException
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
... 3 more
Caused by: java.lang.NullPointerException
at org.apache.flink.api.common.typeutils.base.LongSerializer.serialize(LongSerializer.java:69)
at org.apache.flink.api.common.typeutils.base.LongSerializer.serialize(LongSerializer.java:32)
at org.apache.flink.api.common.typeutils.CompositeSerializer.serialize(CompositeSerializer.java:142)
at org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot.java:116)
at org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot.writeStateInKeyGroup(AbstractStateTableSnapshot.java:121)
at org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(CopyOnWriteStateTableSnapshot.java:37)
at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:191)
at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:158)
at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
at java.util.concurrent.FutureTask.run(FutureTask.java)
at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)
... 5 more
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
遇到这个问题,看起来是在使用Apache Flink进行Checkpoint操作时,与自定义的状态(ValueState)和Time-to-Live (TTL)配置相关联的NullPointerException。这可能是由于在序列化或访问状态值时出现了空指针异常。以下是一些可能的解决步骤:
检查状态初始化:确保你的ValueState
在使用前已经被正确初始化。如果在访问状态之前没有通过getState()
方法获取并初始化状态,可能会导致空指针异常。
状态序列化问题:根据堆栈跟踪,异常发生在LongSerializer.serialize
方法中,这意味着可能是在尝试序列化一个null
的Long值。检查你的状态逻辑,确保在更新状态时不会存入null
值。Flink的状态后端对null
值的处理是有限制的,很多序列化器不支持null
值。
TTL逻辑检查:如果你为ValueState
配置了TTL,确认当TTL过期且状态被清除时,你的代码能够妥善处理可能的null
返回值。TTL过期可能导致状态被清空,之后访问该状态时如果没有相应的空值处理逻辑,就可能抛出空指针异常。
自定义序列化器:如果使用了自定义的序列化器,请确保它们正确处理了null
值的情况,并且实现了所有必要的方法。
日志和调试信息:增加更多的日志记录点,特别是在状态访问和更新的地方,以帮助定位问题发生的具体位置。
版本兼容性:确认使用的Flink版本与你依赖的所有库版本兼容,有时候版本不匹配也可能导致这类问题。
资源限制:虽然从提供的信息来看不太像资源问题,但偶尔资源不足或极端的GC行为也可能导致类似异常,确保任务管理器有足够的内存分配。
针对这个特定的问题,直接修改代码以避免将null
值存入状态,并确保状态访问前有正确的初始化逻辑,应该是解决问题的关键所在。