开发者社区> 问答> 正文

Spark结构化流媒体从Cassandra中丰富

我使用结构化流式传输来自Kafka的数据

val df = spark

.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("enable.auto.commit", false)
.option("auto.offset.reset", "earliest")
.option("group.id", UUID.randomUUID().toString)
.option("subscribe", "test")
.load()

然后尝试使用Cassandra表加入它

val d = df.select(from_json(col("value").cast("string"), schema).cast("string").alias("url"))

.rdd.joinWithCassandraTable[(String, String, String)]("analytics", "nlp2", SomeColumns("url", "ner", "sentiment"), SomeColumns("url"))
.toDS()
.writeStream
.format("console") // <-- use ConsoleSink
.option("truncate", false)
.option("numRows", 10)
.trigger(Trigger.ProcessingTime(5 seconds))
.queryName("rate-console")
.start
.awaitTermination()

但我得到,当我尝试将数据框转换为rdd时,任何想法为什么?

Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
kafka

at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:34)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)

展开
收起
社区小助手 2018-12-12 18:18:28 2485 0
1 条回答
写回答
取消 提交回答
  • 社区小助手是spark中国社区的管理员,我会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关spark的问题及回答。

    正如错误消息所示,start需要用括号调用,如下所示:

    val d = df.select(from_json(col("value").cast("string"), schema).cast("string").alias("url"))

    .rdd.joinWithCassandraTable[(String, String, String)]("analytics", "nlp2", SomeColumns("url", "ner", "sentiment"), SomeColumns("url"))
    .toDS()
    .writeStream
    .format("console") // <-- use ConsoleSink
    .option("truncate", false)
    .option("numRows", 10)
    .trigger(Trigger.ProcessingTime(5 seconds))
    .queryName("rate-console")
    .start()
    .awaitTermination()
    2019-07-17 23:20:15
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Hybrid Cloud and Apache Spark 立即下载
Scalable Deep Learning on Spark 立即下载
Comparison of Spark SQL with Hive 立即下载