Flink SQL 问题之用代码执行报错如何解决

简介: Flink SQL报错通常指在使用Apache Flink的SQL接口执行数据处理任务时遇到的问题;本合集将收集常见的Flink SQL报错情况及其解决方法,帮助用户迅速恢复数据处理流程。

问题一:请问一下,我已经使用Flink的SQL Client通过JDBC连接模式将数据从SQL Server导入到Hudi表中 然后将数据从hudi导入到SQL Server另一个表中 报错


请问一下,我已经使用Flink的SQL Client通过JDBC连接模式将数据从SQL Server导入到Hudi表中 然后将数据从hudi导入到SQL Server另一个表中 报错 Loading class com.mysql.jdbc.Driver'. This is deprecated. The new driver class iscom.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary. Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue. Caused by: java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: org/apache/flink/util/function/SerializableFunction 这是什么问题,可否使用Flink JDBC 从hudi 写入数据库呢?


参考回答:

报错信息中提到了 com.mysql.jdbc.Driver 这个类已经过时,应该使用 com.mysql.cj.jdbc.Driver 这个类。可以尝试更新 JDBC 驱动,使用新的 com.mysql.cj.jdbc.Driver 驱动类。

另外,报错信息中还提到了 java.lang.NoClassDefFoundError: org/apache/flink/util/function/SerializableFunction,这个错误可能是由于 Flink 版本不兼容导致的。建议检查 Flink 版本和相关依赖的版本是否匹配,尝试升级 Flink 版本或者降低相关依赖的版本。

关于使用 Flink JDBC 从 Hudi 写入数据库,是可以实现的。具体实现可以参考 Flink 官方文档中的示例代码,比如使用 JdbcOutputFormat 将数据写入数据库。在使用过程中,需要注意配置正确的 JDBC 连接信息和 SQL 语句,以及保证数据类型和表结构的兼容性。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/496537?spm=a2c6h.14164896.0.0.169063bfNaEsRK


问题二:请问flink sql cdc pg或者mysql 报这个错的原因是什么,报错: Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.pgsql_source'


请问flink sql cdc pg或者mysql 报这个错的原因是什么,报错: Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.pgsql_source'



参考回答:

应该是你参数哪里没写对。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/492408?spm=a2c6h.14164896.0.0.52bb63bf2LAFqc


问题三:各位大侠,flink sql cdc pg或者mysql报这个错的原因是什么,报错: Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.pgsql_source'


各位大侠,flink sql cdc pg或者mysql报这个错的原因是什么,报错: Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.pgsql_source'



参考回答:

看报错是由于 Flink SQL CDC 模块无法正确连接到 PostgreSQL 或 MySQL 数据库引起的。

检查下数据库连接信息、数据库不存在或表不存在、数据驱动、数据库访问权限等。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/492444?spm=a2c6h.14164896.0.0.52bb63bf2LAFqc


问题四:用代码执行flink sql 报错


错误:

Caused by: org.apache.flink.table.api.ValidationException: Multiple factories for identifier 'jdbc' that implement 'org.apache.flink.table.factories.DynamicTableSinkFactory' found in the classpath

看意思是找到了两个一样的类:DynamicTableSinkFactory

代码如下: package org.apache.flink.examples;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.factories.DynamicTableSinkFactory;

public class CDC2ss2 {     public static void main(String[] args) throws Exception {         // set up execution environment         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();         StreamTableEnvironment tEnv;         EnvironmentSettings settings = EnvironmentSettings.newInstance()                 .useBlinkPlanner()                 .inStreamingMode()                 .build();         tEnv = StreamTableEnvironment.create(env, settings);         String src_sql = "CREATE TABLE userss (\n" +                 "     user_id INT,\n" +                 "     user_nm STRING\n" +                 ") WITH (\n" +                 "      'connector' = 'mysql-cdc',\n" +                 "      'hostname' = '10.12.5.37',\n" +                 "      'port' = '3306',\n" +                 "      'username' = 'dps',\n" +                 "      'password' = 'dps1234',\n" +                 "      'database-name' = 'rpt',\n" +                 "      'table-name' = 'users'\n" +                 "      )";         tEnv.executeSql(src_sql); // 创建表         String sink="CREATE TABLE sink (\n" +                 "     user_id INT,\n" +                 "     user_nm STRING,\n" +                 "     primary key(user_id)  NOT ENFORCED \n" +                 ") WITH (\n" +                 "      'connector' = 'jdbc',\n" +                 "      'url' = 'jdbc:mysql://10.0.171.171:3306/dps?useSSL=false',\n" +                 "      'username' = 'dps',\n" +                 "      'password' = 'dps1234',\n" +                 "      'table-name' = 'sink'\n" +                 "      )";         String to_print_sql="insert into sink select user_id  ,user_nm   from userss";          tEnv.executeSql(sink);         tEnv.executeSql(to_print_sql);         env.execute();     }

}

详细错误:

Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'default_catalog.default_database.sink'.

Table options are:

'connector'='jdbc'

'password'='dps1234'

'table-name'='sink'

'url'='jdbc:mysql://10.0.171.171:3306/dps?useSSL=false'

'username'='dps'

at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)

at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)

at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)

at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)

at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)

at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

at scala.collection.Iterator$class.foreach(Iterator.scala:893)

at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)

at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

at scala.collection.AbstractIterable.foreach(Iterable.scala:54)

at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)

at scala.collection.AbstractTraversable.map(Traversable.scala:104)

at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690)

at org.apache.flink.examples.CDC2ss2.main(CDC2ss2.java:50)

Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option ''connector'='jdbc''.

at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329)

at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)

... 18 more

Caused by: org.apache.flink.table.api.ValidationException: Multiple factories for identifier 'jdbc' that implement 'org.apache.flink.table.factories.DynamicTableSinkFactory' found in the classpath.

Ambiguous factory classes are:

java.util.LinkedList

java.util.LinkedList

java.util.LinkedList

java.util.LinkedList

at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:253)

at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)

... 19 more

Process finished with exit code 1*来自志愿者整理的flink邮件归档


参考回答:

你是用哪个版本的呢?有没有自己继承了 DynamicTableSinkFactory 实现的 factoryIdentifier 方法返回 JDCB 的 Connector?


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/370089?spm=a2c6h.14164896.0.0.52bb63bf2LAFqc


问题五:flink sql 读取rocketmq的数据,但是提示如下报错,请问这是啥原因?


flink sql 读取rocketmq的数据,但是提示如下报错,请问这是啥原因? file:///C:/Users/29243/AppData/Roaming/DingTalk/270913990_v2/resource_cache/7d/7da241e8d354dd345b1975ce966d7d2a.png


参考回答:

rmq社区的flink sql接口目前支持度不高,有很多bug。我提交的修复由于当前版本之前有几次没有review的合入导致master启动不了。用我这个仓库里面的这个版本吧。您可以参考一下这个链接,里面可能有您想要的答案:https://github.com/deemogsw/rocketMQ-flink-connector


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/488157?spm=a2c6h.14164896.0.0.52bb63bf2LAFqc

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
6月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
简介:本文整理自阿里云高级技术专家李麟在Flink Forward Asia 2025新加坡站的分享,介绍了Flink 2.1 SQL在实时数据处理与AI融合方面的关键进展,包括AI函数集成、Join优化及未来发展方向,助力构建高效实时AI管道。
948 43
|
6月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
本文整理自阿里云的高级技术专家、Apache Flink PMC 成员李麟老师在 Flink Forward Asia 2025 新加坡[1]站 —— 实时 AI 专场中的分享。将带来关于 Flink 2.1 版本中 SQL 在实时数据处理和 AI 方面进展的话题。
402 0
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
|
7月前
|
SQL 消息中间件 Kafka
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是 Apache Flink 提供的 SQL 引擎,支持流批一体处理,统一操作流数据与批数据,具备高性能、低延迟、丰富数据源支持及标准 SQL 兼容性,适用于实时与离线数据分析。
1052 1
|
SQL 存储 API
Flink实践:通过Flink SQL进行SFTP文件的读写操作
虽然 Apache Flink 与 SFTP 之间的直接交互存在一定的限制,但通过一些创造性的方法和技术,我们仍然可以有效地实现对 SFTP 文件的读写操作。这既展现了 Flink 在处理复杂数据场景中的强大能力,也体现了软件工程中常见的问题解决思路——即通过现有工具和一定的间接方法来克服技术障碍。通过这种方式,Flink SQL 成为了处理各种数据源,包括 SFTP 文件,在内的强大工具。
478 15
|
8月前
|
Java 调度 流计算
基于Java 17 + Spring Boot 3.2 + Flink 1.18的智慧实验室管理系统核心代码
这是一套基于Java 17、Spring Boot 3.2和Flink 1.18开发的智慧实验室管理系统核心代码。系统涵盖多协议设备接入(支持OPC UA、MQTT等12种工业协议)、实时异常检测(Flink流处理引擎实现设备状态监控)、强化学习调度(Q-Learning算法优化资源分配)、三维可视化(JavaFX与WebGL渲染实验室空间)、微服务架构(Spring Cloud构建分布式体系)及数据湖建设(Spark构建实验室数据仓库)。实际应用中,该系统显著提升了设备调度效率(响应时间从46分钟降至9秒)、设备利用率(从41%提升至89%),并大幅减少实验准备时间和维护成本。
425 0
|
SQL 大数据 数据处理
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是为应对传统数据处理框架中流批分离的问题而诞生的,它融合了SQL的简洁性和Flink的强大流批处理能力,降低了大数据处理门槛。其核心工作原理包括生成逻辑执行计划、查询优化和构建算子树,确保高效执行。Flink SQL 支持过滤、投影、聚合、连接和窗口等常用算子,实现了流批一体处理,极大提高了开发效率和代码复用性。通过统一的API和语法,Flink SQL 能够灵活应对实时和离线数据分析场景,为企业提供强大的数据处理能力。
2209 27
|
SQL 存储 缓存
Flink SQL Deduplication 去重以及如何获取最新状态操作
Flink SQL Deduplication 是一种高效的数据去重功能,支持多种数据类型和灵活的配置选项。它通过哈希表、时间窗口和状态管理等技术实现去重,适用于流处理和批处理场景。本文介绍了其特性、原理、实际案例及源码分析,帮助读者更好地理解和应用这一功能。
1018 14
|
SQL 大数据 API
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
308 0
|
关系型数据库 MySQL 网络安全
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
|
SQL 存储 监控
SQL Server的并行实施如何优化?
【7月更文挑战第23天】SQL Server的并行实施如何优化?
604 13

相关产品

  • 实时计算 Flink版