Flink之Table API与SQL(一)

简介: 笔记

(1)Table API与SQL概念


Flink 社区很早就设想过将批数据看作一个有界流数据,将批处理看作流计算的一个 特例,从而实现流批统一,Flink 社区的开发人员在多轮讨论后,基本敲定了 Flink 未来的 技术架构



Apache Flink 有两种关系型 API 来做流批统一处理:Table API 和 SQL。


Table API 是用于 Scala 和 Java 语言的查询 API,它可以用一种非常直观的方式来 组合使用选取、过滤、join 等关系型算子。


Flink SQL 是基于 Apache Calcite 来实现的标准 SQL。这两种 API 中的查询对于 批(DataSet)和流(DataStream)的输入有相同的语义,也会产生同样的计算结果。


Table API 和 SQL 两种 API 是紧密集成的,以及 DataStream 和 DataSet API。你 可以在这些 API 之间,以及一些基于这些 API 的库之间轻松的切换。比如,你可以先 用 CEP 从 DataStream 中做模式匹配,然后用 Table API 来分析匹配的结果;或者你 可以用 SQL 来扫描、过滤、聚合一个批式的表,然后再跑一个 Gelly 图算法 来处理已 经预处理好的数据。


(2)Table 程序依赖


取决于你使用的编程语言,选择 Java 或者 Scala API 来构建你的 Table API 和 SQL 程序:

<!-- Either... -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-java-bridge_2.11</artifactId>
  <version>1.12.0</version>
</dependency>
<!-- or... -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
  <version>1.12.0</version>
</dependency>

除此之外,如果你想在 IDE 本地运行你的程序,你需要添加下面的模块,具体用哪个取决于你使用哪个 Planner:

<!-- Either... (for the old planner that was available before Flink 1.9) -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-planner_2.11</artifactId>
  <version>1.12.0</version>
</dependency>
<!-- or.. (for the new Blink planner) -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-planner-blink_2.11</artifactId>
  <version>1.12.0</version>
</dependency>

内部实现上,部分 table 相关的代码是用 Scala 实现的。所以,下面的依赖也需要添加到你的程序里,不管是批式还是流式的程序:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-scala_2.11</artifactId>
  <version>1.12.0</version>
</dependency>

(3)两种计划器(Planner)的主要区别

Blink 将批处理作业视作流处理的一种特例。严格来说,Table 和 DataSet 之间不支持相互转换,并且批处理作业也不会转换成 DataSet 程序而是转换成 DataStream 程序,流处理作业也一样。

Blink 计划器不支持 BatchTableSource,而是使用有界的 StreamTableSource 来替代。

旧计划器和 Blink 计划器中 FilterableTableSource 的实现是不兼容的。旧计划器会将 PlannerExpression 下推至 FilterableTableSource,而 Blink 计划器则是将 Expression 下推。

基于字符串的键值配置选项仅在 Blink 计划器中使用。(详情参见 配置 )

PlannerConfig 在两种计划器中的实现(CalciteConfig)是不同的。

Blink 计划器会将多sink(multiple-sinks)优化成一张有向无环图(DAG),TableEnvironment 和 StreamTableEnvironment 都支持该特性。旧计划器总是将每个sink都优化成一个新的有向无环图,且所有图相互独立。

旧计划器目前不支持 catalog 统计数据,而 Blink 支持。


(4)Table API 和 SQL 程序的结构


Table API 和 SQL 集成在同一套 API 中。这套 API 的核心概念是Table,用作查询的输入和输出。所有用于批处理和流处理的 Table API 和 SQL 程序都遵循相同的模式。下面的代码示例展示了 Table API 和 SQL 程序的通用结构。

// create a TableEnvironment for specific planner batch or streaming
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// create an input Table
tableEnv.executeSql("CREATE TEMPORARY TABLE table1 ... WITH ( 'connector' = ... )");
// register an output Table
tableEnv.executeSql("CREATE TEMPORARY TABLE outputTable ... WITH ( 'connector' = ... )");
// create a Table object from a Table API query
Table table2 = tableEnv.from("table1").select(...);
// create a Table object from a SQL query
Table table3 = tableEnv.sqlQuery("SELECT ... FROM table1 ... ");
// emit a Table API result Table to a TableSink, same for SQL result
TableResult tableResult = table2.executeInsert("outputTable");
tableResult...


(5)创建 TableEnvironment


TableEnvironment 是 Table API 和 SQL 的核心概念。它负责:


在内部的 catalog 中注册 Table

注册外部的 catalog

加载可插拔模块

执行 SQL 查询

注册自定义函数 (scalar、table 或 aggregation)

将 DataStream 或 DataSet 转换成 Table

持有对 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用

Table 总是与特定的 TableEnvironment 绑定。不能在同一条查询中使用不同 TableEnvironment 中的表,例如,对它们进行 join 或 union 操作。


TableEnvironment 可以通过静态方法 BatchTableEnvironment.create() 或者 StreamTableEnvironment.create() 在 StreamExecutionEnvironment 或者 ExecutionEnvironment 中创建,TableConfig 是可选项。TableConfig可用于配置TableEnvironment或定制的查询优化和转换过程

// **********************
// FLINK STREAMING QUERY
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);
// or TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings);
// ******************
// FLINK BATCH QUERY
// ******************
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);
// **********************
// BLINK STREAMING QUERY
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
// or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);
// ******************
// BLINK BATCH QUERY
// ******************
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);


(6)Table API与SQL开发


对batch数据源通过Table API对表查询

package com.aikfk.flink.sql;
import com.aikfk.flink.sql.pojo.WC;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/4/5 10:32 下午
 */
public class WordCountBatchTable {
    public static void main(String[] args) throws Exception {
        // 1.准备环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 2.创建TableEnvironment(old planner)
        BatchTableEnvironment tableEnvironment = BatchTableEnvironment.create(env);
        // 3.batch数据源
        DataSet<WC> dataSet = env.fromElements(
                new WC("java", 1),
                new WC("spark", 1),
                new WC("hive", 1),
                new WC("hbase", 1),
                new WC("hbase", 1),
                new WC("hadoop", 1),
                new WC("java", 1));
        // 4.将DataSet转换成table表
        Table table = tableEnvironment.fromDataSet(dataSet);
        // 5.通过Table API对table表做逻辑处理,生成结果表
        Table tableResult = table.groupBy($("wordName"))
                .select($("wordName"), $("freq").sum().as("freq"))
                .filter($("freq").isEqual(2));
        tableResult.printSchema();
        /**
         * root
         *  |-- wordName: STRING
         *  |-- freq: BIGINT
         */
        // 6.将table表转换为DataSet
        DataSet<WC> wcDataSet = tableEnvironment.toDataSet(tableResult, WC.class);
        wcDataSet.print();
    /**
         * WC{wordName='java', freq=2}
         * WC{wordName='hbase', freq=2}
         */
    }
}

对batch数据源通过SQL对表查询

package com.aikfk.flink.sql;
import com.aikfk.flink.sql.pojo.WC;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/4/5 10:32 下午
 */
public class WordCountBatchSQL {
    public static void main(String[] args) throws Exception {
        // 1.准备环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 2.创建TableEnvironment(old planner)
        BatchTableEnvironment tableEnvironment = BatchTableEnvironment.create(env);
        // 3.batch数据源
        DataSet<WC> dataSet = env.fromElements(
                new WC("java", 1),
                new WC("spark", 1),
                new WC("hive", 1),
                new WC("hbase", 1),
                new WC("hbase", 1),
                new WC("hadoop", 1),
                new WC("java", 1));
        // Table table = tableEnvironment.fromDataSet(dataSet);
        // 4.创建虚拟表
        // tableEnvironment.createTemporaryView("wordcount" ,table);
        // 将dataSet转换为视图
        tableEnvironment.createTemporaryView("wordcount" ,dataSet,$("wordName"), $("freq"));
        // 5.通过SQL对表的查询,生成结果表
        Table tableResult = tableEnvironment.sqlQuery("select wordName,sum(freq) as freq " +
                "from wordcount group by wordName having sum(freq) > 1" );
        // 6.将table表转换为DataSet
        DataSet<WC> wcDataSet = tableEnvironment.toDataSet(tableResult, WC.class);
        wcDataSet.print();
        /**
         * WC{wordName='java', freq=2}
         * WC{wordName='hbase', freq=2}
         */
    }
}

对stream数据源通过SQL对表查询

package com.aikfk.flink.sql;
import com.aikfk.flink.sql.pojo.WC;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/4/5 10:32 下午
 */
public class WordCountStreamSQL {
    public static void main(String[] args) throws Exception {
        // 1.准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2.创建TableEnvironment(Blink planner)
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env , settings);
        // 3.batch数据源
        DataStream<WC> dataStream = env.fromElements(
                new WC("java", 1),
                new WC("spark", 1),
                new WC("hive", 1),
                new WC("hbase", 1),
                new WC("hbase", 1),
                new WC("hadoop", 1),
                new WC("java", 1));
        // 将dataStream流转换为table表
        Table wordcount = tableEnvironment.fromDataStream(dataStream,$("wordName"), $("freq"));
        // 4.将dataStream转换为视图
        tableEnvironment.createTemporaryView("wordcount" ,dataStream,$("wordName"), $("freq"));
        // 5.通过SQL对表的查询,生成结果表
        Table tableResult = tableEnvironment.sqlQuery("select wordName,sum(freq) as freq " +
                "from wordcount group by wordName having sum(freq) > 1" +
                "union all select wordName,sum(freq) as freq from "+wordcount+" group by wordName having sum(freq) > 1");
        // 6.将table表转换为DataStream
        DataStream<Tuple2<Boolean, WC>> retractStream = tableEnvironment.toRetractStream(tableResult, WC.class);
        retractStream.print();
        env.execute();
        /**
         * 15> (true,WC{wordName='java', freq=2})
         * 15> (true,WC{wordName='java', freq=2})
         * 7> (true,WC{wordName='hbase', freq=2})
         * 7> (true,WC{wordName='hbase', freq=2})
         */
    }
}





相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
消息中间件 Java Kafka
Flink--4、DateStream API(执行环境、源算子、基本转换算子)
Flink--4、DateStream API(执行环境、源算子、基本转换算子)
Flink--4、DateStream API(执行环境、源算子、基本转换算子)
|
SQL 大数据 数据处理
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是为应对传统数据处理框架中流批分离的问题而诞生的,它融合了SQL的简洁性和Flink的强大流批处理能力,降低了大数据处理门槛。其核心工作原理包括生成逻辑执行计划、查询优化和构建算子树,确保高效执行。Flink SQL 支持过滤、投影、聚合、连接和窗口等常用算子,实现了流批一体处理,极大提高了开发效率和代码复用性。通过统一的API和语法,Flink SQL 能够灵活应对实时和离线数据分析场景,为企业提供强大的数据处理能力。
2383 27
|
数据采集 算法 小程序
MaaS一体化绿色出行服务,实现城市交通出行碳中和
和管理交通相关服务,以满足消费者的出行需求。旨在深刻理解公众的出行需求,将各种交通模式整合在统一的服务体系与平台上,利用大数据进行决策,以优化资源配置,满足出行者多样化出行需求,并通过统一的互联网应用对外提供服务。
3880 0
MaaS一体化绿色出行服务,实现城市交通出行碳中和
|
Web App开发 安全
利用chrome进行阿里云多工作环境切换
场景:境外银行作为第一家云上银行,不同区域和环境采用阿里云主账号进行隔离,不同的子账号登录态无法共存或者切换,工作中需要频繁进行工作环境切换时就需要先退出再重新登录,非常繁琐,且容易造成混乱,引起不必要的误操作问题。经安全大佬 @先本 指导,利用chrome的多身份功能,完美解决~分享给大家,希望可以帮助大家提高工作效率启用chrome多身份:1、打开chrome浏览器,点击用户头像,点击添加按钮
831 0
|
SQL 关系型数据库 API
Flink(十三)【Flink SQL(上)SqlClient、DDL、查询】(1)
Flink(十三)【Flink SQL(上)SqlClient、DDL、查询】
Flink(十三)【Flink SQL(上)SqlClient、DDL、查询】(1)
|
SQL Java 流计算
Flink(十三)【Flink SQL(上)SqlClient、DDL、查询】(3)
Flink(十三)【Flink SQL(上)SqlClient、DDL、查询】
|
负载均衡 Kubernetes 网络协议
Istio:xDS协议解析
Istio:xDS协议解析
Istio:xDS协议解析
|
SQL 消息中间件 存储
Flink(十三)【Flink SQL(上)SqlClient、DDL、查询】(2)
Flink(十三)【Flink SQL(上)SqlClient、DDL、查询】
|
算法 计算机视觉 Python
【OpenCV】-算子(Sobel、Canny、Laplacian)学习
【OpenCV】-算子(Sobel、Canny、Laplacian)学习
1647 2
|
存储 运维 关系型数据库
探索 Apache Paimon 在阿里智能引擎的应用场景
本文整理自Apache Yarn && Flink Contributor,阿里巴巴智能引擎事业部技术专家王伟骏(鸿历)老师在 5月16日 Streaming Lakehouse Meetup · Online 上的分享。
26542 34
探索 Apache Paimon 在阿里智能引擎的应用场景