问题一:[讨论] 为 sql-client 添加 '--filename' 参数
我想定期执行一些flink sql批处理作业,例如'insert into select .....',但目前我找不到合适的方法,所以参考 hive,我更改了源代码并添加了一个“--filename”参数,以便 我们可以执行一个sql文件。
像这样:
/home/flink/bin/sql-client.sh 内嵌 -f flink.sql
对于这个功能社区有什么想法或计划吗?*来自志愿者整理的flink邮件归档
参考答案:
嗨君,
目前,sql 客户端已经支持 -u 选项,就像: ./bin/sql-client.sh 嵌入 -u "insert_statement"。
已经有一个 JIRA [1] 想要支持 -f 选项
[1] https://issues.apache.org/jira/browse/FLINK-12828*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/371492?spm=a2c6h.13066369.question.33.6ad26382umztJa
问题二:解析kafka的mysql binlog问题
解析kafka的mysql binlog问题 你好。这是我的解析sql。我想读取binlog的数据数据和表数据。为什么可以取到表不能取到数据呢?
私有静态最终字符串 KAFKA_SQL = "创建表 kafkaTable (\n" + " 数据
VARCHAR , " + " table
VARCHAR " + ") WITH (" + " '连接器' = '卡夫卡'," + " 'topic' = 'order_source'," + " 'properties.bootstrap.servers' = '***'," + " 'properties.group.id' = 'real1'," + " '格式' = 'json'," + " 'scan.startup.mode' = '最早的偏移'" +
")"; 你好测试代码如下
私有静态最终字符串 KAFKA_SQL = "创建表 kafkaTable (\n" + " 数据
VARCHAR , " + " table
VARCHAR " + ") WITH (" + " '连接器' = '卡夫卡'," + " 'topic' = 'source_databases'," + " 'properties.bootstrap.servers' = '***'," + " 'properties.group.id' = 'real1'," + " '格式' = 'json'," + " 'scan.startup.mode' = '最早的偏移'" + ")"; public static void main(String[] args) 抛出异常 {
//绑定表 StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);
tableResult.print();
Table table = bsTableEnv.sqlQuery("select * from kafkaTable");
bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);
bsEnv.execute("aa");
}
输出结果如下数据都是空的。数据格式为canal解析的mysql binlog ,order_operation_time ,inventory_batch_log ,order_log ,order_address_book ,product_inventory ,order_physical_relation ,bil_business_attach ,picking_detail ,picking_detail ,订单
1.11版本blink datastream转表吗? 看到的例子都是用OldPlanner来转表的。 致谢*来自志愿者整理的flink邮件归档
参考答案:
有kafka 中json 数据的样例不? 有没有任务管理器有没有异常日志信息? 因为“数据”是一个复杂的结构,不是容易的字符串结构。所以1.11至今,这个功能还不支持。 1.12中已经支持读取复杂结构为字符串类型了。*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/371494?spm=a2c6h.13066369.question.32.6ad26382BFaCOL
问题三:Flink SQL 解析复杂(嵌套)JSON 的问题以及写入到 hive 映射问题
json格式,如果是一个json数组的方法定义模式,数组里还可能存在隐藏json数组的情况。
如数据: {"user_info":{"user_id":"0111","name":"xxx"},"timestam":1586676835655,"id":"10001","jsonArray":[{"name222":"xxx" ,"user_id222":"0022"},{"name333":"name3333","user_id222":"user3333"},{"cc":"xxx333","user_id444":"user4444","name444":" name4444"}]}
参照:https://www.cnblogs.com/Springmoon-venn/p/12664547.html 需要模式这样定义: user_info 定义成:ROW<user_id STRING, name STRING> jsonArray 定义成: ARRAY<ROW<user_id222 STRING, name222 STRING>>
问题是: 如果json数组还有一个数组可以编码定义吗?这个数据也是要写入到hive,数组里,数组 ,怎么映射成Hive类型,映射成数组 ,这种情况的json怎么处理? 有没有什么办法直接把json数组,直接映射成数组 ,试过发现不行,怎么处理这种复杂类型。*来自志愿者整理的flink邮件归档
参考答案:
json格式有一个问题在解这个问题[1],可以把jsonNode强制转成字符串,1.12里会支持,可以看下。
最好的事物 伦纳德 [1] https://issues.apache.org/jira/browse/FLINK-18002 < https://issues.apache.org/jira/browse/FLINK-18002 >*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/371495?spm=a2c6h.13066369.question.33.6ad26382fjcYid
问题四:flink1.11.0 执行sqlQuery提示NullPointException
大家好: 我正在为公司之前基于flink1.10的网关升级flink版本到1.11,用的hive目录,建表后,执行sqlQuery方法提示NullPointException,希望给出排错建议,具体报错信息如下: 引起:java.lang.NullPointerException 在 java.util.Objects.requireNonNull(Objects.java:203) 在 org.apache.calcite.rel.metadata.RelMetadataQuery. (RelMetadataQuery.java:141) 在 org.apache.calcite.rel.metadata.RelMetadataQuery. (RelMetadataQuery.java:106) 在 org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery. (FlinkRelMetadataQuery.java:73) 在 org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.instance(FlinkRelMetadataQuery.java:52) 在 org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:39) 在 org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:38) 在 org.apache.calcite.plan.RelOptCluster.getMetadataQuery(RelOptCluster.java:178) 在 org.apache.calcite.rel.logical.LogicalProject.create(LogicalProject.java:118) 在 org.apache.calcite.rel.logical.LogicalProject.create(LogicalProject.java:111) 在 org.apache.calcite.rel.core.RelFactories$ProjectFactoryImpl.createProject(RelFactories.java:180) 在 org.apache.calcite.tools.RelBuilder.project_(RelBuilder.java:1462) 在 org.apache.calcite.tools.RelBuilder.project(RelBuilder.java:1256) 在 org.apache.calcite.tools.RelBuilder.projectNamed(RelBuilder.java:1521) 在 org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectList(SqlToRelConverter.java:4125) 在 org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:685) 在 org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642) 在 org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345) 在 org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568) 在 org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164) 在 org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151) 在 org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:81) 在 org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNode(SqlExprToRexConverterImpl.java:73) 在 org.apache.flink.table.planner.delegation.ParserImpl.parseSqlExpression(ParserImpl.java:93) 在 org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolveExpressionDataType(CatalogTableSchemaResolver.java:119) 在 org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolve(CatalogTableSchemaResolver.java:83) 在 org.apache.flink.table.catalog.CatalogManager.resolveTableSchema(CatalogManager.java:380) 在 org.apache.flink.table.catalog.CatalogManager.getPermanentTable(CatalogManager.java:408) 在 org.apache.flink.table.catalog.CatalogManager.getTable(CatalogManager.java:375) 在 org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.getTable(DatabaseCalciteSchema.java:75) 在 org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:83) 在 org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:289) 在 org.apache.calcite.sql.validate.EmptyScope.resolve_(EmptyScope.java:143) 在 org.apache.calcite.sql.validate.EmptyScope.resolveTable(EmptyScope.java:99) 在 org.apache.calcite.sql.validate.DelegatingScope.resolveTable(DelegatingScope.java:203) 在 org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:112) 在 org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:184) 在 org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) 在 org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110) 在 org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084) 在 org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3256) 在 org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238) 在 org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510) 在 org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) 在 org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) 在 org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110) 在 org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084) 在 org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) 在 org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059) 在 org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766) 在 org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141)*来自志愿者整理的flink邮件归档
参考答案:
这个问题只能说是使用表环境恶化的问题。ververica的网关的模式本质就是多线程。 创建TableEnvironment和使用TableEnvironment可能不是一个线程,worker线程是被那个的。 简单来说就是: 当session创建的时候,worker thread1会创建一个tableEnvironment, 然后当其他该会话请求来自外部的时候,可能是worker thread2使用该TableEnvironment执行sql。
实际上这个就是在多线程情况下使用TableEnvironment。不符合TableEnvironment只能在单线程使用的约束。*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/371496?spm=a2c6h.13066369.question.36.6ad263828ZE0cm
问题五:flink1.11.1使用Table API Hive方言的executSql报错
大家好: 我基于Flink1.11.1的表API使用Hive方言,调用executSql方法后报错,触发信息如下: org.apache.flink.client.program.ProgramInvocationException:主要方法 导致错误:无法执行sql 在 org.apache.flink.client.program.PackagedProgram.callMainMethod( PackagedProgram.java:302) ~[flink-dist_2.11-1.11.1.jar:1.11.1] 在 org.apache.flink.client.program.PackagedProgram .invokeInteractiveModeForExecution(PackagedProgram.java:198) ~[flink-dist_2. 11-1.11.1.jar:1.11.1] 在 org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java: 149)~[flink-dist_2.11-1.11.1.jar:1.11.1] 在 org.apache.flink.client.deployment.application。 DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78) ~[flink-dist_2.11-1.11.1.jar:1.11.1] 在 org.apache.flink.client.deployment.application。 DetachedApplicationRunner.run(DetachedApplicationRunner.java:67) ~[flink-dist_2.11-1.11.1.jar:1.11.1] 在 org.apache.flink.runtime.webmonitor.handlers.JarRunHandler .lambda$handleRequest$0(JarRunHandler.java:100) ~[flink-dist_2.11-1.11.1 .jar:1.11.1] 在 java.util.concurrent.CompletableFuture$AsyncSupply.run( CompletableFuture.java:1604) [?:1.8.0_242] 在 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java: [511] [?:1.8.0_242] 在 java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_242 ] 在 java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask .access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_242] 在 java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask .run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_242] 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor .java:1149) [?:1.8.0_242] 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor .java:624) [?:1.8.0_242] 在 java.lang.Thread.run(Thread.java:748) [?:1.8.0_242] 引起:org.apache.flink.table.api.TableException: Failed to execute sql 在 org.apache.flink.table.api.internal.TableEnvironmentImpl .executeInternal(TableEnvironmentImpl.java:747) ~[flink-table-blink_2.11- 1.11.1.jar:1.11.1] 在 org.apache.flink.table.api.internal.TableEnvironmentImpl .executeOperation(TableEnvironmentImpl.java:1069) ~[flink-table-blink_2.11- 1.11.1.jar:1.11.1] 在 org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql( TableEnvironmentImpl.java:690) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] 在 org.forchange.online.etl.h2h.Prod2Poc.main(Prod2Poc.java:46) ~[?:?] 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8. 0_242] 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl .java:62) ~[?:1.8.0_242] 在 sun.reflect.DelegatingMethodAccessorImpl.invoke( DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_242] 在 java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_242] 在 org.apache.flink.client.program.PackagedProgram.callMainMethod( PackagedProgram.java:288) ~[flink-dist_2.11-1.11.1.jar:1.11.1] ……还有 13 个 引起:java.lang.IllegalArgumentException:Job client must be a 协调请求网关。这是一个错误。 在 org.apache.flink.util.Preconditions.checkArgument(Preconditions.java: 139)~[flink-dist_2.11-1.11.1.jar:1.11.1] 在 org.apache.flink.streaming.api.operators.collect.CollectResultFetcher .setJobClient(CollectResultFetcher.java:97) ~[flink-dist_2.11-1.11.1.jar: 1.11.1] 在 org.apache.flink.streaming.api.operators.collect。 CollectResultIterator.setJobClient(CollectResultIterator.java:84) ~[flink-dist_2.11-1.11.1.jar:1.11.1] 在 org.apache.flink.table.planner.sinks.SelectTableSinkBase .setJobClient(SelectTableSinkBase.java:81) ~[flink-table-blink_2.11-1.11.1 .jar:1.11.1] 在 org.apache.flink.table.api.internal.TableEnvironmentImpl .executeInternal(TableEnvironmentImpl.java:737) ~[flink-table-blink_2.11- 1.11.1.jar:1.11.1] 在 org.apache.flink.table.api.internal.TableEnvironmentImpl .executeOperation(TableEnvironmentImpl.java:1069) ~[flink-table-blink_2.11- 1.11.1.jar:1.11.1] 在 org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql( TableEnvironmentImpl.java:690) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] 在 org.forchange.online.etl.h2h.Prod2Poc.main(Prod2Poc.java:46) ~[?:?] 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8. 0_242] 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl .java:62) ~[?:1.8.0_242] 在 sun.reflect.DelegatingMethodAccessorImpl.invoke( DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_242] 在 java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_242] 在 org.apache.flink.client.program.PackagedProgram.callMainMethod( PackagedProgram.java:288) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
*核心错误 作业客户端必须是 CoordinationRequestGateway。这是一个错误。
难道这是一个Bug吗?*来自志愿者整理的flink邮件归档
参考答案:
你的包是完整的flink-1.11.1的包吗? 例如检查一下 ClusterClientJobClientAdapter 这个类是否继承了 CoordinationRequestGateway ?*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/371497?spm=a2c6h.13066369.question.37.6ad26382OsZxRi