开发者社区> 问答> 正文

(Spark结构化流媒体)如何处理kafka主题中的旧数据

社区小助手 2018-12-06 15:12:10 386

我开始使用spark结构化流媒体。

我通过waterMark从kafka主题(startOffset:latest)获取readStream,按窗口持续时间分组事件时间,并写入kafka主题。

我的问题是,如何在spark结构化流媒体作业之前处理写入kafka主题的数据?

我一开始尝试使用`startOffset:earliest'。但是kafka主题中的数据太大,因此由于yarn超时而未启动spark流式处理。(即使我增加超时值)

1.如果我只是创建批处理作业并按特定数据范围进行筛选。结果没有反映在spark流的当前状态,结果的一致性和准确性似乎存在问题。

我试图重置检查点目录,但它没有工作。
如何处理旧数据和大数据?

分享到
取消 提交回答
全部回答(1)
  • 社区小助手
    2019-07-17 23:18:32

    你可以尝试parmeter maxOffsetsPerTrigger对卡夫卡+结构化数据流从卡夫卡接收旧数据。将此参数的值设置为您希望一次从Kafka接收的记录数。

    使用:

    sparkSession.readStream

      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "test-name")
      .option("startingOffsets", "earliest")
      .option("maxOffsetsPerTrigger", 1)
      .option("group.id", "2")
      .option("auto.offset.reset", "earliest")
      .load()
    0 0
+ 订阅

大数据计算实践乐园,近距离学习前沿技术

推荐文章
相似问题
推荐课程