从Kafka主题读取偏移并将其写入aerospike数据库。目前我正在准备这个工作生产准备和实施SparkListener。
在浏览文档的过程中,我偶然发现了这个例子:
StreamingQuery query = wordCounts.writeStream()
.outputMode("complete")
.format("console")
.start();
query.awaitTermination();
执行此代码后,流式计算将在后台启动。查询对象是该活动流式查询的句柄,我们决定使用awaitTermination()等待查询终止,以防止进程在查询处于活动状态时退出。
我知道它在终止进程之前等待查询完成。
这究竟是什么意思?它有助于避免查询写入的数据丢失。
当查询每天写入数百万条记录时,它有什么用?
我的代码看起来很简单:
dataset
.writeStream()
.option("startingOffsets", "earliest")
.outputMode(OutputMode.Append())
.format("console")
.foreach(sink)
.trigger(Trigger.ProcessingTime(triggerInterval))
.option("checkpointLocation", checkpointLocation)
.start();
由于查询是在后台启动的,没有明确的阻塞指令,您的代码只会到达main函数的末尾并立即退出。
没有用。它确保查询完全执行。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。