Flink报错问题之执行sqlQuery报错如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:[讨论] 为 sql-client 添加 '--filename' 参数

我想定期执行一些flink sql批处理作业,例如'insert into select .....',但目前我找不到合适的方法,所以参考 hive,我更改了源代码并添加了一个“--filename”参数,以便 我们可以执行一个sql文件。

像这样:

/home/flink/bin/sql-client.sh 内嵌 -f flink.sql

对于这个功能社区有什么想法或计划吗?*来自志愿者整理的flink邮件归档



参考答案:

嗨君,

目前,sql 客户端已经支持 -u 选项,就像: ./bin/sql-client.sh 嵌入 -u "insert_statement"。

已经有一个 JIRA [1] 想要支持 -f 选项

[1] https://issues.apache.org/jira/browse/FLINK-12828*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371492?spm=a2c6h.13066369.question.33.6ad26382umztJa



问题二:解析kafka的mysql binlog问题

解析kafka的mysql binlog问题 你好。这是我的解析sql。我想读取binlog的数据数据和表数据。为什么可以取到表不能取到数据呢?

私有静态最终字符串 KAFKA_SQL = "创建表 kafkaTable (\n" + " 数据 VARCHAR , " + " table VARCHAR " + ") WITH (" + " '连接器' = '卡夫卡'," + " 'topic' = 'order_source'," + " 'properties.bootstrap.servers' = '***'," + " 'properties.group.id' = 'real1'," + " '格式' = 'json'," + " 'scan.startup.mode' = '最早的偏移'" +

")"; 你好测试代码如下

私有静态最终字符串 KAFKA_SQL = "创建表 kafkaTable (\n" + " 数据 VARCHAR , " + " table VARCHAR " + ") WITH (" + " '连接器' = '卡夫卡'," + " 'topic' = 'source_databases'," + " 'properties.bootstrap.servers' = '***'," + " 'properties.group.id' = 'real1'," + " '格式' = 'json'," + " 'scan.startup.mode' = '最早的偏移'" + ")"; public static void main(String[] args) 抛出异常 {

//绑定表 StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);

TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);

tableResult.print();

Table table = bsTableEnv.sqlQuery("select * from kafkaTable");

bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);

bsEnv.execute("aa");

}

输出结果如下数据都是空的。数据格式为canal解析的mysql binlog ,order_operation_time ,inventory_batch_log ,order_log ,order_address_book ,product_inventory ,order_physical_relation ,bil_business_attach ,picking_detail ,picking_detail ,订单

1.11版本blink datastream转表吗? 看到的例子都是用OldPlanner来转表的。 致谢*来自志愿者整理的flink邮件归档



参考答案:

有kafka 中json 数据的样例不? 有没有任务管理器有没有异常日志信息? 因为“数据”是一个复杂的结构,不是容易的字符串结构。所以1.11至今,这个功能还不支持。 1.12中已经支持读取复杂结构为字符串类型了。*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371494?spm=a2c6h.13066369.question.32.6ad26382BFaCOL



问题三:Flink SQL 解析复杂(嵌套)JSON 的问题以及写入到 hive 映射问题

json格式,如果是一个json数组的方法定义模式,数组里还可能存在隐藏json数组的情况。

如数据: {"user_info":{"user_id":"0111","name":"xxx"},"timestam":1586676835655,"id":"10001","jsonArray":[{"name222":"xxx" ,"user_id222":"0022"},{"name333":"name3333","user_id222":"user3333"},{"cc":"xxx333","user_id444":"user4444","name444":" name4444"}]}

参照:https://www.cnblogs.com/Springmoon-venn/p/12664547.html 需要模式这样定义: user_info 定义成:ROW<user_id STRING, name STRING> jsonArray 定义成: ARRAY<ROW<user_id222 STRING, name222 STRING>>

问题是: 如果json数组还有一个数组可以编码定义吗?这个数据也是要写入到hive,数组里,数组 ,怎么映射成Hive类型,映射成数组 ,这种情况的json怎么处理? 有没有什么办法直接把json数组,直接映射成数组 ,试过发现不行,怎么处理这种复杂类型。*来自志愿者整理的flink邮件归档



参考答案:

json格式有一个问题在解这个问题[1],可以把jsonNode强制转成字符串,1.12里会支持,可以看下。

最好的事物 伦纳德 [1] https://issues.apache.org/jira/browse/FLINK-18002 < https://issues.apache.org/jira/browse/FLINK-18002 >*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371495?spm=a2c6h.13066369.question.33.6ad26382fjcYid



问题四:flink1.11.0 执行sqlQuery提示NullPointException

大家好: 我正在为公司之前基于flink1.10的网关升级flink版本到1.11,用的hive目录,建表后,执行sqlQuery方法提示NullPointException,希望给出排错建议,具体报错信息如下: 引起:java.lang.NullPointerException 在 java.util.Objects.requireNonNull(Objects.java:203) 在 org.apache.calcite.rel.metadata.RelMetadataQuery. (RelMetadataQuery.java:141) 在 org.apache.calcite.rel.metadata.RelMetadataQuery. (RelMetadataQuery.java:106) 在 org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery. (FlinkRelMetadataQuery.java:73) 在 org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.instance(FlinkRelMetadataQuery.java:52) 在 org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:39) 在 org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:38) 在 org.apache.calcite.plan.RelOptCluster.getMetadataQuery(RelOptCluster.java:178) 在 org.apache.calcite.rel.logical.LogicalProject.create(LogicalProject.java:118) 在 org.apache.calcite.rel.logical.LogicalProject.create(LogicalProject.java:111) 在 org.apache.calcite.rel.core.RelFactories$ProjectFactoryImpl.createProject(RelFactories.java:180) 在 org.apache.calcite.tools.RelBuilder.project_(RelBuilder.java:1462) 在 org.apache.calcite.tools.RelBuilder.project(RelBuilder.java:1256) 在 org.apache.calcite.tools.RelBuilder.projectNamed(RelBuilder.java:1521) 在 org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectList(SqlToRelConverter.java:4125) 在 org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:685) 在 org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642) 在 org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345) 在 org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568) 在 org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164) 在 org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151) 在 org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:81) 在 org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNode(SqlExprToRexConverterImpl.java:73) 在 org.apache.flink.table.planner.delegation.ParserImpl.parseSqlExpression(ParserImpl.java:93) 在 org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolveExpressionDataType(CatalogTableSchemaResolver.java:119) 在 org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolve(CatalogTableSchemaResolver.java:83) 在 org.apache.flink.table.catalog.CatalogManager.resolveTableSchema(CatalogManager.java:380) 在 org.apache.flink.table.catalog.CatalogManager.getPermanentTable(CatalogManager.java:408) 在 org.apache.flink.table.catalog.CatalogManager.getTable(CatalogManager.java:375) 在 org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.getTable(DatabaseCalciteSchema.java:75) 在 org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:83) 在 org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:289) 在 org.apache.calcite.sql.validate.EmptyScope.resolve_(EmptyScope.java:143) 在 org.apache.calcite.sql.validate.EmptyScope.resolveTable(EmptyScope.java:99) 在 org.apache.calcite.sql.validate.DelegatingScope.resolveTable(DelegatingScope.java:203) 在 org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:112) 在 org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:184) 在 org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) 在 org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110) 在 org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084) 在 org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3256) 在 org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238) 在 org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510) 在 org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) 在 org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) 在 org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110) 在 org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084) 在 org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) 在 org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059) 在 org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766) 在 org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141)*来自志愿者整理的flink邮件归档



参考答案:

这个问题只能说是使用表环境恶化的问题。ververica的网关的模式本质就是多线程。 创建TableEnvironment和使用TableEnvironment可能不是一个线程,worker线程是被那个的。 简单来说就是: 当session创建的时候,worker thread1会创建一个tableEnvironment, 然后当其他该会话请求来自外部的时候,可能是worker thread2使用该TableEnvironment执行sql。

实际上这个就是在多线程情况下使用TableEnvironment。不符合TableEnvironment只能在单线程使用的约束。*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371496?spm=a2c6h.13066369.question.36.6ad263828ZE0cm



问题五:flink1.11.1使用Table API Hive方言的executSql报错

大家好: 我基于Flink1.11.1的表API使用Hive方言,调用executSql方法后报错,触发信息如下: org.apache.flink.client.program.ProgramInvocationException:主要方法 导致错误:无法执行sql 在 org.apache.flink.client.program.PackagedProgram.callMainMethod( PackagedProgram.java:302) ~[flink-dist_2.11-1.11.1.jar:1.11.1] 在 org.apache.flink.client.program.PackagedProgram .invokeInteractiveModeForExecution(PackagedProgram.java:198) ~[flink-dist_2. 11-1.11.1.jar:1.11.1] 在 org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java: 149)~[flink-dist_2.11-1.11.1.jar:1.11.1] 在 org.apache.flink.client.deployment.application。 DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78) ~[flink-dist_2.11-1.11.1.jar:1.11.1] 在 org.apache.flink.client.deployment.application。 DetachedApplicationRunner.run(DetachedApplicationRunner.java:67) ~[flink-dist_2.11-1.11.1.jar:1.11.1] 在 org.apache.flink.runtime.webmonitor.handlers.JarRunHandler .lambda$handleRequest$0(JarRunHandler.java:100) ~[flink-dist_2.11-1.11.1 .jar:1.11.1] 在 java.util.concurrent.CompletableFuture$AsyncSupply.run( CompletableFuture.java:1604) [?:1.8.0_242] 在 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java: [511] [?:1.8.0_242] 在 java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_242 ] 在 java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask .access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_242] 在 java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask .run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_242] 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor .java:1149) [?:1.8.0_242] 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor .java:624) [?:1.8.0_242] 在 java.lang.Thread.run(Thread.java:748) [?:1.8.0_242] 引起:org.apache.flink.table.api.TableException: Failed to execute sql 在 org.apache.flink.table.api.internal.TableEnvironmentImpl .executeInternal(TableEnvironmentImpl.java:747) ~[flink-table-blink_2.11- 1.11.1.jar:1.11.1] 在 org.apache.flink.table.api.internal.TableEnvironmentImpl .executeOperation(TableEnvironmentImpl.java:1069) ~[flink-table-blink_2.11- 1.11.1.jar:1.11.1] 在 org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql( TableEnvironmentImpl.java:690) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] 在 org.forchange.online.etl.h2h.Prod2Poc.main(Prod2Poc.java:46) ~[?:?] 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8. 0_242] 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl .java:62) ~[?:1.8.0_242] 在 sun.reflect.DelegatingMethodAccessorImpl.invoke( DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_242] 在 java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_242] 在 org.apache.flink.client.program.PackagedProgram.callMainMethod( PackagedProgram.java:288) ~[flink-dist_2.11-1.11.1.jar:1.11.1] ……还有 13 个 引起:java.lang.IllegalArgumentException:Job client must be a 协调请求网关。这是一个错误。 在 org.apache.flink.util.Preconditions.checkArgument(Preconditions.java: 139)~[flink-dist_2.11-1.11.1.jar:1.11.1] 在 org.apache.flink.streaming.api.operators.collect.CollectResultFetcher .setJobClient(CollectResultFetcher.java:97) ~[flink-dist_2.11-1.11.1.jar: 1.11.1] 在 org.apache.flink.streaming.api.operators.collect。 CollectResultIterator.setJobClient(CollectResultIterator.java:84) ~[flink-dist_2.11-1.11.1.jar:1.11.1] 在 org.apache.flink.table.planner.sinks.SelectTableSinkBase .setJobClient(SelectTableSinkBase.java:81) ~[flink-table-blink_2.11-1.11.1 .jar:1.11.1] 在 org.apache.flink.table.api.internal.TableEnvironmentImpl .executeInternal(TableEnvironmentImpl.java:737) ~[flink-table-blink_2.11- 1.11.1.jar:1.11.1] 在 org.apache.flink.table.api.internal.TableEnvironmentImpl .executeOperation(TableEnvironmentImpl.java:1069) ~[flink-table-blink_2.11- 1.11.1.jar:1.11.1] 在 org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql( TableEnvironmentImpl.java:690) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] 在 org.forchange.online.etl.h2h.Prod2Poc.main(Prod2Poc.java:46) ~[?:?] 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8. 0_242] 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl .java:62) ~[?:1.8.0_242] 在 sun.reflect.DelegatingMethodAccessorImpl.invoke( DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_242] 在 java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_242] 在 org.apache.flink.client.program.PackagedProgram.callMainMethod( PackagedProgram.java:288) ~[flink-dist_2.11-1.11.1.jar:1.11.1]

*核心错误 作业客户端必须是 CoordinationRequestGateway。这是一个错误。 难道这是一个Bug吗?*来自志愿者整理的flink邮件归档



参考答案:

你的包是完整的flink-1.11.1的包吗? 例如检查一下 ClusterClientJobClientAdapter 这个类是否继承了 CoordinationRequestGateway ?*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371497?spm=a2c6h.13066369.question.37.6ad26382OsZxRi

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
14天前
|
机器学习/深度学习 人工智能 流计算
人工智能平台PAI 操作报错合集之在集群上提交了包含alink相关功能的flink任务,但是却报错如何解决
阿里云人工智能平台PAI (Platform for Artificial Intelligence) 是阿里云推出的一套全面、易用的机器学习和深度学习平台,旨在帮助企业、开发者和数据科学家快速构建、训练、部署和管理人工智能模型。在使用阿里云人工智能平台PAI进行操作时,可能会遇到各种类型的错误。以下列举了一些常见的报错情况及其可能的原因和解决方法。
|
2月前
|
Java 关系型数据库 MySQL
Flink1.18.1和CDC2.4.1 本地没问题 提交任务到服务器 报错java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.mysql.MySqlConnectorConfig
【2月更文挑战第33天】Flink1.18.1和CDC2.4.1 本地没问题 提交任务到服务器 报错java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.mysql.MySqlConnectorConfig
69 2
|
2月前
|
Java 关系型数据库 MySQL
Flink CDC有见这个报错不?
【2月更文挑战第29天】Flink CDC有见这个报错不?
29 2
|
2月前
|
存储 关系型数据库 MySQL
Flink CDC产品常见问题之写hudi的时候报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
2月前
|
资源调度 关系型数据库 测试技术
Flink CDC产品常见问题之没有报错但是一直监听不到数据如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
2月前
|
缓存 监控 Java
Flink CDC产品常见问题之flink集群jps命令报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
2月前
|
监控 关系型数据库 MySQL
Flink CDC产品常见问题之使用3.0测试mysql到starrocks启动报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
3月前
|
消息中间件 Kafka Apache
Apache Flink 是一个开源的分布式流处理框架
Apache Flink 是一个开源的分布式流处理框架
598 5
|
2月前
|
SQL Java API
官宣|Apache Flink 1.19 发布公告
Apache Flink PMC(项目管理委员)很高兴地宣布发布 Apache Flink 1.19.0。
1628 2
官宣|Apache Flink 1.19 发布公告
|
2月前
|
SQL Apache 流计算
Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
【2月更文挑战第25天】Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
289 3

相关产品

  • 实时计算 Flink版