Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】(4)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】

Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】(3)https://developer.aliyun.com/article/1532337

4.5、输出表

表的创建和查询,就对应着流处理中的读取数据源(Source)和转换(Transform);而最后一个步骤Sink,也就是将结果数据输出到外部系统,就对应着表的输出操作。

在代码上,输出一张表最直接的方法,就是调用Table的方法executeInsert()方法将一个 Table写入到注册过的表中,方法传入的参数就是注册的表名。

// 注册表,用于输出数据到外部系统
tableEnv.executeSql("CREATE TABLE OutputTable ... WITH ( 'connector' = ... )");
 
// 经过查询转换,得到结果表
Table result = ...
 
// 将结果表写入已注册的输出表中
result.executeInsert("OutputTable");

在底层,表的输出是通过将数据写入到TableSink来实现的。TableSink是Table API中提供的一个向外部系统写入数据的通用接口,可以支持不同的文件格式(比如CSV、Parquet)、存储数据库(比如JDBC、Elasticsearch)和消息队列(比如Kafka)。

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;
 
public class SqlDemo {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
        // TODO 1.创建表环境
        // 1.1 写法1:
//        EnvironmentSettings es = EnvironmentSettings.newInstance()
//                .inStreamingMode()
//                .build();
//        TableEnvironment table_env = TableEnvironment.create(es);
 
        // 1.2 写法2:
        StreamTableEnvironment table_env = StreamTableEnvironment.create(env);
 
        // TODO 2. 创建表
        table_env.executeSql("CREATE TABLE source ( \n" +
                "    id INT, \n" +
                "    ts BIGINT, \n" +
                "    vc INT\n" +
                ") WITH ( \n" +
                "    'connector' = 'datagen', \n" +
                "    'rows-per-second'='1', \n" +
                "    'fields.id.kind'='random', \n" +
                "    'fields.id.min'='1', \n" +
                "    'fields.id.max'='10', \n" +
                "    'fields.ts.kind'='sequence', \n" +
                "    'fields.ts.start'='1', \n" +
                "    'fields.ts.end'='1000000', \n" +
                "    'fields.vc.kind'='random', \n" +
                "    'fields.vc.min'='1', \n" +
                "    'fields.vc.max'='100'\n" +
                ");\n");
        table_env.executeSql("CREATE TABLE sink (\n" +
                "    id INT, \n" +
                "    sum_vc INT \n" +
                ") WITH (\n" +
                "'connector' = 'print'\n" +
                ");\n");
 
        // TODO 3. 执行查询 查询的结果也是一张表
        // 3.1 使用sql进行查询
        Table table = table_env.sqlQuery("select id,sum(vc) as sum_vc from source where id > 5 group by id;");
        // 把 table 对象注册成为表名
        table_env.createTemporaryView("tmp",table);
//        table_env.sqlQuery("select * from tmp;");
        // 3.2 用table api查询
//        Table source = table_env.from("source");
//        Table result = source
//                .where($("id").isGreater(5))
//                .groupBy($("id"))
//                .aggregate($("vc").sum().as("sum_vc"))
//                .select($("id"), $("sum_vc"));
 
        // TODO 4. 输出表
        // 4.1 sql用法
        table_env.executeSql("insert into sink select * from tmp");
        // 4.2 table api
//        result.executeInsert("sink");
    }
}

运行结果:

4.6 表和流的转换

既然都不用 SQL 了,我们不可能用 API 只是为了没事找事干,而是为了方便结合一些底层的 API,比如我们之前学的 DataSream API。

4.6.1、将流(DataStream)转换成表(Table)

1. 调用fromDataStream()方法

想要将一个DataStream转换成表很简单,可以通过调用表环境的fromDataStream()方法来实现,返回的就是一个Table对象。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
// 获取表环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
 
// 读取数据源
SingleOutputStreamOperator<WaterSensor> sensorDS = env.fromSource(...)
 
// 将数据流转换成表
Table sensorTable = tableEnv.fromDataStream(sensorDS);

由于流中的数据本身就是定义好的POJO类型WaterSensor,所以我们将流转换成表之后,每一行数据就对应着一个WaterSensor,而表中的列名就对应着WaterSensor中的属性。

另外,我们还可以在fromDataStream()方法中增加参数,用来指定提取哪些属性作为表中的字段名,并可以任意指定位置:

// 提取Event中的timestamp和url作为表中的列
Table sensorTable = tableEnv.fromDataStream(sensorDS, $("id"), $("vc"));

也可以通过表达式的as()方法对字段进行重命名:

// 将timestamp字段重命名为ts
Table sensorTable = tableEnv.fromDataStream(sensorDS, $("id").as("sid"), $("vc"));

(2)调用createTemporaryView()方法

      调用fromDataStream()方法简单直观,可以直接实现DataStream到Table的转换;不过如果我们希望直接在SQL中引用这张表,就还需要调用表环境的createTemporaryView()方法来创建虚拟视图了。

       对于这种场景,也有一种更简洁的调用方式。我们可以直接调用createTemporaryView()方法创建虚拟表,传入的两个参数,第一个依然是注册的表名,而第二个可以直接就是DataStream。之后仍旧可以传入多个参数,用来指定表中的字段

tableEnv.createTemporaryView("sensorTable",sensorDS, $("id"),$("ts"),$("vc"));

这样,我们接下来就可以直接在SQL中引用表sensorTable了。

4.6.2、将表(Table)转换成流(DataStream)

(1)调用toDataStream()方法

将一个Table对象转换成DataStream非常简单,只要直接调用表环境的方法toDataStream()就可以了。例如,我们可以将2.4小节经查询转换得到的表aliceClickTable转换成流打印输出:

tableEnv.toDataStream(table).print();

(2)调用toChangelogStream()方法

urlCountTable这个表中进行了分组聚合统计,所以表中的每一行是会“更新”的。对于这样有更新操作的表,我们不应该直接把它转换成DataStream打印输出,而是记录一下它的“更新日志”(change log)。这样一来,对于表的所有更新操作,就变成了一条更新日志的流,我们就可以转换成流打印输出了。

代码中需要调用的是表环境的toChangelogStream()方法:

Table table = tableEnv.sqlQuery(
    "SELECT id, sum(vc) " +
    "FROM source " +
    "GROUP BY id "
  );
 
// 将表转换成更新日志流
tableEnv.toChangelogStream(table).print();

4.6.3、支持的数据类型

整体来看,DataStream中支持的数据类型,Table中也是都支持的,只不过在进行转换时需要注意一些细节。

1. 原子类型

在Flink中,基础数据类型(Integer、Double、String)和通用数据类型(也就是不可再拆分的数据类型)统一称作“原子类型”。原子类型的DataStream,转换之后就成了只有一列的Table,列字段(field)的数据类型可以由原子类型推断出。另外,还可以在fromDataStream()方法里增加参数,用来重新命名列字段。

StreamTableEnvironment tableEnv = ...;
 
DataStream<Long> stream = ...;
 
// 将数据流转换成动态表,动态表只有一个字段,重命名为myLong
Table table = tableEnv.fromDataStream(stream, $("myLong"));
2. Tuple类型

当原子类型不做重命名时,默认的字段名就是“f0”,容易想到,这其实就是将原子类型看作了一元组Tuple1的处理结果。

Table支持Flink中定义的元组类型Tuple,对应在表中字段名默认就是元组中元素的属性名f0、f1、f2...。所有字段都可以被重新排序,也可以提取其中的一部分字段。字段还可以通过调用表达式的as()方法来进行重命名。

StreamTableEnvironment tableEnv = ...;
 
DataStream<Tuple2<Long, Integer>> stream = ...;
 
// 将数据流转换成只包含f1字段的表
Table table = tableEnv.fromDataStream(stream, $("f1"));
 
// 将数据流转换成包含f0和f1字段的表,在表中f0和f1位置交换
Table table = tableEnv.fromDataStream(stream, $("f1"), $("f0"));
 
// 将f1字段命名为myInt,f0命名为myLong
Table table = tableEnv.fromDataStream(stream, $("f1").as("myInt"), $("f0").as("myLong"));
3. POJO 类型

Flink也支持多种数据类型组合成的“复合类型”,最典型的就是简单Java对象(POJO 类型)。由于POJO中已经定义好了可读性强的字段名,这种类型的数据流转换成Table就显得无比顺畅了。

将POJO类型的DataStream转换成Table,如果不指定字段名称,就会直接使用原始 POJO 类型中的字段名称。POJO中的字段同样可以被重新排序、提却和重命名。

StreamTableEnvironment tableEnv = ...;
 
DataStream<Event> stream = ...;
 
Table table = tableEnv.fromDataStream(stream);
Table table = tableEnv.fromDataStream(stream, $("user"));
Table table = tableEnv.fromDataStream(stream, $("user").as("myUser"), $("url").as("myUrl"));
4. Row类型

Flink中还定义了一个在关系型表中更加通用的数据类型——行(Row),它是Table中数据的基本组织形式。

Row类型也是一种复合类型,它的长度固定,而且无法直接推断出每个字段的类型,所以在使用时必须指明具体的类型信息;我们在创建Table时调用的CREATE语句就会将所有的字段名称和类型指定,这在Flink中被称为表的“模式结构”(Schema)。

4.6.4、综合应用示例

package com.lyh.sql;
 
import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 
import static org.apache.flink.table.api.Expressions.$;
 
public class TableStreamDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
        DataStreamSource<WaterSensor> sensorDS = env.fromElements(
                new WaterSensor("s1", 1L, 1),
                new WaterSensor("s1", 2L, 2),
                new WaterSensor("s2", 2L, 21),
                new WaterSensor("s3", 3L, 3),
                new WaterSensor("s3", 4L, 4)
        );
 
 
        // TODO 1.创建表环境
        StreamTableEnvironment table_env = StreamTableEnvironment.create(env);
 
        // TODO 1. 流 -> 表
        // 属性名 就是表的 字段名
        Table sensorTable = table_env.fromDataStream(sensorDS);
        // 或者指定保留哪些字段
//        table_env.fromDataStream(sensorDS,$("id"));
        // 注册
        table_env.createTemporaryView("sensor",sensorTable);
 
        Table result = table_env.sqlQuery("select id,sum(vc) from sensor group by id");
        Table filter = table_env.sqlQuery("select id,ts,vc from sensor where ts > 2");
 
        // TODO 2. 表 -> 流
        // 2.1 追加流
        table_env.toDataStream(filter,WaterSensor.class).print("filter");
        // 2.2 更新流(结果需要更新)
        table_env.toChangelogStream(result).print("sum");
 
        // 只要代码中调用了 DataStream 就需要使用 execute
        env.execute();
    }
}

运行结果:

4.7 自定义函数(UDF)

系统函数尽管庞大,也不可能涵盖所有的功能;如果有系统函数不支持的需求,我们就需要用自定义函数(User Defined Functions,UDF)来实现了。

Flink的Table API和SQL提供了多种自定义函数的接口,以抽象类的形式定义。当前UDF主要有以下几类:

  1. 标量函数(Scalar Functions):将输入的标量值转换成一个新的标量值;
  2. 表函数(Table Functions):将标量值转换成一个或多个新的行数据,也就是扩展成一个表;
  3. 聚合函数(Aggregate Functions):将多行数据里的标量值转换成一个新的标量值;
  4. 表聚合函数(Table Aggregate Functions):将多行数据里的标量值转换成一个或多个新的行数据。

4.7.1、整体调用流程

要想在代码中使用自定义的函数,我们需要首先自定义对应UDF抽象类的实现,并在表环境中注册这个函数,然后就可以在Table API和SQL中调用了。

(1)注册函数

注册函数时需要调用表环境的createTemporarySystemFunction()方法,传入注册的函数名以及UDF类的Class对象:

// 注册函数
tableEnv.createTemporarySystemFunction("MyFunction", MyFunction.class);

我们自定义的UDF类叫作MyFunction,它应该是上面四种UDF抽象类中某一个的具体实现;在环境中将它注册为名叫MyFunction的函数。

(2)使用Table API调用函数

在Table API中,需要使用call()方法来调用自定义函数:

tableEnv.from("MyTable").select(call("MyFunction", $("myField")));

这里call()方法有两个参数,一个是注册好的函数名MyFunction,另一个则是函数调用时本身的参数。这里我们定义MyFunction在调用时,需要传入的参数是myField字段。

(3)在SQL中调用函数

当我们将函数注册为系统函数之后,在SQL中的调用就与内置系统函数完全一样了:

tableEnv.sqlQuery("SELECT MyFunction(myField) FROM MyTable");

可见,SQL的调用方式更加方便,我们后续依然会以SQL为例介绍UDF的用法。

4.7.2、标量函数(Scalar Functions

自定义标量函数可以把0个、 1个或多个标量值转换成一个标量值,它对应的输入是一行数据中的字段,输出则是唯一的值。所以从输入和输出表中行数据的对应关系看,标量函数是“一对一”的转换。

想要实现自定义的标量函数,我们需要自定义一个类来继承抽象类ScalarFunction,并实现叫作eval() 的求值方法。标量函数的行为就取决于求值方法的定义,它必须是公有的(public),而且名字必须是eval。求值方法eval可以重载多次,任何数据类型都可作为求值方法的参数和返回值类型。

这里需要特别说明的是,ScalarFunction抽象类中并没有定义eval()方法,所以我们不能直接在代码中重写(override);但Table API的框架底层又要求了求值方法必须名字为eval()。这是Table API和SQL目前还显得不够完善的地方,未来的版本应该会有所改进。

下面我们来看一个具体的例子。我们实现一个自定义的哈希(hash)函数HashFunction,用来求传入对象的哈希值。

package com.lyh.sql;
 
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.InputGroup;
import org.apache.flink.table.functions.ScalarFunction;
 
// TODO 自定义函数的实现类
public class MyHashFunction extends ScalarFunction {
    // 接受任意类型的输入 返回int类型输出
    public int eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o){
        return o.hashCode();
    }
}
package com.lyh.sql;
 
import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;
 
public class ScalarFunctionDemo {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
        DataStreamSource<WaterSensor> sensorDS = env.fromElements(
                new WaterSensor("s1", 1L, 1),
                new WaterSensor("s1", 2L, 2),
                new WaterSensor("s2", 2L, 21),
                new WaterSensor("s3", 3L, 3),
                new WaterSensor("s3", 4L, 4)
        );
 
 
        // TODO 1.创建表环境
        StreamTableEnvironment table_env = StreamTableEnvironment.create(env);
        // TODO 流 -> 表
        // 属性名 就是表的 字段名
        Table sensorTable = table_env.fromDataStream(sensorDS);
        table_env.createTemporaryView("sensor",sensorTable);
 
        //TODO 2. 注册函数
        table_env.createTemporaryFunction("hashFunction",MyHashFunction.class);
 
        // TODO 3. 调用自定义函数
        // 3.1 sql 用法
        table_env.sqlQuery("select hashFunction(id) from sensor")
                // 调用了 sql 的 execute 就不需要 env.execute()
                .execute()
                .print();
        // 3.2 table api 用法
        sensorTable
                // hashFunction的eval方法有注解的原因就是因为要告诉这里的参数类型
                .select(call("hashFunction",$("id")))
                .execute()
                .print();
 
    }
}

运行结果:

这里我们自定义了一个ScalarFunction,实现了eval()求值方法,将任意类型的对象传入,得到一个Int类型的哈希值返回。当然,具体的求哈希操作就省略了,直接调用对象的hashCode()方法即可。

另外注意,由于Table API在对函数进行解析时需要提取求值方法参数的类型引用,所以我们用DataTypeHint(inputGroup = InputGroup.ANY)对输入参数的类型做了标注,表示eval的参数可以是任意类型。

Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】(5)https://developer.aliyun.com/article/1532340

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
3月前
|
SQL 存储 API
Flink实践:通过Flink SQL进行SFTP文件的读写操作
虽然 Apache Flink 与 SFTP 之间的直接交互存在一定的限制,但通过一些创造性的方法和技术,我们仍然可以有效地实现对 SFTP 文件的读写操作。这既展现了 Flink 在处理复杂数据场景中的强大能力,也体现了软件工程中常见的问题解决思路——即通过现有工具和一定的间接方法来克服技术障碍。通过这种方式,Flink SQL 成为了处理各种数据源,包括 SFTP 文件,在内的强大工具。
199 15
|
4天前
|
SQL 存储 缓存
Flink SQL Deduplication 去重以及如何获取最新状态操作
Flink SQL Deduplication 是一种高效的数据去重功能,支持多种数据类型和灵活的配置选项。它通过哈希表、时间窗口和状态管理等技术实现去重,适用于流处理和批处理场景。本文介绍了其特性、原理、实际案例及源码分析,帮助读者更好地理解和应用这一功能。
53 14
|
4月前
|
存储 API 网络架构
【Azure 存储服务】调用REST API获取Stroage Account Table中所有的Entity计数 -- Count
【Azure 存储服务】调用REST API获取Stroage Account Table中所有的Entity计数 -- Count
|
4月前
|
SQL 存储 Unix
Flink SQL 在快手实践问题之设置 Window Offset 以调整窗口划分如何解决
Flink SQL 在快手实践问题之设置 Window Offset 以调整窗口划分如何解决
70 2
|
2月前
|
SQL 大数据 API
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
56 0
|
3月前
|
SQL 安全 数据处理
揭秘数据脱敏神器:Flink SQL的神秘力量,守护你的数据宝藏!
【9月更文挑战第7天】在大数据时代,数据管理和处理尤为重要,尤其在保障数据安全与隐私方面。本文探讨如何利用Flink SQL实现数据脱敏,为实时数据处理提供有效的隐私保护方案。数据脱敏涉及在处理、存储或传输前对敏感数据进行加密、遮蔽或替换,以遵守数据保护法规(如GDPR)。Flink SQL通过内置函数和表达式支持这一过程。
91 2
|
3月前
|
SQL 大数据 数据处理
奇迹降临!解锁 Flink SQL 简单高效的终极秘籍,开启数据处理的传奇之旅!
【9月更文挑战第7天】在大数据处理领域,Flink SQL 因其强大功能与简洁语法成为开发者首选。本文分享了编写高效 Flink SQL 的实用技巧:理解数据特征及业务需求;灵活运用窗口函数(如 TUMBLE 和 HOP);优化连接操作,优先采用等值连接;合理选择数据类型以减少计算资源消耗。结合实际案例(如实时电商数据分析),并通过定期性能测试与调优,助力开发者在大数据处理中更得心应手,挖掘更多价值信息。
52 1
|
4月前
|
存储 JSON API
【Azure 存储服务】使用REST API操作Azure Storage Table,删除数据(Delete Entity)
【Azure 存储服务】使用REST API操作Azure Storage Table,删除数据(Delete Entity)
【Azure 存储服务】使用REST API操作Azure Storage Table,删除数据(Delete Entity)
|
4月前
|
SQL 流计算
Flink SQL 在快手实践问题之通过 SQL 改写实现状态复用如何解决
Flink SQL 在快手实践问题之通过 SQL 改写实现状态复用如何解决
56 2
下一篇
DataWorks