我在使用flink 1.8 自定义 FileInputFormat 的时候遇到了不兼容问题,初步看了源代码,不知道自己得出的结论是否正确,并想了解一下后续趋势和进展,麻烦各位大佬抽空指点一下,先行感谢~~
问题1: StreamExecutionEnvironment 为什么要做这样的限制?ContinuousFileMonitoringFunction 的作用是什么?
相关的代码描述如下
StreamExecutionEnvironment 对 FileInputFormat 对象有特殊的处理逻辑
if (inputFormat instanceof FileInputFormat) {
@SuppressWarnings("unchecked")
FileInputFormat<OUT> format = (FileInputFormat<OUT>) inputFormat;
source = createFileInput(format, typeInfo, "Custom File source",
FileProcessingMode.PROCESS_ONCE, -1);
} else {
source = createInput(inputFormat, typeInfo, "Custom Source");
}
return source;
createFileInput 方法内 使用 ContinuousFileMonitoringFunction 对 inputFormat 进行处理,在其构造函数中,对 FileInputFormat<OUT> format 进行了一些条件约束
Preconditions.checkArgument(
format.getFilePaths().length == 1,
"FileInputFormats with multiple paths are not supported yet.");
这里就将 FileInputFormat 限制为只能添加一个file path。
问题2: 在flink 1.10 版本情况是否有改善?(在 FileInputFormat.supportsMultiPaths 方法中我看到flink 2.0 中,所有的FileInputFormat 都会支持多路径)
/**
Override this method to supports multiple paths.
When this method will be removed, all FileInputFormats have to support multiple paths.
*
*
*/
@Deprecated
public boolean supportsMultiPaths() {
return false;
}*来自志愿者整理的flink邮件归档
你的需求是什么?下列哪种? - 1.想用unbounded source,continuous的file source,监控文件夹,发送新文件,且需要支持多文件夹 - 2.只是想用bounded的input format,需要支持多文件
如果是1,现在仍然不支持。 如果是2,那你可以用env.addSource(new InputFormatSourceFunction(..)...)来支持多文件。*来自志愿者整理的FLINK邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。