我使用结构化流式传输来自Kafka的数据
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("enable.auto.commit", false)
.option("auto.offset.reset", "earliest")
.option("group.id", UUID.randomUUID().toString)
.option("subscribe", "test")
.load()
然后尝试使用Cassandra表加入它
val d = df.select(from_json(col("value").cast("string"), schema).cast("string").alias("url"))
.rdd.joinWithCassandraTable[(String, String, String)]("analytics", "nlp2", SomeColumns("url", "ner", "sentiment"), SomeColumns("url"))
.toDS()
.writeStream
.format("console") // <-- use ConsoleSink
.option("truncate", false)
.option("numRows", 10)
.trigger(Trigger.ProcessingTime(5 seconds))
.queryName("rate-console")
.start
.awaitTermination()
但我得到,当我尝试将数据框转换为rdd时,任何想法为什么?
Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
kafka
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:34)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
正如错误消息所示,start需要用括号调用,如下所示:
val d = df.select(from_json(col("value").cast("string"), schema).cast("string").alias("url"))
.rdd.joinWithCassandraTable[(String, String, String)]("analytics", "nlp2", SomeColumns("url", "ner", "sentiment"), SomeColumns("url"))
.toDS()
.writeStream
.format("console") // <-- use ConsoleSink
.option("truncate", false)
.option("numRows", 10)
.trigger(Trigger.ProcessingTime(5 seconds))
.queryName("rate-console")
.start()
.awaitTermination()
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。