FlinkSQL写入对象存储S3报错"Use persist() to create a persistent"

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: FlinkSQL写入对象存储S3报错"java.util.concurrent.ExecutionException: java.lang.UnsupportedOperationException: S3RecoverableFsDataOutputStream cannot sync state to S3. Use persist() to create a persistent recoverable intermediate point."

通过FlinkSQL创建从MySQL读取数据写入S3的任务,没有设置checkpoint,然后报错无法写入,具体错误如下:

2023-11-01 16:10:41
java.util.concurrent.ExecutionException: java.lang.UnsupportedOperationException: S3RecoverableFsDataOutputStream cannot sync state to S3. Use persist() to create a persistent recoverable intermediate point.
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.completeProcessing(SourceStreamTask.java:362)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:334)
Caused by: java.lang.UnsupportedOperationException: S3RecoverableFsDataOutputStream cannot sync state to S3. Use persist() to create a persistent recoverable intermediate point.
    at org.apache.flink.fs.s3.common.utils.RefCountedBufferingFileStream.sync(RefCountedBufferingFileStream.java:111)
    at org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.sync(S3RecoverableFsDataOutputStream.java:129)
    at org.apache.flink.formats.csv.CsvBulkWriter.finish(CsvBulkWriter.java:110)
    at org.apache.flink.connector.file.table.FileSystemTableSink$ProjectionBulkFactory$1.finish(FileSystemTableSink.java:647)
    at org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.closeForCommit(BulkPartWriter.java:64)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.closePartFile(Bucket.java:263)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.onProcessingTime(Bucket.java:379)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onProcessingTime(Buckets.java:338)
    at org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.endInput(AbstractStreamingWriter.java:155)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:96)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$finish$0(StreamOperatorWrapper.java:149)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finish(StreamOperatorWrapper.java:149)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finish(StreamOperatorWrapper.java:156)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finish(StreamOperatorWrapper.java:156)
    at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.finishOperators(RegularOperatorChain.java:115)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.endData(StreamTask.java:574)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.lambda$completeProcessing$0(SourceStreamTask.java:359)
    at org.apache.flink.util.function.FunctionUtils.lambda$asCallable$5(FunctionUtils.java:126)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
    at java.lang.Thread.run(Thread.java:750)

报错也很明显,需要强制设置checkpoint,由于是不需要恢复的任务打算简单配置checkpoint保存在内存中,修改flink-conf.yaml,参考https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/ops/state/state_backends/#memorystatebackend:

state.backend: hashmap
state.checkpoint-storage: jobmanager
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
12月前
|
存储 对象存储
使用Ceph对象存储的Amazon S3接口(基于nautilus版本)
使用Ceph对象存储的Amazon S3接口(基于nautilus版本)
386 0
|
14天前
|
监控 网络安全 PHP
对象存储oss使用问题之操作报错:Unable to execute HTTP request: SocketException如何解决
《对象存储OSS操作报错合集》精选了用户在使用阿里云对象存储服务(OSS)过程中出现的各种常见及疑难报错情况,包括但不限于权限问题、上传下载异常、Bucket配置错误、网络连接问题、跨域资源共享(CORS)设定错误、数据一致性问题以及API调用失败等场景。为用户降低故障排查时间,确保OSS服务的稳定运行与高效利用。
98 0
|
14天前
|
存储 API 开发工具
对象存储oss使用问题之操作报错:The request signature we calculated does not match the signature you provide如何解决
《对象存储OSS操作报错合集》精选了用户在使用阿里云对象存储服务(OSS)过程中出现的各种常见及疑难报错情况,包括但不限于权限问题、上传下载异常、Bucket配置错误、网络连接问题、跨域资源共享(CORS)设定错误、数据一致性问题以及API调用失败等场景。为用户降低故障排查时间,确保OSS服务的稳定运行与高效利用。
105 0
|
14天前
|
运维 API 开发工具
对象存储oss使用问题之获取临时访问凭证报错:It is not a map value.如何解决
《对象存储OSS操作报错合集》精选了用户在使用阿里云对象存储服务(OSS)过程中出现的各种常见及疑难报错情况,包括但不限于权限问题、上传下载异常、Bucket配置错误、网络连接问题、跨域资源共享(CORS)设定错误、数据一致性问题以及API调用失败等场景。为用户降低故障排查时间,确保OSS服务的稳定运行与高效利用。
33 0
|
14天前
|
存储 移动开发 前端开发
对象存储oss使用问题之OSS SDK .net 使用下载例程报错如何解决
《对象存储OSS操作报错合集》精选了用户在使用阿里云对象存储服务(OSS)过程中出现的各种常见及疑难报错情况,包括但不限于权限问题、上传下载异常、Bucket配置错误、网络连接问题、跨域资源共享(CORS)设定错误、数据一致性问题以及API调用失败等场景。为用户降低故障排查时间,确保OSS服务的稳定运行与高效利用。
26 0
|
2月前
|
分布式计算 DataWorks 关系型数据库
DataWorks报错问题之使用oss读取csv文件至odps 报错如何解决
DataWorks是阿里云提供的一站式大数据开发与管理平台,支持数据集成、数据开发、数据治理等功能;在本汇总中,我们梳理了DataWorks产品在使用过程中经常遇到的问题及解答,以助用户在数据处理和分析工作中提高效率,降低难度。
|
3月前
|
SQL 分布式计算 DataWorks
maxcompute配置问题之连接oss报错如何解决
MaxCompute配置是指在使用阿里云MaxCompute服务时对项目设置、计算资源、存储空间等进行的各项调整;本合集将提供MaxCompute配置的指南和建议,帮助用户根据数据处理需求优化其MaxCompute环境。
29 0
|
12月前
|
存储 文件存储 对象存储
带你读《存储漫谈:Ceph原理与实践》——3.2.1 对象存储和 S3
带你读《存储漫谈:Ceph原理与实践》——3.2.1 对象存储和 S3
|
12月前
|
人工智能 JavaScript 前端开发
MinIO:基于Go实现的高性能、兼容S3协议的对象存储
MinIO:基于Go实现的高性能、兼容S3协议的对象存储
579 0
|
12月前
|
XML 对象存储 Swift
Ceph对象存储的Amazon S3接口的使用(重点介绍分片上传接口)(基于nautilus版本)
Ceph对象存储的Amazon S3接口的使用(重点介绍分片上传接口)(基于nautilus版本)
531 0