20/11/20 10:42:00 INFO operators.AbstractStreamOperator: Could not complete snapshot 1 for operator Window(EventTimeSessionWindows(15000), EventTimeTrigger, EventSessionWindowFunction) -> (Timestamps/Watermarks, Timestamps/Watermarks) (4/4). java.lang.Exception: Could not write timer service of Window(EventTimeSessionWindows(15000), EventTimeTrigger, EventSessionWindowFunction) -> (Timestamps/Watermarks, Timestamps/Watermarks) (4/4) to checkpoint state stream. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:466) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1302) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1236) at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:892) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:797) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:728) at org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88) at org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177) at org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279) at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321) at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: Could not open output stream for state backend at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:367) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:234) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:209) at org.apache.flink.runtime.state.NonClosingCheckpointOutputStream.write(NonClosingCheckpointOutputStream.java:61) at java.io.DataOutputStream.write(DataOutputStream.java:107) at java.io.DataOutputStream.writeUTF(DataOutputStream.java:401) at java.io.DataOutputStream.writeUTF(DataOutputStream.java:323) at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.writeVersionedSnapshot(TypeSerializerSnapshot.java:151) at org.apache.flink.streaming.api.operators.InternalTimersSnapshotReaderWriters$InternalTimersSnapshotWriterV2.writeKeyAndNamespaceSerializers(InternalTimersSnapshotReaderWriters.java:200) at org.apache.flink.streaming.api.operators.InternalTimersSnapshotReaderWriters$AbstractInternalTimersSnapshotWriter.writeTimersSnapshot(InternalTimersSnapshotReaderWriters.java:117) at org.apache.flink.streaming.api.operators.InternalTimerServiceSerializationProxy.write(InternalTimerServiceSerializationProxy.java:101) at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.snapshotStateForKeyGroup(InternalTimeServiceManager.java:139) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:462) ... 20 more Caused by: java.io.IOException: Mkdirs failed to create file:/app/aiops/message/flink/happit/cn/checkpoint/a6fe8d9cae18aa8f6353d39ab566ed78/chk-1 at org.apache.flink.core.fs.local.LocalFileSystem.create(LocalFileSystem.java:269) at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:126) at org.apache.flink.core.fs.EntropyInjector.createEntropyAware(EntropyInjector.java:61) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:356) ... 32 more
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
遇到这个错误信息,表明在使用Apache Flink进行流处理时,在尝试进行检查点(checkpoint)操作时发生了问题。具体来说,Flink未能将某个窗口操作符的计时器服务(timer service)状态写入到检查点状态流中,导致了checkpoint失败。错误根源在于无法在指定路径下创建文件夹来存储检查点数据。
解决这个问题可以尝试以下几个步骤:
检查目录权限:确保Flink配置中指定的检查点目录(在这个案例中是file:/app/aiops/message/flink/happit/cn/checkpoint/a6fe8d9cae18aa8f6353d39ab566ed78/
)对于运行Flink任务的用户具有读写权限。可以通过命令行工具如ls -ld /app/aiops/message/flink/happit/cn/checkpoint/
来查看目录权限,并用chmod
或chown
命令调整权限如果需要。
检查磁盘空间:确认该目录所在磁盘是否有足够的空间用于存储检查点数据。如果磁盘已满,清理空间或重新配置到一个有足够空间的磁盘上。
Flink配置检查:
flink-conf.yaml
),确保state.backend
、state.checkpoints.dir
等与状态后端和检查点相关的配置正确无误。state.backend: filesystem
)且指定了正确的目录。重启Flink集群:有时候,临时的文件系统问题或锁争用可能导致这类问题,重启Flink集群可能有助于解决问题。
日志深入分析:查看Flink任务管理器和工作节点的日志,可能会提供更多关于为什么无法创建目录的信息,比如是否有其他进程正在占用资源或是存在网络问题。
考虑使用分布式文件系统:如果你的应用场景允许,考虑使用HDFS或其他分布式文件系统作为检查点存储,这通常能提供更好的容错性和可扩展性。
如果以上步骤都不能解决问题,可能需要更详细地调查Flink作业的具体配置和执行环境,甚至考虑咨询Flink社区或查看相关开源问题跟踪系统以获取进一步的帮助。