Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】(3)https://developer.aliyun.com/article/1532337
4.5、输出表
表的创建和查询,就对应着流处理中的读取数据源(Source)和转换(Transform);而最后一个步骤Sink,也就是将结果数据输出到外部系统,就对应着表的输出操作。
在代码上,输出一张表最直接的方法,就是调用Table的方法executeInsert()方法将一个 Table写入到注册过的表中,方法传入的参数就是注册的表名。
// 注册表,用于输出数据到外部系统 tableEnv.executeSql("CREATE TABLE OutputTable ... WITH ( 'connector' = ... )"); // 经过查询转换,得到结果表 Table result = ... // 将结果表写入已注册的输出表中 result.executeInsert("OutputTable");
在底层,表的输出是通过将数据写入到TableSink来实现的。TableSink是Table API中提供的一个向外部系统写入数据的通用接口,可以支持不同的文件格式(比如CSV、Parquet)、存储数据库(比如JDBC、Elasticsearch)和消息队列(比如Kafka)。
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import static org.apache.flink.table.api.Expressions.$; public class SqlDemo { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // TODO 1.创建表环境 // 1.1 写法1: // EnvironmentSettings es = EnvironmentSettings.newInstance() // .inStreamingMode() // .build(); // TableEnvironment table_env = TableEnvironment.create(es); // 1.2 写法2: StreamTableEnvironment table_env = StreamTableEnvironment.create(env); // TODO 2. 创建表 table_env.executeSql("CREATE TABLE source ( \n" + " id INT, \n" + " ts BIGINT, \n" + " vc INT\n" + ") WITH ( \n" + " 'connector' = 'datagen', \n" + " 'rows-per-second'='1', \n" + " 'fields.id.kind'='random', \n" + " 'fields.id.min'='1', \n" + " 'fields.id.max'='10', \n" + " 'fields.ts.kind'='sequence', \n" + " 'fields.ts.start'='1', \n" + " 'fields.ts.end'='1000000', \n" + " 'fields.vc.kind'='random', \n" + " 'fields.vc.min'='1', \n" + " 'fields.vc.max'='100'\n" + ");\n"); table_env.executeSql("CREATE TABLE sink (\n" + " id INT, \n" + " sum_vc INT \n" + ") WITH (\n" + "'connector' = 'print'\n" + ");\n"); // TODO 3. 执行查询 查询的结果也是一张表 // 3.1 使用sql进行查询 Table table = table_env.sqlQuery("select id,sum(vc) as sum_vc from source where id > 5 group by id;"); // 把 table 对象注册成为表名 table_env.createTemporaryView("tmp",table); // table_env.sqlQuery("select * from tmp;"); // 3.2 用table api查询 // Table source = table_env.from("source"); // Table result = source // .where($("id").isGreater(5)) // .groupBy($("id")) // .aggregate($("vc").sum().as("sum_vc")) // .select($("id"), $("sum_vc")); // TODO 4. 输出表 // 4.1 sql用法 table_env.executeSql("insert into sink select * from tmp"); // 4.2 table api // result.executeInsert("sink"); } }
运行结果:
4.6 表和流的转换
既然都不用 SQL 了,我们不可能用 API 只是为了没事找事干,而是为了方便结合一些底层的 API,比如我们之前学的 DataSream API。
4.6.1、将流(DataStream)转换成表(Table)
1. 调用fromDataStream()方法
想要将一个DataStream转换成表很简单,可以通过调用表环境的fromDataStream()方法来实现,返回的就是一个Table对象。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 获取表环境 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // 读取数据源 SingleOutputStreamOperator<WaterSensor> sensorDS = env.fromSource(...) // 将数据流转换成表 Table sensorTable = tableEnv.fromDataStream(sensorDS);
由于流中的数据本身就是定义好的POJO类型WaterSensor,所以我们将流转换成表之后,每一行数据就对应着一个WaterSensor,而表中的列名就对应着WaterSensor中的属性。
另外,我们还可以在fromDataStream()方法中增加参数,用来指定提取哪些属性作为表中的字段名,并可以任意指定位置:
// 提取Event中的timestamp和url作为表中的列 Table sensorTable = tableEnv.fromDataStream(sensorDS, $("id"), $("vc"));
也可以通过表达式的as()方法对字段进行重命名:
// 将timestamp字段重命名为ts Table sensorTable = tableEnv.fromDataStream(sensorDS, $("id").as("sid"), $("vc"));
(2)调用createTemporaryView()方法
调用fromDataStream()方法简单直观,可以直接实现DataStream到Table的转换;不过如果我们希望直接在SQL中引用这张表,就还需要调用表环境的createTemporaryView()方法来创建虚拟视图了。
对于这种场景,也有一种更简洁的调用方式。我们可以直接调用createTemporaryView()方法创建虚拟表,传入的两个参数,第一个依然是注册的表名,而第二个可以直接就是DataStream。之后仍旧可以传入多个参数,用来指定表中的字段
tableEnv.createTemporaryView("sensorTable",sensorDS, $("id"),$("ts"),$("vc"));
这样,我们接下来就可以直接在SQL中引用表sensorTable了。
4.6.2、将表(Table)转换成流(DataStream)
(1)调用toDataStream()方法
将一个Table对象转换成DataStream非常简单,只要直接调用表环境的方法toDataStream()就可以了。例如,我们可以将2.4小节经查询转换得到的表aliceClickTable转换成流打印输出:
tableEnv.toDataStream(table).print();
(2)调用toChangelogStream()方法
urlCountTable这个表中进行了分组聚合统计,所以表中的每一行是会“更新”的。对于这样有更新操作的表,我们不应该直接把它转换成DataStream打印输出,而是记录一下它的“更新日志”(change log)。这样一来,对于表的所有更新操作,就变成了一条更新日志的流,我们就可以转换成流打印输出了。
代码中需要调用的是表环境的toChangelogStream()方法:
Table table = tableEnv.sqlQuery( "SELECT id, sum(vc) " + "FROM source " + "GROUP BY id " ); // 将表转换成更新日志流 tableEnv.toChangelogStream(table).print();
4.6.3、支持的数据类型
整体来看,DataStream中支持的数据类型,Table中也是都支持的,只不过在进行转换时需要注意一些细节。
1. 原子类型
在Flink中,基础数据类型(Integer、Double、String)和通用数据类型(也就是不可再拆分的数据类型)统一称作“原子类型”。原子类型的DataStream,转换之后就成了只有一列的Table,列字段(field)的数据类型可以由原子类型推断出。另外,还可以在fromDataStream()方法里增加参数,用来重新命名列字段。
StreamTableEnvironment tableEnv = ...; DataStream<Long> stream = ...; // 将数据流转换成动态表,动态表只有一个字段,重命名为myLong Table table = tableEnv.fromDataStream(stream, $("myLong"));
2. Tuple类型
当原子类型不做重命名时,默认的字段名就是“f0”,容易想到,这其实就是将原子类型看作了一元组Tuple1的处理结果。
Table支持Flink中定义的元组类型Tuple,对应在表中字段名默认就是元组中元素的属性名f0、f1、f2...。所有字段都可以被重新排序,也可以提取其中的一部分字段。字段还可以通过调用表达式的as()方法来进行重命名。
StreamTableEnvironment tableEnv = ...; DataStream<Tuple2<Long, Integer>> stream = ...; // 将数据流转换成只包含f1字段的表 Table table = tableEnv.fromDataStream(stream, $("f1")); // 将数据流转换成包含f0和f1字段的表,在表中f0和f1位置交换 Table table = tableEnv.fromDataStream(stream, $("f1"), $("f0")); // 将f1字段命名为myInt,f0命名为myLong Table table = tableEnv.fromDataStream(stream, $("f1").as("myInt"), $("f0").as("myLong"));
3. POJO 类型
Flink也支持多种数据类型组合成的“复合类型”,最典型的就是简单Java对象(POJO 类型)。由于POJO中已经定义好了可读性强的字段名,这种类型的数据流转换成Table就显得无比顺畅了。
将POJO类型的DataStream转换成Table,如果不指定字段名称,就会直接使用原始 POJO 类型中的字段名称。POJO中的字段同样可以被重新排序、提却和重命名。
StreamTableEnvironment tableEnv = ...; DataStream<Event> stream = ...; Table table = tableEnv.fromDataStream(stream); Table table = tableEnv.fromDataStream(stream, $("user")); Table table = tableEnv.fromDataStream(stream, $("user").as("myUser"), $("url").as("myUrl"));
4. Row类型
Flink中还定义了一个在关系型表中更加通用的数据类型——行(Row),它是Table中数据的基本组织形式。
Row类型也是一种复合类型,它的长度固定,而且无法直接推断出每个字段的类型,所以在使用时必须指明具体的类型信息;我们在创建Table时调用的CREATE语句就会将所有的字段名称和类型指定,这在Flink中被称为表的“模式结构”(Schema)。
4.6.4、综合应用示例
package com.lyh.sql; import com.lyh.bean.WaterSensor; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import static org.apache.flink.table.api.Expressions.$; public class TableStreamDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<WaterSensor> sensorDS = env.fromElements( new WaterSensor("s1", 1L, 1), new WaterSensor("s1", 2L, 2), new WaterSensor("s2", 2L, 21), new WaterSensor("s3", 3L, 3), new WaterSensor("s3", 4L, 4) ); // TODO 1.创建表环境 StreamTableEnvironment table_env = StreamTableEnvironment.create(env); // TODO 1. 流 -> 表 // 属性名 就是表的 字段名 Table sensorTable = table_env.fromDataStream(sensorDS); // 或者指定保留哪些字段 // table_env.fromDataStream(sensorDS,$("id")); // 注册 table_env.createTemporaryView("sensor",sensorTable); Table result = table_env.sqlQuery("select id,sum(vc) from sensor group by id"); Table filter = table_env.sqlQuery("select id,ts,vc from sensor where ts > 2"); // TODO 2. 表 -> 流 // 2.1 追加流 table_env.toDataStream(filter,WaterSensor.class).print("filter"); // 2.2 更新流(结果需要更新) table_env.toChangelogStream(result).print("sum"); // 只要代码中调用了 DataStream 就需要使用 execute env.execute(); } }
运行结果:
4.7 自定义函数(UDF)
系统函数尽管庞大,也不可能涵盖所有的功能;如果有系统函数不支持的需求,我们就需要用自定义函数(User Defined Functions,UDF)来实现了。
Flink的Table API和SQL提供了多种自定义函数的接口,以抽象类的形式定义。当前UDF主要有以下几类:
- 标量函数(Scalar Functions):将输入的标量值转换成一个新的标量值;
- 表函数(Table Functions):将标量值转换成一个或多个新的行数据,也就是扩展成一个表;
- 聚合函数(Aggregate Functions):将多行数据里的标量值转换成一个新的标量值;
- 表聚合函数(Table Aggregate Functions):将多行数据里的标量值转换成一个或多个新的行数据。
4.7.1、整体调用流程
要想在代码中使用自定义的函数,我们需要首先自定义对应UDF抽象类的实现,并在表环境中注册这个函数,然后就可以在Table API和SQL中调用了。
(1)注册函数
注册函数时需要调用表环境的createTemporarySystemFunction()方法,传入注册的函数名以及UDF类的Class对象:
// 注册函数 tableEnv.createTemporarySystemFunction("MyFunction", MyFunction.class);
我们自定义的UDF类叫作MyFunction,它应该是上面四种UDF抽象类中某一个的具体实现;在环境中将它注册为名叫MyFunction的函数。
(2)使用Table API调用函数
在Table API中,需要使用call()方法来调用自定义函数:
tableEnv.from("MyTable").select(call("MyFunction", $("myField")));
这里call()方法有两个参数,一个是注册好的函数名MyFunction,另一个则是函数调用时本身的参数。这里我们定义MyFunction在调用时,需要传入的参数是myField字段。
(3)在SQL中调用函数
当我们将函数注册为系统函数之后,在SQL中的调用就与内置系统函数完全一样了:
tableEnv.sqlQuery("SELECT MyFunction(myField) FROM MyTable");
可见,SQL的调用方式更加方便,我们后续依然会以SQL为例介绍UDF的用法。
4.7.2、标量函数(Scalar Functions)
自定义标量函数可以把0个、 1个或多个标量值转换成一个标量值,它对应的输入是一行数据中的字段,输出则是唯一的值。所以从输入和输出表中行数据的对应关系看,标量函数是“一对一”的转换。
想要实现自定义的标量函数,我们需要自定义一个类来继承抽象类ScalarFunction,并实现叫作eval() 的求值方法。标量函数的行为就取决于求值方法的定义,它必须是公有的(public),而且名字必须是eval。求值方法eval可以重载多次,任何数据类型都可作为求值方法的参数和返回值类型。
这里需要特别说明的是,ScalarFunction抽象类中并没有定义eval()方法,所以我们不能直接在代码中重写(override);但Table API的框架底层又要求了求值方法必须名字为eval()。这是Table API和SQL目前还显得不够完善的地方,未来的版本应该会有所改进。
下面我们来看一个具体的例子。我们实现一个自定义的哈希(hash)函数HashFunction,用来求传入对象的哈希值。
package com.lyh.sql; import org.apache.flink.table.annotation.DataTypeHint; import org.apache.flink.table.annotation.InputGroup; import org.apache.flink.table.functions.ScalarFunction; // TODO 自定义函数的实现类 public class MyHashFunction extends ScalarFunction { // 接受任意类型的输入 返回int类型输出 public int eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o){ return o.hashCode(); } }
package com.lyh.sql; import com.lyh.bean.WaterSensor; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import static org.apache.flink.table.api.Expressions.$; import static org.apache.flink.table.api.Expressions.call; public class ScalarFunctionDemo { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<WaterSensor> sensorDS = env.fromElements( new WaterSensor("s1", 1L, 1), new WaterSensor("s1", 2L, 2), new WaterSensor("s2", 2L, 21), new WaterSensor("s3", 3L, 3), new WaterSensor("s3", 4L, 4) ); // TODO 1.创建表环境 StreamTableEnvironment table_env = StreamTableEnvironment.create(env); // TODO 流 -> 表 // 属性名 就是表的 字段名 Table sensorTable = table_env.fromDataStream(sensorDS); table_env.createTemporaryView("sensor",sensorTable); //TODO 2. 注册函数 table_env.createTemporaryFunction("hashFunction",MyHashFunction.class); // TODO 3. 调用自定义函数 // 3.1 sql 用法 table_env.sqlQuery("select hashFunction(id) from sensor") // 调用了 sql 的 execute 就不需要 env.execute() .execute() .print(); // 3.2 table api 用法 sensorTable // hashFunction的eval方法有注解的原因就是因为要告诉这里的参数类型 .select(call("hashFunction",$("id"))) .execute() .print(); } }
运行结果:
这里我们自定义了一个ScalarFunction,实现了eval()求值方法,将任意类型的对象传入,得到一个Int类型的哈希值返回。当然,具体的求哈希操作就省略了,直接调用对象的hashCode()方法即可。
另外注意,由于Table API在对函数进行解析时需要提取求值方法参数的类型引用,所以我们用DataTypeHint(inputGroup = InputGroup.ANY)对输入参数的类型做了标注,表示eval的参数可以是任意类型。
Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】(5)https://developer.aliyun.com/article/1532340