文章目录
一、Table API 和 Flink SQL 是什么
二、基本程序结构
三、创建 TableEnvironment
四、表(Table)
五、读取文件创建表
六、读取Kafka数据创建表
七、表的查询 - Table API & SQL
八、表和流的相互转换
一、Table API 和 Flink SQL 是什么
- Flink 对批处理和流处理,提供了统一的上层API
- Table API 是一套内嵌在Java和Scala语言中的查询API,它允许以非常直观的方式组合来自一些关系运算符的查询
- Flink的SQL支持基于实现了SQL标准的Apache Calcite
二、基本程序结构
Table API 和 SQL 的程序结构,与流式处理的程序结构十分类似
val tableEnv = ... // 创建表的执行环境 // 创建一张表,用于读取数据 tableEnv.connect(...).createTemporaryTable("inputTable") // 创建一张表,用于把计算结构输出 tableEnv.connect(...).createTemporaryTable("outputTable") // 通过 Table API 查询算子,得到一张结果表 val result = tableEnv.from("inputTable").select(...) // 通过 SQL 查询语句,得到一张结果表 val sqlResult = tableEnv.sqlQuery("SELECT ... FROM inputTable ...") // 将结果表写入输出表中 result.insertInto("outputTable")
三、创建 TableEnvironment
创建表的执行环境,需要将flink流处理的执行环境传入
val tableEnv = StreamTableEnvironment.create(env)
- TableEnvironment 是 flink 中集成Table API 和 SQL 的核心概念,所有对表的操作都基于 TableEnvironment
- 注册 Catalog
- 在 Catalog 中注册表
- 执行 SQL 查询
- 注册用户自定义函数(UDF)
不同处理环境的定义:
四、表(Table)
- TableEnvironment 可以注册目录 Catalog,并可以基于 Catalog 注册表
- 表(Table)是由一个“标识符”(identifier)来指定的,由3部分组成:Catalog 名、数据库(database)名和对象名
- 表可以是常规的,也可以是虚拟的(视图,view)
- 常规表(Table)一般可以用来描述外部数据,比如文件、数据库或消息队列的数据,也可以直接从 DataStream 转换而来
- 视图(View)可以从现有的表中创建,通常是 table API 或者 SQL 查询的一个结果集