Flink SQL 问题之用代码执行报错如何解决

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: Flink SQL报错通常指在使用Apache Flink的SQL接口执行数据处理任务时遇到的问题;本合集将收集常见的Flink SQL报错情况及其解决方法,帮助用户迅速恢复数据处理流程。

问题一:请问一下,我已经使用Flink的SQL Client通过JDBC连接模式将数据从SQL Server导入到Hudi表中 然后将数据从hudi导入到SQL Server另一个表中 报错


请问一下,我已经使用Flink的SQL Client通过JDBC连接模式将数据从SQL Server导入到Hudi表中 然后将数据从hudi导入到SQL Server另一个表中 报错 Loading class com.mysql.jdbc.Driver'. This is deprecated. The new driver class iscom.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary. Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue. Caused by: java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: org/apache/flink/util/function/SerializableFunction 这是什么问题,可否使用Flink JDBC 从hudi 写入数据库呢?


参考回答:

报错信息中提到了 com.mysql.jdbc.Driver 这个类已经过时,应该使用 com.mysql.cj.jdbc.Driver 这个类。可以尝试更新 JDBC 驱动,使用新的 com.mysql.cj.jdbc.Driver 驱动类。

另外,报错信息中还提到了 java.lang.NoClassDefFoundError: org/apache/flink/util/function/SerializableFunction,这个错误可能是由于 Flink 版本不兼容导致的。建议检查 Flink 版本和相关依赖的版本是否匹配,尝试升级 Flink 版本或者降低相关依赖的版本。

关于使用 Flink JDBC 从 Hudi 写入数据库,是可以实现的。具体实现可以参考 Flink 官方文档中的示例代码,比如使用 JdbcOutputFormat 将数据写入数据库。在使用过程中,需要注意配置正确的 JDBC 连接信息和 SQL 语句,以及保证数据类型和表结构的兼容性。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/496537?spm=a2c6h.14164896.0.0.169063bfNaEsRK


问题二:请问flink sql cdc pg或者mysql 报这个错的原因是什么,报错: Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.pgsql_source'


请问flink sql cdc pg或者mysql 报这个错的原因是什么,报错: Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.pgsql_source'



参考回答:

应该是你参数哪里没写对。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/492408?spm=a2c6h.14164896.0.0.52bb63bf2LAFqc


问题三:各位大侠,flink sql cdc pg或者mysql报这个错的原因是什么,报错: Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.pgsql_source'


各位大侠,flink sql cdc pg或者mysql报这个错的原因是什么,报错: Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.pgsql_source'



参考回答:

看报错是由于 Flink SQL CDC 模块无法正确连接到 PostgreSQL 或 MySQL 数据库引起的。

检查下数据库连接信息、数据库不存在或表不存在、数据驱动、数据库访问权限等。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/492444?spm=a2c6h.14164896.0.0.52bb63bf2LAFqc


问题四:用代码执行flink sql 报错


错误:

Caused by: org.apache.flink.table.api.ValidationException: Multiple factories for identifier 'jdbc' that implement 'org.apache.flink.table.factories.DynamicTableSinkFactory' found in the classpath

看意思是找到了两个一样的类:DynamicTableSinkFactory

代码如下: package org.apache.flink.examples;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.factories.DynamicTableSinkFactory;

public class CDC2ss2 {     public static void main(String[] args) throws Exception {         // set up execution environment         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();         StreamTableEnvironment tEnv;         EnvironmentSettings settings = EnvironmentSettings.newInstance()                 .useBlinkPlanner()                 .inStreamingMode()                 .build();         tEnv = StreamTableEnvironment.create(env, settings);         String src_sql = "CREATE TABLE userss (\n" +                 "     user_id INT,\n" +                 "     user_nm STRING\n" +                 ") WITH (\n" +                 "      'connector' = 'mysql-cdc',\n" +                 "      'hostname' = '10.12.5.37',\n" +                 "      'port' = '3306',\n" +                 "      'username' = 'dps',\n" +                 "      'password' = 'dps1234',\n" +                 "      'database-name' = 'rpt',\n" +                 "      'table-name' = 'users'\n" +                 "      )";         tEnv.executeSql(src_sql); // 创建表         String sink="CREATE TABLE sink (\n" +                 "     user_id INT,\n" +                 "     user_nm STRING,\n" +                 "     primary key(user_id)  NOT ENFORCED \n" +                 ") WITH (\n" +                 "      'connector' = 'jdbc',\n" +                 "      'url' = 'jdbc:mysql://10.0.171.171:3306/dps?useSSL=false',\n" +                 "      'username' = 'dps',\n" +                 "      'password' = 'dps1234',\n" +                 "      'table-name' = 'sink'\n" +                 "      )";         String to_print_sql="insert into sink select user_id  ,user_nm   from userss";          tEnv.executeSql(sink);         tEnv.executeSql(to_print_sql);         env.execute();     }

}

详细错误:

Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'default_catalog.default_database.sink'.

Table options are:

'connector'='jdbc'

'password'='dps1234'

'table-name'='sink'

'url'='jdbc:mysql://10.0.171.171:3306/dps?useSSL=false'

'username'='dps'

at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)

at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)

at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)

at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)

at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)

at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

at scala.collection.Iterator$class.foreach(Iterator.scala:893)

at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)

at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

at scala.collection.AbstractIterable.foreach(Iterable.scala:54)

at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)

at scala.collection.AbstractTraversable.map(Traversable.scala:104)

at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690)

at org.apache.flink.examples.CDC2ss2.main(CDC2ss2.java:50)

Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option ''connector'='jdbc''.

at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329)

at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)

... 18 more

Caused by: org.apache.flink.table.api.ValidationException: Multiple factories for identifier 'jdbc' that implement 'org.apache.flink.table.factories.DynamicTableSinkFactory' found in the classpath.

Ambiguous factory classes are:

java.util.LinkedList

java.util.LinkedList

java.util.LinkedList

java.util.LinkedList

at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:253)

at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)

... 19 more

Process finished with exit code 1*来自志愿者整理的flink邮件归档


参考回答:

你是用哪个版本的呢?有没有自己继承了 DynamicTableSinkFactory 实现的 factoryIdentifier 方法返回 JDCB 的 Connector?


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/370089?spm=a2c6h.14164896.0.0.52bb63bf2LAFqc


问题五:flink sql 读取rocketmq的数据,但是提示如下报错,请问这是啥原因?


flink sql 读取rocketmq的数据,但是提示如下报错,请问这是啥原因? file:///C:/Users/29243/AppData/Roaming/DingTalk/270913990_v2/resource_cache/7d/7da241e8d354dd345b1975ce966d7d2a.png


参考回答:

rmq社区的flink sql接口目前支持度不高,有很多bug。我提交的修复由于当前版本之前有几次没有review的合入导致master启动不了。用我这个仓库里面的这个版本吧。您可以参考一下这个链接,里面可能有您想要的答案:https://github.com/deemogsw/rocketMQ-flink-connector


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/488157?spm=a2c6h.14164896.0.0.52bb63bf2LAFqc

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
6月前
|
SQL 自然语言处理 数据库
【Azure Developer】分享两段Python代码处理表格(CSV格式)数据 : 根据每列的内容生成SQL语句
本文介绍了使用Python Pandas处理数据收集任务中格式不统一的问题。针对两种情况:服务名对应多人拥有状态(1/0表示),以及服务名与人名重复列的情况,分别采用双层for循环和字典数据结构实现数据转换,最终生成Name对应的Services列表(逗号分隔)。此方法高效解决大量数据的人工处理难题,减少错误并提升效率。文中附带代码示例及执行结果截图,便于理解和实践。
163 4
|
6月前
|
SQL Java 数据库连接
【YashanDB知识库】解决mybatis的mapper文件sql语句结尾加分号";"报错
【YashanDB知识库】解决mybatis的mapper文件sql语句结尾加分号";"报错
|
3月前
|
Java 调度 流计算
基于Java 17 + Spring Boot 3.2 + Flink 1.18的智慧实验室管理系统核心代码
这是一套基于Java 17、Spring Boot 3.2和Flink 1.18开发的智慧实验室管理系统核心代码。系统涵盖多协议设备接入(支持OPC UA、MQTT等12种工业协议)、实时异常检测(Flink流处理引擎实现设备状态监控)、强化学习调度(Q-Learning算法优化资源分配)、三维可视化(JavaFX与WebGL渲染实验室空间)、微服务架构(Spring Cloud构建分布式体系)及数据湖建设(Spark构建实验室数据仓库)。实际应用中,该系统显著提升了设备调度效率(响应时间从46分钟降至9秒)、设备利用率(从41%提升至89%),并大幅减少实验准备时间和维护成本。
261 0
|
6月前
|
SQL
【YashanDB知识库】使用leading hint调整SQL执行计划后报错YAS-04522 invalid hint leading
【YashanDB知识库】使用leading hint调整SQL执行计划后报错YAS-04522 invalid hint leading
【YashanDB知识库】使用leading hint调整SQL执行计划后报错YAS-04522 invalid hint leading
|
6月前
|
SQL Java 数据库连接
【YashanDB 知识库】解决 mybatis 的 mapper 文件 sql 语句结尾加分号";"报错
【YashanDB 知识库】解决 mybatis 的 mapper 文件 sql 语句结尾加分号";"报错
|
7月前
|
SQL 数据库
数据库数据恢复—SQL Server报错“错误 823”的数据恢复案例
SQL Server数据库附加数据库过程中比较常见的报错是“错误 823”,附加数据库失败。 如果数据库有备份则只需还原备份即可。但是如果没有备份,备份时间太久,或者其他原因导致备份不可用,那么就需要通过专业手段对数据库进行数据恢复。
|
7月前
|
SQL
【YashanDB 知识库】使用 leading hint 调整 SQL 执行计划后报错 YAS-04522 invalid hint leading
在 YashanDB 的所有版本中,使用 leading hint 调整 SQL 执行计划时可能出现“YAS-04522 invalid hint leading”错误,导致 SQL 无法正常执行。原因是 YashanDB 优化器的 Bug。解决方法为避免使用 leading hint。可通过创建测试表 a、b、c 并执行特定 SQL 语句来验证问题是否存在。
|
8月前
|
SQL 大数据 数据处理
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是为应对传统数据处理框架中流批分离的问题而诞生的,它融合了SQL的简洁性和Flink的强大流批处理能力,降低了大数据处理门槛。其核心工作原理包括生成逻辑执行计划、查询优化和构建算子树,确保高效执行。Flink SQL 支持过滤、投影、聚合、连接和窗口等常用算子,实现了流批一体处理,极大提高了开发效率和代码复用性。通过统一的API和语法,Flink SQL 能够灵活应对实时和离线数据分析场景,为企业提供强大的数据处理能力。
1643 27
|
8月前
|
SQL Java 数据库连接
如何在 Java 代码中使用 JSqlParser 解析复杂的 SQL 语句?
大家好,我是 V 哥。JSqlParser 是一个用于解析 SQL 语句的 Java 库,可将 SQL 解析为 Java 对象树,支持多种 SQL 类型(如 `SELECT`、`INSERT` 等)。它适用于 SQL 分析、修改、生成和验证等场景。通过 Maven 或 Gradle 安装后,可以方便地在 Java 代码中使用。
2652 11

相关产品

  • 实时计算 Flink版