Flink报错问题之jdbc表报错如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:sql-client的jdbc表出错

你好,我创建了一个jdbc的表

创建表 mvp_dim_anticheat_args_all ( ID BIGINT, 字符串, cnt_7d INT, cnt_30d INT, 主键 (id) 未强制执行 ) 和 ( '连接器' = 'jdbc', 'driver'='com.mysql.jdbc.Driver', 'url' = 'jdbc:mysql://localhost:3306/huyou_oi', 'table-name' = 'mvp_dim_ll', '用户名' = 'huy_oi', '密码' = '420123' );

查询的时候报 [错误] 无法执行 SQL 语句。原因: java.lang.ClassNotFoundException: com.mysql.jdbc.Driver

我的安装目录lib下面有 flink-connector-jdbc_2.11-1.11.0.jar 和 mysql-connector-java-5.1.38.jar 这俩,请问是什么原因? 谢谢*来自志愿者整理的flink邮件归档



参考答案:

在 yarn 上提交作业可以,不代表通过 sqlclient 可以,他们使用的是不同的剧本和配置。 bin/yarn-session.sh, conf/flink-conf.yaml 相关,后跟 bin/sql-client.sh, conf/sql-client-defaults.yaml 关于。

你理一下这个逻辑,或者可以给你的相关配置文件,以及 sql-client.sh 启动完整命令。*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371498?spm=a2c6h.13066369.question.38.6ad26382QkTdYJ



问题二:Blink的批量模式的并行度问题

大家好,

Flink 当前的blink table planner 批处理模式 (读hdfs上的orc文件)只支持StreamTableSource和LookupableTableSource, 但是StreamTableSource的并行度默认应该是1,可能是ContinuousFileMonitoringFunction , 那如何能扩大并行度来优化性能呢?*来自志愿者整理的flink邮件归档



参考答案:

你好,

如果你是FileInputFormat,默认就是1个并行度,这个参数我尝试了并像, 看代码是创建了一个SingleOutputStreamOperator,感觉得重写下我使用的OrcInputFormat, 让他不继承FileInputFormat,像源码里的HiveInputFormat一样*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371499?spm=a2c6h.13066369.question.37.6ad263822320FJ



问题三:找不到标识符“kafka”的任何工厂

你好, Flink-1.11.1 尝试运行SQL DDL 读取kafka的数据,执行create语句的时候报错了 编译的jar包是jar-with-dependencies的

代码截图: public String ddlSql = String.format("CREATE TABLE %s (\n" + " 数字 BIGINT,\n" + " 味精字符串,\n" + " 用户名 STRING,\n" + " 更新时间时间戳(3)\n" + ") WITH (\n" + " '连接器' = '卡夫卡',\n" + " '主题' = '%s',\n" + " 'properties.bootstrap.servers' = '%s',\n" + " 'properties.group.id' = '%s',\n" + " '格式' = 'json',\n" + " 'json.fail-on-missing-field' = 'false',\n" + " 'json.ignore-parse-errors' = 'true'\n" + ")\n", 表名, 主题, 服务器, 组);

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); tableEnv.executeSql(ddlSql);

报错信息: 引起:org.apache.flink.table.api.ValidationException:在类路径中找不到实现“org.apache.flink.table.factories.DynamicTableSourceFactory”的标识符“kafka”的任何工厂。 可用的工厂标识符有: 数据源 在 org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240) 在 org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326) ……还有 33 个

参考了这个http://apache-flink.147419.n8.nabble.com/flink-1-11-executeSql-DDL-td4890.html#a4893 补充了flink-connector-kafka_2.12,flink-sql-connector-kafka_2.12,还是会报一样的错

附上pom依赖: <依赖> <依赖> org.apache.flink flink-java ${flink.version} </依赖> <依赖> org.apache.flink flink-table-api-java-bridge_2.12 ${flink.version} </依赖> <依赖> org.apache.flink flink-table-api-java ${flink.version} </依赖> <依赖> org.apache.flink flink-connector-kafka_2.12 ${flink.version} </依赖> <依赖> org.apache.flink flink-sql-connector-kafka_2.12 ${flink.version} </依赖> <依赖> org.apache.flink flink-json ${flink.version} </依赖> </依赖>*来自志愿者整理的flink邮件归档



参考答案:

可能你的打包方式有关系。你要是直接在这个程序里面运行是可以运行的么?

如果可以在idea运行,但是打出来的jar包不能运行的话,很有可能和SPI文件有关系。 如果你用的是shade plugin,需要看下这个transformer[1]

[1] https://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html#AppendingTransformer*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371501?spm=a2c6h.13066369.question.38.6ad26382eRok4A



问题四:flink聚合作业重启问题

大家好 :

请教问题,我通过程序拉取kafka消息后,注册为flink流表。然后执行sql: "select user_id, count(*)cnt from 流表",将结果写入mysql聚合表中(SINK组件为) :flink1.11 版本JdbcUpsertTableSink)。 但问题是,每次JOB重启后,之前mysql聚合表结果会被清空。我设置了检查点和racksdbbackendstate。

感谢和问候*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371503?spm=a2c6h.13066369.question.39.6ad26382VjrkhY



问题五:【flink】flink sql insert into插入语句的问题

测试Flink版本:1.11.0

Flink 支持这种语法插入吗,在插入时指定具体的内容和要插入的列

插入 tableName(col1[,col2]) 选择 col1[,col2]

通过测试发现了以下问题

建表语句:

用 () 创建表 t1(a int,b string,c int);

用 () 创建表 t2(a int,b string,c int);

问题1:测试发现插入时发现和接收器模式的匹配规则是按照定义的顺序进行

测试语句:

insert into t2 select t1.a,t1.c, t1.b from t1;

报错信息:

org.apache.flink.table.api.ValidationException:查询结果的字段类型 和注册的 TableSink default_catalog.default_database.t2 不匹配。

查询模式:[a: INT, c: INT, b: VARCHAR(2147483647)]

接收器架构:[a: INT, b: VARCHAR(2147483647), c: INT]

在 org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyI mplicitCast(TableSinkUtils.scala:100)

在 org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(Plann erBase.scala:213)

在 org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(Plann erBase.scala:204)

在 scala.Option.map(Option.scala:146)

在 org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(Planner Base.scala:204)

在 org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(Str eamPlanner.scala:98)

在 org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(Str eamPlanner.scala:80)

在 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala: 234)

在 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala: 234)

在 scala.collection.Iterator$class.foreach(Iterator.scala:891)

在 scala.collection.AbstractIterator.foreach(Iterator.scala:1334)

在 scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

在 scala.collection.AbstractIterable.foreach(Iterable.scala:54)

在 scala.collection.TraversableLike$class.map(TraversableLike.scala:234)

在 scala.collection.AbstractTraversable.map(Traversable.scala:104)

在 org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanne r.scala:80)

在 org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanne r.scala:43)

在 org.apache.flink.table.api.internal.TableEnvironmentImpl.explain(TableEnviro nmentImpl.java:632)

问题2:支持Insert into tableName(col1[,col2]) select col1[,col2]的语法,但并 没有自己,还是按照定义的顺序进行匹配

测试语句:

插入 t2(a,c,b) 从 t1 中选择 t1.a,t1.c, t1.b;

报错信息:

org.apache.flink.table.api.ValidationException:查询结果的字段类型 和注册的 TableSink default_catalog.default_database.t2 不匹配。

查询模式:[a: INT, c: INT, b: VARCHAR(2147483647)]

接收器架构:[a: INT, b: VARCHAR(2147483647), c: INT]

在 org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyI mplicitCast(TableSinkUtils.scala:100)

在 org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(Plann erBase.scala:213)

在 org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(Plann erBase.scala:204)

在 scala.Option.map(Option.scala:146)

在 org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(Planner Base.scala:204)

在 org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(Str eamPlanner.scala:98)

在 org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(Str eamPlanner.scala:80)

在 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala: 234)

在 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala: 234)

在 scala.collection.Iterator$class.foreach(Iterator.scala:891)

在 scala.collection.AbstractIterator.foreach(Iterator.scala:1334)

在 scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

在 scala.collection.AbstractIterable.foreach(Iterable.scala:54)

在 scala.collection.TraversableLike$class.map(TraversableLike.scala:234)

在 scala.collection.AbstractTraversable.map(Traversable.scala:104)

在 org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanne r.scala:80)

在 org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanne r.scala:43)

在 org.apache.flink.table.api.internal.TableEnvironmentImpl.explain(TableEnviro nmentImpl.java:632)

问题3:当插入到你的字段比sink的模式中的字段少的时候也一样

测试语句:

插入 t2(a,b)

从 t1 中选择 t1.a、t1.b;

报错信息:

org.apache.flink.table.api.ValidationException:查询结果的字段类型 和注册的 TableSink default_catalog.default_database.t2 不匹配。

查询模式:[a: INT, c: VARCHAR(2147483647)]

接收器架构:[a: INT, b: VARCHAR(2147483647), c: INT]

在 org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyI mplicitCast(TableSinkUtils.scala:100)

在 org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(Plann erBase.scala:213)

在 org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(Plann erBase.scala:204)

在 scala.Option.map(Option.scala:146)

在 org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(Planner Base.scala:204)

在 org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(Str eamPlanner.scala:98)

在 org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(Str eamPlanner.scala:80)

在 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala: 234)

在 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala: 234)

在 scala.collection.Iterator$class.foreach(Iterator.scala:891)

在 scala.collection.AbstractIterator.foreach(Iterator.scala:1334)

在 scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

在 scala.collection.AbstractIterable.foreach(Iterable.scala:54)

在 scala.collection.TraversableLike$class.map(TraversableLike.scala:234)

在 scala.collection.AbstractTraversable.map(Traversable.scala:104)

在 org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanne r.scala:80)

在 org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanne r.scala:43)

在 org.apache.flink.table.api.internal.TableEnvironmentImpl.explain(TableEnviro nmentImpl.java:632)

总结:

现在的实现限制了查询的和写人的知识,

只能找到schema定义的字段顺序才能正确的插入,

当很多时候会比较麻烦,

还有,只能插入部分列的需求也是存在的,目前不能支持*来自志愿者整理的flink邮件归档



参考答案:

你好,

Flink 目前已经不支持这个语法……我创建了一个问题[1],可以在里面查出这个特性的进展。

[1] https://issues.apache.org/jira/browse/FLINK-18726*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371505?spm=a2c6h.13066369.question.42.6ad26382kqhSrw

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
1月前
|
Java 关系型数据库 MySQL
Flink1.18.1和CDC2.4.1 本地没问题 提交任务到服务器 报错java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.mysql.MySqlConnectorConfig
【2月更文挑战第33天】Flink1.18.1和CDC2.4.1 本地没问题 提交任务到服务器 报错java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.mysql.MySqlConnectorConfig
52 2
|
1月前
|
Java 关系型数据库 MySQL
Flink CDC有见这个报错不?
【2月更文挑战第29天】Flink CDC有见这个报错不?
22 2
|
1月前
|
存储 关系型数据库 MySQL
Flink CDC产品常见问题之写hudi的时候报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
1月前
|
资源调度 关系型数据库 测试技术
Flink CDC产品常见问题之没有报错但是一直监听不到数据如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
1月前
|
缓存 监控 Java
Flink CDC产品常见问题之flink集群jps命令报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
1月前
|
Oracle 关系型数据库 MySQL
Flink CDC产品常见问题之用superset连接starrocks报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
1月前
|
Oracle 关系型数据库 MySQL
flink cdc 增量问题之增量数据会报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
3月前
|
SQL Java 关系型数据库
MySQL之JDBC(二)
MySQL之JDBC(二)
34 0
|
3月前
|
关系型数据库 MySQL Java
MySQL之JDBC(一)
MySQL之JDBC
33 0
|
2天前
|
SQL 关系型数据库 MySQL
Spring_jdbc数据连接池(mysql实现增、删、改、查)
Spring_jdbc数据连接池(mysql实现增、删、改、查)
8 0

相关产品

  • 实时计算 Flink版