Structured Streaming报错记录:Overloaded method foreachBatch with alternatives

本文涉及的产品
RDS MySQL DuckDB 分析主实例,基础系列 4核8GB
RDSClaw,2核4GB
RDS DuckDB + QuickBI 企业套餐,8核32GB + QuickBI 专业版
简介: Structured Streaming报错记录:Overloaded method foreachBatch with alternatives

Structured Streaming报错记录:Overloaded method foreachBatch with alternatives



bc77bb4e6f814625b865bdde621424d3.jpeg



0. 写在前面

  • Spark : Spark3.0.0 
  • Scala :  Scala2.12 


1. 报错


overloaded method value foreachBatch with alternatives:


2. 代码及报错信息


Error:(48, 12) overloaded method value foreachBatch with alternatives:


(function:org.apache.spark.api.java.function.VoidFunction2[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row],java.lang.Long])org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]
(function: (org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], scala.Long) => Unit)org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]
cannot be applied to ((org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], Any) => org.apache.spark.sql.Dataset[org.apache.spark.sql.Row])
.foreachBatch((df, batchId) => {


importjava.util.Propertiesimportorg.apache.spark.sql.streaming.{StreamingQuery, Trigger}
importorg.apache.spark.sql.{DataFrame, SparkSession}
objectForeachBatchSink1 {
defmain(args: Array[String]): Unit= {
valspark: SparkSession=SparkSession            .builder()
            .master("local[*]")
            .appName("ForeachSink1")
            .getOrCreate()
importspark.implicits._vallines: DataFrame=spark.readStream            .format("socket") // 设置数据源            .option("host", "cluster01")
            .option("port", 10000)
            .loadvalprops=newProperties()
props.setProperty("user", "root")
props.setProperty("password", "1234")
valquery: StreamingQuery=lines.writeStream            .outputMode("update")
            .foreachBatch((df, batchId) => {
valresult=df.as[String].flatMap(_.split("\\W+")).groupBy("value").count()
result.persist()
result.write.mode("overwrite").jdbc("jdbc:mysql://cluster01:3306/test","wc", props)
result.write.mode("overwrite").json("./foreach1")
result.unpersist()
            })
//            .trigger(Trigger.ProcessingTime(0))            .trigger(Trigger.Continuous(10))
            .startquery.awaitTermination()
    }
}




Error:(43, 12) overloaded method value foreachBatch with alternatives:
(function:org.apache.spark.api.java.function.VoidFunction2[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row],java.lang.Long])org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]
(function: (org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], scala.Long) => Unit)org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]cannot be applied to ((org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], Any) => org.apache.spark.sql.DataFrame)
.foreachBatch((df, batchId) => {


importjava.util.Propertiesimportorg.apache.spark.sql.streaming.{StreamingQuery, Trigger}
importorg.apache.spark.sql.{DataFrame, SparkSession}
objectForeachBatchSink {
defmain(args: Array[String]): Unit= {
valspark: SparkSession=SparkSession            .builder()
            .master("local[*]")
            .appName("ForeachSink")
            .getOrCreate()
importspark.implicits._vallines: DataFrame=spark.readStream            .format("socket") // 设置数据源            .option("host", "cluster01")
            .option("port", 10000)
            .loadvalprops=newProperties()
props.setProperty("user", "root")
props.setProperty("password", "1234")
valquery: StreamingQuery=lines.writeStream            .outputMode("complete")
            .foreachBatch((df, batchId) => {          
result.persist()
result.write.mode("overwrite").jdbc("jdbc:mysql://cluster01:3306/test","wc", props)
result.write.mode("overwrite").json("./foreach")
result.unpersist()
            })
            .startquery.awaitTermination()
    }
}



3. 原因及纠错


Scala2.12版本和2.11版本的不同,对于foreachBatch()方法的实现不太一样


正确代码如下 

importjava.util.Propertiesimportorg.apache.spark.sql.streaming.StreamingQueryimportorg.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
objectForeachBatchSink {
defmyFun(df: Dataset[Row], batchId: Long, props: Properties): Unit= {
println("BatchId"+batchId)
if (df.count() !=0) {
df.persist()
df.write.mode("overwrite").jdbc("jdbc:mysql://cluster01:3306/test","wc", props)
df.write.mode("overwrite").json("./StructedStreaming_sink-ForeachBatchSink")
df.unpersist()
        }
    }
defmain(args: Array[String]): Unit= {
valspark: SparkSession=SparkSession          .builder()
          .master("local[2]")
          .appName("ForeachBatchSink")
          .getOrCreate()
importspark.implicits._vallines: DataFrame=spark.readStream          .format("socket") // TODO 设置数据源          .option("host", "cluster01")
          .option("port", 10000)
          .loadvalwordCount: DataFrame=lines.as[String]
          .flatMap(_.split("\\W+"))
          .groupBy("value")
          .count()  // value countvalprops=newProperties()
props.setProperty("user", "root")
props.setProperty("password", "1234")
valquery: StreamingQuery=wordCount.writeStream          .outputMode("complete")
          .foreachBatch((df : Dataset[Row], batchId : Long) => {
myFun(df, batchId, props)
          })
          .startquery.awaitTermination()
    }
}





importjava.util.Propertiesimportorg.apache.spark.sql.streaming.{StreamingQuery, Trigger}
importorg.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
objectForeachBatchSink1 {
defmyFun(df: Dataset[Row], batchId: Long, props: Properties, spark : SparkSession): Unit= {
importspark.implicits._println("BatchId = "+batchId)
if (df.count() !=0) {
valresult=df.as[String].flatMap(_.split("\\W+")).groupBy("value").count()
result.persist()
result.write.mode("overwrite").jdbc("jdbc:mysql://cluster01:3306/test","wc", props)
result.write.mode("overwrite").json("./StructedStreaming_sink-ForeachBatchSink1")
result.unpersist()
        }
    }
defmain(args: Array[String]): Unit= {
valspark: SparkSession=SparkSession          .builder()
          .master("local[2]")
          .appName("ForeachBatchSink1")
          .getOrCreate()
importspark.implicits._vallines: DataFrame=spark.readStream          .format("socket") // TODO 设置数据源          .option("host", "cluster01")
          .option("port", 10000)
          .loadvalprops=newProperties()
props.setProperty("user", "root")
props.setProperty("password", "1234")
valquery: StreamingQuery=lines.writeStream          .outputMode("update")
          .foreachBatch((df : Dataset[Row], batchId : Long) => {
myFun(df, batchId, props, spark)
          })
          .trigger(Trigger.Continuous(10))
          .startquery.awaitTermination()
    }
}


4. 参考链接

https://blog.csdn.net/Shockang/article/details/120961968

相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。   相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情: https://www.aliyun.com/product/rds/mysql 
目录
相关文章
|
机器学习/深度学习 算法 计算机视觉
Yolov5 + 界面PyQt5 +.exe文件部署运行
Yolov5 + 界面PyQt5 +.exe文件部署运行
|
Ubuntu Linux Python
【服务器部署】让程序在服务器后台持续运行
在服务器后台运行项目不方便管理?不会在后台运行项目?快来试试Screen吧
1505 0
|
6月前
|
JSON 负载均衡 监控
《服务治理》Thrift与gRPC深度对比与实践
在微服务架构中,服务间通信是系统设计的核心环节。RPC(Remote Procedure Call)框架通过抽象网络通信细节,让开发者能够像调用本地方法一样调用远程服务,极大地提升了开发效率。
|
C语言
【C语言】符号优先级详解 -《谁与争锋 ! 》
理解C语言中的运算符优先级和结合性是编写正确代码的关键。本文详细介绍了C语言中的各种运算符、它们的优先级和结合性,并通过示例展示了如何正确使用这些运算符。掌握这些知识,将有助于编写出逻辑严谨、结构清晰的C语言程序。
797 8
|
存储 NoSQL Redis
【Redis从头学-12】Redis主从复制和读写分离的多种部署方式解析(普通方式、Docker搭建方式、Docker-Compose搭建方式)上
【Redis从头学-12】Redis主从复制和读写分离的多种部署方式解析(普通方式、Docker搭建方式、Docker-Compose搭建方式)
442 0
|
数据处理 Python
python遍历文件夹所有文件按什么排序
python遍历文件夹所有文件按什么排序
359 1
|
存储 运维 Kubernetes
k8s学习笔记之StorageClass+NFS
k8s学习笔记之StorageClass+NFS
|
关系型数据库 MySQL
清理MySQL的binlog日志
清理MySQL的binlog日志
1996 0
|
数据采集 Python
requests库中Session对象超时解决过程
requests库中Session对象超时解决过程