开发者社区> 问答> 正文

如何在Flink中为Google Cloud Storage创建RecoverableWriter

我想使用Google Cloud Storage来编写(汇总)DataStream流媒体作业中的元素StreamingFileSink。

为此,我使用了用于Hadoop的Google Cloud Storage连接器作为其实现org.apache.hadoop.fs.FileSystem,并用HadoopFileSystem作为org.apache.flink.core.fs.FileSystem Flink 包装hadoop FileSystem类的实现。

我在gradle文件中包含以下依赖项:

compile("com.google.cloud.bigdataoss:gcs-connector:1.9.4-hadoop2")
compile("org.apache.flink:flink-connector-filesystem_2.11:1.6.0")
provided("org.apache.flink:flink-shaded-hadoop2:1.6.0")
现在,从我理解的源代码[1] [2] [3],Flink动态加载FileSystemFactory运行时的实现(via java.util.ServiceLoader),并HadoopFsFactory在运行时加载(通过反射,如果它在类路径中找到Hadoop)它然后用于创建实例FileSystem。

我遇到的问题是RecoverableWriterHadoop兼容包的默认值仅支持hdfs文件方案(我使用gs),因此在运行时抛出错误。

所以,我extended的HadoopFileSystem(我叫GCSFileSystem),并@overrided在FileSystem#createRecoverableWriter()返回一个自定义实现的RecoverableWriter,然后处理回收等细节,并创建相应的FileSystemFactory类(类装饰用@AutoService,因此应通过发现ServiceLoader)。

该设置在本地和本地docker集群上运行良好(实际上GCS连接器由于缺少授权而引发错误,但这很好,因为它意味着FileSystem已加载并正在运行)但是当我将其部署到运行的docker集群时它会失败在Google Compute Engine上。

在GCE上,默认值HadoopFileSystem被加载并抛出异常,因为方案是gs和不是hdfs,但我的假设是它应该加载我的工厂实现,因此不应该出现这个错误。

我在Flink v1.6.0上并使用docker-flink在Docker上运行长时间运行的会话群集

展开
收起
flink小助手 2018-12-10 11:42:35 2557 0
1 条回答
写回答
取消 提交回答
  • flink小助手会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关flink的问题及回答。

    我在一个长期生活会话集群运行,当我job.jar执行时,FileSystem初始化已经完成并且工厂已经加载了!因此,当我添加我的作业时,没有进行初始化调用。

    解决方案?根据您的工作方式,有几种方法:

    独立:将包含FileSystem实现的jar添加到lib/目录中

    Cluster(manual):将包含FileSystem实现的jar添加到lib/您zip或图像的目录或其他任何内容。

    Cluster(docker)(long-living):创建自定义容器图像并将jar添加到该lib/图像的目录中。

    Cluster(docker)(per-job-session):创建一个自定义容器映像并将所有jar(包含FileSystem和你的作业等)添加到lib/目录中。

    2019-07-17 23:19:10
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载