我开始使用spark结构化流媒体。
我通过waterMark从kafka主题(startOffset:latest)获取readStream,按窗口持续时间分组事件时间,并写入kafka主题。
我的问题是,如何在spark结构化流媒体作业之前处理写入kafka主题的数据?
我一开始尝试使用`startOffset:earliest'。但是kafka主题中的数据太大,因此由于yarn超时而未启动spark流式处理。(即使我增加超时值)
1.如果我只是创建批处理作业并按特定数据范围进行筛选。结果没有反映在spark流的当前状态,结果的一致性和准确性似乎存在问题。
我试图重置检查点目录,但它没有工作。
如何处理旧数据和大数据?
你可以尝试parmeter maxOffsetsPerTrigger对卡夫卡+结构化数据流从卡夫卡接收旧数据。将此参数的值设置为您希望一次从Kafka接收的记录数。
使用:
sparkSession.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test-name")
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger", 1)
.option("group.id", "2")
.option("auto.offset.reset", "earliest")
.load()
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。