哈喽各位,本章主要写的是FlinkSQL也是Flink章节的倒数第二篇了,最后还有一篇FlinkCEP,稍后会出,耐心关注哦!好了,进入正题!!!!
Table API和SQL是最上层的API,在Flink中这两种API被集成在一起,SQL执行的对象也是Flink中的表(Table),所以我们一般会认为它们是一体的。Flink是批流统一的处理框架,无论是批处理(DataSet API)还是流处理(DataStream API),在上层应用中都可以直接使用Table API或者SQL来实现;这两种API对于一张表执行相同的查询操作,得到的结果是完全一样的。我们主要还是以流处理应用为例进行讲解。需要说明的是,Table API和SQL最初并不完善,在Flink 1.9版本合并阿里巴巴内部版本Blink之后发生了非常大的改变,此后也一直处在快速开发和完善的过程中,直到Flink 1.12版本才基本上做到了功能上的完善。而即使是在目前最新的1.13版本中,Table API和SQL也依然不算稳定,接口用法还在不停调整和更新。所以这部分希望大家重在理解原理和基本用法,具体的API调用可以随时关注官网的更新变化。
一、直接上手
如果我们对关系型数据库和SQL非常熟悉,那么Table API和SQL的使用其实非常简单:只要得到一个“表”(Table),然后对它调用Table API,或者直接写SQL就可以了。接下来我们就以一个非常简单的例子上手,初步了解一下这种高层级API的使用方法。
1.1 需要引入的依赖
我们想要在代码中使用Table API,必须引入相关的依赖。
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency>
这里的依赖是一个Java的“桥接器”(bridge),主要就是负责Table API和下层DataStream API的连接支持,按照不同的语言分为Java版和Scala版。如果我们希望在本地的集成开发环境(IDE)里运行Table API和SQL,还需要引入以下依赖:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency>
这里主要添加的依赖是一个“计划器”(planner),它是Table API的核心组件,负责提供运行时环境,并生成程序的执行计划。这里我们用到的是新版的blink planner。由于Flink安装包的lib目录下会自带planner,所以在生产集群环境中提交的作业不需要打包这个依赖。而在Table API的内部实现上,部分相关的代码是用 Scala 实现的,所以还需要额外添加一个Scala版流处理的相关依赖。
1.2 一个简单示例
有了基本的依赖,接下来我们就可以尝试在Flink代码中使用Table API和SQL了。比如,我们可以自定义一些Event类型的用户访问事件,作为输入的数据源;而后从中提取url地址和用户名user两个字段作为输出。如果使用DataStream API,我们可以直接读取数据源后,用一个简单转换算子map来做字段的提取。而这个需求直接写SQL的话,实现会更加简单:
select url, user from EventTable;
这里我们把流中所有数据组成的表叫作EventTable。在Flink代码中直接对这个表执行上面的SQL,就可以得到想要提取的数据了。在代码中具体实现如下:
public class TableExample { public static void main(String[] args) throws Exception { // 获取流执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 读取数据源 SingleOutputStreamOperator<Event> eventStream = env .fromElements( new Event("Alice", "./home", 1000L), new Event("Bob", "./cart", 1000L), new Event("Alice", "./prod?id=1", 5 * 1000L), new Event("Cary", "./home", 60 * 1000L), new Event("Bob", "./prod?id=3", 90 * 1000L), new Event("Alice", "./prod?id=7", 105 * 1000L) ); // 获取表环境 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // 将数据流转换成表 Table eventTable = tableEnv.fromDataStream(eventStream); // 用执行SQL 的方式提取数据 Table visitTable = tableEnv.sqlQuery("select url, user from " + eventTable); // 将表转换成数据流,打印输出 tableEnv.toDataStream(visitTable).print(); // 执行程序 env.execute(); } }
这里我们需要创建一个“表环境”(TableEnvironment),然后将数据流(DataStream)转换成一个表(Table);之后就可以执行SQL在这个表中查询数据了。
二、基本API
2.1 程序架构
在Flink中,Table API和SQL可以看作联结在一起的一套API,这套API的核心概念就是“表”(Table)。在我们的程序中,输入数据可以定义成一张表;然后对这张表进行查询,就可以得到新的表,这相当于就是流数据的转换操作;最后还可以定义一张用于输出的表,负责将处理结果写入到外部系统。程序基本架构如下:
// 创建表环境 TableEnvironment tableEnv = ...; // 创建输入表,连接外部系统读取数据 tableEnv.executeSql("CREATE TEMPORARY TABLE inputTable ... WITH ( 'connector' = ... )"); // 注册一个表,连接到外部系统,用于输出 tableEnv.executeSql("CREATE TEMPORARY TABLE outputTable ... WITH ( 'connector' = ... )"); // 执行SQL对表进行查询转换,得到一个新的表 Table table1 = tableEnv.sqlQuery("SELECT ... FROM inputTable... "); // 使用Table API对表进行查询转换,得到一个新的表 Table table2 = tableEnv.from("inputTable").select(...); // 将得到的结果写入输出表 TableResult tableResult = table1.executeInsert("outputTable");
这里不是从一个DataStream转换成Table,而是通过执行DDL来直接创建一个表。这里执行的CREATE语句中用WITH指定了外部系统的连接器,于是就可以连接外部系统读取数据了。这其实是更加一般化的程序架构,因为这样我们就可以完全抛开DataStream API,直接用SQL语句实现全部的流处理过程。
2.2 创建表环境
对于Flink这样的流处理框架来说,数据流和表在结构上还是有所区别的。所以使用Table API和SQL需要一个特别的运行时环境,这就是所谓的“表环境”(TableEnvironment)。它主要负责:
(1)注册Catalog和表; (2)执行 SQL 查询; (3)注册用户自定义函数(UDF); (4)DataStream 和表之间的转换。
每个表和SQL的执行,都必须绑定在一个表环境(TableEnvironment)中。TableEnvironment是Table API中提供的基本接口类,可以通过调用静态的create()方法来创建一个表环境实例。方法需要传入一个环境的配置参数EnvironmentSettings,它可以指定当前表环境的执行模式和计划器(planner)。执行模式有批处理和流处理两种选择,默认是流处理模式;计划器默认使用blink planner。
import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; EnvironmentSettings settings = EnvironmentSettings .newInstance() .inStreamingMode() // 使用流处理模式 .build(); TableEnvironment tableEnv = TableEnvironment.create(setting);
对于流处理场景,其实默认配置就完全够用了。所以我们也可以用另一种更加简单的方式来创建表环境:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
这里我们引入了一个“流式表环境”(StreamTableEnvironment),它是继承自TableEnvironment的子接口。调用它的create()方法,只需要直接将当前的流执行环境(StreamExecutionEnvironment)传入,就可以创建出对应的流式表环境了。
2.3 创建表
表(Table)是我们非常熟悉的一个概念,它是关系型数据库中数据存储的基本形式,也是SQL执行的基本对象。具体创建表的方式,有通过连接器(connector)和虚拟表(virtual tables)两种。
1. 连接器表(Connector Tables)
最直观的创建表的方式,就是通过连接器(connector)连接到一个外部系统,然后定义出对应的表结构。在代码中,我们可以调用表环境的executeSql()方法,可以传入一个DDL作为参数执行SQL操作。这里我们传入一个CREATE语句进行表的创建,并通过WITH关键字指定连接到外部系统的连接器:
tableEnv.executeSql("CREATE [TEMPORARY] TABLE MyTable ... WITH ( 'connector' = ... )");
这里的TEMPORARY关键字可以省略。关于连接器的具体定义,我们会在11.8节中展开讲解。
2. 虚拟表(Virtual Tables)
在环境中注册之后,我们就可以在SQL中直接使用这张表进行查询转换了。
Table newTable = tableEnv.sqlQuery("SELECT ... FROM MyTable... ");
这里调用了表环境的sqlQuery()方法,直接传入一条SQL语句作为参数执行查询,得到的结果是一个Table对象。Table是Table API中提供的核心接口类,就代表了一个Java中定义的表实例。由于newTable是一个Table对象,并没有在表环境中注册;所以如果希望直接在SQL中使用,我们还需要将这个中间结果表注册到环境中:
tableEnv.createTemporaryView("NewTable", newTable);
我们发现,这里的注册其实是创建了一个“虚拟表”(Virtual Table)。这个概念与SQL语法中的视图(View)非常类似,所以调用的方法也叫作创建“虚拟视图”(createTemporaryView)。
2.4 表的查询
创建好了表,接下来自然就是对表进行查询转换了。对一个表的查询(Query)操作,就对应着流数据的转换(Transform)处理。Flink为我们提供了两种查询方式:SQL,和Table API。
1. 执行SQL进行查询
基于表执行SQL语句,是我们最为熟悉的查询方式。在代码中,我们只要调用表环境的sqlQuery()方法,传入一个字符串形式的SQL查询语句就可以了。执行得到的结果,是一个Table对象。
// 创建表环境 TableEnvironment tableEnv = ...; // 创建表 tableEnv.executeSql("CREATE TABLE EventTable ... WITH ( 'connector' = ... )"); // 查询用户Alice的点击事件,并提取表中前两个字段 Table aliceVisitTable = tableEnv.sqlQuery( "SELECT user, url " + "FROM EventTable " + "WHERE user = 'Alice' " );
目前Flink支持标准SQL中的绝大部分用法,并提供了丰富的计算函数。这样我们就可以把已有的技术迁移过来,像在MySQL、Hive中那样直接通过编写SQL实现自己的处理需求,从而大大降低了Flink上手的难度。例如,我们也可以通过GROUP BY关键字定义分组聚合,调用COUNT()、SUM()这样的函数来进行统计计算:
Table urlCountTable = tableEnv.sqlQuery( "SELECT user, COUNT(url) " + "FROM EventTable " + "GROUP BY user " );
上面的例子得到的是一个新的Table对象,我们可以再次将它注册为虚拟表继续在SQL中调用。另外,我们也可以直接将查询的结果写入到已经注册的表中,这需要调用表环境的executeSql()方法来执行DDL,传入的是一个INSERT语句:
// 注册表 tableEnv.executeSql("CREATE TABLE EventTable ... WITH ( 'connector' = ... )"); tableEnv.executeSql("CREATE TABLE OutputTable ... WITH ( 'connector' = ... )"); // 将查询结果输出到OutputTable中 tableEnv.executeSql ( "INSERT INTO OutputTable " + "SELECT user, url " + "FROM EventTable " + "WHERE user = 'Alice' " );
2. 调用Table API进行查询
另外一种查询方式就是调用Table API。这是嵌入在Java和Scala语言内的查询API,核心就是Table接口类,通过一步步链式调用Table的方法,就可以定义出所有的查询转换操作。由于Table API是基于Table的Java实例进行调用的,因此我们首先要得到表的Java对象。基于环境中已注册的表,可以通过表环境的from()方法非常容易地得到一个Table对象:Table eventTable = tableEnv.from("EventTable"); 传入的参数就是注册好的表名。注意这里eventTable是一个Table对象,而EventTable是在环境中注册的表名。得到Table对象之后,就可以调用API进行各种转换操作了,得到的是一个新的Table对象:
Table maryClickTable = eventTable .where($("user").isEqual("Alice")) .select($("url"), $("user"));
这里每个方法的参数都是一个“表达式”(Expression),用方法调用的形式直观地说明了想要表达的内容;“$”符号用来指定表中的一个字段。上面的代码和直接执行SQL是等效的。Table API是嵌入编程语言中的DSL,SQL中的很多特性和功能必须要有对应的实现才可以使用,因此跟直接写SQL比起来肯定就要麻烦一些。目前Table API支持的功能相对更少,可以预见未来Flink社区也会以扩展SQL为主,为大家提供更加通用的接口方式;所以我们接下来也会以介绍SQL为主,简略地提及Table API。
3. 两种API的结合使用
可以发现,无论是调用Table API还是执行SQL,得到的结果都是一个Table对象;所以这两种API的查询可以很方便地结合在一起。(1)无论是哪种方式得到的Table对象,都可以继续调用Table API进行查询转换;(2)如果想要对一个表执行SQL操作(用FROM关键字引用),必须先在环境中对它进行注册。所以我们可以通过创建虚拟表的方式实现两者的转换:tableEnv.createTemporaryView("MyTable", myTable); 两种API殊途同归,实际应用中可以按照自己的习惯任意选择。不过由于结合使用容易引起混淆,而Table API功能相对较少、通用性较差,所以企业项目中往往会直接选择SQL的方式来实现需求。
2.5 输出表
表的创建和查询,就对应着流处理中的读取数据源(Source)和转换(Transform);而最后一个步骤Sink,也就是将结果数据输出到外部系统,就对应着表的输出操作。在代码上,输出一张表最直接的方法,就是调用Table的方法executeInsert()方法将一个 Table写入到注册过的表中,方法传入的参数就是注册的表名。
// 注册表,用于输出数据到外部系统 tableEnv.executeSql("CREATE TABLE OutputTable ... WITH ( 'connector' = ... )"); // 经过查询转换,得到结果表 Table result = ... // 将结果表写入已注册的输出表中 result.executeInsert("OutputTable");
在底层,表的输出是通过将数据写入到TableSink来实现的。TableSink是Table API中提供的一个向外部系统写入数据的通用接口,可以支持不同的文件格式(比如CSV、Parquet)、存储数据库(比如JDBC、Elasticsearch)和消息队列(比如Kafka)。
2.6 表和流的转换
1. 将表(Table)转换成流(DataStream)
(1)调用toDataStream()方法 将一个Table对象转换成DataStream非常简单,只要直接调用表环境的方法toDataStream()就可以了。例如,我们可以将2.4小节经查询转换得到的表aliceClickTable转换成流打印输出:
Table aliceVisitTable = tableEnv.sqlQuery( "SELECT user, url " + "FROM EventTable " + "WHERE user = 'Alice' " ); // 将表转换成数据流 tableEnv.toDataStream(aliceVisitTable).print();
(2)调用toChangelogStream()方法 urlCountTable这个表中进行了分组聚合统计,所以表中的每一行是会“更新”的。对于这样有更新操作的表,我们不应该直接把它转换成DataStream打印输出,而是记录一下它的“更新日志”(change log)。这样一来,对于表的所有更新操作,就变成了一条更新日志的流,我们就可以转换成流打印输出了。代码中需要调用的是表环境的toChangelogStream()方法:
Table urlCountTable = tableEnv.sqlQuery( "SELECT user, COUNT(url) " + "FROM EventTable " + "GROUP BY user " ); // 将表转换成更新日志流 tableEnv.toDataStream(urlCountTable).print();
2. 将流(DataStream)转换成表(Table)
(1)调用fromDataStream()方法 想要将一个DataStream转换成表也很简单,可以通过调用表环境的fromDataStream()方法来实现,返回的就是一个Table对象。例如,我们可以直接将事件流eventStream转换成一个表:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 获取表环境 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // 读取数据源 SingleOutputStreamOperator<Event> eventStream = env.addSource(...) // 将数据流转换成表 Table eventTable = tableEnv.fromDataStream(eventStream);
由于流中的数据本身就是定义好的POJO类型Event,所以我们将流转换成表之后,每一行数据就对应着一个Event,而表中的列名就对应着Event中的属性。另外,我们还可以在fromDataStream()方法中增加参数,用来指定提取哪些属性作为表中的字段名,并可以任意指定位置:
// 提取Event中的timestamp和url作为表中的列 Table eventTable2 = tableEnv.fromDataStream(eventStream, $("timestamp"), $("url"));
需要注意的是,timestamp本身是SQL中的关键字,所以我们在定义表名、列名时要尽量避免。这时可以通过表达式的as()方法对字段进行重命名:
// 将timestamp字段重命名为ts Table eventTable2 = tableEnv.fromDataStream(eventStream, $("timestamp").as("ts"), $("url"));
(2)调用createTemporaryView()方法 调用fromDataStream()方法简单直观,可以直接实现DataStream到Table的转换;不过如果我们希望直接在SQL中引用这张表,就还需要调用表环境的createTemporaryView()方法来创建虚拟视图了。对于这种场景,也有一种更简洁的调用方式。我们可以直接调用createTemporaryView()方法创建虚拟表,传入的两个参数,第一个依然是注册的表名,而第二个可以直接就是DataStream。之后仍旧可以传入多个参数,用来指定表中的字段
tableEnv.createTemporaryView("EventTable", eventStream, $("timestamp").as("ts"),$("url"));
这样,我们接下来就可以直接在SQL中引用表EventTable了。