Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】(4)

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】

Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】(3)https://developer.aliyun.com/article/1532337

4.5、输出表

表的创建和查询,就对应着流处理中的读取数据源(Source)和转换(Transform);而最后一个步骤Sink,也就是将结果数据输出到外部系统,就对应着表的输出操作。

在代码上,输出一张表最直接的方法,就是调用Table的方法executeInsert()方法将一个 Table写入到注册过的表中,方法传入的参数就是注册的表名。

// 注册表,用于输出数据到外部系统
tableEnv.executeSql("CREATE TABLE OutputTable ... WITH ( 'connector' = ... )");
 
// 经过查询转换,得到结果表
Table result = ...
 
// 将结果表写入已注册的输出表中
result.executeInsert("OutputTable");

在底层,表的输出是通过将数据写入到TableSink来实现的。TableSink是Table API中提供的一个向外部系统写入数据的通用接口,可以支持不同的文件格式(比如CSV、Parquet)、存储数据库(比如JDBC、Elasticsearch)和消息队列(比如Kafka)。

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;
 
public class SqlDemo {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
        // TODO 1.创建表环境
        // 1.1 写法1:
//        EnvironmentSettings es = EnvironmentSettings.newInstance()
//                .inStreamingMode()
//                .build();
//        TableEnvironment table_env = TableEnvironment.create(es);
 
        // 1.2 写法2:
        StreamTableEnvironment table_env = StreamTableEnvironment.create(env);
 
        // TODO 2. 创建表
        table_env.executeSql("CREATE TABLE source ( \n" +
                "    id INT, \n" +
                "    ts BIGINT, \n" +
                "    vc INT\n" +
                ") WITH ( \n" +
                "    'connector' = 'datagen', \n" +
                "    'rows-per-second'='1', \n" +
                "    'fields.id.kind'='random', \n" +
                "    'fields.id.min'='1', \n" +
                "    'fields.id.max'='10', \n" +
                "    'fields.ts.kind'='sequence', \n" +
                "    'fields.ts.start'='1', \n" +
                "    'fields.ts.end'='1000000', \n" +
                "    'fields.vc.kind'='random', \n" +
                "    'fields.vc.min'='1', \n" +
                "    'fields.vc.max'='100'\n" +
                ");\n");
        table_env.executeSql("CREATE TABLE sink (\n" +
                "    id INT, \n" +
                "    sum_vc INT \n" +
                ") WITH (\n" +
                "'connector' = 'print'\n" +
                ");\n");
 
        // TODO 3. 执行查询 查询的结果也是一张表
        // 3.1 使用sql进行查询
        Table table = table_env.sqlQuery("select id,sum(vc) as sum_vc from source where id > 5 group by id;");
        // 把 table 对象注册成为表名
        table_env.createTemporaryView("tmp",table);
//        table_env.sqlQuery("select * from tmp;");
        // 3.2 用table api查询
//        Table source = table_env.from("source");
//        Table result = source
//                .where($("id").isGreater(5))
//                .groupBy($("id"))
//                .aggregate($("vc").sum().as("sum_vc"))
//                .select($("id"), $("sum_vc"));
 
        // TODO 4. 输出表
        // 4.1 sql用法
        table_env.executeSql("insert into sink select * from tmp");
        // 4.2 table api
//        result.executeInsert("sink");
    }
}

运行结果:

4.6 表和流的转换

既然都不用 SQL 了,我们不可能用 API 只是为了没事找事干,而是为了方便结合一些底层的 API,比如我们之前学的 DataSream API。

4.6.1、将流(DataStream)转换成表(Table)

1. 调用fromDataStream()方法

想要将一个DataStream转换成表很简单,可以通过调用表环境的fromDataStream()方法来实现,返回的就是一个Table对象。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
// 获取表环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
 
// 读取数据源
SingleOutputStreamOperator<WaterSensor> sensorDS = env.fromSource(...)
 
// 将数据流转换成表
Table sensorTable = tableEnv.fromDataStream(sensorDS);

由于流中的数据本身就是定义好的POJO类型WaterSensor,所以我们将流转换成表之后,每一行数据就对应着一个WaterSensor,而表中的列名就对应着WaterSensor中的属性。

另外,我们还可以在fromDataStream()方法中增加参数,用来指定提取哪些属性作为表中的字段名,并可以任意指定位置:

// 提取Event中的timestamp和url作为表中的列
Table sensorTable = tableEnv.fromDataStream(sensorDS, $("id"), $("vc"));

也可以通过表达式的as()方法对字段进行重命名:

// 将timestamp字段重命名为ts
Table sensorTable = tableEnv.fromDataStream(sensorDS, $("id").as("sid"), $("vc"));

(2)调用createTemporaryView()方法

      调用fromDataStream()方法简单直观,可以直接实现DataStream到Table的转换;不过如果我们希望直接在SQL中引用这张表,就还需要调用表环境的createTemporaryView()方法来创建虚拟视图了。

       对于这种场景,也有一种更简洁的调用方式。我们可以直接调用createTemporaryView()方法创建虚拟表,传入的两个参数,第一个依然是注册的表名,而第二个可以直接就是DataStream。之后仍旧可以传入多个参数,用来指定表中的字段

tableEnv.createTemporaryView("sensorTable",sensorDS, $("id"),$("ts"),$("vc"));

这样,我们接下来就可以直接在SQL中引用表sensorTable了。

4.6.2、将表(Table)转换成流(DataStream)

(1)调用toDataStream()方法

将一个Table对象转换成DataStream非常简单,只要直接调用表环境的方法toDataStream()就可以了。例如,我们可以将2.4小节经查询转换得到的表aliceClickTable转换成流打印输出:

tableEnv.toDataStream(table).print();

(2)调用toChangelogStream()方法

urlCountTable这个表中进行了分组聚合统计,所以表中的每一行是会“更新”的。对于这样有更新操作的表,我们不应该直接把它转换成DataStream打印输出,而是记录一下它的“更新日志”(change log)。这样一来,对于表的所有更新操作,就变成了一条更新日志的流,我们就可以转换成流打印输出了。

代码中需要调用的是表环境的toChangelogStream()方法:

Table table = tableEnv.sqlQuery(
    "SELECT id, sum(vc) " +
    "FROM source " +
    "GROUP BY id "
  );
 
// 将表转换成更新日志流
tableEnv.toChangelogStream(table).print();

4.6.3、支持的数据类型

整体来看,DataStream中支持的数据类型,Table中也是都支持的,只不过在进行转换时需要注意一些细节。

1. 原子类型

在Flink中,基础数据类型(Integer、Double、String)和通用数据类型(也就是不可再拆分的数据类型)统一称作“原子类型”。原子类型的DataStream,转换之后就成了只有一列的Table,列字段(field)的数据类型可以由原子类型推断出。另外,还可以在fromDataStream()方法里增加参数,用来重新命名列字段。

StreamTableEnvironment tableEnv = ...;
 
DataStream<Long> stream = ...;
 
// 将数据流转换成动态表,动态表只有一个字段,重命名为myLong
Table table = tableEnv.fromDataStream(stream, $("myLong"));
2. Tuple类型

当原子类型不做重命名时,默认的字段名就是“f0”,容易想到,这其实就是将原子类型看作了一元组Tuple1的处理结果。

Table支持Flink中定义的元组类型Tuple,对应在表中字段名默认就是元组中元素的属性名f0、f1、f2...。所有字段都可以被重新排序,也可以提取其中的一部分字段。字段还可以通过调用表达式的as()方法来进行重命名。

StreamTableEnvironment tableEnv = ...;
 
DataStream<Tuple2<Long, Integer>> stream = ...;
 
// 将数据流转换成只包含f1字段的表
Table table = tableEnv.fromDataStream(stream, $("f1"));
 
// 将数据流转换成包含f0和f1字段的表,在表中f0和f1位置交换
Table table = tableEnv.fromDataStream(stream, $("f1"), $("f0"));
 
// 将f1字段命名为myInt,f0命名为myLong
Table table = tableEnv.fromDataStream(stream, $("f1").as("myInt"), $("f0").as("myLong"));
3. POJO 类型

Flink也支持多种数据类型组合成的“复合类型”,最典型的就是简单Java对象(POJO 类型)。由于POJO中已经定义好了可读性强的字段名,这种类型的数据流转换成Table就显得无比顺畅了。

将POJO类型的DataStream转换成Table,如果不指定字段名称,就会直接使用原始 POJO 类型中的字段名称。POJO中的字段同样可以被重新排序、提却和重命名。

StreamTableEnvironment tableEnv = ...;
 
DataStream<Event> stream = ...;
 
Table table = tableEnv.fromDataStream(stream);
Table table = tableEnv.fromDataStream(stream, $("user"));
Table table = tableEnv.fromDataStream(stream, $("user").as("myUser"), $("url").as("myUrl"));
4. Row类型

Flink中还定义了一个在关系型表中更加通用的数据类型——行(Row),它是Table中数据的基本组织形式。

Row类型也是一种复合类型,它的长度固定,而且无法直接推断出每个字段的类型,所以在使用时必须指明具体的类型信息;我们在创建Table时调用的CREATE语句就会将所有的字段名称和类型指定,这在Flink中被称为表的“模式结构”(Schema)。

4.6.4、综合应用示例

package com.lyh.sql;
 
import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 
import static org.apache.flink.table.api.Expressions.$;
 
public class TableStreamDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
        DataStreamSource<WaterSensor> sensorDS = env.fromElements(
                new WaterSensor("s1", 1L, 1),
                new WaterSensor("s1", 2L, 2),
                new WaterSensor("s2", 2L, 21),
                new WaterSensor("s3", 3L, 3),
                new WaterSensor("s3", 4L, 4)
        );
 
 
        // TODO 1.创建表环境
        StreamTableEnvironment table_env = StreamTableEnvironment.create(env);
 
        // TODO 1. 流 -> 表
        // 属性名 就是表的 字段名
        Table sensorTable = table_env.fromDataStream(sensorDS);
        // 或者指定保留哪些字段
//        table_env.fromDataStream(sensorDS,$("id"));
        // 注册
        table_env.createTemporaryView("sensor",sensorTable);
 
        Table result = table_env.sqlQuery("select id,sum(vc) from sensor group by id");
        Table filter = table_env.sqlQuery("select id,ts,vc from sensor where ts > 2");
 
        // TODO 2. 表 -> 流
        // 2.1 追加流
        table_env.toDataStream(filter,WaterSensor.class).print("filter");
        // 2.2 更新流(结果需要更新)
        table_env.toChangelogStream(result).print("sum");
 
        // 只要代码中调用了 DataStream 就需要使用 execute
        env.execute();
    }
}

运行结果:

4.7 自定义函数(UDF)

系统函数尽管庞大,也不可能涵盖所有的功能;如果有系统函数不支持的需求,我们就需要用自定义函数(User Defined Functions,UDF)来实现了。

Flink的Table API和SQL提供了多种自定义函数的接口,以抽象类的形式定义。当前UDF主要有以下几类:

  1. 标量函数(Scalar Functions):将输入的标量值转换成一个新的标量值;
  2. 表函数(Table Functions):将标量值转换成一个或多个新的行数据,也就是扩展成一个表;
  3. 聚合函数(Aggregate Functions):将多行数据里的标量值转换成一个新的标量值;
  4. 表聚合函数(Table Aggregate Functions):将多行数据里的标量值转换成一个或多个新的行数据。

4.7.1、整体调用流程

要想在代码中使用自定义的函数,我们需要首先自定义对应UDF抽象类的实现,并在表环境中注册这个函数,然后就可以在Table API和SQL中调用了。

(1)注册函数

注册函数时需要调用表环境的createTemporarySystemFunction()方法,传入注册的函数名以及UDF类的Class对象:

// 注册函数
tableEnv.createTemporarySystemFunction("MyFunction", MyFunction.class);

我们自定义的UDF类叫作MyFunction,它应该是上面四种UDF抽象类中某一个的具体实现;在环境中将它注册为名叫MyFunction的函数。

(2)使用Table API调用函数

在Table API中,需要使用call()方法来调用自定义函数:

tableEnv.from("MyTable").select(call("MyFunction", $("myField")));

这里call()方法有两个参数,一个是注册好的函数名MyFunction,另一个则是函数调用时本身的参数。这里我们定义MyFunction在调用时,需要传入的参数是myField字段。

(3)在SQL中调用函数

当我们将函数注册为系统函数之后,在SQL中的调用就与内置系统函数完全一样了:

tableEnv.sqlQuery("SELECT MyFunction(myField) FROM MyTable");

可见,SQL的调用方式更加方便,我们后续依然会以SQL为例介绍UDF的用法。

4.7.2、标量函数(Scalar Functions

自定义标量函数可以把0个、 1个或多个标量值转换成一个标量值,它对应的输入是一行数据中的字段,输出则是唯一的值。所以从输入和输出表中行数据的对应关系看,标量函数是“一对一”的转换。

想要实现自定义的标量函数,我们需要自定义一个类来继承抽象类ScalarFunction,并实现叫作eval() 的求值方法。标量函数的行为就取决于求值方法的定义,它必须是公有的(public),而且名字必须是eval。求值方法eval可以重载多次,任何数据类型都可作为求值方法的参数和返回值类型。

这里需要特别说明的是,ScalarFunction抽象类中并没有定义eval()方法,所以我们不能直接在代码中重写(override);但Table API的框架底层又要求了求值方法必须名字为eval()。这是Table API和SQL目前还显得不够完善的地方,未来的版本应该会有所改进。

下面我们来看一个具体的例子。我们实现一个自定义的哈希(hash)函数HashFunction,用来求传入对象的哈希值。

package com.lyh.sql;
 
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.InputGroup;
import org.apache.flink.table.functions.ScalarFunction;
 
// TODO 自定义函数的实现类
public class MyHashFunction extends ScalarFunction {
    // 接受任意类型的输入 返回int类型输出
    public int eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o){
        return o.hashCode();
    }
}
package com.lyh.sql;
 
import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;
 
public class ScalarFunctionDemo {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
        DataStreamSource<WaterSensor> sensorDS = env.fromElements(
                new WaterSensor("s1", 1L, 1),
                new WaterSensor("s1", 2L, 2),
                new WaterSensor("s2", 2L, 21),
                new WaterSensor("s3", 3L, 3),
                new WaterSensor("s3", 4L, 4)
        );
 
 
        // TODO 1.创建表环境
        StreamTableEnvironment table_env = StreamTableEnvironment.create(env);
        // TODO 流 -> 表
        // 属性名 就是表的 字段名
        Table sensorTable = table_env.fromDataStream(sensorDS);
        table_env.createTemporaryView("sensor",sensorTable);
 
        //TODO 2. 注册函数
        table_env.createTemporaryFunction("hashFunction",MyHashFunction.class);
 
        // TODO 3. 调用自定义函数
        // 3.1 sql 用法
        table_env.sqlQuery("select hashFunction(id) from sensor")
                // 调用了 sql 的 execute 就不需要 env.execute()
                .execute()
                .print();
        // 3.2 table api 用法
        sensorTable
                // hashFunction的eval方法有注解的原因就是因为要告诉这里的参数类型
                .select(call("hashFunction",$("id")))
                .execute()
                .print();
 
    }
}

运行结果:

这里我们自定义了一个ScalarFunction,实现了eval()求值方法,将任意类型的对象传入,得到一个Int类型的哈希值返回。当然,具体的求哈希操作就省略了,直接调用对象的hashCode()方法即可。

另外注意,由于Table API在对函数进行解析时需要提取求值方法参数的类型引用,所以我们用DataTypeHint(inputGroup = InputGroup.ANY)对输入参数的类型做了标注,表示eval的参数可以是任意类型。

Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】(5)https://developer.aliyun.com/article/1532340

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
6月前
|
SQL 存储 大数据
Dataphin V5.0:支持创建异步调用API,实现慢 SQL 复杂计算的直连消费
本文介绍了数据服务产品中异步调用的应用场景与优势,包括大数据引擎查询、复杂SQL及大规模数据下载等场景,解决了同步调用可能导致的资源浪费和性能问题。通过创建异步API、测试发布以及权限申请等功能,实现高效稳定的服务提供。以电商订单查询为例,展示了如何利用异步调用提升系统性能与用户体验。
272 9
|
SQL 消息中间件 关系型数据库
Apache Doris Flink Connector 24.0.0 版本正式发布
该版本新增了对 Flink 1.20 的支持,并支持通过 Arrow Flight SQL 高速读取 Doris 中数据。
459 21
|
9月前
|
SQL 人工智能 关系型数据库
Flink CDC YAML:面向数据集成的 API 设计
本文整理自阿里云智能集团 Flink PMC Member & Committer 徐榜江(雪尽)在 FFA 2024 分论坛的分享,涵盖四大主题:Flink CDC、YAML API、Transform + AI 和 Community。文章详细介绍了 Flink CDC 的发展历程及其优势,特别是 YAML API 的设计与实现,以及如何通过 Transform 和 AI 模型集成提升数据处理能力。最后,分享了社区动态和未来规划,欢迎更多开发者加入开源社区,共同推动 Flink CDC 的发展。
704 12
Flink CDC YAML:面向数据集成的 API 设计
|
9月前
|
SQL 存储 API
Flink Materialized Table:构建流批一体 ETL
本文整理自阿里云智能集团 Apache Flink Committer 刘大龙老师在2024FFA流批一体论坛的分享,涵盖三部分内容:数据工程师用户故事、Materialized Table 构建流批一体 ETL 及 Demo。文章通过案例分析传统 Lambda 架构的挑战,介绍了 Materialized Table 如何简化流批处理,提供统一 API 和声明式 ETL,实现高效的数据处理和维护。最后展示了基于 Flink 和 Paimon 的实际演示,帮助用户更好地理解和应用这一技术。
787 7
Flink Materialized Table:构建流批一体 ETL
|
8月前
|
SQL 人工智能 关系型数据库
Flink CDC YAML:面向数据集成的 API 设计
Flink CDC YAML:面向数据集成的 API 设计
338 5
|
8月前
|
SQL 存储 API
Flink Materialized Table:构建流批一体 ETL
Flink Materialized Table:构建流批一体 ETL
151 3
|
SQL 分布式计算 DataWorks
DataWorks产品使用合集之使用API调用ODPS SQL时,出现资源被定时任务抢占,该怎么办
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
202 32
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之如何配置Connector来保持与MySOL一致
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
API Java 数据库连接
从平凡到卓越:Hibernate Criteria API 让你的数据库查询瞬间高大上,彻底告别复杂SQL!
【8月更文挑战第31天】构建复杂查询是数据库应用开发中的常见需求。Hibernate 的 Criteria API 以其强大和灵活的特点,允许开发者以面向对象的方式构建查询逻辑,同时具备 SQL 的表达力。本文将介绍 Criteria API 的基本用法并通过示例展示其实际应用。此 API 通过 API 构建查询条件而非直接编写查询语句,提高了代码的可读性和安全性。无论是简单的条件过滤还是复杂的分页和连接查询,Criteria API 均能胜任,有助于提升开发效率和应用的健壮性。
443 0
|
消息中间件 存储 关系型数据库
实时计算 Flink版产品使用问题之如何使用Kafka Connector将数据写入到Kafka
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。

热门文章

最新文章