我在一个不断更新的文件夹中有一个csv文件。我需要从这个csv文件中获取输入并生成一些事务。如何从持续更新的csv文件中获取数据,每5分钟一次的话?
我试过以下:
val csvDF = spark
.readStream
.option("sep", ",")
.schema(userSchema)
.csv("file:///home/location/testFiles")
但问题是它是监视文件夹是否创建了任何新文件......但我的问题只是一个不断更新的文件。
tl; dr它不起作用。
默认情况下,Spark Structured Streaming监视目录中的文件,并且每个新文件都会触发计算。处理完文件后,将永远不再处理该文件。这是默认实现。
您可以编写自己的流式源,可以监视文件的变化,但这是一个自定义的源开发(在大多数情况下,这是不值得的努力)。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。