(1)Table API与SQL概念
Flink 社区很早就设想过将批数据看作一个有界流数据,将批处理看作流计算的一个 特例,从而实现流批统一,Flink 社区的开发人员在多轮讨论后,基本敲定了 Flink 未来的 技术架构
Apache Flink 有两种关系型 API 来做流批统一处理:Table API 和 SQL。
Table API 是用于 Scala 和 Java 语言的查询 API,它可以用一种非常直观的方式来 组合使用选取、过滤、join 等关系型算子。
Flink SQL 是基于 Apache Calcite 来实现的标准 SQL。这两种 API 中的查询对于 批(DataSet)和流(DataStream)的输入有相同的语义,也会产生同样的计算结果。
Table API 和 SQL 两种 API 是紧密集成的,以及 DataStream 和 DataSet API。你 可以在这些 API 之间,以及一些基于这些 API 的库之间轻松的切换。比如,你可以先 用 CEP 从 DataStream 中做模式匹配,然后用 Table API 来分析匹配的结果;或者你 可以用 SQL 来扫描、过滤、聚合一个批式的表,然后再跑一个 Gelly 图算法 来处理已 经预处理好的数据。
(2)Table 程序依赖
取决于你使用的编程语言,选择 Java 或者 Scala API 来构建你的 Table API 和 SQL 程序:
<!-- Either... --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.11</artifactId> <version>1.12.0</version> </dependency> <!-- or... --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_2.11</artifactId> <version>1.12.0</version> </dependency>
除此之外,如果你想在 IDE 本地运行你的程序,你需要添加下面的模块,具体用哪个取决于你使用哪个 Planner:
<!-- Either... (for the old planner that was available before Flink 1.9) --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.11</artifactId> <version>1.12.0</version> </dependency> <!-- or.. (for the new Blink planner) --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.11</artifactId> <version>1.12.0</version> </dependency>
内部实现上,部分 table 相关的代码是用 Scala 实现的。所以,下面的依赖也需要添加到你的程序里,不管是批式还是流式的程序:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>1.12.0</version> </dependency>
(3)两种计划器(Planner)的主要区别
Blink 将批处理作业视作流处理的一种特例。严格来说,Table 和 DataSet 之间不支持相互转换,并且批处理作业也不会转换成 DataSet 程序而是转换成 DataStream 程序,流处理作业也一样。
Blink 计划器不支持 BatchTableSource,而是使用有界的 StreamTableSource 来替代。
旧计划器和 Blink 计划器中 FilterableTableSource 的实现是不兼容的。旧计划器会将 PlannerExpression 下推至 FilterableTableSource,而 Blink 计划器则是将 Expression 下推。
基于字符串的键值配置选项仅在 Blink 计划器中使用。(详情参见 配置 )
PlannerConfig 在两种计划器中的实现(CalciteConfig)是不同的。
Blink 计划器会将多sink(multiple-sinks)优化成一张有向无环图(DAG),TableEnvironment 和 StreamTableEnvironment 都支持该特性。旧计划器总是将每个sink都优化成一个新的有向无环图,且所有图相互独立。
旧计划器目前不支持 catalog 统计数据,而 Blink 支持。
(4)Table API 和 SQL 程序的结构
Table API 和 SQL 集成在同一套 API 中。这套 API 的核心概念是Table,用作查询的输入和输出。所有用于批处理和流处理的 Table API 和 SQL 程序都遵循相同的模式。下面的代码示例展示了 Table API 和 SQL 程序的通用结构。
// create a TableEnvironment for specific planner batch or streaming TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section // create an input Table tableEnv.executeSql("CREATE TEMPORARY TABLE table1 ... WITH ( 'connector' = ... )"); // register an output Table tableEnv.executeSql("CREATE TEMPORARY TABLE outputTable ... WITH ( 'connector' = ... )"); // create a Table object from a Table API query Table table2 = tableEnv.from("table1").select(...); // create a Table object from a SQL query Table table3 = tableEnv.sqlQuery("SELECT ... FROM table1 ... "); // emit a Table API result Table to a TableSink, same for SQL result TableResult tableResult = table2.executeInsert("outputTable"); tableResult...
(5)创建 TableEnvironment
TableEnvironment 是 Table API 和 SQL 的核心概念。它负责:
在内部的 catalog 中注册 Table
注册外部的 catalog
加载可插拔模块
执行 SQL 查询
注册自定义函数 (scalar、table 或 aggregation)
将 DataStream 或 DataSet 转换成 Table
持有对 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用
Table 总是与特定的 TableEnvironment 绑定。不能在同一条查询中使用不同 TableEnvironment 中的表,例如,对它们进行 join 或 union 操作。
TableEnvironment 可以通过静态方法 BatchTableEnvironment.create() 或者 StreamTableEnvironment.create() 在 StreamExecutionEnvironment 或者 ExecutionEnvironment 中创建,TableConfig 是可选项。TableConfig可用于配置TableEnvironment或定制的查询优化和转换过程
// ********************** // FLINK STREAMING QUERY // ********************** import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build(); StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings); // or TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings); // ****************** // FLINK BATCH QUERY // ****************** import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.table.api.bridge.java.BatchTableEnvironment; ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv); // ********************** // BLINK STREAMING QUERY // ********************** import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); // or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings); // ****************** // BLINK BATCH QUERY // ****************** import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
(6)Table API与SQL开发
对batch数据源通过Table API对表查询
package com.aikfk.flink.sql; import com.aikfk.flink.sql.pojo.WC; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.BatchTableEnvironment; import static org.apache.flink.table.api.Expressions.$; /** * @author :caizhengjie * @description:TODO * @date :2021/4/5 10:32 下午 */ public class WordCountBatchTable { public static void main(String[] args) throws Exception { // 1.准备环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 2.创建TableEnvironment(old planner) BatchTableEnvironment tableEnvironment = BatchTableEnvironment.create(env); // 3.batch数据源 DataSet<WC> dataSet = env.fromElements( new WC("java", 1), new WC("spark", 1), new WC("hive", 1), new WC("hbase", 1), new WC("hbase", 1), new WC("hadoop", 1), new WC("java", 1)); // 4.将DataSet转换成table表 Table table = tableEnvironment.fromDataSet(dataSet); // 5.通过Table API对table表做逻辑处理,生成结果表 Table tableResult = table.groupBy($("wordName")) .select($("wordName"), $("freq").sum().as("freq")) .filter($("freq").isEqual(2)); tableResult.printSchema(); /** * root * |-- wordName: STRING * |-- freq: BIGINT */ // 6.将table表转换为DataSet DataSet<WC> wcDataSet = tableEnvironment.toDataSet(tableResult, WC.class); wcDataSet.print(); /** * WC{wordName='java', freq=2} * WC{wordName='hbase', freq=2} */ } }
对batch数据源通过SQL对表查询
package com.aikfk.flink.sql; import com.aikfk.flink.sql.pojo.WC; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.BatchTableEnvironment; import static org.apache.flink.table.api.Expressions.$; /** * @author :caizhengjie * @description:TODO * @date :2021/4/5 10:32 下午 */ public class WordCountBatchSQL { public static void main(String[] args) throws Exception { // 1.准备环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 2.创建TableEnvironment(old planner) BatchTableEnvironment tableEnvironment = BatchTableEnvironment.create(env); // 3.batch数据源 DataSet<WC> dataSet = env.fromElements( new WC("java", 1), new WC("spark", 1), new WC("hive", 1), new WC("hbase", 1), new WC("hbase", 1), new WC("hadoop", 1), new WC("java", 1)); // Table table = tableEnvironment.fromDataSet(dataSet); // 4.创建虚拟表 // tableEnvironment.createTemporaryView("wordcount" ,table); // 将dataSet转换为视图 tableEnvironment.createTemporaryView("wordcount" ,dataSet,$("wordName"), $("freq")); // 5.通过SQL对表的查询,生成结果表 Table tableResult = tableEnvironment.sqlQuery("select wordName,sum(freq) as freq " + "from wordcount group by wordName having sum(freq) > 1" ); // 6.将table表转换为DataSet DataSet<WC> wcDataSet = tableEnvironment.toDataSet(tableResult, WC.class); wcDataSet.print(); /** * WC{wordName='java', freq=2} * WC{wordName='hbase', freq=2} */ } }
对stream数据源通过SQL对表查询
package com.aikfk.flink.sql; import com.aikfk.flink.sql.pojo.WC; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import static org.apache.flink.table.api.Expressions.$; /** * @author :caizhengjie * @description:TODO * @date :2021/4/5 10:32 下午 */ public class WordCountStreamSQL { public static void main(String[] args) throws Exception { // 1.准备环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2.创建TableEnvironment(Blink planner) EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build(); StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env , settings); // 3.batch数据源 DataStream<WC> dataStream = env.fromElements( new WC("java", 1), new WC("spark", 1), new WC("hive", 1), new WC("hbase", 1), new WC("hbase", 1), new WC("hadoop", 1), new WC("java", 1)); // 将dataStream流转换为table表 Table wordcount = tableEnvironment.fromDataStream(dataStream,$("wordName"), $("freq")); // 4.将dataStream转换为视图 tableEnvironment.createTemporaryView("wordcount" ,dataStream,$("wordName"), $("freq")); // 5.通过SQL对表的查询,生成结果表 Table tableResult = tableEnvironment.sqlQuery("select wordName,sum(freq) as freq " + "from wordcount group by wordName having sum(freq) > 1" + "union all select wordName,sum(freq) as freq from "+wordcount+" group by wordName having sum(freq) > 1"); // 6.将table表转换为DataStream DataStream<Tuple2<Boolean, WC>> retractStream = tableEnvironment.toRetractStream(tableResult, WC.class); retractStream.print(); env.execute(); /** * 15> (true,WC{wordName='java', freq=2}) * 15> (true,WC{wordName='java', freq=2}) * 7> (true,WC{wordName='hbase', freq=2}) * 7> (true,WC{wordName='hbase', freq=2}) */ } }