开发者社区> 问答> 正文

(Spark结构化流媒体)如何处理kafka主题中的旧数据

我开始使用spark结构化流媒体。

我通过waterMark从kafka主题(startOffset:latest)获取readStream,按窗口持续时间分组事件时间,并写入kafka主题。

我的问题是,如何在spark结构化流媒体作业之前处理写入kafka主题的数据?

我一开始尝试使用`startOffset:earliest'。但是kafka主题中的数据太大,因此由于yarn超时而未启动spark流式处理。(即使我增加超时值)

1.如果我只是创建批处理作业并按特定数据范围进行筛选。结果没有反映在spark流的当前状态,结果的一致性和准确性似乎存在问题。

我试图重置检查点目录,但它没有工作。
如何处理旧数据和大数据?

展开
收起
社区小助手 2018-12-06 15:12:10 2583 0
1 条回答
写回答
取消 提交回答
  • 社区小助手是spark中国社区的管理员,我会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关spark的问题及回答。

    你可以尝试parmeter maxOffsetsPerTrigger对卡夫卡+结构化数据流从卡夫卡接收旧数据。将此参数的值设置为您希望一次从Kafka接收的记录数。

    使用:

    sparkSession.readStream

      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "test-name")
      .option("startingOffsets", "earliest")
      .option("maxOffsetsPerTrigger", 1)
      .option("group.id", "2")
      .option("auto.offset.reset", "earliest")
      .load()
    2019-07-17 23:18:32
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Hybrid Cloud and Apache Spark 立即下载
Scalable Deep Learning on Spark 立即下载
Comparison of Spark SQL with Hive 立即下载