Flink之Table API与SQL(一)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 笔记

(1)Table API与SQL概念


Flink 社区很早就设想过将批数据看作一个有界流数据,将批处理看作流计算的一个 特例,从而实现流批统一,Flink 社区的开发人员在多轮讨论后,基本敲定了 Flink 未来的 技术架构



Apache Flink 有两种关系型 API 来做流批统一处理:Table API 和 SQL。


Table API 是用于 Scala 和 Java 语言的查询 API,它可以用一种非常直观的方式来 组合使用选取、过滤、join 等关系型算子。


Flink SQL 是基于 Apache Calcite 来实现的标准 SQL。这两种 API 中的查询对于 批(DataSet)和流(DataStream)的输入有相同的语义,也会产生同样的计算结果。


Table API 和 SQL 两种 API 是紧密集成的,以及 DataStream 和 DataSet API。你 可以在这些 API 之间,以及一些基于这些 API 的库之间轻松的切换。比如,你可以先 用 CEP 从 DataStream 中做模式匹配,然后用 Table API 来分析匹配的结果;或者你 可以用 SQL 来扫描、过滤、聚合一个批式的表,然后再跑一个 Gelly 图算法 来处理已 经预处理好的数据。


(2)Table 程序依赖


取决于你使用的编程语言,选择 Java 或者 Scala API 来构建你的 Table API 和 SQL 程序:

<!-- Either... -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-java-bridge_2.11</artifactId>
  <version>1.12.0</version>
</dependency>
<!-- or... -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
  <version>1.12.0</version>
</dependency>

除此之外,如果你想在 IDE 本地运行你的程序,你需要添加下面的模块,具体用哪个取决于你使用哪个 Planner:

<!-- Either... (for the old planner that was available before Flink 1.9) -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-planner_2.11</artifactId>
  <version>1.12.0</version>
</dependency>
<!-- or.. (for the new Blink planner) -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-planner-blink_2.11</artifactId>
  <version>1.12.0</version>
</dependency>

内部实现上,部分 table 相关的代码是用 Scala 实现的。所以,下面的依赖也需要添加到你的程序里,不管是批式还是流式的程序:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-scala_2.11</artifactId>
  <version>1.12.0</version>
</dependency>

(3)两种计划器(Planner)的主要区别

Blink 将批处理作业视作流处理的一种特例。严格来说,Table 和 DataSet 之间不支持相互转换,并且批处理作业也不会转换成 DataSet 程序而是转换成 DataStream 程序,流处理作业也一样。

Blink 计划器不支持 BatchTableSource,而是使用有界的 StreamTableSource 来替代。

旧计划器和 Blink 计划器中 FilterableTableSource 的实现是不兼容的。旧计划器会将 PlannerExpression 下推至 FilterableTableSource,而 Blink 计划器则是将 Expression 下推。

基于字符串的键值配置选项仅在 Blink 计划器中使用。(详情参见 配置 )

PlannerConfig 在两种计划器中的实现(CalciteConfig)是不同的。

Blink 计划器会将多sink(multiple-sinks)优化成一张有向无环图(DAG),TableEnvironment 和 StreamTableEnvironment 都支持该特性。旧计划器总是将每个sink都优化成一个新的有向无环图,且所有图相互独立。

旧计划器目前不支持 catalog 统计数据,而 Blink 支持。


(4)Table API 和 SQL 程序的结构


Table API 和 SQL 集成在同一套 API 中。这套 API 的核心概念是Table,用作查询的输入和输出。所有用于批处理和流处理的 Table API 和 SQL 程序都遵循相同的模式。下面的代码示例展示了 Table API 和 SQL 程序的通用结构。

// create a TableEnvironment for specific planner batch or streaming
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// create an input Table
tableEnv.executeSql("CREATE TEMPORARY TABLE table1 ... WITH ( 'connector' = ... )");
// register an output Table
tableEnv.executeSql("CREATE TEMPORARY TABLE outputTable ... WITH ( 'connector' = ... )");
// create a Table object from a Table API query
Table table2 = tableEnv.from("table1").select(...);
// create a Table object from a SQL query
Table table3 = tableEnv.sqlQuery("SELECT ... FROM table1 ... ");
// emit a Table API result Table to a TableSink, same for SQL result
TableResult tableResult = table2.executeInsert("outputTable");
tableResult...


(5)创建 TableEnvironment


TableEnvironment 是 Table API 和 SQL 的核心概念。它负责:


在内部的 catalog 中注册 Table

注册外部的 catalog

加载可插拔模块

执行 SQL 查询

注册自定义函数 (scalar、table 或 aggregation)

将 DataStream 或 DataSet 转换成 Table

持有对 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用

Table 总是与特定的 TableEnvironment 绑定。不能在同一条查询中使用不同 TableEnvironment 中的表,例如,对它们进行 join 或 union 操作。


TableEnvironment 可以通过静态方法 BatchTableEnvironment.create() 或者 StreamTableEnvironment.create() 在 StreamExecutionEnvironment 或者 ExecutionEnvironment 中创建,TableConfig 是可选项。TableConfig可用于配置TableEnvironment或定制的查询优化和转换过程

// **********************
// FLINK STREAMING QUERY
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);
// or TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings);
// ******************
// FLINK BATCH QUERY
// ******************
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);
// **********************
// BLINK STREAMING QUERY
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
// or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);
// ******************
// BLINK BATCH QUERY
// ******************
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);


(6)Table API与SQL开发


对batch数据源通过Table API对表查询

package com.aikfk.flink.sql;
import com.aikfk.flink.sql.pojo.WC;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/4/5 10:32 下午
 */
public class WordCountBatchTable {
    public static void main(String[] args) throws Exception {
        // 1.准备环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 2.创建TableEnvironment(old planner)
        BatchTableEnvironment tableEnvironment = BatchTableEnvironment.create(env);
        // 3.batch数据源
        DataSet<WC> dataSet = env.fromElements(
                new WC("java", 1),
                new WC("spark", 1),
                new WC("hive", 1),
                new WC("hbase", 1),
                new WC("hbase", 1),
                new WC("hadoop", 1),
                new WC("java", 1));
        // 4.将DataSet转换成table表
        Table table = tableEnvironment.fromDataSet(dataSet);
        // 5.通过Table API对table表做逻辑处理,生成结果表
        Table tableResult = table.groupBy($("wordName"))
                .select($("wordName"), $("freq").sum().as("freq"))
                .filter($("freq").isEqual(2));
        tableResult.printSchema();
        /**
         * root
         *  |-- wordName: STRING
         *  |-- freq: BIGINT
         */
        // 6.将table表转换为DataSet
        DataSet<WC> wcDataSet = tableEnvironment.toDataSet(tableResult, WC.class);
        wcDataSet.print();
    /**
         * WC{wordName='java', freq=2}
         * WC{wordName='hbase', freq=2}
         */
    }
}

对batch数据源通过SQL对表查询

package com.aikfk.flink.sql;
import com.aikfk.flink.sql.pojo.WC;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/4/5 10:32 下午
 */
public class WordCountBatchSQL {
    public static void main(String[] args) throws Exception {
        // 1.准备环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 2.创建TableEnvironment(old planner)
        BatchTableEnvironment tableEnvironment = BatchTableEnvironment.create(env);
        // 3.batch数据源
        DataSet<WC> dataSet = env.fromElements(
                new WC("java", 1),
                new WC("spark", 1),
                new WC("hive", 1),
                new WC("hbase", 1),
                new WC("hbase", 1),
                new WC("hadoop", 1),
                new WC("java", 1));
        // Table table = tableEnvironment.fromDataSet(dataSet);
        // 4.创建虚拟表
        // tableEnvironment.createTemporaryView("wordcount" ,table);
        // 将dataSet转换为视图
        tableEnvironment.createTemporaryView("wordcount" ,dataSet,$("wordName"), $("freq"));
        // 5.通过SQL对表的查询,生成结果表
        Table tableResult = tableEnvironment.sqlQuery("select wordName,sum(freq) as freq " +
                "from wordcount group by wordName having sum(freq) > 1" );
        // 6.将table表转换为DataSet
        DataSet<WC> wcDataSet = tableEnvironment.toDataSet(tableResult, WC.class);
        wcDataSet.print();
        /**
         * WC{wordName='java', freq=2}
         * WC{wordName='hbase', freq=2}
         */
    }
}

对stream数据源通过SQL对表查询

package com.aikfk.flink.sql;
import com.aikfk.flink.sql.pojo.WC;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/4/5 10:32 下午
 */
public class WordCountStreamSQL {
    public static void main(String[] args) throws Exception {
        // 1.准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2.创建TableEnvironment(Blink planner)
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env , settings);
        // 3.batch数据源
        DataStream<WC> dataStream = env.fromElements(
                new WC("java", 1),
                new WC("spark", 1),
                new WC("hive", 1),
                new WC("hbase", 1),
                new WC("hbase", 1),
                new WC("hadoop", 1),
                new WC("java", 1));
        // 将dataStream流转换为table表
        Table wordcount = tableEnvironment.fromDataStream(dataStream,$("wordName"), $("freq"));
        // 4.将dataStream转换为视图
        tableEnvironment.createTemporaryView("wordcount" ,dataStream,$("wordName"), $("freq"));
        // 5.通过SQL对表的查询,生成结果表
        Table tableResult = tableEnvironment.sqlQuery("select wordName,sum(freq) as freq " +
                "from wordcount group by wordName having sum(freq) > 1" +
                "union all select wordName,sum(freq) as freq from "+wordcount+" group by wordName having sum(freq) > 1");
        // 6.将table表转换为DataStream
        DataStream<Tuple2<Boolean, WC>> retractStream = tableEnvironment.toRetractStream(tableResult, WC.class);
        retractStream.print();
        env.execute();
        /**
         * 15> (true,WC{wordName='java', freq=2})
         * 15> (true,WC{wordName='java', freq=2})
         * 7> (true,WC{wordName='hbase', freq=2})
         * 7> (true,WC{wordName='hbase', freq=2})
         */
    }
}





相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
4月前
|
SQL 分布式计算 DataWorks
DataWorks产品使用合集之使用API调用ODPS SQL时,出现资源被定时任务抢占,该怎么办
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
3月前
|
API Java 数据库连接
从平凡到卓越:Hibernate Criteria API 让你的数据库查询瞬间高大上,彻底告别复杂SQL!
【8月更文挑战第31天】构建复杂查询是数据库应用开发中的常见需求。Hibernate 的 Criteria API 以其强大和灵活的特点,允许开发者以面向对象的方式构建查询逻辑,同时具备 SQL 的表达力。本文将介绍 Criteria API 的基本用法并通过示例展示其实际应用。此 API 通过 API 构建查询条件而非直接编写查询语句,提高了代码的可读性和安全性。无论是简单的条件过滤还是复杂的分页和连接查询,Criteria API 均能胜任,有助于提升开发效率和应用的健壮性。
131 0
|
4月前
|
SQL Oracle 关系型数据库
CREATE TABLE 时的 SQL FOREIGN KEY 约束
【7月更文挑战第24天】CREATE TABLE 时的 SQL FOREIGN KEY 约束。
56 5
|
4月前
|
SQL Oracle 关系型数据库
CREATE TABLE 时的 SQL FOREIGN KEY 约束
【7月更文挑战第19天】CREATE TABLE 时的 SQL FOREIGN KEY 约束
41 8
|
4月前
|
SQL Oracle 关系型数据库
ALTER TABLE 时的 SQL PRIMARY KEY 约束
【7月更文挑战第24天】ALTER TABLE 时的 SQL PRIMARY KEY 约束。
44 3
|
4月前
|
SQL Oracle 关系型数据库
CREATE TABLE 时的 SQL PRIMARY KEY 约束
【7月更文挑战第24天】CREATE TABLE 时的 SQL PRIMARY KEY 约束。
43 2
|
4月前
|
SQL 关系型数据库 MySQL
SQL CREATE TABLE 语句
【7月更文挑战第18天】SQL CREATE TABLE 语句。
45 6
|
4月前
|
SQL Oracle 关系型数据库
ALTER TABLE 时的 SQL PRIMARY KEY 约束
【7月更文挑战第19天】ALTER TABLE 时的 SQL PRIMARY KEY 约束。
41 3
|
4月前
|
SQL 关系型数据库 MySQL
ALTER TABLE 时的 SQL DEFAULT 约束
【7月更文挑战第20天】ALTER TABLE 时的 SQL DEFAULT 约束。
42 1
|
4月前
|
SQL 关系型数据库 MySQL
SQL CREATE TABLE 语句
【7月更文挑战第16天】SQL CREATE TABLE 语句。
39 2

热门文章

最新文章

下一篇
无影云桌面