Flink SQL 问题之用代码执行报错如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink SQL报错通常指在使用Apache Flink的SQL接口执行数据处理任务时遇到的问题;本合集将收集常见的Flink SQL报错情况及其解决方法,帮助用户迅速恢复数据处理流程。

问题一:请问一下,我已经使用Flink的SQL Client通过JDBC连接模式将数据从SQL Server导入到Hudi表中 然后将数据从hudi导入到SQL Server另一个表中 报错


请问一下,我已经使用Flink的SQL Client通过JDBC连接模式将数据从SQL Server导入到Hudi表中 然后将数据从hudi导入到SQL Server另一个表中 报错 Loading class com.mysql.jdbc.Driver'. This is deprecated. The new driver class iscom.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary. Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue. Caused by: java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: org/apache/flink/util/function/SerializableFunction 这是什么问题,可否使用Flink JDBC 从hudi 写入数据库呢?


参考回答:

报错信息中提到了 com.mysql.jdbc.Driver 这个类已经过时,应该使用 com.mysql.cj.jdbc.Driver 这个类。可以尝试更新 JDBC 驱动,使用新的 com.mysql.cj.jdbc.Driver 驱动类。

另外,报错信息中还提到了 java.lang.NoClassDefFoundError: org/apache/flink/util/function/SerializableFunction,这个错误可能是由于 Flink 版本不兼容导致的。建议检查 Flink 版本和相关依赖的版本是否匹配,尝试升级 Flink 版本或者降低相关依赖的版本。

关于使用 Flink JDBC 从 Hudi 写入数据库,是可以实现的。具体实现可以参考 Flink 官方文档中的示例代码,比如使用 JdbcOutputFormat 将数据写入数据库。在使用过程中,需要注意配置正确的 JDBC 连接信息和 SQL 语句,以及保证数据类型和表结构的兼容性。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/496537?spm=a2c6h.14164896.0.0.169063bfNaEsRK


问题二:请问flink sql cdc pg或者mysql 报这个错的原因是什么,报错: Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.pgsql_source'


请问flink sql cdc pg或者mysql 报这个错的原因是什么,报错: Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.pgsql_source'



参考回答:

应该是你参数哪里没写对。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/492408?spm=a2c6h.14164896.0.0.52bb63bf2LAFqc


问题三:各位大侠,flink sql cdc pg或者mysql报这个错的原因是什么,报错: Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.pgsql_source'


各位大侠,flink sql cdc pg或者mysql报这个错的原因是什么,报错: Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.pgsql_source'



参考回答:

看报错是由于 Flink SQL CDC 模块无法正确连接到 PostgreSQL 或 MySQL 数据库引起的。

检查下数据库连接信息、数据库不存在或表不存在、数据驱动、数据库访问权限等。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/492444?spm=a2c6h.14164896.0.0.52bb63bf2LAFqc


问题四:用代码执行flink sql 报错


错误:

Caused by: org.apache.flink.table.api.ValidationException: Multiple factories for identifier 'jdbc' that implement 'org.apache.flink.table.factories.DynamicTableSinkFactory' found in the classpath

看意思是找到了两个一样的类:DynamicTableSinkFactory

代码如下: package org.apache.flink.examples;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.factories.DynamicTableSinkFactory;

public class CDC2ss2 {     public static void main(String[] args) throws Exception {         // set up execution environment         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();         StreamTableEnvironment tEnv;         EnvironmentSettings settings = EnvironmentSettings.newInstance()                 .useBlinkPlanner()                 .inStreamingMode()                 .build();         tEnv = StreamTableEnvironment.create(env, settings);         String src_sql = "CREATE TABLE userss (\n" +                 "     user_id INT,\n" +                 "     user_nm STRING\n" +                 ") WITH (\n" +                 "      'connector' = 'mysql-cdc',\n" +                 "      'hostname' = '10.12.5.37',\n" +                 "      'port' = '3306',\n" +                 "      'username' = 'dps',\n" +                 "      'password' = 'dps1234',\n" +                 "      'database-name' = 'rpt',\n" +                 "      'table-name' = 'users'\n" +                 "      )";         tEnv.executeSql(src_sql); // 创建表         String sink="CREATE TABLE sink (\n" +                 "     user_id INT,\n" +                 "     user_nm STRING,\n" +                 "     primary key(user_id)  NOT ENFORCED \n" +                 ") WITH (\n" +                 "      'connector' = 'jdbc',\n" +                 "      'url' = 'jdbc:mysql://10.0.171.171:3306/dps?useSSL=false',\n" +                 "      'username' = 'dps',\n" +                 "      'password' = 'dps1234',\n" +                 "      'table-name' = 'sink'\n" +                 "      )";         String to_print_sql="insert into sink select user_id  ,user_nm   from userss";          tEnv.executeSql(sink);         tEnv.executeSql(to_print_sql);         env.execute();     }

}

详细错误:

Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'default_catalog.default_database.sink'.

Table options are:

'connector'='jdbc'

'password'='dps1234'

'table-name'='sink'

'url'='jdbc:mysql://10.0.171.171:3306/dps?useSSL=false'

'username'='dps'

at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)

at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)

at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)

at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)

at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)

at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

at scala.collection.Iterator$class.foreach(Iterator.scala:893)

at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)

at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

at scala.collection.AbstractIterable.foreach(Iterable.scala:54)

at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)

at scala.collection.AbstractTraversable.map(Traversable.scala:104)

at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690)

at org.apache.flink.examples.CDC2ss2.main(CDC2ss2.java:50)

Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option ''connector'='jdbc''.

at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329)

at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)

... 18 more

Caused by: org.apache.flink.table.api.ValidationException: Multiple factories for identifier 'jdbc' that implement 'org.apache.flink.table.factories.DynamicTableSinkFactory' found in the classpath.

Ambiguous factory classes are:

java.util.LinkedList

java.util.LinkedList

java.util.LinkedList

java.util.LinkedList

at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:253)

at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)

... 19 more

Process finished with exit code 1*来自志愿者整理的flink邮件归档


参考回答:

你是用哪个版本的呢?有没有自己继承了 DynamicTableSinkFactory 实现的 factoryIdentifier 方法返回 JDCB 的 Connector?


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/370089?spm=a2c6h.14164896.0.0.52bb63bf2LAFqc


问题五:flink sql 读取rocketmq的数据,但是提示如下报错,请问这是啥原因?


flink sql 读取rocketmq的数据,但是提示如下报错,请问这是啥原因? file:///C:/Users/29243/AppData/Roaming/DingTalk/270913990_v2/resource_cache/7d/7da241e8d354dd345b1975ce966d7d2a.png


参考回答:

rmq社区的flink sql接口目前支持度不高,有很多bug。我提交的修复由于当前版本之前有几次没有review的合入导致master启动不了。用我这个仓库里面的这个版本吧。您可以参考一下这个链接,里面可能有您想要的答案:https://github.com/deemogsw/rocketMQ-flink-connector


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/488157?spm=a2c6h.14164896.0.0.52bb63bf2LAFqc

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
8天前
|
SQL XML Java
mybatis 调用修改SQL时 出现了一个问题 没有修改成功也没有报错
mybatis 调用修改SQL时 出现了一个问题 没有修改成功也没有报错
16 0
|
27天前
|
Java 开发工具 流计算
flink最新master代码编译出现Java Runtime Environment 问题
在尝试编译Flink源码时遇到Java运行时环境致命错误:EXCEPTION_ACCESS_VIOLATION。问题出现在JVM.dll+0x88212。使用的是Java 11.0.28和Java HotSpot(TM) 64-Bit Server VM。系统为Windows客户端,没有生成核心dump文件。错误日志保存在hs_err_pid39364.log和replay_pid39364.log。要解决这个问题,建议检查JDK版本兼容性,更新JDK或参照错误报告文件提交Bug至http://bugreport.java.com/bugreport/crash.jsp。
|
2天前
|
SQL 关系型数据库 分布式数据库
数据管理DMS操作报错合集之DMS SQL执行失败且无法看到原因,如何解决
数据管理DMS(Data Management Service)是阿里云提供的数据库管理和运维服务,它支持多种数据库类型,包括RDS、PolarDB、MongoDB等。在使用DMS进行数据库操作时,可能会遇到各种报错情况。以下是一些常见的DMS操作报错及其可能的原因与解决措施的合集。
|
7天前
|
SQL
SQL Server2008 安装报错Restart computer failed的解决办法
SQL Server2008 安装报错Restart computer failed的解决办法
10 0
|
2月前
|
SQL JSON Kubernetes
Seata常见问题之服务端 error日志没有输出,客户端执行sql报错如何解决
Seata 是一个开源的分布式事务解决方案,旨在提供高效且简单的事务协调机制,以解决微服务架构下跨服务调用(分布式场景)的一致性问题。以下是Seata常见问题的一个合集
108 0
|
2月前
|
SQL 存储 Kubernetes
Seata常见问题之mybatisplus的批量插入方法报SQL错误如何解决
Seata 是一个开源的分布式事务解决方案,旨在提供高效且简单的事务协调机制,以解决微服务架构下跨服务调用(分布式场景)的一致性问题。以下是Seata常见问题的一个合集
43 0
|
2月前
|
SQL XML Java
整理几个常用的sql和其他代码
整理几个常用的sql和其他代码
12 1
|
2月前
|
SQL 数据库 索引
解决SQL报错:索引中丢失IN或OUT參数
解决SQL报错:索引中丢失IN或OUT參数
|
2月前
|
SQL 资源调度 Oracle
Flink CDC产品常见问题之sql运行中查看日志任务失败如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
19天前
|
SQL 人工智能 算法
【SQL server】玩转SQL server数据库:第二章 关系数据库
【SQL server】玩转SQL server数据库:第二章 关系数据库
61 10

相关产品

  • 实时计算 Flink版