五、读取文件创建表
TableEnvironment 可以调用.connect() 方法,连接外部系统,并调用.createTemporaryTable() 方法,在 Catalog 中注册表
tableEnv .connect(...) // 定义表的数据来源,和外部系统建立连接 .withFormat(...) // 定义数据格式化方法 .withSchema(...) // 定义表结构 .createTemporaryTable("MyTable") // 创建临时表
可以创建Table来描述文件数据,它可以从文件中读取,或者将数据写入文件
可以看到,我们从txt文件中读出六条数据,并以三元组的形式进行输出。
六、读取Kafka数据创建表
消费Kafka数据
七、表的查询 - Table API & SQL
Table API 是集成在 Scala 和 Java 语言内的查询API
Table API 基于代表“表”的Table类,并提供一整套操作处理的方法API;这些方法会返回一个新的Table对象,表示对输入表应用转换操作的结果
有些关系型转换操作,可以由多个方法调用组成,构成链式调用结构
val sensorTable:Table = tableEnv.form("inputTable") val resultTable:Table = sensorTable .select("id,temperature") .filter("id = 'sensor_1'")
测试结果如下:
true / false —> 表示数据是否是新增 or 撤回回收 。
SQL 查询示例:
八、表和流的相互转换
将 DataStream 转换成表
对于一个DataStream,可以直接转换成Table,进而方便地调用 Table API 做转换操作
val dataStream:DataStream[SensorReading] = ... val sensorTable:Table = tableEnv.fromDataStream(dataStream)
默认转换后的 Table schema 和 DataStream 中的字段定义一一对应,也可以单独指定出来
val dataStream:DataStream[SensorReading] = ... val sensorTable = tableEnv.fromDataStream(dataStream, 'id,'timestamp,'temperature)
数据类型与Schema的对应
DataStream 中的数据类型,与表的 Schema 之间的对应关系,可以有两种:基于字段名称,或者基于字段位置
基于名称(name-based)
val sensorTable = tableEnv.formDataStream( 'timestamp as 'ts,'id as 'myId,'temperature)
基于位置(position-based)
val sensorTable = tableEnv.from
创建临时视图(Temporary View)
基于 DataStream 创建临时视图
tableEnv.createTemporaryView("sensorView",dataStream) tableEnv.create