Flink SQL 问题之重启报错如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink SQL报错通常指在使用Apache Flink的SQL接口执行数据处理任务时遇到的问题;本合集将收集常见的Flink SQL报错情况及其解决方法,帮助用户迅速恢复数据处理流程。

问题一:FlinkSQL新增operatoer后基于savepoint重启报错怎么办?


原先sql任务是: CREATE TABLE A_source(...) CREATE TABLE B_sink (...) INSERT INTO B_sink SELECT 1 FROM A_source ; 我基于这个FlinkSQL任务生成了savepoint后,我重新修改为

CREATE TABLE A_source(...) CREATE TABLE B_sink (...) CREATE TABLE C_source(...) CREATE TABLE D_sink (...) INSERT INTO B_sink SELECT 1 FROM A_source ;

INSERT INTO C_sink SELECT 1 FROM D_source ; 并基于Savepoint提交,结果显示

Cannot map checkpoint/savepoint state for operator 2e9c6b0c053878cef673bbe7d94ab037 to the new program, because the operator is not available in the new program. If you want to allow to skip this, you can set the --allowNonRestoredState option on the CLI.

想请教一下底层是因为什么原因导致了opertor匹配不上? *来自志愿者整理的flink邮件归档


参考回答:

可以查看org.apache.flink.runtime.checkpoint#loadAndValidateCheckpoint方法了解为什么加载失败了 对于这个问题,文档和代码中都提供了解决办法,就是使用 --allowNonRestoredState参数,具体使用方法参见文档 https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html

如有错误,欢迎补充回答。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/370775?spm=a2c6h.14164896.0.0.4a6263bfEP5Bvd


问题二:flink sql jdbc 只要含有group by 语句就会报错怎么办?


|INSERT INTO pvuv_sink |SELECT | user_id,item_id,category_id,count(1) as cnt |FROM user_log |group by user_id,item_id,category_id 以上是报错得sql 11:21:57,157 ERROR org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat - JDBC executeBatch error, retry times = 1 java.sql.SQLException: No operations allowed after statement closed.

正常sql,可以插入mysql |INSERT INTO pvuv_sink |SELECT | user_id,item_id,category_id,1

来自志愿者整理的flink邮件归档来自志愿者整理的FLINK邮件归档


参考回答:

看异常感觉是连接超时导致的,有一个相关的jira,https://issues.apache.org/jira/browse/FLINK-16681


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/366237?spm=a2c6h.14164896.0.0.4a6263bfEP5Bvd


问题三:FlinkSql 插入hbase group by 多个字段的时候发现报错


报错如下: **Exception in thread "main" org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has full primary keys if it is updated. ** 代码如下:

tableEnv.connect(new kafka()
. property(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"30.23.13.231:9092")
      .property("zookeeper.connect","....:2181"))
      .withFormat(new Json())
      .withSchema(new Schema().field("componentType",Types.STRING)
                              .field("endDate",Types.STRING)
                              .field("envName",Types.STRING)
                              .field("resultId",Types.STRING)
                              .field("spendTime",Types.INT)
                              .field("returnDataNum",Types.INT)
                              .field("startDate",Types.STRING)
                              .field("tableName",Types.STRING)
                              .field("tenantName",Types.STRING))
      .inAppendMode()
      .createTemporaryTable("MyTable")
    val hbaseDDL :String =
      """
        |Create table flink_log1 (
        |rowkey string,
        |cf ROW(f1 BIGINT,f2 BIGINT,f3 INT,f4 INT,f5 BIGINT,tenantName VARCHAR)
        |) with(
        |   'connector.type' = 'hbase',
        |   'connector.version' = '1.4.3',
        |   'connector.table-name' = 'flink_log1',
        |   'connector.zookeeper.quorum' = '....:2181,....:2181',
        |   'connector.zookeeper.znode.parent' = '/hbase',
        |   'connector.write.buffer-flush.max-size' = '10mb',
        |   'connector.write.buffer-flush.max-rows' = '1000'
        |)
      """.stripMargin
    tableEnv.sqlUpdate(hbaseDDL)
    val sql =
        "select CONCAT_WS('_',tenantName,tenantName) as tm_tenantName, " +
        "count(tenantName) as f1 ," +
        "count(case when resultId =  '2' then resultId else '' end) as f2 ,"+
        "avg(spendTime) as f3 ,"+
        "sum(returnDataNum) as f4 ,"+
        "count(case when resultId =  '1' then tenantName else '' end) as f5 ,"+
        "tenantName "+
        "from MyTable where substring(endDate,1,10)='2020-06-28' " +
        "group by CONCAT_WS('_',tenantName,tenantName),tenantName"
    val table: Table = tableEnv.sqlQuery(sql)
    tableEnv.createTemporaryView("tmp",table)
    tableEnv.sqlUpdate("insert into flink_log1 " +
      "select tm_tenantName,ROW(f1,f2,f3,f4,f5,tenantName) as cf from tmp ")
    streamEnv.execute("my insert hbase sql")


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/310237?spm=a2c6h.14164896.0.0.4a6263bfEP5Bvd


问题四:flink sql 使用自定义函数 返回嵌套行,查询报错 scala.MatchError,为什么?


目前无法通过t.* 将嵌套的字段查询出来。 val schema = tEnv.executeSql( """ | SELECT t.* FROM ( | SELECT EvtParser(request) as t FROM parsed_nginx_log | ) |""".stripMargin).getTableSchema

其中自定义函数 EvtParser 定义如下。

@DataTypeHint("ROW") def eval(line: String) = { ..... }

详细报错信息:

java.lang.RuntimeException: Error while applying rule PushProjectIntoTableSourceScanRule, args [rel#63:LogicalProject.NONE.any.None: 0.[NONE].[NONE](input=RelSubset#62,exprs=[EvtParser(3).evt,EvtParser(3).evt,EvtParser(3).evt, EvtParser(3).app, EvtParser(3).uid,EvtParser(3).uid,EvtParser(3).uid, EvtParser(3).ts, EvtParser(3).url,EvtParser(3).url,EvtParser(3).url, EvtParser(3).action]), rel#1:LogicalTableScan.NONE.any.None: 0.[NONE].[NONE](table=[default_catalog, default_database, parsed_nginx_log])]

at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:256) at org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:58) at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:510) at org.apache.calcite.tools.ProgramsRuleSetProgram.run(Programs.java:312)atorg.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)atorg.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgramRuleSetProgram.run(Programs.java:312)atorg.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)atorg.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgramRuleSetProgram.run(Programs.java:312) at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgramanonfunanonfunanonfunoptimize1.apply(FlinkChainedProgram.scala:62)atorg.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram1.apply(FlinkChainedProgram.scala:62)atorg.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram1.apply(FlinkChainedProgram.scala:62) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgramanonfunanonfunanonfunoptimize1.apply(FlinkChainedProgram.scala:58)atscala.collection.TraversableOnce1.apply(FlinkChainedProgram.scala:58)atscala.collection.TraversableOnce1.apply(FlinkChainedProgram.scala:58) at scala.collection.TraversableOnceanonfunanonfunanonfunfoldLeft1.apply(TraversableOnce.scala:157)atscala.collection.TraversableOnce1.apply(TraversableOnce.scala:157)atscala.collection.TraversableOnce1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnceanonfunanonfunanonfunfoldLeft1.apply(TraversableOnce.scala:157)atscala.collection.Iterator1.apply(TraversableOnce.scala:157)atscala.collection.Iterator1.apply(TraversableOnce.scala:157) at scala.collection.Iteratorclass.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLikeclass.foreach(IterableLike.scala:72)atscala.collection.AbstractIterable.foreach(Iterable.scala:54)atscala.collection.TraversableOnceclass.foreach(IterableLike.scala:72)atscala.collection.AbstractIterable.foreach(Iterable.scala:54)atscala.collection.TraversableOnceclass.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnceclass.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:287) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:160) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:707) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:1107) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666) at cn.bestlang.starry.evt.EvtEtlJobTest.testMain(EvtEtlJobTest.scala:51) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688) at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) at org.junit.jupiter.engine.execution.InvocationInterceptorChainValidatingInvocation.proceed(InvocationInterceptorChain.java:131)atorg.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)atorg.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)atorg.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)atorg.junit.jupiter.engine.execution.ExecutableInvokerValidatingInvocation.proceed(InvocationInterceptorChain.java:131)atorg.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)atorg.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)atorg.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)atorg.junit.jupiter.engine.execution.ExecutableInvokerValidatingInvocation.proceed(InvocationInterceptorChain.java:131) at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149) at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140) at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84) at org.junit.jupiter.engine.execution.ExecutableInvokerReflectiveInterceptorCall.lambdaofVoidMethodofVoidMethodofVoidMethod0(ExecutableInvoker.java:115) at org.junit.jupiter.engine.execution.ExecutableInvoker.lambdainvokeinvokeinvoke0(ExecutableInvoker.java:105) at org.junit.jupiter.engine.execution.InvocationInterceptorChainInterceptedInvocation.proceed(InvocationInterceptorChain.java:106)atorg.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)atorg.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)atorg.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)atorg.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)atorg.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)atorg.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambdaInterceptedInvocation.proceed(InvocationInterceptorChain.java:106)atorg.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)atorg.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)atorg.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)atorg.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)atorg.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)atorg.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambdaInterceptedInvocation.proceed(InvocationInterceptorChain.java:106) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104) at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambdainvokeTestMethod6(TestMethodTestDescriptor.java:210)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)atorg.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)atorg.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda6(TestMethodTestDescriptor.java:210)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)atorg.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)atorg.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda6(TestMethodTestDescriptor.java:210) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambdaexecuteRecursively6(NodeTestTask.java:151)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda6(NodeTestTask.java:151)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda6(NodeTestTask.java:151) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambdaexecuteRecursively8(NodeTestTask.java:141)atorg.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda8(NodeTestTask.java:141)atorg.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda8(NodeTestTask.java:141) at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambdaexecuteRecursively9(NodeTestTask.java:139)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)atjava.util.ArrayList.forEach(ArrayList.java:1257)atorg.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda9(NodeTestTask.java:139)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)atjava.util.ArrayList.forEach(ArrayList.java:1257)atorg.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda9(NodeTestTask.java:139) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) at java.util.ArrayList.forEach(ArrayList.java:1257) at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambdaexecuteRecursively6(NodeTestTask.java:155)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda6(NodeTestTask.java:155)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda6(NodeTestTask.java:155) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambdaexecuteRecursively8(NodeTestTask.java:141)atorg.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda8(NodeTestTask.java:141)atorg.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda8(NodeTestTask.java:141) at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambdaexecuteRecursively9(NodeTestTask.java:139)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)atjava.util.ArrayList.forEach(ArrayList.java:1257)atorg.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda9(NodeTestTask.java:139)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)atjava.util.ArrayList.forEach(ArrayList.java:1257)atorg.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda9(NodeTestTask.java:139) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) at java.util.ArrayList.forEach(ArrayList.java:1257) at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambdaexecuteRecursively6(NodeTestTask.java:155)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda6(NodeTestTask.java:155)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda6(NodeTestTask.java:155) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambdaexecuteRecursively8(NodeTestTask.java:141)atorg.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda8(NodeTestTask.java:141)atorg.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.lambda8(NodeTestTask.java:141) at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambdaexecuteRecursively9(NodeTestTask.java:139)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)atorg.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)atorg.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)atorg.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)atorg.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)atorg.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)atorg.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda9(NodeTestTask.java:139)atorg.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)atorg.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)atorg.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)atorg.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)atorg.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)atorg.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)atorg.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)atorg.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda9(NodeTestTask.java:139) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35) at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57) at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54) at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107) at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88) at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambdaexecute0(EngineExecutionOrchestrator.java:54)atorg.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)atorg.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)atorg.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)atorg.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)atorg.junit.platform.launcher.core.DefaultLauncherSession0(EngineExecutionOrchestrator.java:54)atorg.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)atorg.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)atorg.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)atorg.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)atorg.junit.platform.launcher.core.DefaultLauncherSession0(EngineExecutionOrchestrator.java:54) at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67) at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52) at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114) at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86) at org.junit.platform.launcher.core.DefaultLauncherSessionDelegatingLauncher.execute(DefaultLauncherSession.java:86) at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53) at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:71) at com.intellij.rt.junit.IdeaTestRunnerRepeater.startRunnerWithArgs(IdeaTestRunner.java:33)atcom.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:221)atcom.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)Causedby:scala.MatchError:EvtParser(Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)atcom.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:221)atcom.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)Causedby:scala.MatchError:EvtParser(Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:221) at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54) Caused by: scala.MatchError: EvtParser(3) (of class org.apache.calcite.rex.RexCall) at org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.internalVisit$1(NestedProjectionUtil.scala:273)

class EvtEtlJobTest {

@Test def testMain(): Unit = { val settings = EnvironmentSettings.newInstance.useBlinkPlanner().build val tEnv = TableEnvironment.create(settings)

tEnv.executeSql( """ | CREATE TABLE parsed_nginx_log ( | remote_addr STRING, | remote_user STRING, | time_local BIGINT, | request STRING, | status STRING, | body_bytes_sent STRING, | http_referer STRING, | http_user_agent STRING, | http_x_forwarded_for STRING | ) WITH ( | 'connector' = 'filesystem', | 'path' = 'src/test/resources/nginx-parsed.log', | 'format' = 'json' | ) |""".stripMargin).getTableSchema

tEnv.executeSql( """ | CREATE TABLE evt_log_index ( | evt STRING, | app STRING, | uid STRING, | ts STRING, | url STRING, | action STRING | ) WITH ( | 'connector' = 'filesystem', | 'path' = '/tmp/evt_log_index', | 'format' = 'json' | ) |""".stripMargin)

// 注册函数 tEnv.createTemporarySystemFunction("EvtParser", classOf[EvtParser])

val schema = tEnv.executeSql( """ | SELECT t.* FROM ( | SELECT EvtParser(request) as t FROM parsed_nginx_log | ) |""".stripMargin).getTableSchema

println("++++++++", schema) }

}*来自志愿者整理的flink


参考回答:

我在1.13分支上验证了下你的case,发现能够跑通。建议cp下那个patch到自己的分支,再验证下。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/364670?spm=a2c6h.14164896.0.0.382263bfYd1g6h


问题五:flink 1.11 sql作业提交JM报错


我使用flink 1.11提交sql作业,从JM日志中看到有如下异常。我的作业里会通过tEnv.executeSQL执行多个ddl语句,通过tEnv.createStatementSet add多个dml语句,并执行execute。 如下异常可能原因是啥呢?还有个问题,这个异常虽然抛出来了,但是作业还是正常启动执行了。这又是为何?是不是不推荐在作业里同时使用executeSQL和statementset.execute?

Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot have more than one execute() or executeAsync() call in a single environment. at org.apache.flink.client.program.StreamContextEnvironment.validateAllowedExecution(StreamContextEnvironment.java:139) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:127) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:57) ~[flink-table-blink_2.12-1.11.0.jar:1.11.0] at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:699) ~[flink-table_2.12-1.11.0.jar:1.11.0] ... 24 more*来自志愿者整理的flink邮件归档


参考回答:

1.11 对 StreamTableEnvironment.execute()

和 StreamExecutionEnvironment.execute() 的执行方式有所调整,

简单概述为:

  1. StreamTableEnvironment.execute() 只能执行 sqlUpdate 和 insertInto 方法执行作业;
  2. Table 转化为 DataStream 后只能通过 StreamExecutionEnvironment.execute() 来执行作业;
  3. 新引入的 TableEnvironment.executeSql() 和 StatementSet.execute() 方法是直接执行sql作业

(异步提交作业),不需要再调用 StreamTableEnvironment.execute()

或 StreamExecutionEnvironment.execute()

详细可以参考 [1] [2]

对于 “No operators defined in streaming topology.”,如果使用

TableEnvironment.executeSql() 或者 StatementSet.execute() 方法提交的作业后再调用

StreamTableEnvironment.execute() 或 StreamExecutionEnvironment.execute()

提交作业,就会出现前面的错误。

对于

“是不是不推荐在作业里同时使用executeSQL和StatementSet.execute?”,这个答案是no。executeSql和StatementSet不会相互干扰。对于出现的错误,能给一个更详细的提交作业的流程描述吗?

[1]

https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/common.html#%E7%BF%BB%E8%AF%91%E4%B8%8E%E6%89%A7%E8%A1%8C%E6%9F%A5%E8%AF%A2

[2]

https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/common.html#%E5%B0%86%E8%A1%A8%E8%BD%AC%E6%8D%A2%E6%88%90-datastream-%E6%88%96-dataset


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372274?spm=a2c6h.14164896.0.0.382263bfYd1g6h


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
1月前
|
SQL 关系型数据库 MySQL
这样的SQL执行为什么不会报错?optimizer_trace深度历险
【10月更文挑战第12天】本文探讨了一条看似错误但实际上能成功执行的SQL语句,通过开启MySQL的优化器追踪功能,详细分析了SQL的执行过程,揭示了子查询被优化器解析为连接操作的原因,最终解释了为何该SQL不会报错。文章不仅增进了对SQL优化机制的理解,也展示了如何利用优化器追踪解决实际问题。
|
2月前
|
SQL 数据库
数据库数据恢复—SQL Server数据库报错“错误823”的数据恢复案例
SQL Server附加数据库出现错误823,附加数据库失败。数据库没有备份,无法通过备份恢复数据库。 SQL Server数据库出现823错误的可能原因有:数据库物理页面损坏、数据库物理页面校验值损坏导致无法识别该页面、断电或者文件系统问题导致页面丢失。
103 12
数据库数据恢复—SQL Server数据库报错“错误823”的数据恢复案例
|
2月前
|
SQL 数据库
SQL解析相关报错
SQL解析相关报错
46 5
|
2月前
|
SQL 存储 API
Flink实践:通过Flink SQL进行SFTP文件的读写操作
虽然 Apache Flink 与 SFTP 之间的直接交互存在一定的限制,但通过一些创造性的方法和技术,我们仍然可以有效地实现对 SFTP 文件的读写操作。这既展现了 Flink 在处理复杂数据场景中的强大能力,也体现了软件工程中常见的问题解决思路——即通过现有工具和一定的间接方法来克服技术障碍。通过这种方式,Flink SQL 成为了处理各种数据源,包括 SFTP 文件,在内的强大工具。
167 15
|
1月前
|
SQL 关系型数据库 MySQL
|
1月前
|
存储 Kubernetes 调度
Flink 批作业如何在 Master 节点出错重启后恢复执行进度?
本文由阿里云研发工程师李俊睿撰写,介绍了Flink 1.20版中新引入的批作业进度恢复功能。文章涵盖背景、解决思路、使用效果及启用方法。此前,若JobMaster故障,批作业需重头开始,造成进度丢失。新功能通过将JM状态持久化至外部存储并在故障后利用这些状态恢复作业进度,避免了这一问题。使用该功能需启用集群高可用并配置相关参数。
178 0
Flink 批作业如何在 Master 节点出错重启后恢复执行进度?
|
2月前
|
关系型数据库 MySQL Nacos
nacos启动报错 load derby-schema.sql error
这篇文章描述了作者在使用Nacos时遇到的启动错误,错误提示为加载derby-schema.sql失败,作者通过将数据库从Derby更换为MySQL解决了问题。
nacos启动报错 load derby-schema.sql error
|
2月前
|
关系型数据库 MySQL Java
flywa报错java.sql.SQLSyntaxErrorException: Unknown database ‘flyway‘
flywa报错java.sql.SQLSyntaxErrorException: Unknown database ‘flyway‘
35 1
|
1月前
|
SQL 大数据 API
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
46 0
|
2月前
|
SQL 大数据 数据处理
奇迹降临!解锁 Flink SQL 简单高效的终极秘籍,开启数据处理的传奇之旅!
【9月更文挑战第7天】在大数据处理领域,Flink SQL 因其强大功能与简洁语法成为开发者首选。本文分享了编写高效 Flink SQL 的实用技巧:理解数据特征及业务需求;灵活运用窗口函数(如 TUMBLE 和 HOP);优化连接操作,优先采用等值连接;合理选择数据类型以减少计算资源消耗。结合实际案例(如实时电商数据分析),并通过定期性能测试与调优,助力开发者在大数据处理中更得心应手,挖掘更多价值信息。
46 1

相关产品

  • 实时计算 Flink版
  • 下一篇
    无影云桌面