如何在Flink中为Google Cloud Storage创建RecoverableWriter-问答-阿里云开发者社区-阿里云

开发者社区> 问答> 正文
阿里云
为了无法计算的价值
打开APP
阿里云APP内打开

如何在Flink中为Google Cloud Storage创建RecoverableWriter

2018-12-10 11:42:35 2032 1

我想使用Google Cloud Storage来编写(汇总)DataStream流媒体作业中的元素StreamingFileSink。

为此,我使用了用于Hadoop的Google Cloud Storage连接器作为其实现org.apache.hadoop.fs.FileSystem,并用HadoopFileSystem作为org.apache.flink.core.fs.FileSystem Flink 包装hadoop FileSystem类的实现。

我在gradle文件中包含以下依赖项:

compile("com.google.cloud.bigdataoss:gcs-connector:1.9.4-hadoop2")
compile("org.apache.flink:flink-connector-filesystem_2.11:1.6.0")
provided("org.apache.flink:flink-shaded-hadoop2:1.6.0")
现在,从我理解的源代码[1] [2] [3],Flink动态加载FileSystemFactory运行时的实现(via java.util.ServiceLoader),并HadoopFsFactory在运行时加载(通过反射,如果它在类路径中找到Hadoop)它然后用于创建实例FileSystem。

我遇到的问题是RecoverableWriterHadoop兼容包的默认值仅支持hdfs文件方案(我使用gs),因此在运行时抛出错误。

所以,我extended的HadoopFileSystem(我叫GCSFileSystem),并@overrided在FileSystem#createRecoverableWriter()返回一个自定义实现的RecoverableWriter,然后处理回收等细节,并创建相应的FileSystemFactory类(类装饰用@AutoService,因此应通过发现ServiceLoader)。

该设置在本地和本地docker集群上运行良好(实际上GCS连接器由于缺少授权而引发错误,但这很好,因为它意味着FileSystem已加载并正在运行)但是当我将其部署到运行的docker集群时它会失败在Google Compute Engine上。

在GCE上,默认值HadoopFileSystem被加载并抛出异常,因为方案是gs和不是hdfs,但我的假设是它应该加载我的工厂实现,因此不应该出现这个错误。

我在Flink v1.6.0上并使用docker-flink在Docker上运行长时间运行的会话群集

取消 提交回答
全部回答(1)
  • flink小助手
    2019-07-17 23:19:10

    我在一个长期生活会话集群运行,当我job.jar执行时,FileSystem初始化已经完成并且工厂已经加载了!因此,当我添加我的作业时,没有进行初始化调用。

    解决方案?根据您的工作方式,有几种方法:

    独立:将包含FileSystem实现的jar添加到lib/目录中

    Cluster(manual):将包含FileSystem实现的jar添加到lib/您zip或图像的目录或其他任何内容。

    Cluster(docker)(long-living):创建自定义容器图像并将jar添加到该lib/图像的目录中。

    Cluster(docker)(per-job-session):创建一个自定义容器映像并将所有jar(包含FileSystem和你的作业等)添加到lib/目录中。

    0 0
相关问答

0

回答

flink sql怎么持久化?

2022-07-18 16:51:26 118浏览量 回答数 0

1

回答

flink1.11 lib下引入flink-sql-connector-mysql-cdc-1.0

2022-07-08 11:27:35 1344浏览量 回答数 1

1

回答

flink创建HBase维度表的方法是什么?

2021-12-07 20:26:31 136浏览量 回答数 1

1

回答

Flink中的RocksDB的术语Compaction具体是什么?

2021-12-09 14:07:21 269浏览量 回答数 1

1

回答

flink sql是否支持动态创建sink table?

2021-12-02 16:50:35 403浏览量 回答数 1

1

回答

Apache Flink在创建指出叫Stratosphere,项目的目标是什么?

2021-11-12 18:46:47 489浏览量 回答数 1

1

回答

如何理解实时计算Flink版中的智能调优 Autopilot?

2021-11-10 13:13:51 395浏览量 回答数 1

1

回答

flink sql 支持 视图吗?

2020-04-20 22:05:29 1429浏览量 回答数 1

3

回答

flink checkpoint 在 window 操作下 全局配置失效的问题。

2018-11-09 12:13:37 7066浏览量 回答数 3

1

回答

创建镜像仓库如何解绑GITHUB账户

2016-05-24 11:49:39 3001浏览量 回答数 1
+关注
flink小助手
flink小助手会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关flink的问题及回答。
0
文章
377
问答
问答排行榜
最热
最新
相关电子书
更多
低代码开发师(初级)实战教程
立即下载
阿里巴巴DevOps 最佳实践手册
立即下载
冬季实战营第三期:MySQL数据库进阶实战
立即下载