开发者社区> 问答> 正文

flink 1.8 内的StreamExecutionEnvironment 不兼容问题

我在使用flink 1.8 自定义 FileInputFormat 的时候遇到了不兼容问题,初步看了源代码,不知道自己得出的结论是否正确,并想了解一下后续趋势和进展,麻烦各位大佬抽空指点一下,先行感谢~~ 

问题1: StreamExecutionEnvironment 为什么要做这样的限制?ContinuousFileMonitoringFunction 的作用是什么?  

相关的代码描述如下 

StreamExecutionEnvironment 对 FileInputFormat 对象有特殊的处理逻辑 

if (inputFormat instanceof FileInputFormat) { 

@SuppressWarnings("unchecked") 

FileInputFormat<OUT> format = (FileInputFormat<OUT>) inputFormat; 

source = createFileInput(format, typeInfo, "Custom File source", 

FileProcessingMode.PROCESS_ONCE, -1); 

} else { 

source = createInput(inputFormat, typeInfo, "Custom Source"); 

return source; 

createFileInput 方法内 使用 ContinuousFileMonitoringFunction 对 inputFormat 进行处理,在其构造函数中,对 FileInputFormat<OUT> format 进行了一些条件约束 

Preconditions.checkArgument( 

format.getFilePaths().length == 1, 

"FileInputFormats with multiple paths are not supported yet."); 

这里就将 FileInputFormat 限制为只能添加一个file path。 

问题2: 在flink 1.10 版本情况是否有改善?(在 FileInputFormat.supportsMultiPaths 方法中我看到flink 2.0 中,所有的FileInputFormat 都会支持多路径) 

/** 

  • Override this method to supports multiple paths. 

  • When this method will be removed, all FileInputFormats have to support multiple paths. 

  • @return True if the FileInputFormat supports multiple paths, false otherwise. 

  • @deprecated Will be removed for Flink 2.0. 

*/ 

@Deprecated 

public boolean supportsMultiPaths() { 

return false; 

}*来自志愿者整理的flink邮件归档

展开
收起
玛丽莲梦嘉 2021-12-02 16:23:09 465 0
1 条回答
写回答
取消 提交回答
  • 你的需求是什么?下列哪种?  - 1.想用unbounded source,continuous的file source,监控文件夹,发送新文件,且需要支持多文件夹  - 2.只是想用bounded的input format,需要支持多文件 

    如果是1,现在仍然不支持。  如果是2,那你可以用env.addSource(new InputFormatSourceFunction(..)...)来支持多文件。*来自志愿者整理的FLINK邮件归档

    2021-12-02 17:19:49
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载