Structured Streaming报错记录:Overloaded method foreachBatch with alternatives

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,高可用系列 2核4GB
云数据库 RDS PostgreSQL,高可用系列 2核4GB
简介: Structured Streaming报错记录:Overloaded method foreachBatch with alternatives

Structured Streaming报错记录:Overloaded method foreachBatch with alternatives



bc77bb4e6f814625b865bdde621424d3.jpeg



0. 写在前面

  • Spark : Spark3.0.0 
  • Scala :  Scala2.12 


1. 报错


overloaded method value foreachBatch with alternatives:


2. 代码及报错信息


Error:(48, 12) overloaded method value foreachBatch with alternatives:


(function:org.apache.spark.api.java.function.VoidFunction2[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row],java.lang.Long])org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]
(function: (org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], scala.Long) => Unit)org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]
cannot be applied to ((org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], Any) => org.apache.spark.sql.Dataset[org.apache.spark.sql.Row])
.foreachBatch((df, batchId) => {


importjava.util.Propertiesimportorg.apache.spark.sql.streaming.{StreamingQuery, Trigger}
importorg.apache.spark.sql.{DataFrame, SparkSession}
objectForeachBatchSink1 {
defmain(args: Array[String]): Unit= {
valspark: SparkSession=SparkSession            .builder()
            .master("local[*]")
            .appName("ForeachSink1")
            .getOrCreate()
importspark.implicits._vallines: DataFrame=spark.readStream            .format("socket") // 设置数据源            .option("host", "cluster01")
            .option("port", 10000)
            .loadvalprops=newProperties()
props.setProperty("user", "root")
props.setProperty("password", "1234")
valquery: StreamingQuery=lines.writeStream            .outputMode("update")
            .foreachBatch((df, batchId) => {
valresult=df.as[String].flatMap(_.split("\\W+")).groupBy("value").count()
result.persist()
result.write.mode("overwrite").jdbc("jdbc:mysql://cluster01:3306/test","wc", props)
result.write.mode("overwrite").json("./foreach1")
result.unpersist()
            })
//            .trigger(Trigger.ProcessingTime(0))            .trigger(Trigger.Continuous(10))
            .startquery.awaitTermination()
    }
}




Error:(43, 12) overloaded method value foreachBatch with alternatives:
(function:org.apache.spark.api.java.function.VoidFunction2[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row],java.lang.Long])org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]
(function: (org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], scala.Long) => Unit)org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]cannot be applied to ((org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], Any) => org.apache.spark.sql.DataFrame)
.foreachBatch((df, batchId) => {


importjava.util.Propertiesimportorg.apache.spark.sql.streaming.{StreamingQuery, Trigger}
importorg.apache.spark.sql.{DataFrame, SparkSession}
objectForeachBatchSink {
defmain(args: Array[String]): Unit= {
valspark: SparkSession=SparkSession            .builder()
            .master("local[*]")
            .appName("ForeachSink")
            .getOrCreate()
importspark.implicits._vallines: DataFrame=spark.readStream            .format("socket") // 设置数据源            .option("host", "cluster01")
            .option("port", 10000)
            .loadvalprops=newProperties()
props.setProperty("user", "root")
props.setProperty("password", "1234")
valquery: StreamingQuery=lines.writeStream            .outputMode("complete")
            .foreachBatch((df, batchId) => {          
result.persist()
result.write.mode("overwrite").jdbc("jdbc:mysql://cluster01:3306/test","wc", props)
result.write.mode("overwrite").json("./foreach")
result.unpersist()
            })
            .startquery.awaitTermination()
    }
}



3. 原因及纠错


Scala2.12版本和2.11版本的不同,对于foreachBatch()方法的实现不太一样


正确代码如下 

importjava.util.Propertiesimportorg.apache.spark.sql.streaming.StreamingQueryimportorg.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
objectForeachBatchSink {
defmyFun(df: Dataset[Row], batchId: Long, props: Properties): Unit= {
println("BatchId"+batchId)
if (df.count() !=0) {
df.persist()
df.write.mode("overwrite").jdbc("jdbc:mysql://cluster01:3306/test","wc", props)
df.write.mode("overwrite").json("./StructedStreaming_sink-ForeachBatchSink")
df.unpersist()
        }
    }
defmain(args: Array[String]): Unit= {
valspark: SparkSession=SparkSession          .builder()
          .master("local[2]")
          .appName("ForeachBatchSink")
          .getOrCreate()
importspark.implicits._vallines: DataFrame=spark.readStream          .format("socket") // TODO 设置数据源          .option("host", "cluster01")
          .option("port", 10000)
          .loadvalwordCount: DataFrame=lines.as[String]
          .flatMap(_.split("\\W+"))
          .groupBy("value")
          .count()  // value countvalprops=newProperties()
props.setProperty("user", "root")
props.setProperty("password", "1234")
valquery: StreamingQuery=wordCount.writeStream          .outputMode("complete")
          .foreachBatch((df : Dataset[Row], batchId : Long) => {
myFun(df, batchId, props)
          })
          .startquery.awaitTermination()
    }
}





importjava.util.Propertiesimportorg.apache.spark.sql.streaming.{StreamingQuery, Trigger}
importorg.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
objectForeachBatchSink1 {
defmyFun(df: Dataset[Row], batchId: Long, props: Properties, spark : SparkSession): Unit= {
importspark.implicits._println("BatchId = "+batchId)
if (df.count() !=0) {
valresult=df.as[String].flatMap(_.split("\\W+")).groupBy("value").count()
result.persist()
result.write.mode("overwrite").jdbc("jdbc:mysql://cluster01:3306/test","wc", props)
result.write.mode("overwrite").json("./StructedStreaming_sink-ForeachBatchSink1")
result.unpersist()
        }
    }
defmain(args: Array[String]): Unit= {
valspark: SparkSession=SparkSession          .builder()
          .master("local[2]")
          .appName("ForeachBatchSink1")
          .getOrCreate()
importspark.implicits._vallines: DataFrame=spark.readStream          .format("socket") // TODO 设置数据源          .option("host", "cluster01")
          .option("port", 10000)
          .loadvalprops=newProperties()
props.setProperty("user", "root")
props.setProperty("password", "1234")
valquery: StreamingQuery=lines.writeStream          .outputMode("update")
          .foreachBatch((df : Dataset[Row], batchId : Long) => {
myFun(df, batchId, props, spark)
          })
          .trigger(Trigger.Continuous(10))
          .startquery.awaitTermination()
    }
}


4. 参考链接

https://blog.csdn.net/Shockang/article/details/120961968

相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。   相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情: https://www.aliyun.com/product/rds/mysql 
目录
相关文章
|
网络协议 Java Nacos
Nacos报错问题之jar 包启动就报错误如何解决
Nacos是一个开源的、易于部署的动态服务发现、配置管理和服务管理平台,旨在帮助微服务架构下的应用进行快速配置更新和服务治理;在实际运用中,用户可能会遇到各种报错,本合集将常见的Nacos报错问题进行归纳和解答,以便使用者能够快速定位和解决这些问题。
|
机器学习/深度学习 算法 计算机视觉
Yolov5 + 界面PyQt5 +.exe文件部署运行
Yolov5 + 界面PyQt5 +.exe文件部署运行
|
数据采集 Prometheus 监控
通过micrometer实时监控线程池的各项指标
最近的一个项目中涉及到文件上传和下载,使用到JUC的线程池ThreadPoolExecutor,在生产环境中出现了某些时刻线程池满负载运作,由于使用了CallerRunsPolicy拒绝策略,导致满负载情况下,应用接口调用无法响应,处于假死状态。考虑到之前用micrometer + prometheus + grafana搭建过监控体系,于是考虑使用micrometer做一次主动的线程池度量数据采集,最终可以相对实时地展示在grafana的面板中。
1276 0
通过micrometer实时监控线程池的各项指标
|
Ubuntu Linux Python
【服务器部署】让程序在服务器后台持续运行
在服务器后台运行项目不方便管理?不会在后台运行项目?快来试试Screen吧
1221 0
|
9月前
|
SQL 关系型数据库 MySQL
深入解析MySQL的EXPLAIN:指标详解与索引优化
MySQL 中的 `EXPLAIN` 语句用于分析和优化 SQL 查询,帮助你了解查询优化器的执行计划。本文详细介绍了 `EXPLAIN` 输出的各项指标,如 `id`、`select_type`、`table`、`type`、`key` 等,并提供了如何利用这些指标优化索引结构和 SQL 语句的具体方法。通过实战案例,展示了如何通过创建合适索引和调整查询语句来提升查询性能。
1741 10
|
SQL
SqlServer 服务无法启动 操作系统错误: 5(拒绝访问。)
SqlServer 服务无法启动 操作系统错误: 5(拒绝访问。)
483 0
SqlServer 服务无法启动 操作系统错误: 5(拒绝访问。)
|
9月前
|
C语言
【C语言】符号优先级详解 -《谁与争锋 ! 》
理解C语言中的运算符优先级和结合性是编写正确代码的关键。本文详细介绍了C语言中的各种运算符、它们的优先级和结合性,并通过示例展示了如何正确使用这些运算符。掌握这些知识,将有助于编写出逻辑严谨、结构清晰的C语言程序。
350 8
|
传感器 机器学习/深度学习 编解码
一文尽览 | 基于点云、多模态的3D目标检测算法综述!(Point/Voxel/Point-Voxel)(下)
目前3D目标检测领域方案主要包括基于单目、双目、激光雷达点云、多模态数据融合等方式,本文主要介绍基于激光雷达雷达点云、多模态数据的相关算法,下面展开讨论下~
一文尽览 | 基于点云、多模态的3D目标检测算法综述!(Point/Voxel/Point-Voxel)(下)
|
存储 NoSQL Redis
【Redis从头学-12】Redis主从复制和读写分离的多种部署方式解析(普通方式、Docker搭建方式、Docker-Compose搭建方式)上
【Redis从头学-12】Redis主从复制和读写分离的多种部署方式解析(普通方式、Docker搭建方式、Docker-Compose搭建方式)
328 0