01 引言
使用过Navicat
的童鞋都知道,当我们写SQL
的时候,工具会根据我们输入的内容弹出提示,这样可以很方便我们去写SQL
,如下:
Flink也是支持SQL的,当然它也有对应的接口支持SQL提示,本文来讲讲。
02 案例
2.1 源码案例
Flink源码里面已经有代码提示的demo了,具体在CliClientTest.java(点击即可打开) ,具体在如下方法里实现了:
具体的入参为:
参数名 | 含义 |
statement | 当前输入的SQL |
position | SQL末端光标的位置 |
返回的内容为:可能的SQL提示集合
2.2 举例
比如目前输入的SQL为:
select * fr
入参的内容为:
参数名 | 值 |
statement | “select * fr” |
position | 11 |
返回的结果为集合:
FROM
接下来分析下其源码。
03 源码分析
从上面的例子,我们可以看到,获取提示的方法如下:
@Override public List<String> completeStatement(String sessionId, String statement, int position) { receivedStatement = statement; receivedPosition = position; return Arrays.asList(helper.getSqlParser().getCompletionHints(statement, position)); }
主要的核心是这一句:helper.getSqlParser().getCompletionHints(statement, position)。
指得是通过helper获取sql解析器,然后调用里面的getCompletionHints方法来获取提示。
接下来看看helper
类。
3.1 SqlParserHelper
从下图可以得知,helper
也是测试包里面的一个类:
helper
只是用于指引用户如何实现的一个工具类在Flink
对外的API
里是不存在的,这个不重要,我们看看里面的逻辑(里面已写注释):
/** * SqlParser 工具类 * * @author : YangLinWei * @createTime: 2022/9/22 2:37 下午 * @version: 1.0.0 */ public class SqlParserHelper { // 从这个TableEnvironment里获取SqlParser解析器实例 private TableEnvironment tableEnv; /** * 构造函数 * <p> * 使用默认EnvironmentSettings配置(当然用户可以根据自己的业务场景来设置配置)来初始化TableEnvironment */ public SqlParserHelper() { tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build()); } /** * 构造函数 * <p> * 主要使用指定的SqlDialect来初始化TableEnvironment */ public SqlParserHelper(SqlDialect sqlDialect) { if (sqlDialect == null || SqlDialect.DEFAULT == sqlDialect) { tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build()); } else if (SqlDialect.HIVE == sqlDialect) { HiveCatalog hiveCatalog = HiveTestUtils.createHiveCatalog(); tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build()); tableEnv.getConfig().setSqlDialect(sqlDialect); tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog); tableEnv.useCatalog(hiveCatalog.getName()); } } /** * 准备一些Flink DDL 来用于测试 */ public void registerTables() { registerTable( "create table MyTable (a int, b bigint, c varchar(32)) " + "with ('connector' = 'filesystem', 'path' = '/non', 'format' = 'csv')"); registerTable( "create table MyOtherTable (a int, b bigint) " + "with ('connector' = 'filesystem', 'path' = '/non', 'format' = 'csv')"); registerTable( "create table MySink (a int, c varchar(32)) with ('connector' = 'COLLECTION' )"); registerTable("create view MyView as select * from MyTable"); } public void registerTable(String createTableStmt) { tableEnv.executeSql(createTableStmt); } /** * 获取SqlParser解析器 * * @return Sql解析器 */ public Parser getSqlParser() { return ((TableEnvironmentInternal) tableEnv).getParser(); } public TableEnvironment getTableEnv() { return tableEnv; } }
从代码可以分析得出,如果要调用Flink里面的代码提示接口,需要先初始化TableEnviorment,然后获取Sql解析器。
接下来,看看Sql解析器的代码。
3.2 ParserImpl
从如下代码提示,可以看到解析器的实现有几个,这里使用的是blink
包里面的ParserImpl
。
ParserImpl
实现了Parser
接口,其中Parser
接口的代码及注释如下:
/** * SQL解析器 * <p> * 主要解析SQL为SQL对象 * * @author : YangLinWei * @createTime: 2022/9/22 3:00 下午 * @version: 1.0.0 */ @Internal public interface Parser { /** * 解析String类型的SQL入口 * <p> * 注意:</b>如果创建的{@link Operation}是一个{@link QueryOperation}, * 它必须以{@link Planner#translate(List)}方法能够理解的形式出现。 * * @param statement 待解析的SQL * * @return 将查询解析为关系 {@link Operation}s */ List<Operation> parse(String statement); /** * 解析SQL标识符集 * * @param identifier SQL唯一标识 * * @return 被解析的SQL唯一标识 */ UnresolvedIdentifier parseIdentifier(String identifier); /** * 解析SQL表达式入口 * * @param sqlExpression 将被解析的SQL表达式 * @param inputRowType SQL表达式中可用的字段 * @param outputType 预期的顶级输出类型(如果可用) * * @return 解析后的表达式 */ ResolvedExpression parseSqlExpression( String sqlExpression, RowType inputRowType, @Nullable LogicalType outputType); /** * 在给定的游标位置返回给定语句的代码提示 * <p> * 注意:补全是不区分大小写的。 * * @param statement 部分或轻微错误的SQL语句 * @param position 光标位置 * * @return 当前光标位置的完成提示 */ String[] getCompletionHints(String statement, int position); }
在本文,我们关注的是getCompletionHints
的方法,其实现代码如下(含注释):
/** * 在给定的游标位置返回给定语句的代码提示 * <p> * 注意:补全是不区分大小写的。 * * @param statement 部分或轻微错误的SQL语句 * @param cursor 光标位置 * * @return 当前光标位置的完成提示集合 */ public String[] getCompletionHints(String statement, int cursor) { // 使用ExtendedParser来获取 代码提示的内容集 List<String> candidates = new ArrayList<>( Arrays.asList(EXTENDED_PARSER.getCompletionHints(statement, cursor))); // 使用SqlAdvisor来获取 代码提示的内容集 SqlAdvisorValidator validator = validatorSupplier.get().getSqlAdvisorValidator(); SqlAdvisor advisor = new SqlAdvisor(validator, validatorSupplier.get().config().getParserConfig()); String[] replaced = new String[1]; List<String> sqlHints = advisor.getCompletionHints(statement, cursor, replaced).stream() .map(item -> item.toIdentifier().toString()) .collect(Collectors.toList()); // 取代码提示的并集,并返回 candidates.addAll(sqlHints); return candidates.toArray(new String[0]); }
从代码可以得知,获取提示内容集的逻辑是分别使用“ExtendedParser
”和“SqlAdvisor
”来获取提示内容的集合,进一步看看这两个类。
3.2.1 ExtendedParser
ExtendedParser获取SQL提示的方法代码如下(含注释):
public String[] getCompletionHints(String statement, int cursor) { // SQL 语句转为大写 String normalizedStatement = statement.trim().toUpperCase(); List<String> hints = new ArrayList<>(); // 遍历 解析策略 集合 for (ExtendedParseStrategy strategy : PARSE_STRATEGIES) { // 判断查询的SQL语句是否策略里面hints集合的开头,如果是,则返回提示 for (String hint : strategy.getHints()) { if (hint.startsWith(normalizedStatement) && cursor < hint.length()) { hints.add(getCompletionHint(normalizedStatement, hint)); } } } return hints.toArray(new String[0]); }
上述代码主要做的事情就是判断传入的SQL是否为定义的策略集合里面的提示内容的前缀,如果是,则直接返回提示。
策略集合的实现有几个:
整理里面getHints方法的内容,有如下的内容:
策略者 | getHints方法内容 |
ClearOperationParseStrategy | CLEAR |
HelpOperationParseStrategy | HELP |
QuitOperationParseStrategy | EXIT、QUIT |
ResetOperationParseStrategy | RESET |
SetOperationParseStrategy | SET |
总的来说,就是根据SQL语句,能返回的提示就是以上表格getHints
方法里面返回的内容合集之一。
3.2.2 SqlAdvisor
SqlAdvisor获取SQL提示的方法代码如下(含注释):
public List<SqlMoniker> getCompletionHints(String sql, int cursor, String[] replaced) { int wordStart = cursor; boolean quoted; // 获取关键字起始游标 for(quoted = false; wordStart > 0 && Character.isJavaIdentifierPart(sql.charAt(wordStart - 1)); --wordStart) { } if (wordStart > 0 && sql.charAt(wordStart - 1) == this.quoteStart()) { quoted = true; --wordStart; } if (wordStart < 0) { return Collections.emptyList(); } else { // 获取关键字结束游标 int wordEnd; for(wordEnd = cursor; wordEnd < sql.length() && Character.isJavaIdentifierPart(sql.charAt(wordEnd)); ++wordEnd) { } if (quoted && wordEnd < sql.length() && sql.charAt(wordEnd) == this.quoteEnd()) { ++wordEnd; } // 获取关键字 String word = replaced[0] = sql.substring(wordStart, cursor); if (wordStart < wordEnd) { // SQL去除关键字 sql = sql.substring(0, wordStart) + sql.substring(wordEnd); } // 根据 已去除关键字的SQL + 关键字进一步查询 List<SqlMoniker> completionHints = this.getCompletionHints0(sql, wordStart); if (quoted) { word = word.substring(1); } if (word.isEmpty()) { return completionHints; } else { List<SqlMoniker> result = new ArrayList(); Casing preferredCasing = this.getPreferredCasing(word); boolean ignoreCase = preferredCasing != Casing.UNCHANGED; Iterator var12 = completionHints.iterator(); while(var12.hasNext()) { SqlMoniker hint = (SqlMoniker)var12.next(); List<String> names = hint.getFullyQualifiedNames(); String cname = (String)Util.last(names); if (cname.regionMatches(ignoreCase, 0, word, 0, word.length())) { result.add(hint); } } return result; } } }
从上述代码,可以看到主要是为了获取 sql
中的关键字以及去除关键字后的sql
,比如:
select * fro
那么关键字为 “fro
”,去除关键字后的sql为“select *
”,进一步看getCompletionHints0
方法,代码及注释如下:
/** * * @Param sql 去除关键字后的sql * @Param cursor 关键字的起始游标 * */ public List<SqlMoniker> getCompletionHints0(String sql, int cursor) { // 封装SQL,后缀加上 "_suggest_" String simpleSql = this.simplifySql(sql, cursor); int idx = simpleSql.indexOf("_suggest_"); if (idx < 0) { return Collections.emptyList(); } else { SqlParserPos pos = new SqlParserPos(1, idx + 1); // 获取SQL提示 return this.getCompletionHints(simpleSql, pos); } }
进一步查询SQL提示:
上述的代码不是重点,最为重要的提示是从上述红框里获取的,及从validator里面的lookupHints里获取,看看是怎样获取的。
进一步查看lookupSelectHints
方法:
继续断点,查看lookupFromHints方法:
继续看看SqlValidatorUtil.getSchemaObjectMonikers
方法:
可以看出,内容是从SqlValidatorCatalogReader.getAllSchemaObjectNames里获取出来的,也就是把所有的元数据获取出来(依次从 catalog -> database -> table遍历出来)。
代码跟踪了那么久,其实主要的核心还是在SqlValidatorCatalogReader.getAllSchemaObjectNames
获取所有的元数据的,这里就不再做解析了,其实就是不断遍历的一个过程。
3.3 比对
到最后,拿之前解析出来的关键字与获取到的元数据进行match
匹配就可以得出最终的SQL
提示了:
04 文末
本文主要讲解了Flink SQL的代码补全提示功能,本质其实就是使用SqlValidatorCatalogReader.getAllSchemaObjectNames 来获取所有的元数据,然后跟关键字匹配,并返回提示结果集。
谢谢大家的阅读,希望能帮助到大家,本文完!