Flink SQL 问题之重启报错如何解决

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: Flink SQL报错通常指在使用Apache Flink的SQL接口执行数据处理任务时遇到的问题;本合集将收集常见的Flink SQL报错情况及其解决方法,帮助用户迅速恢复数据处理流程。

问题一:FlinkSQL新增operatoer后基于savepoint重启报错怎么办?


原先sql任务是: CREATE TABLE A_source(...) CREATE TABLE B_sink (...) INSERT INTO B_sink SELECT 1 FROM A_source ; 我基于这个FlinkSQL任务生成了savepoint后,我重新修改为

CREATE TABLE A_source(...) CREATE TABLE B_sink (...) CREATE TABLE C_source(...) CREATE TABLE D_sink (...) INSERT INTO B_sink SELECT 1 FROM A_source ;

INSERT INTO C_sink SELECT 1 FROM D_source ; 并基于Savepoint提交,结果显示

Cannot map checkpoint/savepoint state for operator 2e9c6b0c053878cef673bbe7d94ab037 to the new program, because the operator is not available in the new program. If you want to allow to skip this, you can set the --allowNonRestoredState option on the CLI.

想请教一下底层是因为什么原因导致了opertor匹配不上? *来自志愿者整理的flink邮件归档


参考回答:

可以查看org.apache.flink.runtime.checkpoint#loadAndValidateCheckpoint方法了解为什么加载失败了 对于这个问题,文档和代码中都提供了解决办法,就是使用 --allowNonRestoredState参数,具体使用方法参见文档 https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html

如有错误,欢迎补充回答。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/370775?spm=a2c6h.14164896.0.0.4a6263bfEP5Bvd


问题二:flink sql jdbc 只要含有group by 语句就会报错怎么办?


|INSERT INTO pvuv_sink |SELECT | user_id,item_id,category_id,count(1) as cnt |FROM user_log |group by user_id,item_id,category_id 以上是报错得sql 11:21:57,157 ERROR org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat - JDBC executeBatch error, retry times = 1 java.sql.SQLException: No operations allowed after statement closed.

正常sql,可以插入mysql |INSERT INTO pvuv_sink |SELECT | user_id,item_id,category_id,1

来自志愿者整理的flink邮件归档来自志愿者整理的FLINK邮件归档


参考回答:

看异常感觉是连接超时导致的,有一个相关的jira,https://issues.apache.org/jira/browse/FLINK-16681


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/366237?spm=a2c6h.14164896.0.0.4a6263bfEP5Bvd


问题三:FlinkSql 插入hbase group by 多个字段的时候发现报错


报错如下: **Exception in thread "main" org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has full primary keys if it is updated. ** 代码如下:

tableEnv.connect(new kafka()
. property(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"30.23.13.231:9092")
      .property("zookeeper.connect","....:2181"))
      .withFormat(new Json())
      .withSchema(new Schema().field("componentType",Types.STRING)
                              .field("endDate",Types.STRING)
                              .field("envName",Types.STRING)
                              .field("resultId",Types.STRING)
                              .field("spendTime",Types.INT)
                              .field("returnDataNum",Types.INT)
                              .field("startDate",Types.STRING)
                              .field("tableName",Types.STRING)
                              .field("tenantName",Types.STRING))
      .inAppendMode()
      .createTemporaryTable("MyTable")
    val hbaseDDL :String =
      """
        |Create table flink_log1 (
        |rowkey string,
        |cf ROW(f1 BIGINT,f2 BIGINT,f3 INT,f4 INT,f5 BIGINT,tenantName VARCHAR)
        |) with(
        |   'connector.type' = 'hbase',
        |   'connector.version' = '1.4.3',
        |   'connector.table-name' = 'flink_log1',
        |   'connector.zookeeper.quorum' = '....:2181,....:2181',
        |   'connector.zookeeper.znode.parent' = '/hbase',
        |   'connector.write.buffer-flush.max-size' = '10mb',
        |   'connector.write.buffer-flush.max-rows' = '1000'
        |)
      """.stripMargin
    tableEnv.sqlUpdate(hbaseDDL)
    val sql =
        "select CONCAT_WS('_',tenantName,tenantName) as tm_tenantName, " +
        "count(tenantName) as f1 ," +
        "count(case when resultId =  '2' then resultId else '' end) as f2 ,"+
        "avg(spendTime) as f3 ,"+
        "sum(returnDataNum) as f4 ,"+
        "count(case when resultId =  '1' then tenantName else '' end) as f5 ,"+
        "tenantName "+
        "from MyTable where substring(endDate,1,10)='2020-06-28' " +
        "group by CONCAT_WS('_',tenantName,tenantName),tenantName"
    val table: Table = tableEnv.sqlQuery(sql)
    tableEnv.createTemporaryView("tmp",table)
    tableEnv.sqlUpdate("insert into flink_log1 " +
      "select tm_tenantName,ROW(f1,f2,f3,f4,f5,tenantName) as cf from tmp ")
    streamEnv.execute("my insert hbase sql")


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/310237?spm=a2c6h.14164896.0.0.4a6263bfEP5Bvd


问题四:flink sql 使用自定义函数 返回嵌套行,查询报错 scala.MatchError,为什么?


目前无法通过t.* 将嵌套的字段查询出来。 val schema = tEnv.executeSql( """ | SELECT t.* FROM ( | SELECT EvtParser(request) as t FROM parsed_nginx_log | ) |""".stripMargin).getTableSchema

其中自定义函数 EvtParser 定义如下。

@DataTypeHint("ROW") def eval(line: String) = { ..... }

详细报错信息:

java.lang.RuntimeException: Error while applying rule PushProjectIntoTableSourceScanRule, args [rel#63:LogicalProject.NONE.any.None: 0.[NONE].[NONE](input=RelSubset#62,exprs=[EvtParser(3).evt,EvtParser(3).evt,EvtParser(3).evt, EvtParser(3).app, EvtParser(3).uid,EvtParser(3).uid,EvtParser(3).uid, EvtParser(3).ts, EvtParser(3).url,EvtParser(3).url,EvtParser(3).url, EvtParser(3).action]), rel#1:LogicalTableScan.NONE.any.None: 0.[NONE].[NONE](table=[default_catalog, default_database, parsed_nginx_log])]

at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:256) at org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:58) at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:510) at org.apache.calcite.tools.ProgramsRuleSetProgram.run(Programs.java:312)atorg.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)atorg.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgramRuleSetProgram.run(Programs.java:312)atorg.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)atorg.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgramRuleSetProgram.run(Programs.java:312) at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgramanonfunanonfunanonfunoptimize1.apply(FlinkChainedProgram.scala:62)atorg.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram1.apply(FlinkChainedProgram.scala:62)atorg.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram1.apply(FlinkChainedProgram.scala:62) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgramanonfunanonfunanonfunoptimize1.apply(FlinkChainedProgram.scala:58)atscala.collection.TraversableOnce1.apply(FlinkChainedProgram.scala:58)atscala.collection.TraversableOnce1.apply(FlinkChainedProgram.scala:58) at scala.collection.TraversableOnceanonfunanonfunanonfunfoldLeft1.apply(TraversableOnce.scala:157)atscala.collection.TraversableOnce1.apply(TraversableOnce.scala:157)atscala.collection.TraversableOnce1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnceanonfunanonfunanonfunfoldLeft1.apply(TraversableOnce.scala:157)atscala.collection.Iterator1.apply(TraversableOnce.scala:157)atscala.collection.Iterator1.apply(TraversableOnce.scala:157) at scala.collection.Iteratorclass.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLikeclass.foreach(IterableLike.scala:72)atscala.collection.AbstractIterable.foreach(Iterable.scala:54)atscala.collection.TraversableOnceclass.foreach(IterableLike.scala:72)atscala.collection.AbstractIterable.foreach(Iterable.scala:54)atscala.collection.TraversableOnceclass.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnceclass.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:287) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:160) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:707) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:1107) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666) at cn.bestlang.starry.evt.EvtEtlJobTest.testMain(EvtEtlJobTest.scala:51) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688) at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) at org.junit.jupiter.engine.execution.InvocationInterceptorChainValidatingInvocation.proceed(InvocationInterceptorChain.java:131)atorg.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)atorg.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)atorg.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)atorg.junit.jupiter.engine.execution.ExecutableInvokerValidatingInvocation.proceed(InvocationInterceptorChain.java:131)atorg.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)atorg.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)atorg.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)atorg.junit.jupiter.engine.execution.ExecutableInvokerValidatingInvocation.proceed(InvocationInterceptorChain.java:131) at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149) at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140) at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84) at org.junit.jupiter.engine.execution.ExecutableInvokerReflectiveInterceptorCall.lambdaofVoidMethodofVoidMethodofVoidMethod0(ExecutableInvoker.java:115) at org.junit.jupiter.engine.execution.ExecutableInvoker.lambdainvokeinvokeinvoke0(ExecutableInvoker.java:105) at org.junit.jupiter.engine.execution.InvocationInterceptorChainInterceptedInvocation.proceed(InvocationInterceptorChain.java:106)atorg.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)atorg.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)atorg.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)atorg.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)atorg.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)atorg.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambdaInterceptedInvocation.proceed(InvocationInterceptorChain.java:106)atorg.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)atorg.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)atorg.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)atorg.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)atorg.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)atorg.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambdaInterceptedInvocation.proceed(InvocationInterceptorChain.java:106) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104) at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambdainvokeTestMethod6(TestMethodTestDescriptor.java:210)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)atorg.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)atorg.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda6(TestMethodTestDescriptor.java:210)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)atorg.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)atorg.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda6(TestMethodTestDescriptor.java:210) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambdaexecuteRecursively6(NodeTestTask.java:151)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda6(NodeTestTask.java:151)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda6(NodeTestTask.java:151) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambdaexecuteRecursively8(NodeTestTask.java:141)atorg.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda8(NodeTestTask.java:141)atorg.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda8(NodeTestTask.java:141) at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambdaexecuteRecursively9(NodeTestTask.java:139)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)atjava.util.ArrayList.forEach(ArrayList.java:1257)atorg.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda9(NodeTestTask.java:139)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)atjava.util.ArrayList.forEach(ArrayList.java:1257)atorg.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda9(NodeTestTask.java:139) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) at java.util.ArrayList.forEach(ArrayList.java:1257) at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambdaexecuteRecursively6(NodeTestTask.java:155)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda6(NodeTestTask.java:155)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda6(NodeTestTask.java:155) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambdaexecuteRecursively8(NodeTestTask.java:141)atorg.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda8(NodeTestTask.java:141)atorg.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda8(NodeTestTask.java:141) at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambdaexecuteRecursively9(NodeTestTask.java:139)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)atjava.util.ArrayList.forEach(ArrayList.java:1257)atorg.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda9(NodeTestTask.java:139)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)atjava.util.ArrayList.forEach(ArrayList.java:1257)atorg.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda9(NodeTestTask.java:139) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) at java.util.ArrayList.forEach(ArrayList.java:1257) at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambdaexecuteRecursively6(NodeTestTask.java:155)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda6(NodeTestTask.java:155)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda6(NodeTestTask.java:155) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambdaexecuteRecursively8(NodeTestTask.java:141)atorg.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda8(NodeTestTask.java:141)atorg.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda8(NodeTestTask.java:141) at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambdaexecuteRecursively9(NodeTestTask.java:139)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)atorg.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)atorg.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)atorg.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)atorg.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)atorg.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)atorg.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda9(NodeTestTask.java:139)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)atorg.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)atorg.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)atorg.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)atorg.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)atorg.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)atorg.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda9(NodeTestTask.java:139) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35) at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57) at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54) at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107) at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88) at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambdaexecute0(EngineExecutionOrchestrator.java:54)atorg.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)atorg.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)atorg.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)atorg.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)atorg.junit.platform.launcher.core.DefaultLauncherSession0(EngineExecutionOrchestrator.java:54)atorg.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)atorg.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)atorg.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)atorg.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)atorg.junit.platform.launcher.core.DefaultLauncherSession0(EngineExecutionOrchestrator.java:54) at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67) at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52) at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114) at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86) at org.junit.platform.launcher.core.DefaultLauncherSessionDelegatingLauncher.execute(DefaultLauncherSession.java:86) at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53) at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:71) at com.intellij.rt.junit.IdeaTestRunnerRepeater.startRunnerWithArgs(IdeaTestRunner.java:33)atcom.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:221)atcom.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)Causedby:scala.MatchError:EvtParser(Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)atcom.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:221)atcom.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)Causedby:scala.MatchError:EvtParser(Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:221) at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54) Caused by: scala.MatchError: EvtParser(3) (of class org.apache.calcite.rex.RexCall) at org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.internalVisit$1(NestedProjectionUtil.scala:273)

class EvtEtlJobTest {

@Test def testMain(): Unit = { val settings = EnvironmentSettings.newInstance.useBlinkPlanner().build val tEnv = TableEnvironment.create(settings)

tEnv.executeSql( """ | CREATE TABLE parsed_nginx_log ( | remote_addr STRING, | remote_user STRING, | time_local BIGINT, | request STRING, | status STRING, | body_bytes_sent STRING, | http_referer STRING, | http_user_agent STRING, | http_x_forwarded_for STRING | ) WITH ( | 'connector' = 'filesystem', | 'path' = 'src/test/resources/nginx-parsed.log', | 'format' = 'json' | ) |""".stripMargin).getTableSchema

tEnv.executeSql( """ | CREATE TABLE evt_log_index ( | evt STRING, | app STRING, | uid STRING, | ts STRING, | url STRING, | action STRING | ) WITH ( | 'connector' = 'filesystem', | 'path' = '/tmp/evt_log_index', | 'format' = 'json' | ) |""".stripMargin)

// 注册函数 tEnv.createTemporarySystemFunction("EvtParser", classOf[EvtParser])

val schema = tEnv.executeSql( """ | SELECT t.* FROM ( | SELECT EvtParser(request) as t FROM parsed_nginx_log | ) |""".stripMargin).getTableSchema

println("++++++++", schema) }

}*来自志愿者整理的flink


参考回答:

我在1.13分支上验证了下你的case,发现能够跑通。建议cp下那个patch到自己的分支,再验证下。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/364670?spm=a2c6h.14164896.0.0.382263bfYd1g6h


问题五:flink 1.11 sql作业提交JM报错


我使用flink 1.11提交sql作业,从JM日志中看到有如下异常。我的作业里会通过tEnv.executeSQL执行多个ddl语句,通过tEnv.createStatementSet add多个dml语句,并执行execute。 如下异常可能原因是啥呢?还有个问题,这个异常虽然抛出来了,但是作业还是正常启动执行了。这又是为何?是不是不推荐在作业里同时使用executeSQL和statementset.execute?

Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot have more than one execute() or executeAsync() call in a single environment. at org.apache.flink.client.program.StreamContextEnvironment.validateAllowedExecution(StreamContextEnvironment.java:139) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:127) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:57) ~[flink-table-blink_2.12-1.11.0.jar:1.11.0] at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:699) ~[flink-table_2.12-1.11.0.jar:1.11.0] ... 24 more*来自志愿者整理的flink邮件归档


参考回答:

1.11 对 StreamTableEnvironment.execute()

和 StreamExecutionEnvironment.execute() 的执行方式有所调整,

简单概述为:

  1. StreamTableEnvironment.execute() 只能执行 sqlUpdate 和 insertInto 方法执行作业;
  2. Table 转化为 DataStream 后只能通过 StreamExecutionEnvironment.execute() 来执行作业;
  3. 新引入的 TableEnvironment.executeSql() 和 StatementSet.execute() 方法是直接执行sql作业

(异步提交作业),不需要再调用 StreamTableEnvironment.execute()

或 StreamExecutionEnvironment.execute()

详细可以参考 [1] [2]

对于 “No operators defined in streaming topology.”,如果使用

TableEnvironment.executeSql() 或者 StatementSet.execute() 方法提交的作业后再调用

StreamTableEnvironment.execute() 或 StreamExecutionEnvironment.execute()

提交作业,就会出现前面的错误。

对于

“是不是不推荐在作业里同时使用executeSQL和StatementSet.execute?”,这个答案是no。executeSql和StatementSet不会相互干扰。对于出现的错误,能给一个更详细的提交作业的流程描述吗?

[1]

https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/common.html#%E7%BF%BB%E8%AF%91%E4%B8%8E%E6%89%A7%E8%A1%8C%E6%9F%A5%E8%AF%A2

[2]

https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/common.html#%E5%B0%86%E8%A1%A8%E8%BD%AC%E6%8D%A2%E6%88%90-datastream-%E6%88%96-dataset


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372274?spm=a2c6h.14164896.0.0.382263bfYd1g6h


相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
3月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
简介:本文整理自阿里云高级技术专家李麟在Flink Forward Asia 2025新加坡站的分享,介绍了Flink 2.1 SQL在实时数据处理与AI融合方面的关键进展,包括AI函数集成、Join优化及未来发展方向,助力构建高效实时AI管道。
788 43
|
3月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
本文整理自阿里云的高级技术专家、Apache Flink PMC 成员李麟老师在 Flink Forward Asia 2025 新加坡[1]站 —— 实时 AI 专场中的分享。将带来关于 Flink 2.1 版本中 SQL 在实时数据处理和 AI 方面进展的话题。
284 0
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
|
8月前
|
SQL Java 数据库连接
【YashanDB知识库】解决mybatis的mapper文件sql语句结尾加分号";"报错
【YashanDB知识库】解决mybatis的mapper文件sql语句结尾加分号";"报错
|
4月前
|
SQL 消息中间件 Kafka
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是 Apache Flink 提供的 SQL 引擎,支持流批一体处理,统一操作流数据与批数据,具备高性能、低延迟、丰富数据源支持及标准 SQL 兼容性,适用于实时与离线数据分析。
867 1
|
8月前
|
SQL
【YashanDB知识库】使用leading hint调整SQL执行计划后报错YAS-04522 invalid hint leading
【YashanDB知识库】使用leading hint调整SQL执行计划后报错YAS-04522 invalid hint leading
【YashanDB知识库】使用leading hint调整SQL执行计划后报错YAS-04522 invalid hint leading
|
8月前
|
SQL Java 数据库连接
【YashanDB 知识库】解决 mybatis 的 mapper 文件 sql 语句结尾加分号";"报错
【YashanDB 知识库】解决 mybatis 的 mapper 文件 sql 语句结尾加分号";"报错
|
10月前
|
SQL 大数据 数据处理
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是为应对传统数据处理框架中流批分离的问题而诞生的,它融合了SQL的简洁性和Flink的强大流批处理能力,降低了大数据处理门槛。其核心工作原理包括生成逻辑执行计划、查询优化和构建算子树,确保高效执行。Flink SQL 支持过滤、投影、聚合、连接和窗口等常用算子,实现了流批一体处理,极大提高了开发效率和代码复用性。通过统一的API和语法,Flink SQL 能够灵活应对实时和离线数据分析场景,为企业提供强大的数据处理能力。
1926 27
|
9月前
|
SQL 数据库
数据库数据恢复—SQL Server报错“错误 823”的数据恢复案例
SQL Server数据库附加数据库过程中比较常见的报错是“错误 823”,附加数据库失败。 如果数据库有备份则只需还原备份即可。但是如果没有备份,备份时间太久,或者其他原因导致备份不可用,那么就需要通过专业手段对数据库进行数据恢复。
|
9月前
|
SQL
【YashanDB 知识库】使用 leading hint 调整 SQL 执行计划后报错 YAS-04522 invalid hint leading
在 YashanDB 的所有版本中,使用 leading hint 调整 SQL 执行计划时可能出现“YAS-04522 invalid hint leading”错误,导致 SQL 无法正常执行。原因是 YashanDB 优化器的 Bug。解决方法为避免使用 leading hint。可通过创建测试表 a、b、c 并执行特定 SQL 语句来验证问题是否存在。
|
关系型数据库 MySQL 网络安全
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")

相关产品

  • 实时计算 Flink版