开发者社区> 问答> 正文

如何在Flink中为Google Cloud Storage创建RecoverableWriter

flink小助手 2018-12-10 11:42:35 504

我想使用Google Cloud Storage来编写(汇总)DataStream流媒体作业中的元素StreamingFileSink。

为此,我使用了用于Hadoop的Google Cloud Storage连接器作为其实现org.apache.hadoop.fs.FileSystem,并用HadoopFileSystem作为org.apache.flink.core.fs.FileSystem Flink 包装hadoop FileSystem类的实现。

我在gradle文件中包含以下依赖项:

compile("com.google.cloud.bigdataoss:gcs-connector:1.9.4-hadoop2")
compile("org.apache.flink:flink-connector-filesystem_2.11:1.6.0")
provided("org.apache.flink:flink-shaded-hadoop2:1.6.0")
现在,从我理解的源代码[1] [2] [3],Flink动态加载FileSystemFactory运行时的实现(via java.util.ServiceLoader),并HadoopFsFactory在运行时加载(通过反射,如果它在类路径中找到Hadoop)它然后用于创建实例FileSystem。

我遇到的问题是RecoverableWriterHadoop兼容包的默认值仅支持hdfs文件方案(我使用gs),因此在运行时抛出错误。

所以,我extended的HadoopFileSystem(我叫GCSFileSystem),并@overrided在FileSystem#createRecoverableWriter()返回一个自定义实现的RecoverableWriter,然后处理回收等细节,并创建相应的FileSystemFactory类(类装饰用@AutoService,因此应通过发现ServiceLoader)。

该设置在本地和本地docker集群上运行良好(实际上GCS连接器由于缺少授权而引发错误,但这很好,因为它意味着FileSystem已加载并正在运行)但是当我将其部署到运行的docker集群时它会失败在Google Compute Engine上。

在GCE上,默认值HadoopFileSystem被加载并抛出异常,因为方案是gs和不是hdfs,但我的假设是它应该加载我的工厂实现,因此不应该出现这个错误。

我在Flink v1.6.0上并使用docker-flink在Docker上运行长时间运行的会话群集

分布式计算 Hadoop 流计算 Docker 容器
分享到
取消 提交回答
全部回答(1)
  • flink小助手
    2019-07-17 23:19:10

    我在一个长期生活会话集群运行,当我job.jar执行时,FileSystem初始化已经完成并且工厂已经加载了!因此,当我添加我的作业时,没有进行初始化调用。

    解决方案?根据您的工作方式,有几种方法:

    独立:将包含FileSystem实现的jar添加到lib/目录中

    Cluster(manual):将包含FileSystem实现的jar添加到lib/您zip或图像的目录或其他任何内容。

    Cluster(docker)(long-living):创建自定义容器图像并将jar添加到该lib/图像的目录中。

    Cluster(docker)(per-job-session):创建一个自定义容器映像并将所有jar(包含FileSystem和你的作业等)添加到lib/目录中。

    0 0
+ 订阅

大数据计算实践乐园,近距离学习前沿技术

推荐文章
相似问题