如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。 具体报错信息如下:
java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are only supported for HDFS at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter. ( HadoopRecoverableWriter.java:61) at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem .createRecoverableWriter(HadoopFileSystem.java:210) at org.apache.flink.core.fs.SafetyNetWrapperFileSystem .createRecoverableWriter(SafetyNetWrapperFileSystem.java:69) at org.apache.flink.streaming.api.functions.sink.filesystem. StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink .java:260) at org.apache.flink.streaming.api.functions.sink.filesystem. StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270) at org.apache.flink.streaming.api.functions.sink.filesystem. StreamingFileSink.initializeState(StreamingFileSink.java:412) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils .tryRestoreFunction(StreamingFunctionUtils.java:185) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils .restoreFunctionState(StreamingFunctionUtils.java:167) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator .initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler .initializeOperatorState(StreamOperatorStateHandler.java:107) at org.apache.flink.streaming.api.operators.AbstractStreamOperator .initializeState(AbstractStreamOperator.java:264) at org.apache.flink.streaming.runtime.tasks.OperatorChain .initializeStateAndOpenOperators(OperatorChain.java:400) at org.apache.flink.streaming.runtime.tasks.StreamTask .lambda$beforeInvoke$2(StreamTask.java:507) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1 .runThrowing(StreamTaskActionExecutor.java:47) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke( StreamTask.java:501) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask .java:531) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) at java.lang.Thread.run(Thread.java:748) *来自志愿者整理的flink邮件归档
很抱歉,我已经很久没用过这个了。但是可以根据异常信息以及API源码执行进行分析,确定是否能直接写入。
如果你要写入自定义的文件系统,那么只能实现自己的sink方式。或者你的文件系统的写入方式兼容hdfs的上层API
可以参考各个sink端的写法。 *来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。