(1)创建表的两种方式
虚拟表
在 SQL 的术语中,Table API 的对象对应于视图(虚拟表)。它封装了一个逻辑查询计划。它可以通过以下方法在 catalog 中创建:
// get a TableEnvironment TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section // table is the result of a simple projection query Table projTable = tableEnv.from("X").select(...); // register the Table projTable as table "projectedTable" tableEnv.createTemporaryView("projectedTable", projTable);
注意: 从传统数据库系统的角度来看,Table 对象与 VIEW 视图非常像。也就是,定义了 Table 的查询是没有被优化的, 而且会被内嵌到另一个引用了这个注册了的 Table的查询中。如果多个查询都引用了同一个注册了的Table,那么它会被内嵌每个查询中并被执行多次, 也就是说注册了的Table的结果不会被共享(注:Blink 计划器的TableEnvironment会优化成只执行一次)。
Connector Tables
另外一个方式去创建 TABLE 是通过 connector 声明。Connector 描述了存储表数据的外部系统。存储系统例如 Apache Kafka 或者常规的文件系统都可以通过这种方式来声明。
tableEnvironment .connect(...) .withFormat(...) .withSchema(...) .inAppendMode() .createTemporaryTable("MyTable")
(2)DataStream、DataSet与Table的转换
(2.1)通过 DataSet 或 DataStream 创建视图
在 TableEnvironment 中可以将 DataStream 或 DataSet 注册成视图。结果视图的 schema 取决于注册的 DataStream 或 DataSet 的数据类型。
注意: 通过 DataStream 或 DataSet 创建的视图只能注册成临时视图。
// get StreamTableEnvironment // registration of a DataSet in a BatchTableEnvironment is equivalent StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section DataStream<Tuple2<Long, String>> stream = ... // register the DataStream as View "myTable" with fields "f0", "f1" tableEnv.createTemporaryView("myTable", stream); // register the DataStream as View "myTable2" with fields "myLong", "myString" tableEnv.createTemporaryView("myTable2", stream, $("myLong"), $("myString"));
(2.2)将 DataStream 或 DataSet 转换成表
与在 TableEnvironment 注册 DataStream 或 DataSet 不同,DataStream 和 DataSet 还可以直接转换成 Table。如果你想在 Table API 的查询中使用表,这将非常便捷。
// get StreamTableEnvironment // registration of a DataSet in a BatchTableEnvironment is equivalent StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section DataStream<Tuple2<Long, String>> stream = ... // Convert the DataStream into a Table with default fields "f0", "f1" Table table1 = tableEnv.fromDataStream(stream); // Convert the DataStream into a Table with fields "myLong", "myString" Table table2 = tableEnv.fromDataStream(stream, $("myLong"), $("myString"));
(2.3)将表转换成 DataStream 或 DataSet
Table 可以被转换成 DataStream 或 DataSet。
将 Table 转换为 DataStream 或者 DataSet 时,你需要指定生成的 DataStream 或者 DataSet 的数据类型,即,Table 的每行数据要转换成的数据类型。
Row: 字段按位置映射,字段数量任意,支持 null 值,无类型安全(type-safe)检查。
POJO: 字段按名称映射(POJO 必须按Table 中字段名称命名),字段数量任意,支持 null 值,无类型安全检查。
Case Class: 字段按位置映射,不支持 null 值,有类型安全检查。
Tuple: 字段按位置映射,字段数量少于 22(Scala)或者 25(Java),不支持 null 值,无类型安全检查。
Atomic Type: Table 必须有一个字段,不支持 null 值,有类型安全检查。
(2.3.1)将表转换成 DataStream
流式查询(streaming query)的结果表会动态更新,即,当新纪录到达查询的输入流时,查询结果会改变。因此,像这样将动态查询结果转换成 DataStream 需要对表的更新方式进行编码。
将 Table 转换为 DataStream 有两种模式:
Append Mode: 仅当动态 Table 仅通过INSERT更改进行修改时,才可以使用此模式,即,它仅是追加操作,并且之前输出的结果永远不会更新。
Retract Mode: 任何情形都可以使用此模式。它使用 boolean 值对 INSERT 和 DELETE 操作的数据进行标记。
// get StreamTableEnvironment. StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section // Table with two fields (String name, Integer age) Table table = ... // convert the Table into an append DataStream of Row by specifying the class DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class); // convert the Table into an append DataStream of Tuple2<String, Integer> // via a TypeInformation TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>( Types.STRING(), Types.INT()); DataStream<Tuple2<String, Integer>> dsTuple = tableEnv.toAppendStream(table, tupleType); // convert the Table into a retract DataStream of Row. // A retract stream of type X is a DataStream<Tuple2<Boolean, X>>. // The boolean field indicates the type of the change. // True is INSERT, false is DELETE. DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(table, Row.class);
注意 一旦 Table 被转化为 DataStream,必须使用 StreamExecutionEnvironment 的 execute 方法执行该 DataStream 作业。
将 Table 转换成 DataSet 的过程如下:
// get BatchTableEnvironment BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env); // Table with two fields (String name, Integer age) Table table = ... // convert the Table into a DataSet of Row by specifying a class DataSet<Row> dsRow = tableEnv.toDataSet(table, Row.class); // convert the Table into a DataSet of Tuple2<String, Integer> via a TypeInformation TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>( Types.STRING(), Types.INT()); DataSet<Tuple2<String, Integer>> dsTuple = tableEnv.toDataSet(table, tupleType);
注意 一旦 Table 被转化为 DataSet,必须使用 ExecutionEnvironment 的 execute 方法执行该 DataSet 作业。
(3)数据类型与Table Schema映射
Flink 的 DataStream 和 DataSet APIs 支持多样的数据类型。例如 Tuple(Scala 内置以及Flink Java tuple)、POJO 类型、Scala case class 类型以及 Flink 的 Row 类型等允许嵌套且有多个可在表的表达式中访问的字段的复合数据类型。
数据类型到 table schema 的映射有两种方式:基于字段位置或基于字段名称。
基于位置映射
基于位置的映射可在保持字段顺序的同时为字段提供更有意义的名称。这种映射方式可用于具有特定的字段顺序的复合数据类型以及原子类型。如 tuple、row 以及 case class 这些复合数据类型都有这样的字段顺序。然而,POJO 类型的字段则必须通过名称映射(参见下一章)。可以将字段投影出来,但不能使用as重命名。
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section; DataStream<Tuple2<Long, Integer>> stream = ... // convert DataStream into Table with default field names "f0" and "f1" Table table = tableEnv.fromDataStream(stream); // convert DataStream into Table with field "myLong" only Table table = tableEnv.fromDataStream(stream, $("myLong")); // convert DataStream into Table with field names "myLong" and "myInt" Table table = tableEnv.fromDataStream(stream, $("myLong"), $("myInt"));
基于名称的映射
基于名称的映射适用于任何数据类型包括 POJO 类型。这是定义 table schema 映射最灵活的方式。映射中的所有字段均按名称引用,并且可以通过 as 重命名。字段可以被重新排序和映射。
若果没有指定任何字段名称,则使用默认的字段名称和复合数据类型的字段顺序,或者使用 f0 表示原子类型。
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section DataStream<Tuple2<Long, Integer>> stream = ... // convert DataStream into Table with default field names "f0" and "f1" Table table = tableEnv.fromDataStream(stream); // convert DataStream into Table with field "f1" only Table table = tableEnv.fromDataStream(stream, $("f1")); // convert DataStream into Table with swapped fields Table table = tableEnv.fromDataStream(stream, $("f1"), $("f0")); // convert DataStream into Table with swapped fields and field names "myInt" and "myLong" Table table = tableEnv.fromDataStream(stream, $("f1").as("myInt"), $("f0").as("myLong"));
(3.1)原子类型
Flink 将基础数据类型(Integer、Double、String)或者通用数据类型(不可再拆分的数据类型)视为原子类型。原子类型的 DataStream 或者 DataSet 会被转换成只有一条属性的 Table。属性的数据类型可以由原子类型推断出,还可以重新命名属性。
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section DataStream<Long> stream = ... // convert DataStream into Table with default field name "f0" Table table = tableEnv.fromDataStream(stream); // convert DataStream into Table with field name "myLong" Table table = tableEnv.fromDataStream(stream, $("myLong"));
(3.2)Tuple类型
Flink 支持 Scala 的内置 tuple 类型并给 Java 提供自己的 tuple 类型。两种 tuple 的 DataStream 和 DataSet 都能被转换成表。
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section DataStream<Tuple2<Long, String>> stream = ... // convert DataStream into Table with default field names "f0", "f1" Table table = tableEnv.fromDataStream(stream); // convert DataStream into Table with renamed field names "myLong", "myString" (position-based) Table table = tableEnv.fromDataStream(stream, $("myLong"), $("myString")); // convert DataStream into Table with reordered fields "f1", "f0" (name-based) Table table = tableEnv.fromDataStream(stream, $("f1"), $("f0")); // convert DataStream into Table with projected field "f1" (name-based) Table table = tableEnv.fromDataStream(stream, $("f1")); // convert DataStream into Table with reordered and aliased fields "myString", "myLong" (name-based) Table table = tableEnv.fromDataStream(stream, $("f1").as("myString"), $("f0").as("myLong"));
(3.3)POJO 类型
在不指定字段名称的情况下将 POJO 类型的 DataStream 或 DataSet 转换成 Table 时,将使用原始 POJO 类型字段的名称。名称映射需要原始名称,并且不能按位置进行。字段可以使用别名(带有 as 关键字)来重命名,重新排序和投影。
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section // Person is a POJO with fields "name" and "age" DataStream<Person> stream = ... // convert DataStream into Table with default field names "age", "name" (fields are ordered by name!) Table table = tableEnv.fromDataStream(stream); // convert DataStream into Table with renamed fields "myAge", "myName" (name-based) Table table = tableEnv.fromDataStream(stream, $("age").as("myAge"), $("name").as("myName")); // convert DataStream into Table with projected field "name" (name-based) Table table = tableEnv.fromDataStream(stream, $("name")); // convert DataStream into Table with projected and renamed field "myName" (name-based) Table table = tableEnv.fromDataStream(stream, $("name").as("myName"));
(3.4)Row类型
Row 类型支持任意数量的字段以及具有 null 值的字段。字段名称可以通过 RowTypeInfo 指定,也可以在将 Row 的 DataStream 或 DataSet 转换为 Table 时指定。Row 类型的字段映射支持基于名称和基于位置两种方式。字段可以通过提供所有字段的名称的方式重命名(基于位置映射)或者分别选择进行投影/排序/重命名(基于名称映射)。
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section // DataStream of Row with two fields "name" and "age" specified in `RowTypeInfo` DataStream<Row> stream = ... // convert DataStream into Table with default field names "name", "age" Table table = tableEnv.fromDataStream(stream); // convert DataStream into Table with renamed field names "myName", "myAge" (position-based) Table table = tableEnv.fromDataStream(stream, $("myName"), $("myAge")); // convert DataStream into Table with renamed fields "myName", "myAge" (name-based) Table table = tableEnv.fromDataStream(stream, $("name").as("myName"), $("age").as("myAge")); // convert DataStream into Table with projected field "name" (name-based) Table table = tableEnv.fromDataStream(stream, $("name")); // convert DataStream into Table with projected and renamed field "myName" (name-based) Table table = tableEnv.fromDataStream(stream, $("name").as("myName"));