我在执行一个很大的sql时,报Only up to 63 inputs are supported at once, while encountered 67,这是个问题是否可以通过修改参数的方式解决,哪位老师可以指点一下,非常感谢,详细错误信息如下所示:
res45: String = wagemonth
,wagetype
,managecom
,agentcode
,branchtype
,agentgrade
,gradelevel
,gradeseries
,businessmodel
,basetype
,employdate
,agentstate
,outworkdate
,basesalary
,branchattr
,agentgroup
,citytype
,operator
,makedate
,maketime
,modifydate
,modifytime
,d01
,d02
,d03
,d04
,d05
,s01
,s02
,s03
,s04
,s05
,n001
,n002
,n003
,n004
,n005
,n006
,n007
,n008
,n009
,n010
,n011
,n012
,n013
,n014
,n015
,n016
,n017
,n018
,n019
,n020
,n021
,n022
,n023
,n024
,n025
,n026
,n027
,n028
,n029
,n030
,n031
,n032
,n033
,n034
,n035
,n036
,n037
,n038
,n039
,n040
,n041
,n042
,n043
,n044
,n045
,n046
,n047
,n048
,n049
,n050
,n051
,n052
,n053
,n054
,n055
,n056
,n057
,n058
,n059
,n060
... java.lang.IllegalArgumentException: Only up to 63 inputs are supported at once, while encountered 67 at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160) at org.apache.flink.streaming.runtime.io.MultipleInputSelectionHandler.checkSupportedInputCount(MultipleInputSelectionHandler.java:79) at org.apache.flink.streaming.runtime.translators.MultiInputTransformationTranslator.translateInternal(MultiInputTransformationTranslator.java:96) at org.apache.flink.streaming.runtime.translators.MultiInputTransformationTranslator.translateForBatchInternal(MultiInputTransformationTranslator.java:56) at org.apache.flink.streaming.runtime.translators.MultiInputTransformationTranslator.translateForBatchInternal(MultiInputTransformationTranslator.java:49) at org.apache.flink.streaming.api.graph.SimpleTransformationTranslator.translateForBatch(SimpleTransformationTranslator.java:49) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:831) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:562) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.getParentInputIds(StreamGraphGenerator.java:853) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:811) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:562) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.getParentInputIds(StreamGraphGenerator.java:853) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:811) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:562) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.getParentInputIds(StreamGraphGenerator.java:853) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:811) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:562) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.getParentInputIds(StreamGraphGenerator.java:853) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:811) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:562) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.getParentInputIds(StreamGraphGenerator.java:853) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:811) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:562) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:319) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.generateStreamGraph(StreamExecutionEnvironment.java:2267) at org.apache.flink.table.planner.delegation.DefaultExecutor.createPipeline(DefaultExecutor.java:83) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:847) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:827) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:918) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730) at executeSql(:79) ... 80 elided
在 Flink 中,一个 SQL 语句的输入源数量是有限制的。具体来说,Flink 1.13 版本之前,一个 SQL 语句的输入源最多只能有 63 个,如果超过这个数量就会报错。
对于这个问题,您可以通过修改 Flink 的配置参数来解决:
在 Flink 的配置文件中,添加以下参数:
table.exec.spill-compression.enabled: true
table.exec.spill-compression.block-size: 128MB
这些参数可以开启 Flink 的 SpillableTableFactory,从而允许 Flink 处理更多的输入源。
如果还是需要处理更多的输入源,可以调整 Flink 的 JVM 参数,例如:
-XX:MaxDirectMemorySize=8g
-XX:MaxMetaspaceSize=1g
-XX:MaxHeapSize=16g
这些参数可以增加 Flink 的内存限制,从而允许 Flink 处理更多的输入源。
这个问题是由于 Flink 中的一个默认参数限制导致的,可以通过修改参数的方式来解决。具体来说,需要修改参数 flink.optimizer.join.reorder-input-sets.max-size
,将其值调大即可。该参数指定了在哪个阶段,Flink 算子将尝试重新组合连接输入,并将其限制在最大输入数量。默认值为 63,因此报错信息中提示了 "Only up to 63 inputs are supported at once"。你可以尝试将该参数值调整为一个较大的整数。
例如,你可以通过以下方式将该参数的值修改为 100
:
set flink.optimizer.join.reorder-input-sets.max-size=100;
- 在修改完参数后,重新执行原来的命令即可。需要注意的是,修改参数可能会影响程序的性能,因此需要根据实际情况进行权衡。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。