Flink报错问题之jdbc表报错如何解决

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:sql-client的jdbc表出错

你好,我创建了一个jdbc的表

创建表 mvp_dim_anticheat_args_all ( ID BIGINT, 字符串, cnt_7d INT, cnt_30d INT, 主键 (id) 未强制执行 ) 和 ( '连接器' = 'jdbc', 'driver'='com.mysql.jdbc.Driver', 'url' = 'jdbc:mysql://localhost:3306/huyou_oi', 'table-name' = 'mvp_dim_ll', '用户名' = 'huy_oi', '密码' = '420123' );

查询的时候报 [错误] 无法执行 SQL 语句。原因: java.lang.ClassNotFoundException: com.mysql.jdbc.Driver

我的安装目录lib下面有 flink-connector-jdbc_2.11-1.11.0.jar 和 mysql-connector-java-5.1.38.jar 这俩,请问是什么原因? 谢谢*来自志愿者整理的flink邮件归档



参考答案:

在 yarn 上提交作业可以,不代表通过 sqlclient 可以,他们使用的是不同的剧本和配置。 bin/yarn-session.sh, conf/flink-conf.yaml 相关,后跟 bin/sql-client.sh, conf/sql-client-defaults.yaml 关于。

你理一下这个逻辑,或者可以给你的相关配置文件,以及 sql-client.sh 启动完整命令。*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371498?spm=a2c6h.13066369.question.38.6ad26382QkTdYJ



问题二:Blink的批量模式的并行度问题

大家好,

Flink 当前的blink table planner 批处理模式 (读hdfs上的orc文件)只支持StreamTableSource和LookupableTableSource, 但是StreamTableSource的并行度默认应该是1,可能是ContinuousFileMonitoringFunction , 那如何能扩大并行度来优化性能呢?*来自志愿者整理的flink邮件归档



参考答案:

你好,

如果你是FileInputFormat,默认就是1个并行度,这个参数我尝试了并像, 看代码是创建了一个SingleOutputStreamOperator,感觉得重写下我使用的OrcInputFormat, 让他不继承FileInputFormat,像源码里的HiveInputFormat一样*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371499?spm=a2c6h.13066369.question.37.6ad263822320FJ



问题三:找不到标识符“kafka”的任何工厂

你好, Flink-1.11.1 尝试运行SQL DDL 读取kafka的数据,执行create语句的时候报错了 编译的jar包是jar-with-dependencies的

代码截图: public String ddlSql = String.format("CREATE TABLE %s (\n" + " 数字 BIGINT,\n" + " 味精字符串,\n" + " 用户名 STRING,\n" + " 更新时间时间戳(3)\n" + ") WITH (\n" + " '连接器' = '卡夫卡',\n" + " '主题' = '%s',\n" + " 'properties.bootstrap.servers' = '%s',\n" + " 'properties.group.id' = '%s',\n" + " '格式' = 'json',\n" + " 'json.fail-on-missing-field' = 'false',\n" + " 'json.ignore-parse-errors' = 'true'\n" + ")\n", 表名, 主题, 服务器, 组);

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); tableEnv.executeSql(ddlSql);

报错信息: 引起:org.apache.flink.table.api.ValidationException:在类路径中找不到实现“org.apache.flink.table.factories.DynamicTableSourceFactory”的标识符“kafka”的任何工厂。 可用的工厂标识符有: 数据源 在 org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240) 在 org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326) ……还有 33 个

参考了这个http://apache-flink.147419.n8.nabble.com/flink-1-11-executeSql-DDL-td4890.html#a4893 补充了flink-connector-kafka_2.12,flink-sql-connector-kafka_2.12,还是会报一样的错

附上pom依赖: <依赖> <依赖> org.apache.flink flink-java ${flink.version} </依赖> <依赖> org.apache.flink flink-table-api-java-bridge_2.12 ${flink.version} </依赖> <依赖> org.apache.flink flink-table-api-java ${flink.version} </依赖> <依赖> org.apache.flink flink-connector-kafka_2.12 ${flink.version} </依赖> <依赖> org.apache.flink flink-sql-connector-kafka_2.12 ${flink.version} </依赖> <依赖> org.apache.flink flink-json ${flink.version} </依赖> </依赖>*来自志愿者整理的flink邮件归档



参考答案:

可能你的打包方式有关系。你要是直接在这个程序里面运行是可以运行的么?

如果可以在idea运行,但是打出来的jar包不能运行的话,很有可能和SPI文件有关系。 如果你用的是shade plugin,需要看下这个transformer[1]

[1] https://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html#AppendingTransformer*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371501?spm=a2c6h.13066369.question.38.6ad26382eRok4A



问题四:flink聚合作业重启问题

大家好 :

请教问题,我通过程序拉取kafka消息后,注册为flink流表。然后执行sql: "select user_id, count(*)cnt from 流表",将结果写入mysql聚合表中(SINK组件为) :flink1.11 版本JdbcUpsertTableSink)。 但问题是,每次JOB重启后,之前mysql聚合表结果会被清空。我设置了检查点和racksdbbackendstate。

感谢和问候*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371503?spm=a2c6h.13066369.question.39.6ad26382VjrkhY



问题五:【flink】flink sql insert into插入语句的问题

测试Flink版本:1.11.0

Flink 支持这种语法插入吗,在插入时指定具体的内容和要插入的列

插入 tableName(col1[,col2]) 选择 col1[,col2]

通过测试发现了以下问题

建表语句:

用 () 创建表 t1(a int,b string,c int);

用 () 创建表 t2(a int,b string,c int);

问题1:测试发现插入时发现和接收器模式的匹配规则是按照定义的顺序进行

测试语句:

insert into t2 select t1.a,t1.c, t1.b from t1;

报错信息:

org.apache.flink.table.api.ValidationException:查询结果的字段类型 和注册的 TableSink default_catalog.default_database.t2 不匹配。

查询模式:[a: INT, c: INT, b: VARCHAR(2147483647)]

接收器架构:[a: INT, b: VARCHAR(2147483647), c: INT]

在 org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyI mplicitCast(TableSinkUtils.scala:100)

在 org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(Plann erBase.scala:213)

在 org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(Plann erBase.scala:204)

在 scala.Option.map(Option.scala:146)

在 org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(Planner Base.scala:204)

在 org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(Str eamPlanner.scala:98)

在 org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(Str eamPlanner.scala:80)

在 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala: 234)

在 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala: 234)

在 scala.collection.Iterator$class.foreach(Iterator.scala:891)

在 scala.collection.AbstractIterator.foreach(Iterator.scala:1334)

在 scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

在 scala.collection.AbstractIterable.foreach(Iterable.scala:54)

在 scala.collection.TraversableLike$class.map(TraversableLike.scala:234)

在 scala.collection.AbstractTraversable.map(Traversable.scala:104)

在 org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanne r.scala:80)

在 org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanne r.scala:43)

在 org.apache.flink.table.api.internal.TableEnvironmentImpl.explain(TableEnviro nmentImpl.java:632)

问题2:支持Insert into tableName(col1[,col2]) select col1[,col2]的语法,但并 没有自己,还是按照定义的顺序进行匹配

测试语句:

插入 t2(a,c,b) 从 t1 中选择 t1.a,t1.c, t1.b;

报错信息:

org.apache.flink.table.api.ValidationException:查询结果的字段类型 和注册的 TableSink default_catalog.default_database.t2 不匹配。

查询模式:[a: INT, c: INT, b: VARCHAR(2147483647)]

接收器架构:[a: INT, b: VARCHAR(2147483647), c: INT]

在 org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyI mplicitCast(TableSinkUtils.scala:100)

在 org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(Plann erBase.scala:213)

在 org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(Plann erBase.scala:204)

在 scala.Option.map(Option.scala:146)

在 org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(Planner Base.scala:204)

在 org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(Str eamPlanner.scala:98)

在 org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(Str eamPlanner.scala:80)

在 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala: 234)

在 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala: 234)

在 scala.collection.Iterator$class.foreach(Iterator.scala:891)

在 scala.collection.AbstractIterator.foreach(Iterator.scala:1334)

在 scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

在 scala.collection.AbstractIterable.foreach(Iterable.scala:54)

在 scala.collection.TraversableLike$class.map(TraversableLike.scala:234)

在 scala.collection.AbstractTraversable.map(Traversable.scala:104)

在 org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanne r.scala:80)

在 org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanne r.scala:43)

在 org.apache.flink.table.api.internal.TableEnvironmentImpl.explain(TableEnviro nmentImpl.java:632)

问题3:当插入到你的字段比sink的模式中的字段少的时候也一样

测试语句:

插入 t2(a,b)

从 t1 中选择 t1.a、t1.b;

报错信息:

org.apache.flink.table.api.ValidationException:查询结果的字段类型 和注册的 TableSink default_catalog.default_database.t2 不匹配。

查询模式:[a: INT, c: VARCHAR(2147483647)]

接收器架构:[a: INT, b: VARCHAR(2147483647), c: INT]

在 org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyI mplicitCast(TableSinkUtils.scala:100)

在 org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(Plann erBase.scala:213)

在 org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(Plann erBase.scala:204)

在 scala.Option.map(Option.scala:146)

在 org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(Planner Base.scala:204)

在 org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(Str eamPlanner.scala:98)

在 org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(Str eamPlanner.scala:80)

在 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala: 234)

在 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala: 234)

在 scala.collection.Iterator$class.foreach(Iterator.scala:891)

在 scala.collection.AbstractIterator.foreach(Iterator.scala:1334)

在 scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

在 scala.collection.AbstractIterable.foreach(Iterable.scala:54)

在 scala.collection.TraversableLike$class.map(TraversableLike.scala:234)

在 scala.collection.AbstractTraversable.map(Traversable.scala:104)

在 org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanne r.scala:80)

在 org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanne r.scala:43)

在 org.apache.flink.table.api.internal.TableEnvironmentImpl.explain(TableEnviro nmentImpl.java:632)

总结:

现在的实现限制了查询的和写人的知识,

只能找到schema定义的字段顺序才能正确的插入,

当很多时候会比较麻烦,

还有,只能插入部分列的需求也是存在的,目前不能支持*来自志愿者整理的flink邮件归档



参考答案:

你好,

Flink 目前已经不支持这个语法……我创建了一个问题[1],可以在里面查出这个特性的进展。

[1] https://issues.apache.org/jira/browse/FLINK-18726*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371505?spm=a2c6h.13066369.question.42.6ad26382kqhSrw

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
SQL Shell API
实时计算 Flink版操作报错合集之任务提交后出现 "cannot run program "/bin/bash": error=1, 不允许操作" ,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
实时计算 Flink版操作报错合集之任务提交后出现 "cannot run program "/bin/bash": error=1, 不允许操作" ,是什么原因
|
资源调度 监控 关系型数据库
实时计算 Flink版操作报错合集之处理大量Join时报错空指针异常,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
实时计算 Flink版操作报错合集之处理大量Join时报错空指针异常,是什么原因
|
SQL Java Apache
实时计算 Flink版操作报错合集之使用parquet时,怎么解决报错:无法访问到java.uti.Arrays$ArrayList类的私有字段
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
Oracle 关系型数据库 Java
实时计算 Flink版操作报错合集之遇到了关于MySqIValidator类缺失的错误,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
SQL 存储 资源调度
实时计算 Flink版操作报错合集之启动项目时报错缺少MySqlValidator类,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
Java 关系型数据库 MySQL
实时计算 Flink版操作报错合集之在使用批处理模式中使用flat_aggregate函数时报错,该如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
关系型数据库 MySQL Java
【Azure 应用服务】App Service 无法连接到Azure MySQL服务,报错:com.mysql.cj.jdbc.exceptions.CommunicationsException: Communications link failure
【Azure 应用服务】App Service 无法连接到Azure MySQL服务,报错:com.mysql.cj.jdbc.exceptions.CommunicationsException: Communications link failure
385 0
|
1月前
|
存储 分布式计算 数据处理
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
阿里云实时计算Flink团队,全球领先的流计算引擎缔造者,支撑双11万亿级数据处理,推动Apache Flink技术发展。现招募Flink执行引擎、存储引擎、数据通道、平台管控及产品经理人才,地点覆盖北京、杭州、上海。技术深度参与开源核心,打造企业级实时计算解决方案,助力全球企业实现毫秒洞察。
369 0
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。

相关产品

  • 实时计算 Flink版