开发者社区> 问答> 正文

Apache Flink - 启用连接排序

我注意到Apache Flink没有优化表连接的顺序。目前,它保留了用户指定的连接顺序(基本上,它按字面顺序进行查询)。我想Apache Calcite可以优化连接的顺序,但由于某些原因,这些规则在Apache Flink中没有使用。

例如,如果我们有两个表' R '和' S '

private val tableEnv: BatchTableEnvironment = TableEnvironment.getTableEnvironment(env)
private val fileNumber = 1
tableEnv.registerTableSource("R", getDataSourceR(fileNumber))
tableEnv.registerTableSource("S", getDataSourceS(fileNumber))
private val r = tableEnv.scan("R")
private val s = tableEnv.scan("S")
我们假设' S '是空的,我们希望以两种方式加入这些表:

val tableOne = r.as("x1, x2").join(r.as("x3, x4")).where("x2 === x3").select("x1, x4")

    .join(s.as("x5, x6")).where("x4 === x5 ").select("x1, x6")

val tableTwo = s.as("x1, x2").join(r.as("x3, x4")).where("x2 === x3").select("x1, x4")

      .join(r.as("x5, x6")).where("x4 === x5 ").select("x1, x6")

如果我们想要计算tableOne和tableTwo中的行数,则两种情况下的结果都将为零。问题是评估tableOne所需的时间比评估tableTwo要长得多。

有没有什么办法可以自动优化连接的执行顺序,甚至通过添加一些统计数据来启用可能的计划成本操作?如何添加这些统计数据?

可能有必要更改表环境CalciteConfig,但我不清楚如何做到这一点。

展开
收起
flink小助手 2018-12-13 14:49:53 2934 0
1 条回答
写回答
取消 提交回答
  • flink小助手会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关flink的问题及回答。

    未启用加入重新排序,因为Flink无法很好地处理统计信息。重新排序连接没有一些准确的基数估计基本上是赌博。因此,禁用连接重新排序,并按用户提供的顺序连接表。这给出了确定性和可控制的行为。

    但是,你可以通过一个经过优化的规则进入优化TableConfig与CalciteConfig创建时TableEnvironment,即TableEnvironment.getTableEnvironment(ENV,yourTableConfig)。在CalciteConfig您可以添加优化规则不同的优化阶段。你可能想添加JoinCommunteRule和JoinAssociateRule到逻辑优化阶段。您可能还需要深入研究代码以检查如何将统计信息传递到优化程序。

    2019-07-17 23:20:44
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Apache Flink技术进阶 立即下载
Apache Spark: Cloud and On-Prem 立即下载
Hybrid Cloud and Apache Spark 立即下载

相关镜像