Flink SQL 问题之重启报错如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink SQL报错通常指在使用Apache Flink的SQL接口执行数据处理任务时遇到的问题;本合集将收集常见的Flink SQL报错情况及其解决方法,帮助用户迅速恢复数据处理流程。

问题一:FlinkSQL新增operatoer后基于savepoint重启报错怎么办?


原先sql任务是: CREATE TABLE A_source(...) CREATE TABLE B_sink (...) INSERT INTO B_sink SELECT 1 FROM A_source ; 我基于这个FlinkSQL任务生成了savepoint后,我重新修改为

CREATE TABLE A_source(...) CREATE TABLE B_sink (...) CREATE TABLE C_source(...) CREATE TABLE D_sink (...) INSERT INTO B_sink SELECT 1 FROM A_source ;

INSERT INTO C_sink SELECT 1 FROM D_source ; 并基于Savepoint提交,结果显示

Cannot map checkpoint/savepoint state for operator 2e9c6b0c053878cef673bbe7d94ab037 to the new program, because the operator is not available in the new program. If you want to allow to skip this, you can set the --allowNonRestoredState option on the CLI.

想请教一下底层是因为什么原因导致了opertor匹配不上? *来自志愿者整理的flink邮件归档


参考回答:

可以查看org.apache.flink.runtime.checkpoint#loadAndValidateCheckpoint方法了解为什么加载失败了 对于这个问题,文档和代码中都提供了解决办法,就是使用 --allowNonRestoredState参数,具体使用方法参见文档 https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html

如有错误,欢迎补充回答。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/370775?spm=a2c6h.14164896.0.0.4a6263bfEP5Bvd


问题二:flink sql jdbc 只要含有group by 语句就会报错怎么办?


|INSERT INTO pvuv_sink |SELECT | user_id,item_id,category_id,count(1) as cnt |FROM user_log |group by user_id,item_id,category_id 以上是报错得sql 11:21:57,157 ERROR org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat - JDBC executeBatch error, retry times = 1 java.sql.SQLException: No operations allowed after statement closed.

正常sql,可以插入mysql |INSERT INTO pvuv_sink |SELECT | user_id,item_id,category_id,1

来自志愿者整理的flink邮件归档来自志愿者整理的FLINK邮件归档


参考回答:

看异常感觉是连接超时导致的,有一个相关的jira,https://issues.apache.org/jira/browse/FLINK-16681


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/366237?spm=a2c6h.14164896.0.0.4a6263bfEP5Bvd


问题三:FlinkSql 插入hbase group by 多个字段的时候发现报错


报错如下: **Exception in thread "main" org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has full primary keys if it is updated. ** 代码如下:

tableEnv.connect(new kafka()
. property(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"30.23.13.231:9092")
      .property("zookeeper.connect","....:2181"))
      .withFormat(new Json())
      .withSchema(new Schema().field("componentType",Types.STRING)
                              .field("endDate",Types.STRING)
                              .field("envName",Types.STRING)
                              .field("resultId",Types.STRING)
                              .field("spendTime",Types.INT)
                              .field("returnDataNum",Types.INT)
                              .field("startDate",Types.STRING)
                              .field("tableName",Types.STRING)
                              .field("tenantName",Types.STRING))
      .inAppendMode()
      .createTemporaryTable("MyTable")
    val hbaseDDL :String =
      """
        |Create table flink_log1 (
        |rowkey string,
        |cf ROW(f1 BIGINT,f2 BIGINT,f3 INT,f4 INT,f5 BIGINT,tenantName VARCHAR)
        |) with(
        |   'connector.type' = 'hbase',
        |   'connector.version' = '1.4.3',
        |   'connector.table-name' = 'flink_log1',
        |   'connector.zookeeper.quorum' = '....:2181,....:2181',
        |   'connector.zookeeper.znode.parent' = '/hbase',
        |   'connector.write.buffer-flush.max-size' = '10mb',
        |   'connector.write.buffer-flush.max-rows' = '1000'
        |)
      """.stripMargin
    tableEnv.sqlUpdate(hbaseDDL)
    val sql =
        "select CONCAT_WS('_',tenantName,tenantName) as tm_tenantName, " +
        "count(tenantName) as f1 ," +
        "count(case when resultId =  '2' then resultId else '' end) as f2 ,"+
        "avg(spendTime) as f3 ,"+
        "sum(returnDataNum) as f4 ,"+
        "count(case when resultId =  '1' then tenantName else '' end) as f5 ,"+
        "tenantName "+
        "from MyTable where substring(endDate,1,10)='2020-06-28' " +
        "group by CONCAT_WS('_',tenantName,tenantName),tenantName"
    val table: Table = tableEnv.sqlQuery(sql)
    tableEnv.createTemporaryView("tmp",table)
    tableEnv.sqlUpdate("insert into flink_log1 " +
      "select tm_tenantName,ROW(f1,f2,f3,f4,f5,tenantName) as cf from tmp ")
    streamEnv.execute("my insert hbase sql")


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/310237?spm=a2c6h.14164896.0.0.4a6263bfEP5Bvd


问题四:flink sql 使用自定义函数 返回嵌套行,查询报错 scala.MatchError,为什么?


目前无法通过t.* 将嵌套的字段查询出来。 val schema = tEnv.executeSql( """ | SELECT t.* FROM ( | SELECT EvtParser(request) as t FROM parsed_nginx_log | ) |""".stripMargin).getTableSchema

其中自定义函数 EvtParser 定义如下。

@DataTypeHint("ROW") def eval(line: String) = { ..... }

详细报错信息:

java.lang.RuntimeException: Error while applying rule PushProjectIntoTableSourceScanRule, args [rel#63:LogicalProject.NONE.any.None: 0.[NONE].[NONE](input=RelSubset#62,exprs=[EvtParser(3).evt,EvtParser(3).evt,EvtParser(3).evt, EvtParser(3).app, EvtParser(3).uid,EvtParser(3).uid,EvtParser(3).uid, EvtParser(3).ts, EvtParser(3).url,EvtParser(3).url,EvtParser(3).url, EvtParser(3).action]), rel#1:LogicalTableScan.NONE.any.None: 0.[NONE].[NONE](table=[default_catalog, default_database, parsed_nginx_log])]

at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:256) at org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:58) at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:510) at org.apache.calcite.tools.ProgramsRuleSetProgram.run(Programs.java:312)atorg.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)atorg.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgramRuleSetProgram.run(Programs.java:312)atorg.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)atorg.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgramRuleSetProgram.run(Programs.java:312) at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgramanonfunanonfunanonfunoptimize1.apply(FlinkChainedProgram.scala:62)atorg.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram1.apply(FlinkChainedProgram.scala:62)atorg.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram1.apply(FlinkChainedProgram.scala:62) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgramanonfunanonfunanonfunoptimize1.apply(FlinkChainedProgram.scala:58)atscala.collection.TraversableOnce1.apply(FlinkChainedProgram.scala:58)atscala.collection.TraversableOnce1.apply(FlinkChainedProgram.scala:58) at scala.collection.TraversableOnceanonfunanonfunanonfunfoldLeft1.apply(TraversableOnce.scala:157)atscala.collection.TraversableOnce1.apply(TraversableOnce.scala:157)atscala.collection.TraversableOnce1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnceanonfunanonfunanonfunfoldLeft1.apply(TraversableOnce.scala:157)atscala.collection.Iterator1.apply(TraversableOnce.scala:157)atscala.collection.Iterator1.apply(TraversableOnce.scala:157) at scala.collection.Iteratorclass.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLikeclass.foreach(IterableLike.scala:72)atscala.collection.AbstractIterable.foreach(Iterable.scala:54)atscala.collection.TraversableOnceclass.foreach(IterableLike.scala:72)atscala.collection.AbstractIterable.foreach(Iterable.scala:54)atscala.collection.TraversableOnceclass.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnceclass.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:287) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:160) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:707) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:1107) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666) at cn.bestlang.starry.evt.EvtEtlJobTest.testMain(EvtEtlJobTest.scala:51) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688) at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) at org.junit.jupiter.engine.execution.InvocationInterceptorChainValidatingInvocation.proceed(InvocationInterceptorChain.java:131)atorg.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)atorg.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)atorg.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)atorg.junit.jupiter.engine.execution.ExecutableInvokerValidatingInvocation.proceed(InvocationInterceptorChain.java:131)atorg.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)atorg.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)atorg.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)atorg.junit.jupiter.engine.execution.ExecutableInvokerValidatingInvocation.proceed(InvocationInterceptorChain.java:131) at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149) at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140) at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84) at org.junit.jupiter.engine.execution.ExecutableInvokerReflectiveInterceptorCall.lambdaofVoidMethodofVoidMethodofVoidMethod0(ExecutableInvoker.java:115) at org.junit.jupiter.engine.execution.ExecutableInvoker.lambdainvokeinvokeinvoke0(ExecutableInvoker.java:105) at org.junit.jupiter.engine.execution.InvocationInterceptorChainInterceptedInvocation.proceed(InvocationInterceptorChain.java:106)atorg.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)atorg.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)atorg.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)atorg.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)atorg.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)atorg.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambdaInterceptedInvocation.proceed(InvocationInterceptorChain.java:106)atorg.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)atorg.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)atorg.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)atorg.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)atorg.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)atorg.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambdaInterceptedInvocation.proceed(InvocationInterceptorChain.java:106) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104) at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambdainvokeTestMethod6(TestMethodTestDescriptor.java:210)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)atorg.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)atorg.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda6(TestMethodTestDescriptor.java:210)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)atorg.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)atorg.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda6(TestMethodTestDescriptor.java:210) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambdaexecuteRecursively6(NodeTestTask.java:151)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda6(NodeTestTask.java:151)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda6(NodeTestTask.java:151) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambdaexecuteRecursively8(NodeTestTask.java:141)atorg.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda8(NodeTestTask.java:141)atorg.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda8(NodeTestTask.java:141) at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambdaexecuteRecursively9(NodeTestTask.java:139)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)atjava.util.ArrayList.forEach(ArrayList.java:1257)atorg.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda9(NodeTestTask.java:139)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)atjava.util.ArrayList.forEach(ArrayList.java:1257)atorg.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda9(NodeTestTask.java:139) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) at java.util.ArrayList.forEach(ArrayList.java:1257) at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambdaexecuteRecursively6(NodeTestTask.java:155)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda6(NodeTestTask.java:155)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda6(NodeTestTask.java:155) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambdaexecuteRecursively8(NodeTestTask.java:141)atorg.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda8(NodeTestTask.java:141)atorg.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda8(NodeTestTask.java:141) at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambdaexecuteRecursively9(NodeTestTask.java:139)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)atjava.util.ArrayList.forEach(ArrayList.java:1257)atorg.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda9(NodeTestTask.java:139)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)atjava.util.ArrayList.forEach(ArrayList.java:1257)atorg.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda9(NodeTestTask.java:139) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) at java.util.ArrayList.forEach(ArrayList.java:1257) at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambdaexecuteRecursively6(NodeTestTask.java:155)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda6(NodeTestTask.java:155)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda6(NodeTestTask.java:155) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambdaexecuteRecursively8(NodeTestTask.java:141)atorg.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda8(NodeTestTask.java:141)atorg.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda8(NodeTestTask.java:141) at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambdaexecuteRecursively9(NodeTestTask.java:139)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)atorg.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)atorg.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)atorg.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)atorg.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)atorg.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)atorg.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda9(NodeTestTask.java:139)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)atorg.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)atorg.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)atorg.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)atorg.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)atorg.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)atorg.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda9(NodeTestTask.java:139) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35) at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57) at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54) at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107) at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88) at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambdaexecute0(EngineExecutionOrchestrator.java:54)atorg.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)atorg.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)atorg.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)atorg.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)atorg.junit.platform.launcher.core.DefaultLauncherSession0(EngineExecutionOrchestrator.java:54)atorg.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)atorg.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)atorg.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)atorg.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)atorg.junit.platform.launcher.core.DefaultLauncherSession0(EngineExecutionOrchestrator.java:54) at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67) at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52) at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114) at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86) at org.junit.platform.launcher.core.DefaultLauncherSessionDelegatingLauncher.execute(DefaultLauncherSession.java:86) at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53) at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:71) at com.intellij.rt.junit.IdeaTestRunnerRepeater.startRunnerWithArgs(IdeaTestRunner.java:33)atcom.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:221)atcom.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)Causedby:scala.MatchError:EvtParser(Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)atcom.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:221)atcom.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)Causedby:scala.MatchError:EvtParser(Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:221) at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54) Caused by: scala.MatchError: EvtParser(3) (of class org.apache.calcite.rex.RexCall) at org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.internalVisit$1(NestedProjectionUtil.scala:273)

class EvtEtlJobTest {

@Test def testMain(): Unit = { val settings = EnvironmentSettings.newInstance.useBlinkPlanner().build val tEnv = TableEnvironment.create(settings)

tEnv.executeSql( """ | CREATE TABLE parsed_nginx_log ( | remote_addr STRING, | remote_user STRING, | time_local BIGINT, | request STRING, | status STRING, | body_bytes_sent STRING, | http_referer STRING, | http_user_agent STRING, | http_x_forwarded_for STRING | ) WITH ( | 'connector' = 'filesystem', | 'path' = 'src/test/resources/nginx-parsed.log', | 'format' = 'json' | ) |""".stripMargin).getTableSchema

tEnv.executeSql( """ | CREATE TABLE evt_log_index ( | evt STRING, | app STRING, | uid STRING, | ts STRING, | url STRING, | action STRING | ) WITH ( | 'connector' = 'filesystem', | 'path' = '/tmp/evt_log_index', | 'format' = 'json' | ) |""".stripMargin)

// 注册函数 tEnv.createTemporarySystemFunction("EvtParser", classOf[EvtParser])

val schema = tEnv.executeSql( """ | SELECT t.* FROM ( | SELECT EvtParser(request) as t FROM parsed_nginx_log | ) |""".stripMargin).getTableSchema

println("++++++++", schema) }

}*来自志愿者整理的flink


参考回答:

我在1.13分支上验证了下你的case,发现能够跑通。建议cp下那个patch到自己的分支,再验证下。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/364670?spm=a2c6h.14164896.0.0.382263bfYd1g6h


问题五:flink 1.11 sql作业提交JM报错


我使用flink 1.11提交sql作业,从JM日志中看到有如下异常。我的作业里会通过tEnv.executeSQL执行多个ddl语句,通过tEnv.createStatementSet add多个dml语句,并执行execute。 如下异常可能原因是啥呢?还有个问题,这个异常虽然抛出来了,但是作业还是正常启动执行了。这又是为何?是不是不推荐在作业里同时使用executeSQL和statementset.execute?

Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot have more than one execute() or executeAsync() call in a single environment. at org.apache.flink.client.program.StreamContextEnvironment.validateAllowedExecution(StreamContextEnvironment.java:139) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:127) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:57) ~[flink-table-blink_2.12-1.11.0.jar:1.11.0] at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:699) ~[flink-table_2.12-1.11.0.jar:1.11.0] ... 24 more*来自志愿者整理的flink邮件归档


参考回答:

1.11 对 StreamTableEnvironment.execute()

和 StreamExecutionEnvironment.execute() 的执行方式有所调整,

简单概述为:

  1. StreamTableEnvironment.execute() 只能执行 sqlUpdate 和 insertInto 方法执行作业;
  2. Table 转化为 DataStream 后只能通过 StreamExecutionEnvironment.execute() 来执行作业;
  3. 新引入的 TableEnvironment.executeSql() 和 StatementSet.execute() 方法是直接执行sql作业

(异步提交作业),不需要再调用 StreamTableEnvironment.execute()

或 StreamExecutionEnvironment.execute()

详细可以参考 [1] [2]

对于 “No operators defined in streaming topology.”,如果使用

TableEnvironment.executeSql() 或者 StatementSet.execute() 方法提交的作业后再调用

StreamTableEnvironment.execute() 或 StreamExecutionEnvironment.execute()

提交作业,就会出现前面的错误。

对于

“是不是不推荐在作业里同时使用executeSQL和StatementSet.execute?”,这个答案是no。executeSql和StatementSet不会相互干扰。对于出现的错误,能给一个更详细的提交作业的流程描述吗?

[1]

https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/common.html#%E7%BF%BB%E8%AF%91%E4%B8%8E%E6%89%A7%E8%A1%8C%E6%9F%A5%E8%AF%A2

[2]

https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/common.html#%E5%B0%86%E8%A1%A8%E8%BD%AC%E6%8D%A2%E6%88%90-datastream-%E6%88%96-dataset


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372274?spm=a2c6h.14164896.0.0.382263bfYd1g6h


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
4月前
|
SQL 关系型数据库 MySQL
这样的SQL执行为什么不会报错?optimizer_trace深度历险
【10月更文挑战第12天】本文探讨了一条看似错误但实际上能成功执行的SQL语句,通过开启MySQL的优化器追踪功能,详细分析了SQL的执行过程,揭示了子查询被优化器解析为连接操作的原因,最终解释了为何该SQL不会报错。文章不仅增进了对SQL优化机制的理解,也展示了如何利用优化器追踪解决实际问题。
|
5月前
|
SQL 数据库
数据库数据恢复—SQL Server数据库报错“错误823”的数据恢复案例
SQL Server附加数据库出现错误823,附加数据库失败。数据库没有备份,无法通过备份恢复数据库。 SQL Server数据库出现823错误的可能原因有:数据库物理页面损坏、数据库物理页面校验值损坏导致无法识别该页面、断电或者文件系统问题导致页面丢失。
134 12
数据库数据恢复—SQL Server数据库报错“错误823”的数据恢复案例
|
26天前
|
SQL 大数据 数据处理
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是为应对传统数据处理框架中流批分离的问题而诞生的,它融合了SQL的简洁性和Flink的强大流批处理能力,降低了大数据处理门槛。其核心工作原理包括生成逻辑执行计划、查询优化和构建算子树,确保高效执行。Flink SQL 支持过滤、投影、聚合、连接和窗口等常用算子,实现了流批一体处理,极大提高了开发效率和代码复用性。通过统一的API和语法,Flink SQL 能够灵活应对实时和离线数据分析场景,为企业提供强大的数据处理能力。
158 26
|
5月前
|
SQL 数据库
SQL解析相关报错
SQL解析相关报错
63 5
|
5月前
|
SQL 存储 API
Flink实践:通过Flink SQL进行SFTP文件的读写操作
虽然 Apache Flink 与 SFTP 之间的直接交互存在一定的限制,但通过一些创造性的方法和技术,我们仍然可以有效地实现对 SFTP 文件的读写操作。这既展现了 Flink 在处理复杂数据场景中的强大能力,也体现了软件工程中常见的问题解决思路——即通过现有工具和一定的间接方法来克服技术障碍。通过这种方式,Flink SQL 成为了处理各种数据源,包括 SFTP 文件,在内的强大工具。
238 15
|
2月前
|
SQL 存储 缓存
Flink SQL Deduplication 去重以及如何获取最新状态操作
Flink SQL Deduplication 是一种高效的数据去重功能,支持多种数据类型和灵活的配置选项。它通过哈希表、时间窗口和状态管理等技术实现去重,适用于流处理和批处理场景。本文介绍了其特性、原理、实际案例及源码分析,帮助读者更好地理解和应用这一功能。
178 14
|
4月前
|
存储 Kubernetes 调度
Flink 批作业如何在 Master 节点出错重启后恢复执行进度?
本文由阿里云研发工程师李俊睿撰写,介绍了Flink 1.20版中新引入的批作业进度恢复功能。文章涵盖背景、解决思路、使用效果及启用方法。此前,若JobMaster故障,批作业需重头开始,造成进度丢失。新功能通过将JM状态持久化至外部存储并在故障后利用这些状态恢复作业进度,避免了这一问题。使用该功能需启用集群高可用并配置相关参数。
242 7
Flink 批作业如何在 Master 节点出错重启后恢复执行进度?
|
4月前
|
SQL 关系型数据库 MySQL
|
5月前
|
关系型数据库 MySQL Nacos
nacos启动报错 load derby-schema.sql error
这篇文章描述了作者在使用Nacos时遇到的启动错误,错误提示为加载derby-schema.sql失败,作者通过将数据库从Derby更换为MySQL解决了问题。
nacos启动报错 load derby-schema.sql error
|
5月前
|
关系型数据库 MySQL Java
flywa报错java.sql.SQLSyntaxErrorException: Unknown database ‘flyway‘
flywa报错java.sql.SQLSyntaxErrorException: Unknown database ‘flyway‘
53 1

相关产品

  • 实时计算 Flink版