Structured Streaming报错记录:Overloaded method foreachBatch with alternatives

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: Structured Streaming报错记录:Overloaded method foreachBatch with alternatives

Structured Streaming报错记录:Overloaded method foreachBatch with alternatives



bc77bb4e6f814625b865bdde621424d3.jpeg



0. 写在前面

  • Spark : Spark3.0.0 
  • Scala :  Scala2.12 


1. 报错


overloaded method value foreachBatch with alternatives:


2. 代码及报错信息


Error:(48, 12) overloaded method value foreachBatch with alternatives:


(function:org.apache.spark.api.java.function.VoidFunction2[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row],java.lang.Long])org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]
(function: (org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], scala.Long) => Unit)org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]
cannot be applied to ((org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], Any) => org.apache.spark.sql.Dataset[org.apache.spark.sql.Row])
.foreachBatch((df, batchId) => {


importjava.util.Propertiesimportorg.apache.spark.sql.streaming.{StreamingQuery, Trigger}
importorg.apache.spark.sql.{DataFrame, SparkSession}
objectForeachBatchSink1 {
defmain(args: Array[String]): Unit= {
valspark: SparkSession=SparkSession            .builder()
            .master("local[*]")
            .appName("ForeachSink1")
            .getOrCreate()
importspark.implicits._vallines: DataFrame=spark.readStream            .format("socket") // 设置数据源            .option("host", "cluster01")
            .option("port", 10000)
            .loadvalprops=newProperties()
props.setProperty("user", "root")
props.setProperty("password", "1234")
valquery: StreamingQuery=lines.writeStream            .outputMode("update")
            .foreachBatch((df, batchId) => {
valresult=df.as[String].flatMap(_.split("\\W+")).groupBy("value").count()
result.persist()
result.write.mode("overwrite").jdbc("jdbc:mysql://cluster01:3306/test","wc", props)
result.write.mode("overwrite").json("./foreach1")
result.unpersist()
            })
//            .trigger(Trigger.ProcessingTime(0))            .trigger(Trigger.Continuous(10))
            .startquery.awaitTermination()
    }
}




Error:(43, 12) overloaded method value foreachBatch with alternatives:
(function:org.apache.spark.api.java.function.VoidFunction2[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row],java.lang.Long])org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]
(function: (org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], scala.Long) => Unit)org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]cannot be applied to ((org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], Any) => org.apache.spark.sql.DataFrame)
.foreachBatch((df, batchId) => {


importjava.util.Propertiesimportorg.apache.spark.sql.streaming.{StreamingQuery, Trigger}
importorg.apache.spark.sql.{DataFrame, SparkSession}
objectForeachBatchSink {
defmain(args: Array[String]): Unit= {
valspark: SparkSession=SparkSession            .builder()
            .master("local[*]")
            .appName("ForeachSink")
            .getOrCreate()
importspark.implicits._vallines: DataFrame=spark.readStream            .format("socket") // 设置数据源            .option("host", "cluster01")
            .option("port", 10000)
            .loadvalprops=newProperties()
props.setProperty("user", "root")
props.setProperty("password", "1234")
valquery: StreamingQuery=lines.writeStream            .outputMode("complete")
            .foreachBatch((df, batchId) => {          
result.persist()
result.write.mode("overwrite").jdbc("jdbc:mysql://cluster01:3306/test","wc", props)
result.write.mode("overwrite").json("./foreach")
result.unpersist()
            })
            .startquery.awaitTermination()
    }
}



3. 原因及纠错


Scala2.12版本和2.11版本的不同,对于foreachBatch()方法的实现不太一样


正确代码如下 

importjava.util.Propertiesimportorg.apache.spark.sql.streaming.StreamingQueryimportorg.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
objectForeachBatchSink {
defmyFun(df: Dataset[Row], batchId: Long, props: Properties): Unit= {
println("BatchId"+batchId)
if (df.count() !=0) {
df.persist()
df.write.mode("overwrite").jdbc("jdbc:mysql://cluster01:3306/test","wc", props)
df.write.mode("overwrite").json("./StructedStreaming_sink-ForeachBatchSink")
df.unpersist()
        }
    }
defmain(args: Array[String]): Unit= {
valspark: SparkSession=SparkSession          .builder()
          .master("local[2]")
          .appName("ForeachBatchSink")
          .getOrCreate()
importspark.implicits._vallines: DataFrame=spark.readStream          .format("socket") // TODO 设置数据源          .option("host", "cluster01")
          .option("port", 10000)
          .loadvalwordCount: DataFrame=lines.as[String]
          .flatMap(_.split("\\W+"))
          .groupBy("value")
          .count()  // value countvalprops=newProperties()
props.setProperty("user", "root")
props.setProperty("password", "1234")
valquery: StreamingQuery=wordCount.writeStream          .outputMode("complete")
          .foreachBatch((df : Dataset[Row], batchId : Long) => {
myFun(df, batchId, props)
          })
          .startquery.awaitTermination()
    }
}





importjava.util.Propertiesimportorg.apache.spark.sql.streaming.{StreamingQuery, Trigger}
importorg.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
objectForeachBatchSink1 {
defmyFun(df: Dataset[Row], batchId: Long, props: Properties, spark : SparkSession): Unit= {
importspark.implicits._println("BatchId = "+batchId)
if (df.count() !=0) {
valresult=df.as[String].flatMap(_.split("\\W+")).groupBy("value").count()
result.persist()
result.write.mode("overwrite").jdbc("jdbc:mysql://cluster01:3306/test","wc", props)
result.write.mode("overwrite").json("./StructedStreaming_sink-ForeachBatchSink1")
result.unpersist()
        }
    }
defmain(args: Array[String]): Unit= {
valspark: SparkSession=SparkSession          .builder()
          .master("local[2]")
          .appName("ForeachBatchSink1")
          .getOrCreate()
importspark.implicits._vallines: DataFrame=spark.readStream          .format("socket") // TODO 设置数据源          .option("host", "cluster01")
          .option("port", 10000)
          .loadvalprops=newProperties()
props.setProperty("user", "root")
props.setProperty("password", "1234")
valquery: StreamingQuery=lines.writeStream          .outputMode("update")
          .foreachBatch((df : Dataset[Row], batchId : Long) => {
myFun(df, batchId, props, spark)
          })
          .trigger(Trigger.Continuous(10))
          .startquery.awaitTermination()
    }
}


4. 参考链接

https://blog.csdn.net/Shockang/article/details/120961968

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
关系型数据库 MySQL Java
Flink作业报错:Caused by: The connector is trying to read binlog starting at GTIDs ..., but this is no longer available on the server
Flink作业报错:Caused by: The connector is trying to read binlog starting at GTIDs ..., but this is no longer available on the server
Flink作业报错:Caused by: The connector is trying to read binlog starting at GTIDs ..., but this is no longer available on the server
|
6月前
|
SQL 消息中间件 存储
Flink报错问题之Flink报错:Table sink 'a' doesn't support consuming update and delete changes which is produced by node如何解决
Flink报错通常是指在使用Apache Flink进行实时数据处理时遇到的错误和异常情况;本合集致力于收集Flink运行中的报错信息和解决策略,以便开发者及时排查和修复问题,优化Flink作业的稳定性。
|
SQL HIVE
DataGrip连接Hive执行DDL操作报错:「FAILED: ParseException line 1:5 cannot recognize input near 'show' 'indexes' 'on' in ddl statement」
DataGrip连接Hive执行DDL操作报错:「FAILED: ParseException line 1:5 cannot recognize input near 'show' 'indexes' 'on' in ddl statement」
594 0
DataGrip连接Hive执行DDL操作报错:「FAILED: ParseException line 1:5 cannot recognize input near 'show' 'indexes' 'on' in ddl statement」
|
6月前
|
存储 NoSQL 关系型数据库
实时计算 Flink版操作报错之抽取Oracle11g时,报错: "Retrieve schema history failed, the schema records for engine ... has been removed",怎么处理
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
6月前
|
消息中间件 Oracle 关系型数据库
实时计算 Flink版操作报错合集之报错io.debezium.DebeziumException: The db history topic or its content is fully or partially missing. Please check database history topic configuration and re-execute the snapshot. 是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
332 0
|
6月前
|
Oracle 关系型数据库 数据库
实时计算 Flink版操作报错合集之执行Flink job,报错“Could not execute SQL statement. Reason:org.apache.flink.table.api.ValidationException: One or more required options are missing”,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
314 0
|
6月前
|
SQL 数据处理 Apache
Flink报错问题之Flink报错Only a single 'INSERT INTO' is supported如何解决
Flink报错通常是指在使用Apache Flink进行实时数据处理时遇到的错误和异常情况;本合集致力于收集Flink运行中的报错信息和解决策略,以便开发者及时排查和修复问题,优化Flink作业的稳定性。
|
6月前
|
SQL 消息中间件 Kubernetes
Flink报错问题之Flink报错:Incompatible types for sink column 'xxx' at position x.如何解决
Flink报错通常是指在使用Apache Flink进行实时数据处理时遇到的错误和异常情况;本合集致力于收集Flink运行中的报错信息和解决策略,以便开发者及时排查和修复问题,优化Flink作业的稳定性。
|
SQL JSON 分布式计算
Spark SQL实战(07)-Data Sources
Spark SQL通过DataFrame接口支持对多种数据源进行操作。 DataFrame可使用关系型变换进行操作,也可用于创建临时视图。将DataFrame注册为临时视图可以让你对其数据运行SQL查询。
227 0
|
流计算 索引
flink-sql入es报错:Missing required options are document-type
我是在flink-sql创建es表的时候报的错,报错提示缺少对应的options,及document-type ,我连忙去flink官方文档查找答案
399 0
flink-sql入es报错:Missing required options are document-type