开发者学堂课程【Apache Flink 入门到实战 - Flink 开源社区出品 :待待深度探索 Flink SQL(一)】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/632/detail/10041
待待深度探索 Flink SQL(一)
内容介绍:
一.New TableEnvironment
二.New Catalog & DDL
三.Blink Planner
四.Q&A
一.New TableEnvironment
1.TableEnvironment 整体设计
FLIP-32 中提出,将 Blink 完全开源,合并到 Flink 主分支中。
合并后在 Flink 1.9 中会存在两个 Planner:Flink Planner 和 Blink Planner。
这两个 planner 中有很大一部分的代码是共享的。
Flink table 是从 flink-libraries 这一二级目录移出来的,相当于在之前的版本中,Flink Table 在整个 Flink 中是一个二等公民。
Flink SQL 越来越受重视,Flink Table 模块也因此被提升为一等公民。
而 Blink 在设计之初就考虑到流和批的统一,批只是流的一种特殊形式,所以可以用同一个 TableEnvironment 来表述流和批。
可以看出,TableEnvironment 组成部分如下:
① flink-table-common:
这个包中主要是包含 Flink Planner 和 Blink Planner 一些共用的代码。
② flink-table-api-java:
这部分是用户编程使用的 API,包含了大部分的 API。
③ flink-table-api-scala:
这里只是非常薄的一层,仅和 Table API 的 Expression 和 DSL 相关。
④两个 Bridge:
flink-table-api-scala-bridge 和 flink-table-api-java-bridge,从图中可以看出,Flink Planner 和 Blink Planner 都会依赖于具体的 JAVA API,也会依赖于具体的 Bridge,通过 Bridge 可以将 API 操作相应的转化为 Scala 的 DataStream、DataSet,或者转化为 JAVA 的 DataStream 或者 Data Set。
2.新旧 TableEnvironment 对比
在 Flink 1.9 之前,原来的 Flink Table 模块,有 7 个 Environment,使用和维护上相对困难。
7 个 Environment 包括:StreamTableEnvironment、BatchTableEnvironment 两类,JAVA 和 Scala 分别 2 个,一共 4 个,加上 3 个父类,一共就是 7 个。
在新的框架之下,社区希望流和批统一,因此对原来的设计进行精简。
首先,提供统一的 TableEnvironment,放在 flink-table-api-java 这个包中。
然后,在 Bridge 中,提供了两个用于衔接 Scala DataStream 和 Java DataStream 的StreamTableEnvironment。
最后,因为 Flink Planner 中还残存在着 toDataSet 类似的操作,所以,暂时保留 BatchTableEnvironment。
这样,目前一共是 5 个 TableEnvironment。
因为未来 Flink Planner 将会被移除,BatchTableEnvironment 就会被废弃,整个 TableEnvironment 的设计也会更加简洁明了。
3. 新 TableEnvironment 的应用
第一行,简单起见,在后续将新的 TableEnvironment 称为 UnifyTableEnvironment。
在 Blink 中,Batch 被认为是 Stream 的一个特例,因此 Blink 的 Batch 可以使用 UnifyTableEnvironment。
UnifyTableEnvironment 在 1.9 中有一些限制,比如它不能够注册 UDAF 和 UDTF,当前新的 Type System 的类型推导功能还没有完成(Java、Scala 的类型推导还没统一),所以这部分的功能暂时不支持。
此外,UnifyTableEnvironment 无法和 DataStream 和 DataSet 互转。
第二行,Stream TableEnvironment 支持转化成 DataStream,也可以注册 UDAF 和 UDTF。
如果是 JAVA 写的,就注册到 JAVA 的 StreamTableEnvironment,如果是用 Scala 写的,就注册到 Scala 的 StreamTableEnvironment。
注意,Blink Batch 作业不支持 Stream TableEnvironment ,因为目前 Batch 没法和 DataStream 互转,所以 toDataStream 这样的语义暂时不支持。
从图中也可以看出,目前Blink Batch只能使用 TableEnvironment。
最后一行,BatchTableEvironment 能够使用 toDataSet 转化为 DataSet。
从上图中,可以很清晰的看出各个 TableEnvironment 能够做什么事情,以及他们有哪些限制。
接下来,将使用示例对各种情况进行说明。
示例1:Blink Batch
EnvironmentSettings settings = EnvironmentSettings.newInstance
(
).useBlinkPlanner().inBatchMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);
tEnv…
tEnv.execute(“job name”);
Blink Batch 只能使用 TableEnvironment(即UnifyTableEnvironment),代码中,首先需要创建一个 EnvironmentSetting,同时指定使用 Blink Planner,并且指定用 Batch 模式。
之所以需要指定 Blink Planner,是因为目前 Flink 1.9 中,将 Flink Planner 和 Blink Planner 的 jar 同时放在了 Flink 的 lib 目录下。
如果不指定使用的 Planner,整个框架并不知道需要使用哪个 Planner, 所以必须显示指定。
如果整个框架只有一个blink或者flink,这时候是可以不用加载的。因为通过服务发现的方式可以找到唯一一个planner直接work,如果找到多一个就可以直接去报错。
在 UnifyEnvironment 中,用户是无法获取到 ExecutionEnvironment 的,即用户无法在写完作业流程后,使用 executionEnvironment.execute 方法启动任务。需要显式的使用 tableEnvironment.execute 方法启动任务,这和之前的作业启动很不相同。
示例 2:Blink Stream
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamExecutionEnvironment execEnv = … StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, settings);
tEnv…
Blink Stream 既可以使用 UnifyTableEnvironment,也可以使用 StreamTableEnvironment,与 Batch 模式基本类似,只是需要将 inBatchMode 换成 inStreamingMode。
示例 3:Flink Batch
ExecutionEnvironment execEnv= ...
BatchTableEnvironment tEnv = BatchTableEnvironment.create(execEnv);
tEnv...
与之前没有变化,不做过多介绍。
示例 4:Flink Stream
EnvironmentSettings settings = EnvironmentSettings.newInstance().useOldPlanner().inStreamMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);
tEnv…
tEnv.execute(“job name”);
Flink Stream 也是同时支持 UnifyEnvironment 和 StreamTableEnvironment,只是在指定 Planner 时,需要指定为 useOldPlanner,也即 Flink Planner。
因为未来 Flink Planner 会被移除,因此,特意起了一个 OlderPlanner 的名字,而且只能够使用 inStreamingMode,无法使用 inBatchMode。因为inBatchMode会通过BatchTableEnvironment创建。
二.New Catalog & DDL
1. 新 Catalog 设计
构建一个新的 Catalog API 主要是 FLIP-30 提出的,之前的 ExternalCatalog 将被废弃,Blink Planner 中已经不支持 ExternalCatalog,而Flink Planner 还支持 ExternalCatalog。
下图是新 Catalog 的整体设计:
可以看到,新的 Catalog 有三层结构,最顶层是 Catalog 的名字,中间一层是 Database,最底层是各种 MetaObject,如 Table,Partition,Function 等。
当前,内置了两个 Catalog 实现:MemoryCatalog 和 HiveCatalog。当然,用户也可以实现自己的 Catalog。
Catalog 能够做什么事情呢?
首先,它可以支持 Create,Drop,List,Alter,Exists 等语句,另外它也支持对 Database,Table,Partition,Function,Statistics 等的操作。基本上,常用的 SQL 语法都已经支持了。
CatalogManager 正如它名字一样,主要是用来管理 Catalog,且可以同时管理多个 Catalog。也就是说,可以通过在一个相同 SQL 中,跨 Catalog 做查询或者关联操作。
例如,支持对 A Hive Catalog 和 B Hive Catalog 做相互关联,这给 Flink 的query带来了很大的灵活性。
CatalogManager 支持的操作包括:
· 注册 Catalog(registerCatalog)
· 获取所有的 Catalog(getCatalogs)
· 获取特定的 Catalog(getCatalog)
· 获取当前的 Catalog(getCurrentCatalog)
· 设置当前的 Catalog(setCurrentCatalog)
· 获取当前的 Database(getCurrentDatabase)
· 设置当前的 Database(setCurrentDatabase)
Catalog 虽然设计了三层结构,但在使用的时候,并不需要完全指定三层结构的值,可以只写Table Name。
这时候,系统会使用 getCurrentCatalog,getCurrentDatabase 获取到默认值,自动补齐三层结构,这种设计简化了对 Catalog 的使用。
如果需要switch默认的 Catalog,只需要调用 setCurrentCatalog 就可以了。
在 TableEnvironment 层,提供了操作 Catalog 的方法,例如:
· 注册 Catalog(registerCatalog)
· 列出所有的 Catalog(listCatalogs)
· 获取指定 Catalog(getCatalog)
· 使用某个 Catalog(useCatalog)
在 SQL Client 层,也做了一定的支持,但是功能有一定的限制。
用户不能够使用 Create 语句直接创建 Catalog,只能通过在 yaml 文件中,通过定义 Deion 的方式去描述 Catalog,然后在启动 SQL Client 的时候,通过传入 -e把yaml文件传给sqlClient,让它去加载。
目前 SQL Client 支持列出已定义的 Catalog,使用一个已经存在的 Catalog 等操作。
从纵向的角度看各个catalog在各个层级里面有哪些操作,可以帮助大家更好的认识和使用catalog。
2.DDL 设计与使用
有了 Catalog,就可以使用 DDL 来操作 Catalog 的内容,可以使用 TableEnvironment 的 sqlUpdate 方法执行 DDL 语句,也可以在 SQL Client 执行 DDL 语句。
sqlUpdate 方法中,支持 Create Table、Create View、Drop Table、Drop View 四个命令。
当然,inset into 这样的语句也是支持的。
下面分别对 4 个命令进行说明:
① Create Table:
可以显示的指定 Catalog Name 或者 DB Name,如果缺省,就按照用户设定的 Current Catalog补齐,然后可以指定字段名称,字段的说明,也可以支持 Partition By 语法。
最后是With 参数,用户可以在此处指定使用的 Connector,例如Kafka,CSV,HBase 等。With 参数需要配置一堆的属性值,可以从各个 Connector 的 Factory 定义中找到。Factory 中会指出有哪些必选属性,哪些可选属性值。
需要注意的是,目前 DDL 中还不支持计算列和 Watermark 的定义,后续的版本中将会继续完善这部分。
Create Table [[catalog_name.]db_name.]table_name(
a int comment 'column comment',
b bigint,
c varchar
)comment 'table comment'
[partitioned by(b)]
With(
update-mode='append',
connector.type='kafka',
... )
② Create View
需要指定 View 的名字,然后紧跟着的是 SQL。View 将会存储在 Catalog 中。
CREATE VIEW view_name AS SELECT xxx
③ Drop Table&Drop View:
和标准 SQL 语法差不多,支持使用 IF EXISTS 语法,如果未加 IF EXISTS ,Drop 一个不存在的表,会抛出异常。
DROP TABLE
[IF EXISTS] [[catalog_name.]db_name.]table_name
④ SQL Client 中执行 DDL:
大部分都只支持查看操作,仅可以使用 Create View 和 Drop View。Catalog,Database,Table ,Function 这些只能做查看。
用户可以在 SQL Client 中 Use 一个已经存在的 Catalog,修改一些属性,或者做 Deion,Explain 这样的一些操作。
CREATE
VIEW
DROP
VIEW
SHOW CATALOGS/DATABASES/TABLES/FUNCTIONS
USE CATALOG xxx
SET xxx=yyy
DESCRIBE table_name
EXPLAIN SELECT xxx
DDL 部分,在 Flink 1.9 中其实基本已经成型,只是还有一些特性,在未来需要逐渐的完善。