我想使用Google Cloud Storage来编写(汇总)DataStream流媒体作业中的元素StreamingFileSink。
为此,我使用了用于Hadoop的Google Cloud Storage连接器作为其实现org.apache.hadoop.fs.FileSystem,并用HadoopFileSystem作为org.apache.flink.core.fs.FileSystem Flink 包装hadoop FileSystem类的实现。
我在gradle文件中包含以下依赖项:
compile("com.google.cloud.bigdataoss:gcs-connector:1.9.4-hadoop2")
compile("org.apache.flink:flink-connector-filesystem_2.11:1.6.0")
provided("org.apache.flink:flink-shaded-hadoop2:1.6.0")
现在,从我理解的源代码[1] [2] [3],Flink动态加载FileSystemFactory运行时的实现(via java.util.ServiceLoader),并HadoopFsFactory在运行时加载(通过反射,如果它在类路径中找到Hadoop)它然后用于创建实例FileSystem。
我遇到的问题是RecoverableWriterHadoop兼容包的默认值仅支持hdfs文件方案(我使用gs),因此在运行时抛出错误。
所以,我extended的HadoopFileSystem(我叫GCSFileSystem),并@overrided在FileSystem#createRecoverableWriter()返回一个自定义实现的RecoverableWriter,然后处理回收等细节,并创建相应的FileSystemFactory类(类装饰用@AutoService,因此应通过发现ServiceLoader)。
该设置在本地和本地docker集群上运行良好(实际上GCS连接器由于缺少授权而引发错误,但这很好,因为它意味着FileSystem已加载并正在运行)但是当我将其部署到运行的docker集群时它会失败在Google Compute Engine上。
在GCE上,默认值HadoopFileSystem被加载并抛出异常,因为方案是gs和不是hdfs,但我的假设是它应该加载我的工厂实现,因此不应该出现这个错误。
我在Flink v1.6.0上并使用docker-flink在Docker上运行长时间运行的会话群集
我在一个长期生活会话集群运行,当我job.jar执行时,FileSystem初始化已经完成并且工厂已经加载了!因此,当我添加我的作业时,没有进行初始化调用。
解决方案?根据您的工作方式,有几种方法:
独立:将包含FileSystem实现的jar添加到lib/目录中
Cluster(manual):将包含FileSystem实现的jar添加到lib/您zip或图像的目录或其他任何内容。
Cluster(docker)(long-living):创建自定义容器图像并将jar添加到该lib/图像的目录中。
Cluster(docker)(per-job-session):创建一个自定义容器映像并将所有jar(包含FileSystem和你的作业等)添加到lib/目录中。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。