Flink之Table API与SQL(一)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 笔记

(1)Table API与SQL概念


Flink 社区很早就设想过将批数据看作一个有界流数据,将批处理看作流计算的一个 特例,从而实现流批统一,Flink 社区的开发人员在多轮讨论后,基本敲定了 Flink 未来的 技术架构



Apache Flink 有两种关系型 API 来做流批统一处理:Table API 和 SQL。


Table API 是用于 Scala 和 Java 语言的查询 API,它可以用一种非常直观的方式来 组合使用选取、过滤、join 等关系型算子。


Flink SQL 是基于 Apache Calcite 来实现的标准 SQL。这两种 API 中的查询对于 批(DataSet)和流(DataStream)的输入有相同的语义,也会产生同样的计算结果。


Table API 和 SQL 两种 API 是紧密集成的,以及 DataStream 和 DataSet API。你 可以在这些 API 之间,以及一些基于这些 API 的库之间轻松的切换。比如,你可以先 用 CEP 从 DataStream 中做模式匹配,然后用 Table API 来分析匹配的结果;或者你 可以用 SQL 来扫描、过滤、聚合一个批式的表,然后再跑一个 Gelly 图算法 来处理已 经预处理好的数据。


(2)Table 程序依赖


取决于你使用的编程语言,选择 Java 或者 Scala API 来构建你的 Table API 和 SQL 程序:

<!-- Either... -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-java-bridge_2.11</artifactId>
  <version>1.12.0</version>
</dependency>
<!-- or... -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
  <version>1.12.0</version>
</dependency>

除此之外,如果你想在 IDE 本地运行你的程序,你需要添加下面的模块,具体用哪个取决于你使用哪个 Planner:

<!-- Either... (for the old planner that was available before Flink 1.9) -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-planner_2.11</artifactId>
  <version>1.12.0</version>
</dependency>
<!-- or.. (for the new Blink planner) -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-planner-blink_2.11</artifactId>
  <version>1.12.0</version>
</dependency>

内部实现上,部分 table 相关的代码是用 Scala 实现的。所以,下面的依赖也需要添加到你的程序里,不管是批式还是流式的程序:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-scala_2.11</artifactId>
  <version>1.12.0</version>
</dependency>

(3)两种计划器(Planner)的主要区别

Blink 将批处理作业视作流处理的一种特例。严格来说,Table 和 DataSet 之间不支持相互转换,并且批处理作业也不会转换成 DataSet 程序而是转换成 DataStream 程序,流处理作业也一样。

Blink 计划器不支持 BatchTableSource,而是使用有界的 StreamTableSource 来替代。

旧计划器和 Blink 计划器中 FilterableTableSource 的实现是不兼容的。旧计划器会将 PlannerExpression 下推至 FilterableTableSource,而 Blink 计划器则是将 Expression 下推。

基于字符串的键值配置选项仅在 Blink 计划器中使用。(详情参见 配置 )

PlannerConfig 在两种计划器中的实现(CalciteConfig)是不同的。

Blink 计划器会将多sink(multiple-sinks)优化成一张有向无环图(DAG),TableEnvironment 和 StreamTableEnvironment 都支持该特性。旧计划器总是将每个sink都优化成一个新的有向无环图,且所有图相互独立。

旧计划器目前不支持 catalog 统计数据,而 Blink 支持。


(4)Table API 和 SQL 程序的结构


Table API 和 SQL 集成在同一套 API 中。这套 API 的核心概念是Table,用作查询的输入和输出。所有用于批处理和流处理的 Table API 和 SQL 程序都遵循相同的模式。下面的代码示例展示了 Table API 和 SQL 程序的通用结构。

// create a TableEnvironment for specific planner batch or streaming
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// create an input Table
tableEnv.executeSql("CREATE TEMPORARY TABLE table1 ... WITH ( 'connector' = ... )");
// register an output Table
tableEnv.executeSql("CREATE TEMPORARY TABLE outputTable ... WITH ( 'connector' = ... )");
// create a Table object from a Table API query
Table table2 = tableEnv.from("table1").select(...);
// create a Table object from a SQL query
Table table3 = tableEnv.sqlQuery("SELECT ... FROM table1 ... ");
// emit a Table API result Table to a TableSink, same for SQL result
TableResult tableResult = table2.executeInsert("outputTable");
tableResult...


(5)创建 TableEnvironment


TableEnvironment 是 Table API 和 SQL 的核心概念。它负责:


在内部的 catalog 中注册 Table

注册外部的 catalog

加载可插拔模块

执行 SQL 查询

注册自定义函数 (scalar、table 或 aggregation)

将 DataStream 或 DataSet 转换成 Table

持有对 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用

Table 总是与特定的 TableEnvironment 绑定。不能在同一条查询中使用不同 TableEnvironment 中的表,例如,对它们进行 join 或 union 操作。


TableEnvironment 可以通过静态方法 BatchTableEnvironment.create() 或者 StreamTableEnvironment.create() 在 StreamExecutionEnvironment 或者 ExecutionEnvironment 中创建,TableConfig 是可选项。TableConfig可用于配置TableEnvironment或定制的查询优化和转换过程

// **********************
// FLINK STREAMING QUERY
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);
// or TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings);
// ******************
// FLINK BATCH QUERY
// ******************
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);
// **********************
// BLINK STREAMING QUERY
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
// or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);
// ******************
// BLINK BATCH QUERY
// ******************
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);


(6)Table API与SQL开发


对batch数据源通过Table API对表查询

package com.aikfk.flink.sql;
import com.aikfk.flink.sql.pojo.WC;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/4/5 10:32 下午
 */
public class WordCountBatchTable {
    public static void main(String[] args) throws Exception {
        // 1.准备环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 2.创建TableEnvironment(old planner)
        BatchTableEnvironment tableEnvironment = BatchTableEnvironment.create(env);
        // 3.batch数据源
        DataSet<WC> dataSet = env.fromElements(
                new WC("java", 1),
                new WC("spark", 1),
                new WC("hive", 1),
                new WC("hbase", 1),
                new WC("hbase", 1),
                new WC("hadoop", 1),
                new WC("java", 1));
        // 4.将DataSet转换成table表
        Table table = tableEnvironment.fromDataSet(dataSet);
        // 5.通过Table API对table表做逻辑处理,生成结果表
        Table tableResult = table.groupBy($("wordName"))
                .select($("wordName"), $("freq").sum().as("freq"))
                .filter($("freq").isEqual(2));
        tableResult.printSchema();
        /**
         * root
         *  |-- wordName: STRING
         *  |-- freq: BIGINT
         */
        // 6.将table表转换为DataSet
        DataSet<WC> wcDataSet = tableEnvironment.toDataSet(tableResult, WC.class);
        wcDataSet.print();
    /**
         * WC{wordName='java', freq=2}
         * WC{wordName='hbase', freq=2}
         */
    }
}

对batch数据源通过SQL对表查询

package com.aikfk.flink.sql;
import com.aikfk.flink.sql.pojo.WC;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/4/5 10:32 下午
 */
public class WordCountBatchSQL {
    public static void main(String[] args) throws Exception {
        // 1.准备环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 2.创建TableEnvironment(old planner)
        BatchTableEnvironment tableEnvironment = BatchTableEnvironment.create(env);
        // 3.batch数据源
        DataSet<WC> dataSet = env.fromElements(
                new WC("java", 1),
                new WC("spark", 1),
                new WC("hive", 1),
                new WC("hbase", 1),
                new WC("hbase", 1),
                new WC("hadoop", 1),
                new WC("java", 1));
        // Table table = tableEnvironment.fromDataSet(dataSet);
        // 4.创建虚拟表
        // tableEnvironment.createTemporaryView("wordcount" ,table);
        // 将dataSet转换为视图
        tableEnvironment.createTemporaryView("wordcount" ,dataSet,$("wordName"), $("freq"));
        // 5.通过SQL对表的查询,生成结果表
        Table tableResult = tableEnvironment.sqlQuery("select wordName,sum(freq) as freq " +
                "from wordcount group by wordName having sum(freq) > 1" );
        // 6.将table表转换为DataSet
        DataSet<WC> wcDataSet = tableEnvironment.toDataSet(tableResult, WC.class);
        wcDataSet.print();
        /**
         * WC{wordName='java', freq=2}
         * WC{wordName='hbase', freq=2}
         */
    }
}

对stream数据源通过SQL对表查询

package com.aikfk.flink.sql;
import com.aikfk.flink.sql.pojo.WC;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/4/5 10:32 下午
 */
public class WordCountStreamSQL {
    public static void main(String[] args) throws Exception {
        // 1.准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2.创建TableEnvironment(Blink planner)
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env , settings);
        // 3.batch数据源
        DataStream<WC> dataStream = env.fromElements(
                new WC("java", 1),
                new WC("spark", 1),
                new WC("hive", 1),
                new WC("hbase", 1),
                new WC("hbase", 1),
                new WC("hadoop", 1),
                new WC("java", 1));
        // 将dataStream流转换为table表
        Table wordcount = tableEnvironment.fromDataStream(dataStream,$("wordName"), $("freq"));
        // 4.将dataStream转换为视图
        tableEnvironment.createTemporaryView("wordcount" ,dataStream,$("wordName"), $("freq"));
        // 5.通过SQL对表的查询,生成结果表
        Table tableResult = tableEnvironment.sqlQuery("select wordName,sum(freq) as freq " +
                "from wordcount group by wordName having sum(freq) > 1" +
                "union all select wordName,sum(freq) as freq from "+wordcount+" group by wordName having sum(freq) > 1");
        // 6.将table表转换为DataStream
        DataStream<Tuple2<Boolean, WC>> retractStream = tableEnvironment.toRetractStream(tableResult, WC.class);
        retractStream.print();
        env.execute();
        /**
         * 15> (true,WC{wordName='java', freq=2})
         * 15> (true,WC{wordName='java', freq=2})
         * 7> (true,WC{wordName='hbase', freq=2})
         * 7> (true,WC{wordName='hbase', freq=2})
         */
    }
}





相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
打赏
0
0
0
0
12
分享
相关文章
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是为应对传统数据处理框架中流批分离的问题而诞生的,它融合了SQL的简洁性和Flink的强大流批处理能力,降低了大数据处理门槛。其核心工作原理包括生成逻辑执行计划、查询优化和构建算子树,确保高效执行。Flink SQL 支持过滤、投影、聚合、连接和窗口等常用算子,实现了流批一体处理,极大提高了开发效率和代码复用性。通过统一的API和语法,Flink SQL 能够灵活应对实时和离线数据分析场景,为企业提供强大的数据处理能力。
103 26
|
4月前
|
Flink实践:通过Flink SQL进行SFTP文件的读写操作
虽然 Apache Flink 与 SFTP 之间的直接交互存在一定的限制,但通过一些创造性的方法和技术,我们仍然可以有效地实现对 SFTP 文件的读写操作。这既展现了 Flink 在处理复杂数据场景中的强大能力,也体现了软件工程中常见的问题解决思路——即通过现有工具和一定的间接方法来克服技术障碍。通过这种方式,Flink SQL 成为了处理各种数据源,包括 SFTP 文件,在内的强大工具。
227 15
|
1月前
|
Flink SQL Deduplication 去重以及如何获取最新状态操作
Flink SQL Deduplication 是一种高效的数据去重功能,支持多种数据类型和灵活的配置选项。它通过哈希表、时间窗口和状态管理等技术实现去重,适用于流处理和批处理场景。本文介绍了其特性、原理、实际案例及源码分析,帮助读者更好地理解和应用这一功能。
149 14
|
3月前
|
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
73 0
奇迹降临!解锁 Flink SQL 简单高效的终极秘籍,开启数据处理的传奇之旅!
【9月更文挑战第7天】在大数据处理领域,Flink SQL 因其强大功能与简洁语法成为开发者首选。本文分享了编写高效 Flink SQL 的实用技巧:理解数据特征及业务需求;灵活运用窗口函数(如 TUMBLE 和 HOP);优化连接操作,优先采用等值连接;合理选择数据类型以减少计算资源消耗。结合实际案例(如实时电商数据分析),并通过定期性能测试与调优,助力开发者在大数据处理中更得心应手,挖掘更多价值信息。
63 1
Flink SQL 问题之执行报错如何解决
Flink SQL报错通常指在使用Apache Flink的SQL接口执行数据处理任务时遇到的问题;本合集将收集常见的Flink SQL报错情况及其解决方法,帮助用户迅速恢复数据处理流程。
685 2
Flink SQL 问题之用代码执行报错如何解决
Flink SQL报错通常指在使用Apache Flink的SQL接口执行数据处理任务时遇到的问题;本合集将收集常见的Flink SQL报错情况及其解决方法,帮助用户迅速恢复数据处理流程。
829 6
Flink SQL 问题之写入ES报错如何解决
Flink SQL报错通常指在使用Apache Flink的SQL接口执行数据处理任务时遇到的问题;本合集将收集常见的Flink SQL报错情况及其解决方法,帮助用户迅速恢复数据处理流程。
122 4
Flink SQL 问题之重启报错如何解决
Flink SQL报错通常指在使用Apache Flink的SQL接口执行数据处理任务时遇到的问题;本合集将收集常见的Flink SQL报错情况及其解决方法,帮助用户迅速恢复数据处理流程。
184 3
Flink SQL 问题之服务器报错如何解决
Flink SQL报错通常指在使用Apache Flink的SQL接口执行数据处理任务时遇到的问题;本合集将收集常见的Flink SQL报错情况及其解决方法,帮助用户迅速恢复数据处理流程。
158 3