我正在从目录中读取csv文件并进行一些处理。现在flink只选择该目录中的任何新文件并对其进行处理。这对我来说很好。
我陷入了两个问题:
我想记录flink已完成处理的文件名。
我想在flink完成处理后立即将处理过的文件移动到不同的文件夹。
我的代码片段是:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
org.apache.flink.core.fs.Path filePath = new org.apache.flink.core.fs.Path(feedFileFolderPath);
RowCsvInputFormat format = new RowCsvInputFormat(filePath, FetchTypeInformation.getTypeInformation());
DataStream inputStream = env.readFile(format, feedFileFolderPath, FileProcessingMode.PROCESS_CONTINUOUSLY,
parseInt(folderLookupTime));
这个主题在flink邮件列表上出现了几次 - 请参阅此处和此处的讨论- 但简短的总结是,在Flink中还没有一种简单的方法可以做到这一点。
似乎通常做的是使用cron作业定期将旧文件移出被监视的目录,并假设它们已被处理。如果你想要更加小心,那么你将不得不实现自己的机制来跟踪进行处理的作业的进度。上面提到的电子邮件主题包括如何做到这一点的一些想法。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。