Flink SQL代码补全提示(源码分析)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
简介: 使用过Navicat的童鞋都知道,当我们写SQL的时候,工具会根据我们输入的内容弹出提示,这样可以很方便我们去写SQL

01 引言


Flink 源码地址: https://github.com/apache/flink


使用过Navicat的童鞋都知道,当我们写SQL的时候,工具会根据我们输入的内容弹出提示,这样可以很方便我们去写SQL,如下:


4db1da6379b643caaa36ff1f5e9ed35d.png


Flink也是支持SQL的,当然它也有对应的接口支持SQL提示,本文来讲讲。


02 案例


2.1 源码案例


Flink源码里面已经有代码提示的demo了,具体在CliClientTest.java(点击即可打开)

,具体在如下方法里实现了:


b5f5dcad307e4c70897e8c6a592857da.png


具体的入参为


参数名 含义
statement 当前输入的SQL
position SQL末端光标的位置


返回的内容为可能的SQL提示集合


2.2 举例


比如目前输入的SQL为:


select * fr


入参的内容为:


参数名
statement “select * fr”
position 11


返回的结果为集合


FROM


接下来分析下其源码。


03 源码分析


从上面的例子,我们可以看到,获取提示的方法如下:

@Override
   public List<String> completeStatement(String sessionId, String statement, int position) {
       receivedStatement = statement;
       receivedPosition = position;
       return Arrays.asList(helper.getSqlParser().getCompletionHints(statement, position));
   }
1
2
3
4
5
6

主要的核心是这一句:helper.getSqlParser().getCompletionHints(statement, position)。


指得是通过helper获取sql解析器,然后调用里面的getCompletionHints方法来获取提示。


接下来看看helper类。


3.1 SqlParserHelper


从下图可以得知,helper也是测试包里面的一个类:

899c3a3742244eb9872a46158612d2a9.png

helper只是用于指引用户如何实现的一个工具类在Flink对外的API里是不存在的,这个不重要,我们看看里面的逻辑(里面已写注释):


/**
 * SqlParser 工具类
 *
 * @author : YangLinWei
 * @createTime: 2022/9/22 2:37 下午
 * @version: 1.0.0
 */
public class SqlParserHelper {
    // 从这个TableEnvironment里获取SqlParser解析器实例
    private TableEnvironment tableEnv;
    /**
     * 构造函数
     * <p>
     * 使用默认EnvironmentSettings配置(当然用户可以根据自己的业务场景来设置配置)来初始化TableEnvironment
     */
    public SqlParserHelper() {
        tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build());
    }
    /**
     * 构造函数
     * <p>
     * 主要使用指定的SqlDialect来初始化TableEnvironment
     */
    public SqlParserHelper(SqlDialect sqlDialect) {
        if (sqlDialect == null || SqlDialect.DEFAULT == sqlDialect) {
            tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build());
        } else if (SqlDialect.HIVE == sqlDialect) {
            HiveCatalog hiveCatalog = HiveTestUtils.createHiveCatalog();
            tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build());
            tableEnv.getConfig().setSqlDialect(sqlDialect);
            tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
            tableEnv.useCatalog(hiveCatalog.getName());
        }
    }
    /**
     * 准备一些Flink DDL 来用于测试
     */
    public void registerTables() {
        registerTable(
                "create table MyTable (a int, b bigint, c varchar(32)) "
                        + "with ('connector' = 'filesystem', 'path' = '/non', 'format' = 'csv')");
        registerTable(
                "create table MyOtherTable (a int, b bigint) "
                        + "with ('connector' = 'filesystem', 'path' = '/non', 'format' = 'csv')");
        registerTable(
                "create table MySink (a int, c varchar(32)) with ('connector' = 'COLLECTION' )");
        registerTable("create view MyView as select * from MyTable");
    }
    public void registerTable(String createTableStmt) {
        tableEnv.executeSql(createTableStmt);
    }
    /**
     * 获取SqlParser解析器
     *
     * @return Sql解析器
     */
    public Parser getSqlParser() {
        return ((TableEnvironmentInternal) tableEnv).getParser();
    }
    public TableEnvironment getTableEnv() {
        return tableEnv;
    }
}


从代码可以分析得出,如果要调用Flink里面的代码提示接口,需要先初始化TableEnviorment,然后获取Sql解析器。


接下来,看看Sql解析器的代码。


3.2 ParserImpl


从如下代码提示,可以看到解析器的实现有几个,这里使用的是blink包里面的ParserImpl


cba4196317bb4d1b8a53debde526a90c.png


ParserImpl 实现了Parser接口,其中Parser接口的代码及注释如下:

/**
 * SQL解析器
 * <p>
 * 主要解析SQL为SQL对象
 *
 * @author : YangLinWei
 * @createTime: 2022/9/22 3:00 下午
 * @version: 1.0.0
 */
@Internal
public interface Parser {
    /**
     * 解析String类型的SQL入口
     * <p>
     * 注意:</b>如果创建的{@link Operation}是一个{@link QueryOperation},
     * 它必须以{@link Planner#translate(List)}方法能够理解的形式出现。
     *
     * @param statement 待解析的SQL
     *
     * @return 将查询解析为关系 {@link Operation}s
     */
    List<Operation> parse(String statement);
    /**
     * 解析SQL标识符集
     *
     * @param identifier SQL唯一标识
     *
     * @return 被解析的SQL唯一标识
     */
    UnresolvedIdentifier parseIdentifier(String identifier);
    /**
     * 解析SQL表达式入口
     *
     * @param sqlExpression 将被解析的SQL表达式
     * @param inputRowType SQL表达式中可用的字段
     * @param outputType 预期的顶级输出类型(如果可用)
     *
     * @return 解析后的表达式
     */
    ResolvedExpression parseSqlExpression(
            String sqlExpression, RowType inputRowType, @Nullable LogicalType outputType);
    /**
     * 在给定的游标位置返回给定语句的代码提示
     * <p>
     * 注意:补全是不区分大小写的。
     *
     * @param statement 部分或轻微错误的SQL语句
     * @param position 光标位置
     *
     * @return 当前光标位置的完成提示
     */
    String[] getCompletionHints(String statement, int position);
}


在本文,我们关注的是getCompletionHints的方法,其实现代码如下(含注释):

/**
 * 在给定的游标位置返回给定语句的代码提示
 * <p>
 * 注意:补全是不区分大小写的。
 *
 * @param statement 部分或轻微错误的SQL语句
 * @param cursor 光标位置
 *
 * @return 当前光标位置的完成提示集合
 */
public String[] getCompletionHints(String statement, int cursor) {
    // 使用ExtendedParser来获取 代码提示的内容集
    List<String> candidates =
            new ArrayList<>(
                    Arrays.asList(EXTENDED_PARSER.getCompletionHints(statement, cursor)));
    // 使用SqlAdvisor来获取 代码提示的内容集
    SqlAdvisorValidator validator = validatorSupplier.get().getSqlAdvisorValidator();
    SqlAdvisor advisor =
            new SqlAdvisor(validator, validatorSupplier.get().config().getParserConfig());
    String[] replaced = new String[1];
    List<String> sqlHints =
            advisor.getCompletionHints(statement, cursor, replaced).stream()
                    .map(item -> item.toIdentifier().toString())
                    .collect(Collectors.toList());
    // 取代码提示的并集,并返回
    candidates.addAll(sqlHints);
    return candidates.toArray(new String[0]);
}


从代码可以得知,获取提示内容集的逻辑是分别使用“ExtendedParser”和“SqlAdvisor”来获取提示内容的集合,进一步看看这两个类


3.2.1 ExtendedParser


ExtendedParser获取SQL提示的方法代码如下(含注释):

public String[] getCompletionHints(String statement, int cursor) {
    // SQL 语句转为大写
    String normalizedStatement = statement.trim().toUpperCase();
    List<String> hints = new ArrayList<>();
    // 遍历 解析策略 集合
    for (ExtendedParseStrategy strategy : PARSE_STRATEGIES) {
        // 判断查询的SQL语句是否策略里面hints集合的开头,如果是,则返回提示
        for (String hint : strategy.getHints()) {
            if (hint.startsWith(normalizedStatement) && cursor < hint.length()) {
                hints.add(getCompletionHint(normalizedStatement, hint));
            }
        }
    }
    return hints.toArray(new String[0]);
}


上述代码主要做的事情就是判断传入的SQL是否为定义的策略集合里面的提示内容的前缀,如果是,则直接返回提示


策略集合的实现有几个:

1d7faaa38a2641e58d4c36e12a3c0e3b.png


整理里面getHints方法的内容,有如下的内容:

微信截图_20221010155007.png

总的来说,就是根据SQL语句,能返回的提示就是以上表格getHints方法里面返回的内容合集之一。


3.2.2 SqlAdvisor


SqlAdvisor获取SQL提示的方法代码如下(含注释):

public List<SqlMoniker> getCompletionHints(String sql, int cursor, String[] replaced) {
        int wordStart = cursor;
        boolean quoted;
        // 获取关键字起始游标
        for(quoted = false; wordStart > 0 && Character.isJavaIdentifierPart(sql.charAt(wordStart - 1)); --wordStart) {
        }
        if (wordStart > 0 && sql.charAt(wordStart - 1) == this.quoteStart()) {
            quoted = true;
            --wordStart;
        }
        if (wordStart < 0) {
            return Collections.emptyList();
        } else {
          // 获取关键字结束游标
            int wordEnd;
            for(wordEnd = cursor; wordEnd < sql.length() && Character.isJavaIdentifierPart(sql.charAt(wordEnd)); ++wordEnd) {
            }
            if (quoted && wordEnd < sql.length() && sql.charAt(wordEnd) == this.quoteEnd()) {
                ++wordEnd;
            }
      // 获取关键字
            String word = replaced[0] = sql.substring(wordStart, cursor);
            if (wordStart < wordEnd) {
              // SQL去除关键字
                sql = sql.substring(0, wordStart) + sql.substring(wordEnd);
            }
      // 根据 已去除关键字的SQL + 关键字进一步查询
            List<SqlMoniker> completionHints = this.getCompletionHints0(sql, wordStart);
            if (quoted) {
                word = word.substring(1);
            }
            if (word.isEmpty()) {
                return completionHints;
            } else {
                List<SqlMoniker> result = new ArrayList();
                Casing preferredCasing = this.getPreferredCasing(word);
                boolean ignoreCase = preferredCasing != Casing.UNCHANGED;
                Iterator var12 = completionHints.iterator();
                while(var12.hasNext()) {
                    SqlMoniker hint = (SqlMoniker)var12.next();
                    List<String> names = hint.getFullyQualifiedNames();
                    String cname = (String)Util.last(names);
                    if (cname.regionMatches(ignoreCase, 0, word, 0, word.length())) {
                        result.add(hint);
                    }
                }
                return result;
            }
        }
    }


从上述代码,可以看到主要是为了获取 sql 中的关键字以及去除关键字后的sql,比如:

select * fro


那么关键字为 “fro”,去除关键字后的sql为“select * ”,进一步看getCompletionHints0方法,代码及注释如下:

/**
*
* @Param sql 去除关键字后的sql
* @Param cursor 关键字的起始游标
*
*/
public List<SqlMoniker> getCompletionHints0(String sql, int cursor) {
  // 封装SQL,后缀加上 "_suggest_"
    String simpleSql = this.simplifySql(sql, cursor);
    int idx = simpleSql.indexOf("_suggest_");
    if (idx < 0) {
        return Collections.emptyList();
    } else {
        SqlParserPos pos = new SqlParserPos(1, idx + 1);
        // 获取SQL提示
        return this.getCompletionHints(simpleSql, pos);
    }
}


进一步查询SQL提示:

c33900228fc84b53b3c87705c970fdec.png


上述的代码不是重点,最为重要的提示是从上述红框里获取的,及从validator里面的lookupHints里获取,看看是怎样获取的。

b5abe8acc42e466f95e061e841e687df.png


进一步查看lookupSelectHints方法:

7ac5bb41dbf14e4fa2aacf9d7eaf93a3.png

继续断点,查看lookupFromHints方法:

fc7276608fe04c8d999df7f458257c76.png

继续看看SqlValidatorUtil.getSchemaObjectMonikers方法:

43eef0c8b1d44c5d9a417b2398339f71.png


可以看出,内容是从SqlValidatorCatalogReader.getAllSchemaObjectNames里获取出来的,也就是把所有的元数据获取出来(依次从 catalog -> database -> table遍历出来)。


代码跟踪了那么久,其实主要的核心还是在SqlValidatorCatalogReader.getAllSchemaObjectNames获取所有的元数据的,这里就不再做解析了,其实就是不断遍历的一个过程。


3.3 比对


到最后,拿之前解析出来的关键字与获取到的元数据进行match匹配就可以得出最终的SQL提示了:


b9de37ebf4344052ac28651a48ff7a6e.png


04 文末


本文主要讲解了Flink SQL的代码补全提示功能,本质其实就是使用

SqlValidatorCatalogReader.getAllSchemaObjectNames 来获取所有的元数据,然后跟关键字匹配,并返回提示结果集。


谢谢大家的阅读,希望能帮助到大家,本文完!


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
3月前
|
SQL 存储 API
Flink实践:通过Flink SQL进行SFTP文件的读写操作
虽然 Apache Flink 与 SFTP 之间的直接交互存在一定的限制,但通过一些创造性的方法和技术,我们仍然可以有效地实现对 SFTP 文件的读写操作。这既展现了 Flink 在处理复杂数据场景中的强大能力,也体现了软件工程中常见的问题解决思路——即通过现有工具和一定的间接方法来克服技术障碍。通过这种方式,Flink SQL 成为了处理各种数据源,包括 SFTP 文件,在内的强大工具。
202 15
|
7天前
|
SQL 存储 缓存
Flink SQL Deduplication 去重以及如何获取最新状态操作
Flink SQL Deduplication 是一种高效的数据去重功能,支持多种数据类型和灵活的配置选项。它通过哈希表、时间窗口和状态管理等技术实现去重,适用于流处理和批处理场景。本文介绍了其特性、原理、实际案例及源码分析,帮助读者更好地理解和应用这一功能。
70 14
|
2月前
|
SQL 大数据 API
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
60 0
|
3月前
|
SQL 安全 数据处理
揭秘数据脱敏神器:Flink SQL的神秘力量,守护你的数据宝藏!
【9月更文挑战第7天】在大数据时代,数据管理和处理尤为重要,尤其在保障数据安全与隐私方面。本文探讨如何利用Flink SQL实现数据脱敏,为实时数据处理提供有效的隐私保护方案。数据脱敏涉及在处理、存储或传输前对敏感数据进行加密、遮蔽或替换,以遵守数据保护法规(如GDPR)。Flink SQL通过内置函数和表达式支持这一过程。
93 2
|
3月前
|
SQL 大数据 数据处理
奇迹降临!解锁 Flink SQL 简单高效的终极秘籍,开启数据处理的传奇之旅!
【9月更文挑战第7天】在大数据处理领域,Flink SQL 因其强大功能与简洁语法成为开发者首选。本文分享了编写高效 Flink SQL 的实用技巧:理解数据特征及业务需求;灵活运用窗口函数(如 TUMBLE 和 HOP);优化连接操作,优先采用等值连接;合理选择数据类型以减少计算资源消耗。结合实际案例(如实时电商数据分析),并通过定期性能测试与调优,助力开发者在大数据处理中更得心应手,挖掘更多价值信息。
52 1
|
3月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
1月前
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
1229 73
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
zdl
|
1月前
|
消息中间件 运维 大数据
大数据实时计算产品的对比测评:实时计算Flink版 VS 自建Flink集群
本文介绍了实时计算Flink版与自建Flink集群的对比,涵盖部署成本、性能表现、易用性和企业级能力等方面。实时计算Flink版作为全托管服务,显著降低了运维成本,提供了强大的集成能力和弹性扩展,特别适合中小型团队和业务波动大的场景。文中还提出了改进建议,并探讨了与其他产品的联动可能性。总结指出,实时计算Flink版在简化运维、降低成本和提升易用性方面表现出色,是大数据实时计算的优选方案。
zdl
157 56
|
5月前
|
存储 监控 大数据
阿里云实时计算Flink在多行业的应用和实践
本文整理自 Flink Forward Asia 2023 中闭门会的分享。主要分享实时计算在各行业的应用实践,对回归实时计算的重点场景进行介绍以及企业如何使用实时计算技术,并且提供一些在技术架构上的参考建议。
867 7
阿里云实时计算Flink在多行业的应用和实践
|
4月前
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用问题之如何在EMR-Flink的Flink SOL中针对source表单独设置并行度
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。