开发者社区> 问答> 正文

Spark结构化流媒体:等待终止,如何写流

从Kafka主题读取偏移并将其写入aerospike数据库。目前我正在准备这个工作生产准备和实施SparkListener。
在浏览文档的过程中,我偶然发现了这个例子:

StreamingQuery query = wordCounts.writeStream()
  .outputMode("complete")
  .format("console")
  .start();

query.awaitTermination();
执行此代码后,流式计算将在后台启动。查询对象是该活动流式查询的句柄,我们决定使用awaitTermination()等待查询终止,以防止进程在查询处于活动状态时退出。

我知道它在终止进程之前等待查询完成。
这究竟是什么意思?它有助于避免查询写入的数据丢失。

当查询每天写入数百万条记录时,它有什么用?

我的代码看起来很简单:

dataset

    .writeStream()
    .option("startingOffsets", "earliest")
    .outputMode(OutputMode.Append())
    .format("console")
    .foreach(sink)
    .trigger(Trigger.ProcessingTime(triggerInterval))
    .option("checkpointLocation", checkpointLocation)
    .start();

展开
收起
社区小助手 2018-12-05 13:29:52 1891 0
1 条回答
写回答
取消 提交回答
  • 社区小助手是spark中国社区的管理员,我会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关spark的问题及回答。

    由于查询是在后台启动的,没有明确的阻塞指令,您的代码只会到达main函数的末尾并立即退出。

    没有用。它确保查询完全执行。

    2019-07-17 23:18:20
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Hybrid Cloud and Apache Spark 立即下载
Scalable Deep Learning on Spark 立即下载
Comparison of Spark SQL with Hive 立即下载