Table API 标准结构
EXP
下面是一个标准的Table API 的例子
public class FlinkTableStandardStructure { public static void main(String[] args) { //1、创建TableEnvironment EnvironmentSettings settings = EnvironmentSettings .newInstance() .inStreamingMode()//默认就是StreamingMode //.inBatchMode() .build(); TableEnvironment tEnv = TableEnvironment.create(settings); //2、创建source table: 1)读取外部表;2)从Table API或者SQL查询结果创建表 Table projTable = tEnv.fromValues( DataTypes.ROW( DataTypes.FIELD("id", DataTypes.DECIMAL(10, 2)), DataTypes.FIELD("name", DataTypes.STRING()) ), row(1, "zhangsan"), row(2L, "lisi") ).select($("id"), $("name")); //注册表到catalog(可选的) tEnv.createTemporaryView("sourceTable", projTable); //3、创建sink table final Schema schema = Schema.newBuilder() .column("id", DataTypes.DECIMAL(10, 2)) .column("name", DataTypes.STRING()) .build(); tEnv.createTemporaryTable("sinkTable", TableDescriptor.forConnector("print") .schema(schema) .build()); //4、Table API执行查询(可以执行多次查询,中间表可以注册到catalog也可以不注册) Table resultTable = tEnv.from("sourceTable").select($("id"), $("name")); // Table resultTable = projTable.select($("id"), $("name")); //5、输出(包括执行,不需要单独在调用tEnv.execute("job")) resultTable.executeInsert("sinkTable"); } }
两种TableEnvironment
1、TableEnvironment
标准的做法是创建TableEnvironment,在创建 TableEnv 的时候,可以传⼊⼀个 EnvironmentSettings 或者 TableConfig 参数, ⽤来配置 TableEnvironment 的⼀些特性: 注意:Flink1.14开始删除了其他的执⾏器了,只保留了BlinkPlanner
功能:
- 注册catalog(可以理解为数据系统实例,⽐如某个HBase集群,某个MySQL服务器)
- 在catalog注册库和表
- 加载插件模块
- 执⾏SQL查询
- 注册UDF
- DataStream和Table互转(仅仅在StreamExecutionEnvironment下)
EnvironmentSettings settings = EnvironmentSettings .newInstance() .useBlinkPlanner()//Flink1.14开始就删除了其他的执⾏器了,只保留了BlinkPlanner .inStreamingMode()//默认就是StreamingMode //.inBatchMode() .build(); TableEnvironment tEnv = TableEnvironment.create(settings);
2、StreamTableEnvironment
如果要混⽤DataStream和Table API/SQL,可以使⽤StreamTableEnvironment:
//1、获取Stream执⾏环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2、创建表执⾏环境 StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
TableEnvironment和StreamExecutionEnvironment⼆选⼀即可。
TableEnviromnet的配置
// instantiate table environment TableEnvironment tEnv = ... // access flink configuration Configuration configuration = tEnv.getConfig().getConfiguration(); // set low-level key-value options configuration.setString("table.exec.mini-batch.enabled", "true"); configuration.setString("table.exec.mini-batch.allow-latency", "5 s"); configuration.setString("table.exec.mini-batch.size", "5000");
具体配置项请参考如下链接: https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/config/
创建表和注册表
有了TableEnvironment,我们接下来就需要创建表并注册表,注册表是可选的。
创建表的两种⽅式
创建表是为了在表上执⾏TableAPI或者SQL查询,可以通过如下两种⽅式创建表
创建表的方式 |
说明 |
代码示例 |
虚拟表 |
从现有的table创建表,通常是从Table API&SQL查询结果创建 表,这种一般是中间表 |
tEnv.createTemporaryView("sourceTable", projTable); |
常规表 |
另外⼀种更实⽤的⽅式是通过外部数据创建表,例如⽂件, 数据 库表, 或者消息队列,这种⼀般是创建输⼊表或者输出表,采⽤ connector方式创建 |
tableEnv.createTable("SourceTableA", sourceDescriptor); tableEnv.createTemporaryTable("SourceTableB", sourceDescriptor) |
创建虚拟表
Table projTable = tEnv.fromValues( DataTypes.ROW( DataTypes.FIELD("id", DataTypes.DECIMAL(10, 2)), DataTypes.FIELD("name", DataTypes.STRING()) ), row(1, "zhangsan"), row(2L, "lisi") ).select($("id"), $("name")); //createTemporaryView tEnv.createTemporaryView("sourceTable", projTable);
创建常规表
final TableDescriptor sourceDescriptor = TableDescriptor.forConnector("datagen") .schema(Schema.newBuilder() .column("f0", DataTypes.STRING()) .build()) .option(DataGenOptions.ROWS_PER_SECOND, 100) .build(); tableEnv.createTable("SourceTableA", sourceDescriptor); tableEnv.createTemporaryTable("SourceTableB", sourceDescriptor); // 也可使⽤SQL DDL tableEnv.executeSql("CREATE [TEMPORARY] TABLE MyTable (...) WITH (...)")
注意: 具体的Connector可以参考如下链接: https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/overview/
临时表和持久表
注册表是为了把计算的中间结果注册为⼀个表,供同⼀Flink session后续访问或者跨Flink session访问。
只有常规 的表(TABLES)才分临时表和持久表
表类型 |
说明 |
|
临时表 |
同一个Flink session可访问,跨Flink session 不能访问 |
|
持久表 |
同一个Flink session和跨Flink session 都能访问 |
EXP
final TableDescriptor sourceDescriptor = TableDescriptor.forConnector("datagen") .schema(Schema.newBuilder() .column("f0", DataTypes.STRING()) .build()) .option(DataGenOptions.ROWS_PER_SECOND, 100) .build(); //持久表 tableEnv.createTable("SourceTableA", sourceDescriptor); //临时表 tableEnv.createTemporaryTable("SourceTableB", sourceDescriptor)
表的标识符
注册表时可以指定唯⼀标识符,格式为:
Catalog(⽬录).database(数据库).objectname(表名)
如果没有指定Catalog或database,就使⽤当前默认值,即tEnv.useCatalog("...")和tEnv.useDatabase("...")指定的 默认值,如果tEnv没有指定,则默认值default_catalog.default_database
EXP
TableEnvironment tEnv = ...; //当前Flink会话切换到custom_catalog.custom_database tEnv.useCatalog("custom_catalog"); tEnv.useDatabase("custom_database"); Table table = ...; //在custom_catalog.custom_database下,把table注册为⼀个名叫exampleView的视图表 tableEnv.createTemporaryView("exampleView", table); //在custom_catalog.other_database下,把table注册为⼀个名叫exampleView的视图表 tableEnv.createTemporaryView("other_database.exampleView", table); //在custom_catalog.custom_database下,把table注册为⼀个名叫exampleView.View的视图表,注意转义字 符 tableEnv.createTemporaryView("`example.View`", table); //other_catalog.other_database,把table注册为⼀个名叫exampleView的视图表 tableEnv.createTemporaryView("other_catalog.other_database.exampleView", table);
Table API 查询
常规查询
/** * Flink Table API执行聚合操作 */ public class FlinkTableAggr { public static void main(String[] args) throws Exception{ //1、获取Stream执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2、创建表执行环境 StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); env.setParallelism(1); //3、读取数据 DataStream<ClickLogs> clickLogs = env.fromElements( "Mary,./home,2022-02-02 12:00:00", "Bob,./cart,2022-02-02 12:00:00", "Mary,./prod?id=1,2022-02-02 12:00:05", "Liz,./home,2022-02-02 12:01:00", "Bob,./prod?id=3,2022-02-02 12:01:30", "Mary,./prod?id=7,2022-02-02 12:01:45" ).map(event -> { String[] props = event.split(","); return ClickLogs .builder() .user(props[0]) .url(props[1]) .cTime(props[2]) .build(); }); //4、流转换为动态表 Table table = tEnv.fromDataStream(clickLogs); //5、执行Table API查询/SQL查询 /** * select * user, * count(url) as cnt * from clicks * group by user */ Table resultTable = table .groupBy($("user")) .aggregate($("url").count().as("cnt")) .select($("user"),$("cnt")); // System.out.println(resultTable.explain()); //6、将Table转换为DataStream //聚合操作必须是撤回流 DataStream<Tuple2<Boolean,Row>> selectedClickLogs = tEnv.toRetractStream(resultTable,Row.class); //7、处理结果:打印/输出 selectedClickLogs.print(); //8、执行 env.execute("FlinkTableAggr"); } }
混⽤Table API和SQL查询
public class MixTableAPIAndSQL { public static void main(String[] args) { //1、创建TableEnvironment EnvironmentSettings settings = EnvironmentSettings .newInstance() .build(); TableEnvironment tEnv = TableEnvironment.create(settings); //2、创建source table Table projTable = tEnv.fromValues( DataTypes.ROW( DataTypes.FIELD("user", DataTypes.STRING()), DataTypes.FIELD("url", DataTypes.STRING()), DataTypes.FIELD("cTime", DataTypes.STRING()) ), row("Mary", "./home","2022-02-02 12:00:00"), row("Bob", "./cart","2022-02-02 12:00:00"), row("Mary", "./prod?id=1","2022-02-02 12:00:05"), row("Liz", "./home","2022-02-02 12:01:00"), row("Bob", "./prod?id=3","2022-02-02 12:01:30"), row("Mary", "./prod?id=7","2022-02-02 12:01:45") ).select($("user"), $("url"),$("cTime")); //注册表到catalog(可选的) tEnv.createTemporaryView("sourceTable", projTable); //3、创建sink table final Schema schema = Schema.newBuilder() .column("user", DataTypes.STRING()) .column("url", DataTypes.STRING()) .build(); tEnv.createTemporaryTable("sinkTable", TableDescriptor.forConnector("print") .schema(schema) .build()); //4、Table API和SQL混用 Table resultTable = tEnv.from("sourceTable").select($("user"), $("url")); tEnv.createTemporaryView("resultTableView", resultTable); Table result = tEnv.sqlQuery("select * from resultTableView where user = 'Mary'"); //5、输出(包括执行,不需要单独在调用tEnv.execute("job")) result.executeInsert("sinkTable"); result.printSchema(); } }
Table API 和DataStream混用
package com.blink.sb; import com.blink.sb.beans.ClickLogs; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import static org.apache.flink.table.api.Expressions.$; public class MixFlinkTableAndDataStream { public static void main(String[] args) throws Exception { //1、获取Stream执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2、创建表执行环境 StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //3、读取数据(source) DataStream<ClickLogs> clickLogs = env.fromElements( "Mary,./home,2022-02-02 12:00:00", "Bob,./cart,2022-02-02 12:00:00", "Mary,./prod?id=1,2022-02-02 12:00:05", "Liz,./home,2022-02-02 12:01:00", "Bob,./prod?id=3,2022-02-02 12:01:30", "Mary,./prod?id=7,2022-02-02 12:01:45" ).map(event -> { String[] props = event.split(","); return ClickLogs .builder() .user(props[0]) .url(props[1]) .cTime(props[2]) .build(); }); //4、流转换为动态表 Table table = tEnv.fromDataStream(clickLogs); //5、执行Table API查询/SQL查询 Table resultTable = table .where($("user").isEqual("Mary")) .select($("user"), $("url"), $("cTime")); //6、将Table转换为DataStream DataStream<ClickLogs> selectedClickLogs = tEnv.toDataStream(resultTable, ClickLogs.class); //7、处理结果:打印/输出 selectedClickLogs.print(); //8、执行 env.execute("FlinkTableFirstExample"); } }
Table api Connectors (内置连接器)
Flink 的 Table API & SQL通过Connectors连接外部系统,并执⾏批/流⽅式的读写操作。Connectors提供了丰富的 外部系统连接器,根据source和sink的类型,它们⽀持不同的格式,例如 CSV、Avro、Parquet 或 ORC。 ⽬前⽀持的内置Connectors如下表所示(截⽌到Flink 1.14.3):
注意:如果作为sink⼤家还要注意⽀持的输出模式(Append/Retract/Upsert)
参考:https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/overview/
Table API 输入(kafka)和输出(kafka)
package com.blink.sb; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.*; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.types.Row; import java.util.concurrent.ExecutionException; import static org.apache.flink.table.api.Expressions.$; public class FlinkTableAPIKafka2Kafka { public static final String input_topic = "clicklog_input"; public static final String output_topic = "clicklog_output"; public static final String output_topic_change = "clicklog_output_change"; public static void main(String[] args) throws ExecutionException, InterruptedException { //1、创建TableEnvironment final ParameterTool parameterTool = ParameterTool.fromArgs(args); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings .newInstance() .build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); //2、创建kafka source table final Schema schema = Schema.newBuilder() .column("user", DataTypes.STRING()) .column("url", DataTypes.STRING()) .column("cTime", DataTypes.STRING()) .build(); tEnv.createTemporaryTable("sourceTable", TableDescriptor.forConnector("kafka") .schema(schema) .format("json") .option("topic", input_topic) .option("properties.bootstrap.servers", "node02:6667") .option("properties.group.id", "testGroup")//每次都从最早的offsets开始 .option("scan.startup.mode", "latest-offset") .build()); //3、创建kafka sink table final Schema sinkSchema = Schema.newBuilder() .column("user", DataTypes.STRING()) .column("url", DataTypes.STRING()) .column("cTime", DataTypes.STRING()) // .column("cnt", DataTypes.BIGINT()) .build(); tEnv.createTemporaryTable("sinkTable", TableDescriptor.forConnector("kafka") .schema(sinkSchema) .format("csv") .option("topic", output_topic) .option("properties.bootstrap.servers", "node02:6667") .build()); //4、Table API从sourceTable读取数据并以csv格式写入sinkTable /** * * 如果报错:Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath * 原因:缺省 flink-connector-kafka_${scala.binary.version} 这个库的支持。 这个库支持 upsert-kafka * 请查看 https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/upsert-kafka/ * * 这种模式 insert_only */ tEnv.from("sourceTable") .select($("user"), $("url"), $("cTime")) .executeInsert("sinkTable"); /** * Table sink 'default_catalog.default_database.sinkTable' doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[user, url], select=[user, url, COUNT(url) AS EXPR$0]) * 向sinkTable中注入数据时, 不支持 分组聚合的table * * 解决方式: 将聚合类型的table 转换为 ChangelogStream类型的<>DataStream</>, 然后,DataStream.sink 到 kafka 中 */ Table select = tEnv.from("sourceTable") .groupBy($("user"), $("url")) .select($("user"), $("url").count().as("cnt")); DataStream<Row> rowDataStream = tEnv.toChangelogStream(select, Schema.newBuilder() .column("user", DataTypes.STRING()) .column("cnt", DataTypes.BIGINT()).build(), ChangelogMode.all()); DataStream<String> stringDataStream = rowDataStream.map(new MapFunction<Row, String>() { @Override public String map(Row row) throws Exception { return "xxxx"; } }); stringDataStream.sinkTo(KafkaSink.<String>builder() .setBootstrapServers("node02:6667") .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setTopic(output_topic_change) .setKafkaValueSerializer(StringSerializer.class) .build()) .build()); } }
kafka输入和输出高级特性(更多option)
可以通过更多option来启⽤⾼级特性,kafka connector⽀持的option
key和value格式
Kafka消息的key和value均可指定format。
- 仅指定value format
kafka消息中key是可以选的,可以只指定value format
tEnv.createTemporaryTable("sourceTable", TableDescriptor.forConnector("kafka") .schema(schema) .format("json") ... .build());
- 指定key和value format
tEnv.createTemporaryTable("sourceTable", TableDescriptor.forConnector("kafka") .schema(schema) // .format("json") .option("topic",input_topic) .option("key.format","json") .option("value.format","json") ... .build())
注意:format("json")和option("value.format","json")⼆选⼀,⼆者等价
Topic和Partition发现
可以通过topic或者topic-pattern来配置主题。
配置项 |
source |
sink |
topic |
⽀持单个topic或者topic列表,topic列表⽤分号分割:topic-1;topic-2,topic和topic-pattern⼆选⼀ |
注意:作为sink时不⽀ 持topic列表 |
topic-pattern |
通过topic名字的正则表达式来指定多个topic,例如:test-topic-[0- 9],topic和topic-pattern⼆选⼀ |
不能作⽤于sink |
注意:要允许在作业开始运⾏后发现动态创建的topic,请为 scan.topic-partition-discovery.interval 设置⼀个⾮负 值
source读取位置
作为source是,可以通过scan.startup.mode选项指定从哪个位置开始消费,可选的值如下
选项值 |
说明 |
group-offsets(默认 值) |
使⽤当前消费组提交到ZK / Kafka的offset开始消费,Flink1.14有bug:https://issues.apach e.org/jira/browse/FLINK-2469 |
earliest-offset |
从earliest offset开始消费(每次) |
latest-offset |
从latest offset开始消费(每次) |
specific-offsets |
从⽤户指定的每个分区的特定偏移量开始消费,必须跟scan.startup.specific-offsets配置搭 配使⽤:例如:partition:0,offset:42;partition:1,offset:300 |
sink分区
当kafka作为sink时,可以通过sink.partitioner指定partitioner。
选项值 |
说明 |
default |
使用kafka默认的partitioner |
fixed |
每个Flink分区上的消息最多分区到一个kafka分区,可以降低网络开销 |
round-robin |
只有当kafka消息没有key时,才有用 |
自定义FlinkKafkaPartitioner的子类 |
一致性保证
默认情况下,如果启⽤checkpoint,Kafka sink使⽤at-least-once⼀致性语意。 在启⽤checkpoint的前题下,可通过sink.delivery-guarantee来调整⼀致性语意:
选项值 |
说明 |
none |
Flink不做任何保证,生成的记录可能会丢失或重复 |
at-least-once(default) |
至少一次保证,但可能会重复 |
exactly-once |
使用kafka事务提供精确一次语义 |
⼀旦启⽤了事物来保证exactly-once语意,⼀定要注意下游消费者要配置isolation.level为read_committed(默认 是read_uncommitted)。