开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

我在执行一个很大的sql时,报Only up to 63 inputs are supported

我在执行一个很大的sql时,报Only up to 63 inputs are supported at once, while encountered 67,这是个问题是否可以通过修改参数的方式解决,哪位老师可以指点一下,非常感谢,详细错误信息如下所示:

res45: String = wagemonth,wagetype,managecom,agentcode,branchtype,agentgrade,gradelevel,gradeseries,businessmodel,basetype,employdate,agentstate,outworkdate,basesalary,branchattr,agentgroup,citytype,operator,makedate,maketime,modifydate,modifytime,d01,d02,d03,d04,d05,s01,s02,s03,s04,s05,n001,n002,n003,n004,n005,n006,n007,n008,n009,n010,n011,n012,n013,n014,n015,n016,n017,n018,n019,n020,n021,n022,n023,n024,n025,n026,n027,n028,n029,n030,n031,n032,n033,n034,n035,n036,n037,n038,n039,n040,n041,n042,n043,n044,n045,n046,n047,n048,n049,n050,n051,n052,n053,n054,n055,n056,n057,n058,n059,n060... java.lang.IllegalArgumentException: Only up to 63 inputs are supported at once, while encountered 67 at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160) at org.apache.flink.streaming.runtime.io.MultipleInputSelectionHandler.checkSupportedInputCount(MultipleInputSelectionHandler.java:79) at org.apache.flink.streaming.runtime.translators.MultiInputTransformationTranslator.translateInternal(MultiInputTransformationTranslator.java:96) at org.apache.flink.streaming.runtime.translators.MultiInputTransformationTranslator.translateForBatchInternal(MultiInputTransformationTranslator.java:56) at org.apache.flink.streaming.runtime.translators.MultiInputTransformationTranslator.translateForBatchInternal(MultiInputTransformationTranslator.java:49) at org.apache.flink.streaming.api.graph.SimpleTransformationTranslator.translateForBatch(SimpleTransformationTranslator.java:49) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:831) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:562) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.getParentInputIds(StreamGraphGenerator.java:853) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:811) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:562) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.getParentInputIds(StreamGraphGenerator.java:853) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:811) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:562) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.getParentInputIds(StreamGraphGenerator.java:853) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:811) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:562) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.getParentInputIds(StreamGraphGenerator.java:853) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:811) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:562) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.getParentInputIds(StreamGraphGenerator.java:853) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:811) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:562) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:319) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.generateStreamGraph(StreamExecutionEnvironment.java:2267) at org.apache.flink.table.planner.delegation.DefaultExecutor.createPipeline(DefaultExecutor.java:83) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:847) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:827) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:918) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730) at executeSql(:79) ... 80 elided

展开
收起
十一0204 2023-04-11 09:04:03 345 0
2 条回答
写回答
取消 提交回答
  • 公众号:网络技术联盟站,InfoQ签约作者,阿里云社区签约作者,华为云 云享专家,BOSS直聘 创作王者,腾讯课堂创作领航员,博客+论坛:https://www.wljslmz.cn,工程师导航:https://www.wljslmz.com

    在 Flink 中,一个 SQL 语句的输入源数量是有限制的。具体来说,Flink 1.13 版本之前,一个 SQL 语句的输入源最多只能有 63 个,如果超过这个数量就会报错。

    对于这个问题,您可以通过修改 Flink 的配置参数来解决:

    1. 在 Flink 的配置文件中,添加以下参数:

      table.exec.spill-compression.enabled: true
      table.exec.spill-compression.block-size: 128MB
      

      这些参数可以开启 Flink 的 SpillableTableFactory,从而允许 Flink 处理更多的输入源。

    2. 如果还是需要处理更多的输入源,可以调整 Flink 的 JVM 参数,例如:

      -XX:MaxDirectMemorySize=8g
      -XX:MaxMetaspaceSize=1g
      -XX:MaxHeapSize=16g
      

      这些参数可以增加 Flink 的内存限制,从而允许 Flink 处理更多的输入源。

    2023-04-26 22:41:26
    赞同 展开评论 打赏
  • 坚持这件事孤独又漫长。
    • 这个问题是由于 Flink 中的一个默认参数限制导致的,可以通过修改参数的方式来解决。具体来说,需要修改参数 flink.optimizer.join.reorder-input-sets.max-size,将其值调大即可。该参数指定了在哪个阶段,Flink 算子将尝试重新组合连接输入,并将其限制在最大输入数量。默认值为 63,因此报错信息中提示了 "Only up to 63 inputs are supported at once"。你可以尝试将该参数值调整为一个较大的整数。

    • 例如,你可以通过以下方式将该参数的值修改为 100

    set flink.optimizer.join.reorder-input-sets.max-size=100;

    
    - 在修改完参数后,重新执行原来的命令即可。需要注意的是,修改参数可能会影响程序的性能,因此需要根据实际情况进行权衡。
    
    
    2023-04-13 16:19:53
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

热门讨论

热门文章

相关电子书

更多
SQL Server 2017 立即下载
GeoMesa on Spark SQL 立即下载
原生SQL on Hadoop引擎- Apache HAWQ 2.x最新技术解密malili 立即下载