开发者社区> 问答> 正文

flink sql 遇到开启MiniBatch后,程序报错

flink版本1.15

在程序中写了,一个子查询进行聚合统计,然后在这个子查询外面再套了一层聚合统计。这样程序就报错了,报错信息如下:
java.lang.RuntimeException: Could not instantiate generated class 'GroupAggsHandler$537'
at org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:74) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.apache.flink.table.runtime.operators.aggregate.MiniBatchLocalGroupAggFunction.open(MiniBatchLocalGroupAggFunction.java:59) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.apache.flink.table.runtime.operators.bundle.AbstractMapBundleOperator.open(AbstractMapBundleOperator.java:82) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107) ~[flink-dist-1.15.0.jar:1.15.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700) ~[flink-dist-1.15.0.jar:1.15.0]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) ~[flink-dist-1.15.0.jar:1.15.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676) ~[flink-dist-1.15.0.jar:1.15.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643) ~[flink-dist-1.15.0.jar:1.15.0]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) ~[flink-dist-1.15.0.jar:1.15.0]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917) ~[flink-dist-1.15.0.jar:1.15.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) ~[flink-dist-1.15.0.jar:1.15.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) ~[flink-dist-1.15.0.jar:1.15.0]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_181]
Caused by: org.apache.flink.util.FlinkRuntimeException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:101) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:68) ~[flink-table-runtime-1.15.0.jar:1.15.0]
... 12 more
Caused by: org.apache.flink.shaded.guava30.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2051) ~[atp-vat-flink-data-1.0-SNAPSHOT.jar:?]
at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache.get(LocalCache.java:3962) ~[atp-vat-flink-data-1.0-SNAPSHOT.jar:?]
at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4859) ~[atp-vat-flink-data-1.0-SNAPSHOT.jar:?]
at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:92) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:101) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:68) ~[flink-table-runtime-1.15.0.jar:1.15.0]
... 12 more
Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:107) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$0(CompileUtils.java:92) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4864) ~[atp-vat-flink-data-1.0-SNAPSHOT.jar:?]
at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529) ~[atp-vat-flink-data-1.0-SNAPSHOT.jar:?]
at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278) ~[atp-vat-flink-data-1.0-SNAPSHOT.jar:?]
at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155) ~[atp-vat-flink-data-1.0-SNAPSHOT.jar:?]
at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045) ~[atp-vat-flink-data-1.0-SNAPSHOT.jar:?]
at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache.get(LocalCache.java:3962) ~[atp-vat-flink-data-1.0-SNAPSHOT.jar:?]
at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4859) ~[atp-vat-flink-data-1.0-SNAPSHOT.jar:?]
at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:92) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:101) ~[flink-table-runtime-1.15.0.jar:1.15.0]
at org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:68) ~[flink-table-runtime-1.15.0.jar:1.15.0]
... 12 more
Caused by: org.codehaus.commons.compiler.CompileException: Line 12, Column 28: Cannot determine simple type name "org"

这种情况是在开启minibatch的情况下会报错,如果不加minibathc那么程序能正常执行,我在本地IDEA中都能成功,但是在集群上运行程序就会报错。请问这是什么问题呢?

展开
收起
游客fuzojzpl5x2bu 2024-05-10 16:20:17 18 0
1 条回答
写回答
取消 提交回答
  • 这个错误信息表明在编译生成的Flink SQL执行逻辑时遇到了问题,具体是无法确定类型"org"的简单类型名称。这可能是由于子查询的复杂性或者与MiniBatch模式的兼容性问题导致的。在Flink 1.15版本中,MiniBatch模式是一种优化,用于减少状态更新的频率,提高性能,但它可能对某些查询结构有特定的要求。

    根据错误信息,问题可能出在以下几点:

    1. 类型问题:在SQL查询中,可能有一个类型被错误地标识为"org",这可能是一个拼写错误或者引用了不完整的类型名称。检查所有涉及的类型,确保它们是有效的Flink或Java类型。
    2. 子查询复杂性:Flink可能在处理嵌套的子查询时遇到了困难,尤其是在MiniBatch模式下。尝试简化查询,或者将复杂的子查询拆分为多个阶段的处理。
    3. 编译错误:在IDEA中能成功可能是因为IDEA的编译器或Flink的本地运行模式与集群环境中的编译或执行环境不同。集群环境可能使用了更严格的编译规则或配置。
    4. Flink版本问题:虽然你使用的是1.15版本,但确保集群上的Flink版本与开发环境一致,不一致的版本可能导致兼容性问题。
    5. MiniBatch配置:检查MiniBatch的相关配置,如state.backend.finite-state-operator.mini-batch.enabledstate.backend.finite-state-operator.mini-batch.interval,确保它们是正确的,并且与你的查询兼容。

    建议尝试以下步骤来解决问题:

    • 简化查询:尝试将复杂的子查询拆分成多个简单的部分,或者避免在MiniBatch模式下使用子查询。
    • 禁用MiniBatch:先禁用MiniBatch模式,看是否可以正常运行。如果可以,再逐步调试启用MiniBatch的问题。
    • 更新或回退版本:确保集群和开发环境的Flink版本一致,或者尝试回退到已知稳定的版本。
    • 查阅官方文档:查阅Flink的最新文档,了解关于MiniBatch模式的限制和注意事项。
    • 提交问题:如果上述方法都无法解决问题,考虑在Flink的官方社区或GitHub上提交问题,提供详细的查询语句和配置,以便开发者协助排查。

    记得在调整查询或配置时,始终在与生产环境相同或尽可能接近的环境中进行测试。

    2024-05-10 17:36:38
    赞同 2 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载