FlinkSQL写入对象存储S3报错"Use persist() to create a persistent"

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: FlinkSQL写入对象存储S3报错"java.util.concurrent.ExecutionException: java.lang.UnsupportedOperationException: S3RecoverableFsDataOutputStream cannot sync state to S3. Use persist() to create a persistent recoverable intermediate point."

通过FlinkSQL创建从MySQL读取数据写入S3的任务,没有设置checkpoint,然后报错无法写入,具体错误如下:

2023-11-01 16:10:41
java.util.concurrent.ExecutionException: java.lang.UnsupportedOperationException: S3RecoverableFsDataOutputStream cannot sync state to S3. Use persist() to create a persistent recoverable intermediate point.
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.completeProcessing(SourceStreamTask.java:362)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:334)
Caused by: java.lang.UnsupportedOperationException: S3RecoverableFsDataOutputStream cannot sync state to S3. Use persist() to create a persistent recoverable intermediate point.
    at org.apache.flink.fs.s3.common.utils.RefCountedBufferingFileStream.sync(RefCountedBufferingFileStream.java:111)
    at org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.sync(S3RecoverableFsDataOutputStream.java:129)
    at org.apache.flink.formats.csv.CsvBulkWriter.finish(CsvBulkWriter.java:110)
    at org.apache.flink.connector.file.table.FileSystemTableSink$ProjectionBulkFactory$1.finish(FileSystemTableSink.java:647)
    at org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.closeForCommit(BulkPartWriter.java:64)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.closePartFile(Bucket.java:263)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.onProcessingTime(Bucket.java:379)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onProcessingTime(Buckets.java:338)
    at org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.endInput(AbstractStreamingWriter.java:155)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:96)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$finish$0(StreamOperatorWrapper.java:149)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finish(StreamOperatorWrapper.java:149)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finish(StreamOperatorWrapper.java:156)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finish(StreamOperatorWrapper.java:156)
    at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.finishOperators(RegularOperatorChain.java:115)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.endData(StreamTask.java:574)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.lambda$completeProcessing$0(SourceStreamTask.java:359)
    at org.apache.flink.util.function.FunctionUtils.lambda$asCallable$5(FunctionUtils.java:126)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
    at java.lang.Thread.run(Thread.java:750)

报错也很明显,需要强制设置checkpoint,由于是不需要恢复的任务打算简单配置checkpoint保存在内存中,修改flink-conf.yaml,参考https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/ops/state/state_backends/#memorystatebackend:

state.backend: hashmap
state.checkpoint-storage: jobmanager
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
分布式计算 DataWorks 调度
oss数据同步maxcompute报错
在使用阿里云DataWorks同步OSS数据至MaxCompute时,遇到“Input is not in the .gz format”的报错。问题源于目标目录中存在一个空文件,导致同步时识别错误。
|
4月前
|
Java Serverless 数据库连接
函数计算操作报错合集之调用打包的OSS函数时发生报错,该怎么办
Serverless 应用引擎(SAE)是阿里云提供的Serverless PaaS平台,支持Spring Cloud、Dubbo、HSF等主流微服务框架,简化应用的部署、运维和弹性伸缩。在使用SAE过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
4月前
|
分布式计算 DataWorks 数据处理
MaxCompute操作报错合集之UDF访问OSS,配置白名单后出现报错,是什么原因
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
4月前
|
SQL 分布式计算 大数据
MaxCompute操作报错合集之在读取OSS遇到格式报错,该怎么办
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
4月前
|
SQL DataWorks 数据可视化
DataWorks操作报错合集之测试OSS数据源的连通性时,出现503 Service Temporarily Unavailable的错误,是什么导致的
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
5月前
|
分布式计算 DataWorks API
DataWorks操作报错合集之在将ODPS空间设置成保护模式后,导出到OSS的任务出现了权限问题,该怎么解决
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
173 1
|
5月前
|
分布式计算 DataWorks MaxCompute
MaxCompute操作报错合集之在Spark访问OSS时出现证书错误的问题,该如何解决
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
4月前
|
缓存 Java 对象存储
配置自己的 阿里OSS 环境,但在引入自己创建的依赖时出现报错。
配置自己的 阿里OSS 环境,但在引入自己创建的依赖时出现报错。
|
4月前
|
分布式计算 DataWorks 数据管理
DataWorks操作报错合集之使用OSS读取CSV文件到ODPS时遇到报错,一般是什么导致的
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
5月前
|
存储 缓存 DataWorks
DataWorks操作报错合集之配置项目连通oss数据源 , 报The request signature we calculated does not match the signature you provided.如何解决
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。