我注意到Apache Flink没有优化表连接的顺序。目前,它保留了用户指定的连接顺序(基本上,它按字面顺序进行查询)。我想Apache Calcite可以优化连接的顺序,但由于某些原因,这些规则在Apache Flink中没有使用。
例如,如果我们有两个表' R '和' S '
private val tableEnv: BatchTableEnvironment = TableEnvironment.getTableEnvironment(env)
private val fileNumber = 1
tableEnv.registerTableSource("R", getDataSourceR(fileNumber))
tableEnv.registerTableSource("S", getDataSourceS(fileNumber))
private val r = tableEnv.scan("R")
private val s = tableEnv.scan("S")
我们假设' S '是空的,我们希望以两种方式加入这些表:
val tableOne = r.as("x1, x2").join(r.as("x3, x4")).where("x2 === x3").select("x1, x4")
.join(s.as("x5, x6")).where("x4 === x5 ").select("x1, x6")
val tableTwo = s.as("x1, x2").join(r.as("x3, x4")).where("x2 === x3").select("x1, x4")
.join(r.as("x5, x6")).where("x4 === x5 ").select("x1, x6")
如果我们想要计算tableOne和tableTwo中的行数,则两种情况下的结果都将为零。问题是评估tableOne所需的时间比评估tableTwo要长得多。
有没有什么办法可以自动优化连接的执行顺序,甚至通过添加一些统计数据来启用可能的计划成本操作?如何添加这些统计数据?
可能有必要更改表环境CalciteConfig,但我不清楚如何做到这一点。
未启用加入重新排序,因为Flink无法很好地处理统计信息。重新排序连接没有一些准确的基数估计基本上是赌博。因此,禁用连接重新排序,并按用户提供的顺序连接表。这给出了确定性和可控制的行为。
但是,你可以通过一个经过优化的规则进入优化TableConfig与CalciteConfig创建时TableEnvironment,即TableEnvironment.getTableEnvironment(ENV,yourTableConfig)。在CalciteConfig您可以添加优化规则不同的优化阶段。你可能想添加JoinCommunteRule和JoinAssociateRule到逻辑优化阶段。您可能还需要深入研究代码以检查如何将统计信息传递到优化程序。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。