通过FlinkSQL创建从MySQL读取数据写入S3的任务,没有设置checkpoint,然后报错无法写入,具体错误如下:
2023-11-01 16:10:41
java.util.concurrent.ExecutionException: java.lang.UnsupportedOperationException: S3RecoverableFsDataOutputStream cannot sync state to S3. Use persist() to create a persistent recoverable intermediate point.
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.completeProcessing(SourceStreamTask.java:362)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:334)
Caused by: java.lang.UnsupportedOperationException: S3RecoverableFsDataOutputStream cannot sync state to S3. Use persist() to create a persistent recoverable intermediate point.
at org.apache.flink.fs.s3.common.utils.RefCountedBufferingFileStream.sync(RefCountedBufferingFileStream.java:111)
at org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.sync(S3RecoverableFsDataOutputStream.java:129)
at org.apache.flink.formats.csv.CsvBulkWriter.finish(CsvBulkWriter.java:110)
at org.apache.flink.connector.file.table.FileSystemTableSink$ProjectionBulkFactory$1.finish(FileSystemTableSink.java:647)
at org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.closeForCommit(BulkPartWriter.java:64)
at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.closePartFile(Bucket.java:263)
at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.onProcessingTime(Bucket.java:379)
at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onProcessingTime(Buckets.java:338)
at org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.endInput(AbstractStreamingWriter.java:155)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:96)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$finish$0(StreamOperatorWrapper.java:149)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finish(StreamOperatorWrapper.java:149)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finish(StreamOperatorWrapper.java:156)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finish(StreamOperatorWrapper.java:156)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.finishOperators(RegularOperatorChain.java:115)
at org.apache.flink.streaming.runtime.tasks.StreamTask.endData(StreamTask.java:574)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.lambda$completeProcessing$0(SourceStreamTask.java:359)
at org.apache.flink.util.function.FunctionUtils.lambda$asCallable$5(FunctionUtils.java:126)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.lang.Thread.run(Thread.java:750)
报错也很明显,需要强制设置checkpoint,由于是不需要恢复的任务打算简单配置checkpoint保存在内存中,修改flink-conf.yaml,参考https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/ops/state/state_backends/#memorystatebackend:
state.backend: hashmap
state.checkpoint-storage: jobmanager