如何使用spark将kafka主题中的writeStream数据写入hdfs?-问答-阿里云开发者社区-阿里云

开发者社区> 问答> 正文
阿里云
为了无法计算的价值
打开APP
阿里云APP内打开

如何使用spark将kafka主题中的writeStream数据写入hdfs?

2019-01-02 14:47:05 5872 2

我一直试图让这段代码工作几个小时:

val spark = SparkSession.builder()
.appName("Consumer")
.getOrCreate()

spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", url)
.option("subscribe", topic)
.load()
.select("value")
.writeStream
.format(fileFormat)
.option("path", filePath)
.option("checkpointLocation", "/tmp/checkpoint")
.start()
.awaitTermination()
它给出了这个例外:

Logical Plan:
Project [value#8]
+- StreamingExecutionRelation KafkaV2[Subscribe[MyTopic]], [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]

at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Caused by: java.lang.ClassCastException: org.apache.spark.sql.execution.streaming.SerializedOffset cannot be cast to org.apache.spark.sql.sources.v2.reader.streaming.Offset
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1$$anonfun$apply$9.apply(MicroBatchExecution.scala:405)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1$$anonfun$apply$9.apply(MicroBatchExecution.scala:390)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:390)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:390)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:389)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
我不明白发生了什么,我只是试图使用spark streaming将kafka主题的数据写入HDFS。为什么这么难?我怎么能这样做?

我得到了批处理版本才能正常工作:

spark.read
.format("kafka")
.option("kafka.bootstrap.servers", url)
.option("subscribe", topic)
.load()
.selectExpr("CAST(value AS String)")
.write
.format(fileFormat)
.save(filePath)

取消 提交回答
全部回答(2)
添加回答
相关问答

1

回答

Spark中数据的位置是如何管理的?

2021-12-07 17:39:53 113浏览量 回答数 1

1

回答

Spark中数据的位置是被谁管理的啊?

2021-12-06 17:31:09 146浏览量 回答数 1

3

回答

Spark中数据的位置是被谁管理的?

2019-10-28 16:07:47 2768浏览量 回答数 3

1

回答

Spark Streaming 原理是什么?

2022-01-13 15:54:26 1118浏览量 回答数 1

1

回答

Spark Streaming中的DStream的转化操作是什么?

2021-12-07 20:17:05 111浏览量 回答数 1

1

回答

Spark Streaming中的DStream的输出操作是什么?

2021-12-07 20:17:49 136浏览量 回答数 1

1

回答

spark streaming中的foreachRDD(func)方法是什么?

2021-12-07 08:05:05 291浏览量 回答数 1

1

回答

使用Spark Streaming SQL进行 PV/UV统计的准备工作?

2020-12-28 11:39:00 418浏览量 回答数 1

1

回答

如何使用Spark Streaming SQL进行 PV/UV统计?

2020-12-28 11:38:30 486浏览量 回答数 1

1

回答

spark streaming实时程序 进程总是运行三四个小时左右不知道原因挂掉

2019-12-20 21:25:15 877浏览量 回答数 1
+关注
社区小助手
社区小助手是spark中国社区的管理员,我会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关spark的问题及回答。
12
文章
824
问答
问答排行榜
最热
最新
相关电子书
更多
低代码开发师(初级)实战教程
立即下载
阿里巴巴DevOps 最佳实践手册
立即下载
冬季实战营第三期:MySQL数据库进阶实战
立即下载