Flink 1.13.0 sql-client 新特性及源码分析

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
简介: 在 Flink 1.13.0 版本中增加了很多新特征,具体可以参考前面一篇文章,其中很重要的一点是对 sql-client 功能做了加强,支持了初始化脚本和执行 SQL 文件,SQL 客户端是直接运行和部署 SQL 流和批处理作业的便捷方法,而无需从命令行或作为 CI 的一部分来编写任何代码,这个版本大大改进了 SQL 客户端的功能。现在,SQL 客户端和SQL 脚本都支持 Java 应用程序可用的几乎所有操作(通过编程方式从 TableEnvironment 启动查询时)。这意味着 SQL 用户在其 SQL 部署中需要粘贴的代码变的更少.由于篇幅的原因这篇文章只会介绍 SQL CLIENT

在 Flink 1.13.0 版本中增加了很多新特征,具体可以参考前面一篇文章,其中很重要的一点是对 sql-client 功能做了加强,支持了初始化脚本和执行 SQL 文件,SQL 客户端是直接运行和部署 SQL 流和批处理作业的便捷方法,而无需从命令行或作为 CI 的一部分来编写任何代码,这个版本大大改进了 SQL 客户端的功能。现在,SQL 客户端和SQL 脚本都支持 Java 应用程序可用的几乎所有操作(通过编程方式从 TableEnvironment 启动查询时)。这意味着 SQL 用户在其 SQL 部署中需要粘贴的代码变的更少.由于篇幅的原因这篇文章只会介绍 SQL CLIENT -i 和 -f 两个新特性的使用以及源码分析其实现原理.


SQL Client: Init scripts and Statement Sets

这个版本极大地改进了 SQL 客户端的功能。现在 SQL Client 和 SQL 脚本都支持 Java 应用程序可用的几乎所有操作(从 TableEnvironment 以编程方式启动查询)。这意味着 SQL 用户在 SQL 部署中需要的代码少了很多。其中最核心的功能就是支持了 -i 命令用来初始化脚本,-f 命令用来执行 SQL 语句,之前的 YAML 文件这个版本不再支持了,相反更多的是通过 SQL 脚本的方式来配置会话和提交任务.


类似于下面这种方式:


sql-client.sh -i init.sql -f test.sql


准备 init.sql 和 test.sql 脚本

init.sql

-- 设置 catalog (这个目前是不支持的)
USE CATALOG myhive;
SET execution.runtime-mode=streaming;
SET pipeline.name=my_flink_job;
SET parallism.default=4;


注意 init.sql 脚本里面是不支持加注释的,官方文档上的 demo 中有通过 -- 的方式加注释,其实这个是不支持的,如果加了的话解析 init.sql 脚本的时候会直接抛出异常.另外 init.sql 初始化脚本文件支持的功能还非常多,我这里就简单的设置了几个,更多的属性可以参考官网.


使用 -i <init.sql> 选项初始化 SQL Client 会话时,初始化 SQL 文件中允许以下语句:


DDL(CREATE/DROP/ALTER),


USE CATALOG/DATABASE,


LOAD/UNLOAD MODULE,


SET command,


RESET command.


test.sql

-- sql
DROP TABLE IF EXISTS PERSON;
CREATE TABLE PERSON (
    name VARCHAR COMMENT '姓名',
    age int COMMENT '年龄',
    city VARCHAR COMMENT '所在城市',
    address VARCHAR COMMENT '家庭住址',
    money DOUBLE COMMENT '金钱',
    ts BIGINT  COMMENT '时间戳',
    t as TO_TIMESTAMP(FROM_UNIXTIME(ts/1000,'yyyy-MM-dd HH:mm:ss')),
    proctime as PROCTIME(),
    WATERMARK FOR t AS t - INTERVAL '5' SECOND
)
WITH (
    'connector' = 'kafka', -- 使用 kafka connector
    'topic' = 'test',  -- kafka topic
    'scan.startup.mode' = 'latest-offset', -- 从起始 offset 开始读取
    'properties.bootstrap.servers' = 'master:9092,storm1:9092,storm2:9092',  -- broker连接信息
    'properties.group.id' = 'jason_flink_test',
    'format' = 'json',  -- 数据源格式为 json
    'json.fail-on-missing-field' = 'false', -- 字段丢失任务不失败
    'json.ignore-parse-errors' = 'true'  -- 解析失败跳过
);
DROP TABLE IF EXISTS print_table_1;
CREATE TABLE print_table_1
(
    name string,
    age INT,
    city string
) 
WITH ('connector' = 'print');
BEGIN STATEMENT SET;
insert into print_table_1 select name,age,city from PERSON;
insert into print_table_1 select name,age,city from PERSON;
END;


test.sql 脚本里面是可以通过 -- 加注释的,并且支持执行一组 SQL 语句,也就是可以同时执行多条 SQL 语句  SQL Client 将每个 INSERT INTO 语句作为单个 Flink 作业执行。但是,由于管道的某些部分可以重复使用,因此有时不是最佳选择。SQL Client 支持 STATEMENT SET 语法来执行一组 SQL 语句。这是 Table API 中StatementSet 的等效功能。STATEMENT SET 语法包含一个或多个 INSERT INTO 语句。全面优化了STATEMENT SET 块中的所有语句,并将其作为单个 Flink 作业执行。联合优化和执行允许重用常见的中间结果,因此可以显着提高执行多个查询的效率。


STATEMENT SET 的语法格式如下:


BEGIN STATEMENT SET;
  -- one or more INSERT INTO statements
  { INSERT INTO|OVERWRITE <select_statement>; }+
END;


上面的 test.sql 脚本里面就用到了这种写法,需要注意的是 STATEMENT SET 中包含的语句必须用分号(;)分隔.我这里为了省事,直接把同一条 SQL 复制了两遍.


提交任务

现在就可以直接提交任务了


sql-client.sh -i init.sql -f test.sql
sql-client.sh -f test.sql


-i 命令是可以选的,不用初始化也可以直接使用 -f 提交任务,在这种模式下,客户端不会打开交互式终端,提交任务完成后会自动退出终端.



可以看到任务提交成功了.接下来就来看一下底层源码是怎么实现的.


源码分析

然后来深入源码分析一下实现的过程.首先从 sql-client.sh 脚本里找到执行的入口类是 org.apache.flink.table.client.SqlClient ,然后来看下 SqlClient 对象属性源码如下:


public class SqlClient {
    private static final Logger LOG = LoggerFactory.getLogger(SqlClient.class);
    // 标记是否是 embedded 模式
    private final boolean isEmbedded;
    // 提交命令选项
    private final CliOptions options;
    // 用来返回结果的
    private final Supplier<Terminal> terminalFactory;
    public static final String MODE_EMBEDDED = "embedded";
    public static final String MODE_GATEWAY = "gateway";
    public SqlClient(boolean isEmbedded, CliOptions options, Supplier<Terminal> terminalFactory) {
        this.isEmbedded = isEmbedded;
        this.options = options;
        this.terminalFactory = terminalFactory;
    }
}


SQL 客户端,用于提交 SQL 语句。客户端可以以两种模式执行:gateway 和 embedded 目前只支持 embedded 模式,并且默认也是这种模式,SqlClient 对象的属性比较简单,代码里面也都添加了注释,就不在介绍了.


接着来看 SqlClient 的 main 方法,也就是程序的入口.main 方法里面调用的是 startClient 方法,所以直接来看 startClient 方法的源码:


@VisibleForTesting
protected static void startClient(String[] args, Supplier<Terminal> terminalFactory) {
    final String mode;
    final String[] modeArgs;
    // 设置启动模式默认是 embedded
    if (args.length < 1 || args[0].startsWith("-")) {
        // mode is not specified, use the default `embedded` mode
        mode = MODE_EMBEDDED;
        modeArgs = args;
    } else {
        // mode is specified, extract the mode value and reaming args
        mode = args[0];
        // remove mode
        modeArgs = Arrays.copyOfRange(args, 1, args.length);
    }
    switch (mode) {
        case MODE_EMBEDDED:
            // 解析提交命令里的参数
            final CliOptions options = CliOptionsParser.parseEmbeddedModeClient(modeArgs);
            // 打印参数说明
            if (options.isPrintHelp()) {
                CliOptionsParser.printHelpEmbeddedModeClient();
            } else {
                try {
                    // 构建 SqlClient 对象
                    final SqlClient client = new SqlClient(true, options, terminalFactory);
                    client.start();
                } catch (SqlClientException e) {
                    // make space in terminal
                    System.out.println();
                    System.out.println();
                    LOG.error("SQL Client must stop.", e);
                    throw e;
                } catch (Throwable t) {
                    // make space in terminal
                    System.out.println();
                    System.out.println();
                    LOG.error(
                            "SQL Client must stop. Unexpected exception. This is a bug. Please consider filing an issue.",
                            t);
                    throw new SqlClientException(
                            "Unexpected exception. This is a bug. Please consider filing an issue.",
                            t);
                }
            }
            break;
        case MODE_GATEWAY:
            throw new SqlClientException("Gateway mode is not supported yet.");
        default:
            CliOptionsParser.printHelpClient();
    }
}


首先是设置模式,不管你传不传 embedded 参数,都会设置成 embedded 模式,因为目前 Gateway 模式是不支持的.然后会调用 parseEmbeddedModeClient 方法解析提交命令里面的各种参数.包括我们上面用到的 -i 和 -f 都是在这一步解析并赋值的.


public static CliOptions parseEmbeddedModeClient(String[] args) {
    try {
        DefaultParser parser = new DefaultParser();
        CommandLine line = parser.parse(EMBEDDED_MODE_CLIENT_OPTIONS, args, true);
        return new CliOptions(
                line.hasOption(CliOptionsParser.OPTION_HELP.getOpt()),
                checkSessionId(line),
                checkUrl(line, CliOptionsParser.OPTION_ENVIRONMENT),
                checkUrl(line, CliOptionsParser.OPTION_DEFAULTS),
             // 这里就是 -i
                checkUrl(line, CliOptionsParser.OPTION_INIT_FILE),
             // 这里就是 -f
                checkUrl(line, CliOptionsParser.OPTION_FILE),
                checkUrls(line, CliOptionsParser.OPTION_JAR),
                checkUrls(line, CliOptionsParser.OPTION_LIBRARY),
                line.getOptionValue(CliOptionsParser.OPTION_UPDATE.getOpt()),
                line.getOptionValue(CliOptionsParser.OPTION_HISTORY.getOpt()),
                getPythonConfiguration(line));
    } catch (ParseException e) {
        throw new SqlClientException(e.getMessage());
    }
}
public static final Option OPTION_INIT_FILE =
            Option.builder("i")
                    .required(false)
                    .longOpt("init")
                    .numberOfArgs(1)
                    .argName("initialization file")
                    .desc(
                            "Script file that used to init the session context. "
                                    + "If get error in execution, the sql client will exit. Notice it's not allowed to add query or insert into the init file.")
                    .build();
public static final Option OPTION_FILE =
            Option.builder("f")
                    .required(false)
                    .longOpt("file")
                    .numberOfArgs(1)
                    .argName("script file")
                    .desc(
                            "Script file that should be executed. In this mode, "
                                    + "the client will not open an interactive terminal.")
                    .build();


解析参数完成后会构建 SqlClient 对象,然后就可以启动 SqlClient .


private void start() {
        if (isEmbedded) {
            // create local executor with default environment
            DefaultContext defaultContext = LocalContextUtils.buildDefaultContext(options);
            // 创建执行器
            final Executor executor = new LocalExecutor(defaultContext);
            // 启动
            executor.start();
            // Open an new session
            String sessionId = executor.openSession(options.getSessionId());
            try {
                // add shutdown hook
                Runtime.getRuntime()
                        .addShutdownHook(new EmbeddedShutdownThread(sessionId, executor));
                // do the actual work
                openCli(sessionId, executor);
            } finally {
                executor.closeSession(sessionId);
            }
        } else {
            throw new SqlClientException("Gateway mode is not supported yet.");
        }
    }

接着会创建一个 LocalExecutor 对象,用于本地执行程序,然后会启动起来,真正执行 SQL 的地方是 openCli 方法,源码如下:


private void openCli(String sessionId, Executor executor) {
    Path historyFilePath;
    if (options.getHistoryFilePath() != null) {
        historyFilePath = Paths.get(options.getHistoryFilePath());
    } else {
        historyFilePath =
                Paths.get(
                        System.getProperty("user.home"),
                        SystemUtils.IS_OS_WINDOWS ? "flink-sql-history" : ".flink-sql-history");
    }
    boolean hasSqlFile = options.getSqlFile() != null;
    boolean hasUpdateStatement = options.getUpdateStatement() != null;
    if (hasSqlFile && hasUpdateStatement) {
        throw new IllegalArgumentException(
                String.format(
                        "Please use either option %s or %s. The option %s is deprecated and it's suggested to use %s instead.",
                        CliOptionsParser.OPTION_FILE,
                        CliOptionsParser.OPTION_UPDATE,
                        CliOptionsParser.OPTION_UPDATE.getOpt(),
                        CliOptionsParser.OPTION_FILE.getOpt()));
    }
    try (CliClient cli = new CliClient(terminalFactory, sessionId, executor, historyFilePath)) {
        if (options.getInitFile() != null) {
            // 执行初始化 SQL -i 参数
            boolean success = cli.executeInitialization(readFromURL(options.getInitFile()));
            if (!success) {
                System.out.println(
                        String.format(
                                "Failed to initialize from sql script: %s. Please refer to the LOG for detailed error messages.",
                                options.getInitFile()));
                return;
            } else {
                System.out.println(
                        String.format(
                                "Successfully initialized from sql script: %s",
                                options.getInitFile()));
            }
        }
        if (!hasSqlFile && !hasUpdateStatement) {
            cli.executeInInteractiveMode();
        } else {
            // 执行真正的 SQL 文件 -f
            cli.executeInNonInteractiveMode(readExecutionContent());
        }
    }
}


这个里面会先获取 historyFilePath 的路径,然后判断是否存在 -i -f 这两个文件,如果有的话会先调用 executeInitialization 执行初始化的脚本.实际调用的是 executeInitialization#executeFile 方法来执行脚本,executeFile 的源码如下:


private boolean executeFile(String content, ExecutionMode mode) {
    terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_EXECUTE_FILE).toAnsi());
    for (String statement : CliStatementSplitter.splitContent(content)) {
        terminal.writer()
                .println(
                        new AttributedString(String.format("%s%s", prompt, statement))
                                .toString());
        terminal.flush();
        if (!executeStatement(statement, mode)) {
            // cancel execution when meet error or ctrl + C;
            return false;
        }
    }
    return true;
}


其实不管是 -i 还是 -f 最终都会调用 executeFile 这个方法去解析脚本里的内容并且执行,这里方法里面先调用


splitContent 方法去做解析.


public static List<String> splitContent(String content) {
    List<String> statements = new ArrayList<>();
    List<String> buffer = new ArrayList<>();
    for (String line : content.split("\n")) {
        if (isEndOfStatement(line)) {
            buffer.add(line);
            statements.add(String.join("\n", buffer));
            buffer.clear();
        } else {
            buffer.add(line);
        }
    }
    if (!buffer.isEmpty()) {
        statements.add(String.join("\n", buffer));
    }
    return statements;
}
private static boolean isEndOfStatement(String line) {
    return line.replaceAll(MASK, "").trim().endsWith(";");
}


其实就是一行一行的读取初始化脚本和 SQL 脚本里面的内容,然后放到一个 List 里面.然后循环这个 List 调用 executeStatement 方法去执行 SQL 脚本.


private boolean executeStatement(String statement, ExecutionMode executionMode) {
    try {
        final Optional<Operation> operation = parseCommand(statement);
        operation.ifPresent(op -> callOperation(op, executionMode));
    } catch (SqlExecutionException e) {
        printExecutionException(e);
        return false;
    }
    return true;
}


执行之前会先对 SQL 做一个清洗,具体逻辑在 parseCommand 方法中.


private Optional<Operation> parseCommand(String stmt) {
    // normalize
    stmt = stmt.trim();
    // remove ';' at the end
    if (stmt.endsWith(";")) {
        stmt = stmt.substring(0, stmt.length() - 1).trim();
    }
    // meet bad case, e.g ";\n"
    if (stmt.trim().isEmpty()) {
        return Optional.empty();
    }
    Operation operation = executor.parseStatement(sessionId, stmt);
    return Optional.of(operation);
}


其实就是把 SQL 后面的 ; 去掉,并在遇到 bad case 的时候返回空.然后调用 parseStatement 方法将 SQL 语句解析成 Operation,后面的过程就跟 Flink SQL 翻译成代码的过程差不多.就不在往后面跟了.


-f 参数调用的是 executeInNonInteractiveMode 方法,实际也会调用 executeFile 方法,跟 -i 的执行逻辑是一样的.这里就不再分析了.


其实在新版本中除了 -i 和 -f 外还有很多其他的功能,感兴趣的同学可以到官网查看详细内容 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sqlclient/#sql-client-configuration


另外当前的 SQL Client 仅支持嵌入式模式(也就是 embedded 模式)。将来,社区计划通过提供基于 REST 的SQL 客户端网关来扩展其功能,有关更多信息,请参见 FLIP-24 和 FLIP-91。


总结

这篇文章主要介绍了 Flink 1.13.0 版本中对于 SQL Client 在初始化会话和提交 SQL 脚本方面的增强.以及结合源码分析了其实现的原理.

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
SQL 存储 API
Flink实践:通过Flink SQL进行SFTP文件的读写操作
虽然 Apache Flink 与 SFTP 之间的直接交互存在一定的限制,但通过一些创造性的方法和技术,我们仍然可以有效地实现对 SFTP 文件的读写操作。这既展现了 Flink 在处理复杂数据场景中的强大能力,也体现了软件工程中常见的问题解决思路——即通过现有工具和一定的间接方法来克服技术障碍。通过这种方式,Flink SQL 成为了处理各种数据源,包括 SFTP 文件,在内的强大工具。
165 15
|
1月前
|
SQL 大数据 API
大数据-118 - Flink DataSet 基本介绍 核心特性 创建、转换、输出等
大数据-118 - Flink DataSet 基本介绍 核心特性 创建、转换、输出等
53 0
|
1月前
|
SQL 存储 数据库
SQL学习一:ACID四个特性,CURD基本操作,常用关键字,常用聚合函数,五个约束,综合题
这篇文章是关于SQL基础知识的全面介绍,包括ACID特性、CURD操作、常用关键字、聚合函数、约束以及索引的创建和使用,并通过综合题目来巩固学习。
32 1
|
1月前
|
SQL 大数据 API
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
45 0
|
2月前
|
SQL 安全 数据处理
揭秘数据脱敏神器:Flink SQL的神秘力量,守护你的数据宝藏!
【9月更文挑战第7天】在大数据时代,数据管理和处理尤为重要,尤其在保障数据安全与隐私方面。本文探讨如何利用Flink SQL实现数据脱敏,为实时数据处理提供有效的隐私保护方案。数据脱敏涉及在处理、存储或传输前对敏感数据进行加密、遮蔽或替换,以遵守数据保护法规(如GDPR)。Flink SQL通过内置函数和表达式支持这一过程。
74 2
|
2月前
|
SQL 大数据 数据处理
奇迹降临!解锁 Flink SQL 简单高效的终极秘籍,开启数据处理的传奇之旅!
【9月更文挑战第7天】在大数据处理领域,Flink SQL 因其强大功能与简洁语法成为开发者首选。本文分享了编写高效 Flink SQL 的实用技巧:理解数据特征及业务需求;灵活运用窗口函数(如 TUMBLE 和 HOP);优化连接操作,优先采用等值连接;合理选择数据类型以减少计算资源消耗。结合实际案例(如实时电商数据分析),并通过定期性能测试与调优,助力开发者在大数据处理中更得心应手,挖掘更多价值信息。
46 1
|
3月前
|
SQL 资源调度 流计算
慢sql治理问题之在 Flink 中, userjar 分发问题如何优化
慢sql治理问题之在 Flink 中, userjar 分发问题如何优化
|
2月前
|
关系型数据库 MySQL 网络安全
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
|
4月前
|
SQL 存储 监控
SQL Server的并行实施如何优化?
【7月更文挑战第23天】SQL Server的并行实施如何优化?
111 13

热门文章

最新文章