Structured Streaming报错记录:Overloaded method foreachBatch with alternatives
0. 写在前面
- Spark :
Spark3.0.0
- Scala :
Scala2.12
1. 报错
overloaded method value foreachBatch with alternatives:
2. 代码及报错信息
Error:(48, 12) overloaded method value foreachBatch with alternatives:
(function:org.apache.spark.api.java.function.VoidFunction2[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row],java.lang.Long])org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]
(function: (org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], scala.Long) => Unit)org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]
cannot be applied to ((org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], Any) => org.apache.spark.sql.Dataset[org.apache.spark.sql.Row])
.foreachBatch((df, batchId) => {
importjava.util.Propertiesimportorg.apache.spark.sql.streaming.{StreamingQuery, Trigger} importorg.apache.spark.sql.{DataFrame, SparkSession} objectForeachBatchSink1 { defmain(args: Array[String]): Unit= { valspark: SparkSession=SparkSession .builder() .master("local[*]") .appName("ForeachSink1") .getOrCreate() importspark.implicits._vallines: DataFrame=spark.readStream .format("socket") // 设置数据源 .option("host", "cluster01") .option("port", 10000) .loadvalprops=newProperties() props.setProperty("user", "root") props.setProperty("password", "1234") valquery: StreamingQuery=lines.writeStream .outputMode("update") .foreachBatch((df, batchId) => { valresult=df.as[String].flatMap(_.split("\\W+")).groupBy("value").count() result.persist() result.write.mode("overwrite").jdbc("jdbc:mysql://cluster01:3306/test","wc", props) result.write.mode("overwrite").json("./foreach1") result.unpersist() }) // .trigger(Trigger.ProcessingTime(0)) .trigger(Trigger.Continuous(10)) .startquery.awaitTermination() } }
Error:(43, 12) overloaded method value foreachBatch with alternatives:
(function:org.apache.spark.api.java.function.VoidFunction2[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row],java.lang.Long])org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]
(function: (org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], scala.Long) => Unit)org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]cannot be applied to ((org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], Any) => org.apache.spark.sql.DataFrame)
.foreachBatch((df, batchId) => {
importjava.util.Propertiesimportorg.apache.spark.sql.streaming.{StreamingQuery, Trigger} importorg.apache.spark.sql.{DataFrame, SparkSession} objectForeachBatchSink { defmain(args: Array[String]): Unit= { valspark: SparkSession=SparkSession .builder() .master("local[*]") .appName("ForeachSink") .getOrCreate() importspark.implicits._vallines: DataFrame=spark.readStream .format("socket") // 设置数据源 .option("host", "cluster01") .option("port", 10000) .loadvalprops=newProperties() props.setProperty("user", "root") props.setProperty("password", "1234") valquery: StreamingQuery=lines.writeStream .outputMode("complete") .foreachBatch((df, batchId) => { result.persist() result.write.mode("overwrite").jdbc("jdbc:mysql://cluster01:3306/test","wc", props) result.write.mode("overwrite").json("./foreach") result.unpersist() }) .startquery.awaitTermination() } }
3. 原因及纠错
Scala2.12版本和2.11版本的不同,对于foreachBatch()方法的实现不太一样
正确代码如下
importjava.util.Propertiesimportorg.apache.spark.sql.streaming.StreamingQueryimportorg.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} objectForeachBatchSink { defmyFun(df: Dataset[Row], batchId: Long, props: Properties): Unit= { println("BatchId"+batchId) if (df.count() !=0) { df.persist() df.write.mode("overwrite").jdbc("jdbc:mysql://cluster01:3306/test","wc", props) df.write.mode("overwrite").json("./StructedStreaming_sink-ForeachBatchSink") df.unpersist() } } defmain(args: Array[String]): Unit= { valspark: SparkSession=SparkSession .builder() .master("local[2]") .appName("ForeachBatchSink") .getOrCreate() importspark.implicits._vallines: DataFrame=spark.readStream .format("socket") // TODO 设置数据源 .option("host", "cluster01") .option("port", 10000) .loadvalwordCount: DataFrame=lines.as[String] .flatMap(_.split("\\W+")) .groupBy("value") .count() // value countvalprops=newProperties() props.setProperty("user", "root") props.setProperty("password", "1234") valquery: StreamingQuery=wordCount.writeStream .outputMode("complete") .foreachBatch((df : Dataset[Row], batchId : Long) => { myFun(df, batchId, props) }) .startquery.awaitTermination() } }
importjava.util.Propertiesimportorg.apache.spark.sql.streaming.{StreamingQuery, Trigger} importorg.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} objectForeachBatchSink1 { defmyFun(df: Dataset[Row], batchId: Long, props: Properties, spark : SparkSession): Unit= { importspark.implicits._println("BatchId = "+batchId) if (df.count() !=0) { valresult=df.as[String].flatMap(_.split("\\W+")).groupBy("value").count() result.persist() result.write.mode("overwrite").jdbc("jdbc:mysql://cluster01:3306/test","wc", props) result.write.mode("overwrite").json("./StructedStreaming_sink-ForeachBatchSink1") result.unpersist() } } defmain(args: Array[String]): Unit= { valspark: SparkSession=SparkSession .builder() .master("local[2]") .appName("ForeachBatchSink1") .getOrCreate() importspark.implicits._vallines: DataFrame=spark.readStream .format("socket") // TODO 设置数据源 .option("host", "cluster01") .option("port", 10000) .loadvalprops=newProperties() props.setProperty("user", "root") props.setProperty("password", "1234") valquery: StreamingQuery=lines.writeStream .outputMode("update") .foreachBatch((df : Dataset[Row], batchId : Long) => { myFun(df, batchId, props, spark) }) .trigger(Trigger.Continuous(10)) .startquery.awaitTermination() } }