Flink SQL 问题之用代码执行报错如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink SQL报错通常指在使用Apache Flink的SQL接口执行数据处理任务时遇到的问题;本合集将收集常见的Flink SQL报错情况及其解决方法,帮助用户迅速恢复数据处理流程。

问题一:请问一下,我已经使用Flink的SQL Client通过JDBC连接模式将数据从SQL Server导入到Hudi表中 然后将数据从hudi导入到SQL Server另一个表中 报错


请问一下,我已经使用Flink的SQL Client通过JDBC连接模式将数据从SQL Server导入到Hudi表中 然后将数据从hudi导入到SQL Server另一个表中 报错 Loading class com.mysql.jdbc.Driver'. This is deprecated. The new driver class iscom.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary. Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue. Caused by: java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: org/apache/flink/util/function/SerializableFunction 这是什么问题,可否使用Flink JDBC 从hudi 写入数据库呢?


参考回答:

报错信息中提到了 com.mysql.jdbc.Driver 这个类已经过时,应该使用 com.mysql.cj.jdbc.Driver 这个类。可以尝试更新 JDBC 驱动,使用新的 com.mysql.cj.jdbc.Driver 驱动类。

另外,报错信息中还提到了 java.lang.NoClassDefFoundError: org/apache/flink/util/function/SerializableFunction,这个错误可能是由于 Flink 版本不兼容导致的。建议检查 Flink 版本和相关依赖的版本是否匹配,尝试升级 Flink 版本或者降低相关依赖的版本。

关于使用 Flink JDBC 从 Hudi 写入数据库,是可以实现的。具体实现可以参考 Flink 官方文档中的示例代码,比如使用 JdbcOutputFormat 将数据写入数据库。在使用过程中,需要注意配置正确的 JDBC 连接信息和 SQL 语句,以及保证数据类型和表结构的兼容性。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/496537?spm=a2c6h.14164896.0.0.169063bfNaEsRK


问题二:请问flink sql cdc pg或者mysql 报这个错的原因是什么,报错: Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.pgsql_source'


请问flink sql cdc pg或者mysql 报这个错的原因是什么,报错: Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.pgsql_source'



参考回答:

应该是你参数哪里没写对。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/492408?spm=a2c6h.14164896.0.0.52bb63bf2LAFqc


问题三:各位大侠,flink sql cdc pg或者mysql报这个错的原因是什么,报错: Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.pgsql_source'


各位大侠,flink sql cdc pg或者mysql报这个错的原因是什么,报错: Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.pgsql_source'



参考回答:

看报错是由于 Flink SQL CDC 模块无法正确连接到 PostgreSQL 或 MySQL 数据库引起的。

检查下数据库连接信息、数据库不存在或表不存在、数据驱动、数据库访问权限等。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/492444?spm=a2c6h.14164896.0.0.52bb63bf2LAFqc


问题四:用代码执行flink sql 报错


错误:

Caused by: org.apache.flink.table.api.ValidationException: Multiple factories for identifier 'jdbc' that implement 'org.apache.flink.table.factories.DynamicTableSinkFactory' found in the classpath

看意思是找到了两个一样的类:DynamicTableSinkFactory

代码如下: package org.apache.flink.examples;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.factories.DynamicTableSinkFactory;

public class CDC2ss2 {     public static void main(String[] args) throws Exception {         // set up execution environment         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();         StreamTableEnvironment tEnv;         EnvironmentSettings settings = EnvironmentSettings.newInstance()                 .useBlinkPlanner()                 .inStreamingMode()                 .build();         tEnv = StreamTableEnvironment.create(env, settings);         String src_sql = "CREATE TABLE userss (\n" +                 "     user_id INT,\n" +                 "     user_nm STRING\n" +                 ") WITH (\n" +                 "      'connector' = 'mysql-cdc',\n" +                 "      'hostname' = '10.12.5.37',\n" +                 "      'port' = '3306',\n" +                 "      'username' = 'dps',\n" +                 "      'password' = 'dps1234',\n" +                 "      'database-name' = 'rpt',\n" +                 "      'table-name' = 'users'\n" +                 "      )";         tEnv.executeSql(src_sql); // 创建表         String sink="CREATE TABLE sink (\n" +                 "     user_id INT,\n" +                 "     user_nm STRING,\n" +                 "     primary key(user_id)  NOT ENFORCED \n" +                 ") WITH (\n" +                 "      'connector' = 'jdbc',\n" +                 "      'url' = 'jdbc:mysql://10.0.171.171:3306/dps?useSSL=false',\n" +                 "      'username' = 'dps',\n" +                 "      'password' = 'dps1234',\n" +                 "      'table-name' = 'sink'\n" +                 "      )";         String to_print_sql="insert into sink select user_id  ,user_nm   from userss";          tEnv.executeSql(sink);         tEnv.executeSql(to_print_sql);         env.execute();     }

}

详细错误:

Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'default_catalog.default_database.sink'.

Table options are:

'connector'='jdbc'

'password'='dps1234'

'table-name'='sink'

'url'='jdbc:mysql://10.0.171.171:3306/dps?useSSL=false'

'username'='dps'

at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)

at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)

at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)

at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)

at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)

at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

at scala.collection.Iterator$class.foreach(Iterator.scala:893)

at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)

at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

at scala.collection.AbstractIterable.foreach(Iterable.scala:54)

at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)

at scala.collection.AbstractTraversable.map(Traversable.scala:104)

at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690)

at org.apache.flink.examples.CDC2ss2.main(CDC2ss2.java:50)

Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option ''connector'='jdbc''.

at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329)

at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)

... 18 more

Caused by: org.apache.flink.table.api.ValidationException: Multiple factories for identifier 'jdbc' that implement 'org.apache.flink.table.factories.DynamicTableSinkFactory' found in the classpath.

Ambiguous factory classes are:

java.util.LinkedList

java.util.LinkedList

java.util.LinkedList

java.util.LinkedList

at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:253)

at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)

... 19 more

Process finished with exit code 1*来自志愿者整理的flink邮件归档


参考回答:

你是用哪个版本的呢?有没有自己继承了 DynamicTableSinkFactory 实现的 factoryIdentifier 方法返回 JDCB 的 Connector?


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/370089?spm=a2c6h.14164896.0.0.52bb63bf2LAFqc


问题五:flink sql 读取rocketmq的数据,但是提示如下报错,请问这是啥原因?


flink sql 读取rocketmq的数据,但是提示如下报错,请问这是啥原因? file:///C:/Users/29243/AppData/Roaming/DingTalk/270913990_v2/resource_cache/7d/7da241e8d354dd345b1975ce966d7d2a.png


参考回答:

rmq社区的flink sql接口目前支持度不高,有很多bug。我提交的修复由于当前版本之前有几次没有review的合入导致master启动不了。用我这个仓库里面的这个版本吧。您可以参考一下这个链接,里面可能有您想要的答案:https://github.com/deemogsw/rocketMQ-flink-connector


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/488157?spm=a2c6h.14164896.0.0.52bb63bf2LAFqc

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
SQL 自然语言处理 数据库
【Azure Developer】分享两段Python代码处理表格(CSV格式)数据 : 根据每列的内容生成SQL语句
本文介绍了使用Python Pandas处理数据收集任务中格式不统一的问题。针对两种情况:服务名对应多人拥有状态(1/0表示),以及服务名与人名重复列的情况,分别采用双层for循环和字典数据结构实现数据转换,最终生成Name对应的Services列表(逗号分隔)。此方法高效解决大量数据的人工处理难题,减少错误并提升效率。文中附带代码示例及执行结果截图,便于理解和实践。
|
2月前
|
SQL Java 数据库连接
【YashanDB知识库】解决mybatis的mapper文件sql语句结尾加分号";"报错
【YashanDB知识库】解决mybatis的mapper文件sql语句结尾加分号";"报错
|
2月前
|
SQL
【YashanDB知识库】使用leading hint调整SQL执行计划后报错YAS-04522 invalid hint leading
【YashanDB知识库】使用leading hint调整SQL执行计划后报错YAS-04522 invalid hint leading
【YashanDB知识库】使用leading hint调整SQL执行计划后报错YAS-04522 invalid hint leading
|
2月前
|
SQL Java 数据库连接
【YashanDB 知识库】解决 mybatis 的 mapper 文件 sql 语句结尾加分号";"报错
【YashanDB 知识库】解决 mybatis 的 mapper 文件 sql 语句结尾加分号";"报错
|
7月前
|
SQL 关系型数据库 MySQL
这样的SQL执行为什么不会报错?optimizer_trace深度历险
【10月更文挑战第12天】本文探讨了一条看似错误但实际上能成功执行的SQL语句,通过开启MySQL的优化器追踪功能,详细分析了SQL的执行过程,揭示了子查询被优化器解析为连接操作的原因,最终解释了为何该SQL不会报错。文章不仅增进了对SQL优化机制的理解,也展示了如何利用优化器追踪解决实际问题。
|
3月前
|
SQL 数据库
数据库数据恢复—SQL Server报错“错误 823”的数据恢复案例
SQL Server数据库附加数据库过程中比较常见的报错是“错误 823”,附加数据库失败。 如果数据库有备份则只需还原备份即可。但是如果没有备份,备份时间太久,或者其他原因导致备份不可用,那么就需要通过专业手段对数据库进行数据恢复。
|
3月前
|
SQL
【YashanDB 知识库】使用 leading hint 调整 SQL 执行计划后报错 YAS-04522 invalid hint leading
在 YashanDB 的所有版本中,使用 leading hint 调整 SQL 执行计划时可能出现“YAS-04522 invalid hint leading”错误,导致 SQL 无法正常执行。原因是 YashanDB 优化器的 Bug。解决方法为避免使用 leading hint。可通过创建测试表 a、b、c 并执行特定 SQL 语句来验证问题是否存在。
|
4月前
|
SQL 大数据 数据处理
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是为应对传统数据处理框架中流批分离的问题而诞生的,它融合了SQL的简洁性和Flink的强大流批处理能力,降低了大数据处理门槛。其核心工作原理包括生成逻辑执行计划、查询优化和构建算子树,确保高效执行。Flink SQL 支持过滤、投影、聚合、连接和窗口等常用算子,实现了流批一体处理,极大提高了开发效率和代码复用性。通过统一的API和语法,Flink SQL 能够灵活应对实时和离线数据分析场景,为企业提供强大的数据处理能力。
582 26
|
4月前
|
SQL Java 数据库连接
如何在 Java 代码中使用 JSqlParser 解析复杂的 SQL 语句?
大家好,我是 V 哥。JSqlParser 是一个用于解析 SQL 语句的 Java 库,可将 SQL 解析为 Java 对象树,支持多种 SQL 类型(如 `SELECT`、`INSERT` 等)。它适用于 SQL 分析、修改、生成和验证等场景。通过 Maven 或 Gradle 安装后,可以方便地在 Java 代码中使用。
1207 11
|
5月前
|
SQL 存储 缓存
Flink SQL Deduplication 去重以及如何获取最新状态操作
Flink SQL Deduplication 是一种高效的数据去重功能,支持多种数据类型和灵活的配置选项。它通过哈希表、时间窗口和状态管理等技术实现去重,适用于流处理和批处理场景。本文介绍了其特性、原理、实际案例及源码分析,帮助读者更好地理解和应用这一功能。
348 14

相关产品

  • 实时计算 Flink版