开发者学堂课程【开源 Flink 极速上手教程:Flink Ecosystems】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/331/detail/3713
Flink Ecosystems(一)
内容介绍:
一.连接外部系统
二.常用 Connector
三.示例& Demo
一.连接外部系统
使用 SQL 的好处是 SQL 是一个标准化的数据查询的语言,很多做数据分析的同学通常习惯用 SQL 来做分析而不是去自己写一个程序,Flink 社区也是在 SQL 投入很多精力来做一系列开发的工作,在 Flink SQL 中可以很方便的与各种系统做一个集成,然后开发了很丰富的内置的操作符和函数,同时 Flink SQL 可以同时处理批数据和流数据,也可以极大的提高数据分析同学的工作效率。Flink SQL 还有一些场景还不是很好,首先如果需要定制化的或者细腻度的对作业的状态或者窗口进行控制,想使用自定义的 timer 等等,这种情况 Flink SQL 目前是支持不了的。另外,像跨版本的 savepoint 对于 Flink 的系统来说,跨版本的 savepoint 的兼容性目前无法保证,所以如果想要跨版本通过 savepoint 来升级是做不了的。了解了 Flink SQL 优缺点以后,可能会问我知道 Flink SQL 很强大但是为什么使用 Flink SQL 的时候需要对接外部系统。这个原因就是 Flink 本身是一个流计算的引擎,它本身是不维护任何数据,所以对 Flink SQL 而言,所有的数据都是存储在外部系统中,也就是所有的表都存储在外部系统中,只有对接这些外部系统,才能够对这些数据进行实际的读写。
在学习 Flink SQL 如何与外部系统对接之前,先学习在 Flink 内部 DaraStream 和 Table 如何转换。
这里做转换的好处就是假设已经有一个 DataStream 的程序,可以转成 Table 的方式来使用 Flink SQL 强大的功能来做一个查询。这个例子中假定有一个 DataStrea 叫做 sensorData ,这个 sensorData 是一个温度传感器传回来的数据流,它有三个字段分别是位置信息、采集温度的时间戳和具体的华氏度的温度。有了这样一个 DataStream 以后,可以通过 streamTableEnv 的createTemporaryView 的方法把这个DataStream 注册成一个 TemporaryView,并且指定每个字段的名称 location、rowtime 和 tempF。当有了这个 View 以后,就可以通过 streamTableEnv 来写SQL语句,对这个 View 进行查询,这个例子中按照位置查询每一天的平均温度。执行了 Query 以后,就会返回一张查询结果的表叫 avgTempCTable, TempC 指的是华氏度转成了摄氏度。有了这张表以后,还可以进一步把这张表再转回DadaStream,然后通过 DadaStream 程序对这个 DataStream 进行后续处理,这有点类似 Flink SQL 与内部的对接。
接下来正式学习 Flink SQL 如何对接外部系统
Name |
Version |
Source |
Sink |
Filesystem |
Bounded and Unbounded Scan, Lookup |
Streaming Sink,Batch Sink |
|
Elasticsearch |
6X&7X |
Not supported |
Streaming Sink,Batch Sink |
Apache Kafka |
0.01+ |
Unbounded Scan |
Steaming Sink,Batch Sink |
JDBC |
Bounded Scan,Lookup |
Steaming Sink,Batch Sink |
|
Apache Hbase |
1.4X |
Bounded Scan,Lookup |
Streaming Sink,Batch Sink |
这里有几个重要的概念先介绍一下,首先就是 Connector 这个概念,对于 Flink SQL 而言,对接外部系统的组件叫做 Connector,比如这张表里列出的 Flink 所支持的比较常用的 Connector。Filesystem 对接的就是文件系统,JDBC 对接的就是一个通过 JDBC 连接去对接的外部关系型数据库。每一个 Connector 主要负责实现 Source 和 Sink,Source 就是从外部系统中读数据,Sink 就是负责把数据写入到外部系统中,每一个 Connector 实现的 Source、Sink 功能上有区别,比如 Filesystem 支持 Bounded 和 Unbounded Scan,JDBC 只支持 Bounded Scan, Elasricsearch 这个 Connector 只实现了 Sink,没有实现 Source。
Formats |
Supported Connectors |
CSV |
Apache Kafka,Filesystem |
JSON |
Apache Kafka,Filesystem,Elasticsearch |
Apache Avro |
Apache Kafka,Filesystem |
Debezium CDC |
Apache Kafka |
Canal CDC |
Apache Kafka |
Apache Parquet |
Filesystem |
Apache ORC |
Filesystem |
除了 Connector 以外还有一个 Format 的概念也非常重要,Format 概念是数据在外部系统中的格式是什么样的,比如一张 Kafka 的表,在 Kafka 中可能是 CSV 的格式存储,还有可能是 JSON 的格式存储,所以通常在指定一个 Connector 连接外部表的时候,通常也需要指定 Format 是什么,这样 Flink 才能正确的读写数据。
接下来一个重要的概念就是 Catalog,Catalog 的功能是对接外部系统的源数据信息,连接外部系统的源数据,然后把源数据信息提供给 Flink,这样 Flink 可以直接访问外部系统中已经创建好的表或者 DadaBase 等等。比如用过 Hive的应该知道,Hive的源数据是存在Hive Metastore 中的,Flink 如果想要访问 Hive 表,需要 Hive 的源数据,有一个 HiveCatalog 对接 Hive 的源数据。
Catalog 的接口本身是一个通用化的设计,可以对接多种不同的外部系统,而且可以在一个用户 session 当中使用多个 Catalog。比如通过 SQL Client 的TableEnvironment 文件指定多个 Catalog 实例,当 TableEnvironment 创建之后,会把每一个指定的 Catalog 列出来,然后通过 CatalogManager 进行管理。使用Catalog 除了能获取外部系统源数据,还有一个好处就是可以帮助 Flink 持久化自身的源数据。比如 HiveCatalog 既可以帮 Flink 访问 Hive 表,也可以帮 Flink 存储一些 Flink 所创建的表的信息,比如通过 Flink 创建了一张 Kafka表,这张 Kafka 表可以把源数据信息放在 Hive Metastore 中,这样就不需要每次启动 session 的时候都需要重新建表,直接去读 Hive Metastore 中建好的表就可以。
了解了这三个概念以后,看一下如何创建一张表来指定一个外部的 Connector,这里给的例子是通过 DDL 来创建这样一张表。
可以看到是非常标准的 CREATE TABLE 语句,这张表的名字叫 MyUserTable,表的名字后面跟的就是 table schema,这个table schema 中有一些字段是Flink所特有的,比如 proctime、WATERMARK 等等,这是流计算特有的一些概念。schema 之后是位置语句,位置语句指定的是表的 properties,所有相关的参数都在位置语句中指定,比如'connector'='kafka'也就是说这张表是一张 Kafka 的表。指定了 connector 以后还可以指定一些每个 connector 所特有的一些属性,比如 Kafka 的表需要指定一个 topic, 告诉 Flink 这张表对应的 Kafka 的 topic 是什么。比如可以指定startup.mode,可以指定消费开始的点位,Format 用来指定表的数据格式。
通过 DDL 创建了一张表以后,表是如何在 Flink 当中被使用的,这里一个很关键的概念就是 Table Factory
这张图中黄色的框中通过 DDL 建表,或者这个表本身可以通过一个 Catalog 去外部系统当中拿到,这两种情况对 Flink 来说都是会被转成 CatalogTable 的对象。有了 CatalogTable 的对象之后,当在 SQL 语句中引用了CatalogTable,比如查询或者写入,Flink 会通过 Table Factory 为这张表创建对应的 Source 或者 Sink,读表需要创建Source,写表需要创建 Sink。创建了Source 和 Sink 之后,Source 和 Sink 就负责实际的读写。创建 Source 和 Sink的模块就叫做 Table Factory,所以很关键的就是需要根据 CatalogTable 的属性决定使用哪一种 Table Factory 来创建 Source 和 Sink。
获取 Table Factory 的方式有两种,一个是 Catalog 本身有一个Table Factory,比如 Hive Catalog 就是一个例子,Hive Catalog 绑定了 Hive 的 Table Factory,那么这个 Table Factory 就会为Hive 表创建 Source 和 Sink。另一种很常用的方式是通过 Java 的 SPI 来确定Table Factory。Catalog Table 中的参数,Flink 可以拿过来跟所有的 Table Factory做一个配对的工作,找到匹配指定的Table Factory,这个 Table Factory 会用来创建表的 Source 和 Sink。
这里需要强调一个点就是查找 Table Factory 的时候,必须是正好查到一个,才能正确返回结果,如果没有找到一个配对的 Table Factory,或者是找到了多个配对的Table Factory,那 Flink 都是会报错的。可能会看到错误提示说 Flink 没有办法为这张表找到一个合适的 Table Factory,这时有两个地方需要检查,一个是 Catalog 是不是缺少了对应 jar包,比如一张 Kafka 表,但是 Kafka 中忘记添加 Kafka 的 jar 包,所以 Table Factory 会找不到。另一种可能情况是 jar 包在但是 CatalogTable 创建时语句写错了,Table Factory 缺少了一个关键字段比如指定 topic,这时也会找不到合适的 Table Factory。
以上这些示例用到的语法都是Flink1.11中新改进的 Connector Options,如果看一下老的资料可能会用一些老的Connector Options,在参数的名字上和符合要求的参数上会更新得不太一样。比如这张图里跟例子里介绍的目的一样,创建一个表,但是使用老的 Connector Options,这时可以看到是'connector.type'='kafka',所以如果看到这种写法也是支持的,只不过使用的是老的写法。具体支持哪些老的 Connector Options,可以看官网的文档:
https:// ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connect.html