Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】(5)

简介: Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】

Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】(4)https://developer.aliyun.com/article/1532339

4.7.3、表函数(Table Functions

       跟标量函数一样,表函数的输入参数也可以是 0个、1个或多个标量值;不同的是,它可以返回任意多行数据。“多行数据”事实上就构成了一个表,所以“表函数”可以认为就是返回一个表的函数,这是一个“一对多”的转换关系。之前我们介绍过的窗口TVF,本质上就是表函数。

       类似地,要实现自定义的表函数,需要自定义类来继承抽象类TableFunction,内部必须要实现的也是一个名为 eval 的求值方法。与标量函数不同的是,TableFunction类本身是有一个泛型参数T的,这就是表函数返回数据的类型;而eval()方法没有返回类型,内部也没有return语句,是通过调用collect()方法来发送想要输出的行数据的。

       在SQL中调用表函数,需要使用LATERAL TABLE(<TableFunction>)来生成扩展的“侧向表”,然后与原始表进行联结(Join)。这里的Join操作可以是直接做交叉联结(cross join),在FROM后用逗号分隔两个表就可以;也可以是以ON TRUE为条件的左联结(LEFT JOIN)。

       下面是表函数的一个具体示例。我们实现了一个分隔字符串的函数SplitFunction,可以将一个字符串转换成(字符串,长度)的二元组。

package com.lyh.sql;
 
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
 
@FunctionHint(output = @DataTypeHint("ROW<word STRING,length INT>"))
public class MySplitFunction extends TableFunction<Row> {
 
    // 返回是 void ,用 collect 方法输出
    public void eval(String str){
        for (String word : str.split(" ")) {
            collect(Row.of(word,word.length()));
        }
    }
}
package com.lyh.sql;
 
import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;
 
public class ScalarFunctionDemo2 {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
        DataStreamSource<String> wordsDS = env.fromElements(
                "hello flink",
                "hello hadoop",
                "hello kafka",
                "hello spark",
                "hello hive and impala"
        );
        
        // TODO 1.创建表环境
        StreamTableEnvironment table_env = StreamTableEnvironment.create(env);
        // TODO 流 -> 表
        // 属性名 就是表的 字段名
        Table sensorTable = table_env.fromDataStream(wordsDS,$("name"));// 字段名
        table_env.createTemporaryView("words",sensorTable);
 
        //TODO 2. 注册函数
        table_env.createTemporaryFunction("splitFunction",MySplitFunction.class);
 
        // TODO 3. 调用自定义函数
        // 交叉用法
        table_env.sqlQuery("select word,length from words,lateral table (splitFunction(name))")
                // 调用了 sql 的 execute 就不需要 env.execute()
                .execute()
                .print();
 
    }
}

运行结果:

左联结:

table_env
     .sqlQuery("select name,word,length from words left join lateral table (splitFunction(name)) on true")
      .execute()
      .print();

字段重命名:

.sqlQuery("select name,newWord,newLength from words left join lateral table (splitFunction(name)) as T(newWord,newLength) on true")

       这里我们直接将表函数的输出类型定义成了ROW,这就是得到的侧向表中的数据类型;每行数据转换后也只有一行。我们分别用交叉联结和左联结两种方式在SQL中进行了调用,还可以对侧向表的中字段进行重命名。

4.7.4、聚合函数(Aggregate Functions

用户自定义聚合函数(User Defined AGGregate function,UDAGG)会把一行或多行数据(也就是一个表)聚合成一个标量值。这是一个标准的“多对一”的转换。

聚合函数的概念我们之前已经接触过多次,如SUM()、MAX()、MIN()、AVG()、COUNT()都是常见的系统内置聚合函数。而如果有些需求无法直接调用系统函数解决,我们就必须自定义聚合函数来实现功能了。

自定义聚合函数需要继承抽象类AggregateFunction。AggregateFunction有两个泛型参数<T, ACC>,T表示聚合输出的结果类型,ACC则表示聚合的中间状态类型。

Flink SQL中的聚合函数的工作原理如下:

(1)首先,它需要创建一个累加器(accumulator),用来存储聚合的中间结果。这与DataStream API中的AggregateFunction非常类似,累加器就可以看作是一个聚合状态。调用createAccumulator()方法可以创建一个空的累加器。

(2)对于输入的每一行数据,都会调用accumulate()方法来更新累加器,这是聚合的核心过程。

(3)当所有的数据都处理完之后,通过调用getValue()方法来计算并返回最终的结果。

所以,每个 AggregateFunction 都必须实现以下几个方法:

  1. createAccumulator()

这是创建累加器的方法。没有输入参数,返回类型为累加器类型ACC。

  1. accumulate()

这是进行聚合计算的核心方法,每来一行数据都会调用。它的第一个参数是确定的,就是当前的累加器,类型为ACC,表示当前聚合的中间状态;后面的参数则是聚合函数调用时传入的参数,可以有多个,类型也可以不同。这个方法主要是更新聚合状态,所以没有返回类型。需要注意的是,accumulate()与之前的求值方法eval()类似,也是底层架构要求的,必须为public,方法名必须为accumulate,且无法直接override、只能手动实现。

  1. getValue()

这是得到最终返回结果的方法。输入参数是ACC类型的累加器,输出类型为T。

在遇到复杂类型时,Flink 的类型推导可能会无法得到正确的结果。所以AggregateFunction也可以专门对累加器和返回结果的类型进行声明,这是通过 getAccumulatorType()和getResultType()两个方法来指定的。

AggregateFunction 的所有方法都必须是 公有的(public),不能是静态的(static),而且名字必须跟上面写的完全一样。createAccumulator、getValue、getResultType 以及 getAccumulatorType 这几个方法是在抽象类 AggregateFunction 中定义的,可以override;而其他则都是底层架构约定的方法。

下面举一个具体的示例,我们从学生的分数表ScoreTable中计算每个学生的加权平均分。

package com.lyh.sql;
 
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.functions.AggregateFunction;
 
// 泛型 T:返回类型 ACC: 累加器类型<加权总和,权重总和>
public class MyWeightAvg extends AggregateFunction<Double, Tuple2<Integer,Integer>> {
 
    /**
     * 计算累加和
     * @param acc 累加器的类型 固定写法
     * @param score 分神
     * @param weight 权重
     */
    public void accumulate(Tuple2<Integer, Integer> acc,Integer score,Integer weight){
        acc.f0 += score*weight;
        acc.f1 += weight;
    }
 
    @Override
    public Double getValue(Tuple2<Integer, Integer> integerIntegerTuple2) {
        return integerIntegerTuple2.f0*1d/integerIntegerTuple2.f1;
    }
 
    @Override
    public Tuple2<Integer, Integer> createAccumulator() {
        return Tuple2.of(0,0);
    }
}
package com.lyh.sql;
 
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 
import static org.apache.flink.table.api.Expressions.$;
 
public class ScalarFunctionDemo3 {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
        DataStreamSource<Tuple3<String,Integer,Integer>> scoreDS = env.fromElements(
                Tuple3.of("燕双鹰",80,3),
                Tuple3.of("李大喜",90,4),
                Tuple3.of("李大喜",88,4),
                Tuple3.of("狄仁杰",95,4),
                Tuple3.of("狄仁杰",86,4)
        );
 
        // TODO 1.创建表环境
        StreamTableEnvironment table_env = StreamTableEnvironment.create(env);
        // TODO 流 -> 表
        // 属性名 就是表的 字段名
        Table scoreTable = table_env.fromDataStream(scoreDS,$("f0").as("name"),$("f1").as("score"),$("f2").as("weight"));// 字段名
        table_env.createTemporaryView("scores",scoreTable);
 
        //TODO 2. 注册函数
        table_env.createTemporaryFunction("weightAvg",MyWeightAvg.class);
 
        // TODO 3. 调用自定义函数
        table_env
                .sqlQuery("select name,weightAvg(score,weight) from scores group by name")
                .execute()
                .print();
    }
}

运行结果:

聚合函数的accumulate()方法有三个输入参数。第一个是WeightedAvgAccum类型的累加器;另外两个则是函数调用时输入的字段:要计算的值 ivalue 和 对应的权重 iweight。这里我们并不考虑其它方法的实现,只要有必须的三个方法就可以了。

4.7.5、表聚合函数(Table Aggregate Functions

       用户自定义表聚合函数(UDTAGG)可以把一行或多行数据(也就是一个表)聚合成另一张表,结果表中可以有多行多列。很明显,这就像表函数和聚合函数的结合体,是一个“多对多”的转换。

       自定义表聚合函数需要继承抽象类TableAggregateFunction。TableAggregateFunction的结构和原理与AggregateFunction非常类似,同样有两个泛型参数<T, ACC>,用一个ACC类型的累加器(accumulator)来存储聚合的中间结果。聚合函数中必须实现的三个方法,在TableAggregateFunction中也必须对应实现:

  • createAccumulator()
  •    创建累加器的方法,与AggregateFunction中用法相同。
  • accumulate()
  •    聚合计算的核心方法,与AggregateFunction中用法相同。
  • emitValue()

   所有输入行处理完成后,输出最终计算结果的方法。这个方法对应着AggregateFunction中的getValue()方法;区别在于emitValue没有输出类型,而输入参数有两个:第一个是ACC类型的累加器,第二个则是用于输出数据的“收集器”out,它的类型为Collect<T>。另外,emitValue()在抽象类中也没有定义,无法override,必须手动实现。

       表聚合函数相对比较复杂,它的一个典型应用场景就是TOP-N查询。比如我们希望选出一组数据排序后的前两名,这就是最简单的TOP-2查询。没有现成的系统函数,那么我们就可以自定义一个表聚合函数来实现这个功能。在累加器中应该能够保存当前最大的两个值,每当来一条新数据就在accumulate()方法中进行比较更新,最终在emitValue()中调用两次out.collect()将前两名数据输出。

具体代码如下:

package com.lyh.sql;
 
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.util.Collector;
 
// T: 返回类型(数值,排名) ACC:累加器类型(第一大的数,第二大的数)
public class MyTableAggregate extends TableAggregateFunction<Tuple2<Integer,Integer>,Tuple2<Integer,Integer>> {
 
    @Override
    public Tuple2<Integer, Integer> createAccumulator() {
        return Tuple2.of(0,0);
    }
 
    /**
     *
     * 每来一个数据 调用一次 判断比较大小 更新top2到acc中
     * @param acc 累加器类型
     * @param num 过来的数据
     */
    public void accumulate(Tuple2<Integer,Integer> acc,Integer num){
        if (num > acc.f0){
            // 新来的变第一,旧的第一变第二
            acc.f1 = acc.f0;
            acc.f0 = num;
        }else if (num > acc.f1){
            // 新来的变第二,旧的第二删除
            acc.f1 = num;
        }
    }
 
    /**
     * 输出结果 (数值,排名)
     * @param acc 累加器类型
     * @param out 采集器类型,和输出结果类型一样
     */
    public void emitValue(Tuple2<Integer,Integer> acc, Collector<Tuple2<Integer,Integer>> out){
        if (acc.f0 != 0){
            out.collect(Tuple2.of(acc.f0,1));
        }
        if (acc.f1 != 0){
            out.collect(Tuple2.of(acc.f1,2));
        }
    }
}
package com.lyh.sql;
 
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;
 
public class ScalarFunctionDemo4 {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
        DataStreamSource<Integer> valDS = env.fromElements(3,5,4,7,5,6,1,4,2);
 
        // TODO 1.创建表环境
        StreamTableEnvironment table_env = StreamTableEnvironment.create(env);
        // TODO 流 -> 表
        // 属性名 就是表的 字段名
        Table valueTable = table_env.fromDataStream(valDS,$("value"));// 字段名
        table_env.createTemporaryView("values",valueTable);
 
        //TODO 2. 注册函数
        table_env.createTemporaryFunction("top2",MyTableAggregate.class);
 
        // TODO 3. 调用自定义函数: 只支持 table api
        valueTable
                .flatAggregate(call("top2",$("value")).as("value","rank"))
                .select($("value"),$("rank"))
                .execute()
                .print();
    }
}

运行结果:

+----+-------------+-------------+
| op |       value |        rank |
+----+-------------+-------------+
| +I |           3 |           1 |
| -D |           3 |           1 |
| +I |           5 |           1 |
| +I |           3 |           2 |
| -D |           5 |           1 |
| -D |           3 |           2 |
| +I |           5 |           1 |
| +I |           4 |           2 |
| -D |           5 |           1 |
| -D |           4 |           2 |
| +I |           7 |           1 |
| +I |           5 |           2 |
| -D |           7 |           1 |
| -D |           5 |           2 |
| +I |           7 |           1 |
| +I |           5 |           2 |
| -D |           7 |           1 |
| -D |           5 |           2 |
| +I |           7 |           1 |
| +I |           6 |           2 |
| -D |           7 |           1 |
| -D |           6 |           2 |
| +I |           7 |           1 |
| +I |           6 |           2 |
| -D |           7 |           1 |
| -D |           6 |           2 |
| +I |           7 |           1 |
| +I |           6 |           2 |
| -D |           7 |           1 |
| -D |           6 |           2 |
| +I |           7 |           1 |
| +I |           6 |           2 |
+----+-------------+-------------+
32 rows in set

目前SQL中没有直接使用表聚合函数的方式,所以需要使用Table API的方式来调用。

       这里使用了flatAggregate()方法,它就是专门用来调用表聚合函数的接口。统计num值最大的两个;并将聚合结果的两个字段重命名为value和rank,之后就可以使用select()将它们提取出来了。

总结

这节课用到不少东西:Kafka、Hive、MySQL、Flink,需要注意的地方很多,忘了的东西也很多。这里记录一下:

  • 关闭 hadoop 前 把 yarn里的任务都 kill 掉(尤其是 flink 的 yarn-session)

一些启动命令:

  • 启动 hive:hiveservices.sh start
  • 启动 flink sql:
  • bin/sqlsession -d
  • bin/sql-client embeded -s yarn-session -i sql-client-init.sql

       现在是 2024-01-23 22:13,终于把 Flink 完结了,从这学期的开始,耗时半年断断续续。Flink 是我难以言说的喜欢的一门课,没有缘由,希望接下来可以好好把它弄熟,更希望未来若干年的工作可以都和它打交道。

       永言配命,自求多福,希望未来能有一个好的结果,浅浅期待一下吧 ~

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
5月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
简介:本文整理自阿里云高级技术专家李麟在Flink Forward Asia 2025新加坡站的分享,介绍了Flink 2.1 SQL在实时数据处理与AI融合方面的关键进展,包括AI函数集成、Join优化及未来发展方向,助力构建高效实时AI管道。
922 43
|
5月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
本文整理自阿里云的高级技术专家、Apache Flink PMC 成员李麟老师在 Flink Forward Asia 2025 新加坡[1]站 —— 实时 AI 专场中的分享。将带来关于 Flink 2.1 版本中 SQL 在实时数据处理和 AI 方面进展的话题。
381 0
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
|
6月前
|
SQL 消息中间件 Kafka
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是 Apache Flink 提供的 SQL 引擎,支持流批一体处理,统一操作流数据与批数据,具备高性能、低延迟、丰富数据源支持及标准 SQL 兼容性,适用于实时与离线数据分析。
1042 1
|
SQL 消息中间件 关系型数据库
Apache Doris Flink Connector 24.0.0 版本正式发布
该版本新增了对 Flink 1.20 的支持,并支持通过 Arrow Flight SQL 高速读取 Doris 中数据。
533 21
|
SQL 存储 API
Flink实践:通过Flink SQL进行SFTP文件的读写操作
虽然 Apache Flink 与 SFTP 之间的直接交互存在一定的限制,但通过一些创造性的方法和技术,我们仍然可以有效地实现对 SFTP 文件的读写操作。这既展现了 Flink 在处理复杂数据场景中的强大能力,也体现了软件工程中常见的问题解决思路——即通过现有工具和一定的间接方法来克服技术障碍。通过这种方式,Flink SQL 成为了处理各种数据源,包括 SFTP 文件,在内的强大工具。
462 15
|
11月前
|
SQL 人工智能 关系型数据库
Flink CDC YAML:面向数据集成的 API 设计
本文整理自阿里云智能集团 Flink PMC Member & Committer 徐榜江(雪尽)在 FFA 2024 分论坛的分享,涵盖四大主题:Flink CDC、YAML API、Transform + AI 和 Community。文章详细介绍了 Flink CDC 的发展历程及其优势,特别是 YAML API 的设计与实现,以及如何通过 Transform 和 AI 模型集成提升数据处理能力。最后,分享了社区动态和未来规划,欢迎更多开发者加入开源社区,共同推动 Flink CDC 的发展。
783 12
Flink CDC YAML:面向数据集成的 API 设计
|
11月前
|
SQL 存储 API
Flink Materialized Table:构建流批一体 ETL
本文整理自阿里云智能集团 Apache Flink Committer 刘大龙老师在2024FFA流批一体论坛的分享,涵盖三部分内容:数据工程师用户故事、Materialized Table 构建流批一体 ETL 及 Demo。文章通过案例分析传统 Lambda 架构的挑战,介绍了 Materialized Table 如何简化流批处理,提供统一 API 和声明式 ETL,实现高效的数据处理和维护。最后展示了基于 Flink 和 Paimon 的实际演示,帮助用户更好地理解和应用这一技术。
901 7
Flink Materialized Table:构建流批一体 ETL
|
SQL 大数据 数据处理
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是为应对传统数据处理框架中流批分离的问题而诞生的,它融合了SQL的简洁性和Flink的强大流批处理能力,降低了大数据处理门槛。其核心工作原理包括生成逻辑执行计划、查询优化和构建算子树,确保高效执行。Flink SQL 支持过滤、投影、聚合、连接和窗口等常用算子,实现了流批一体处理,极大提高了开发效率和代码复用性。通过统一的API和语法,Flink SQL 能够灵活应对实时和离线数据分析场景,为企业提供强大的数据处理能力。
2163 27
|
10月前
|
SQL 人工智能 关系型数据库
Flink CDC YAML:面向数据集成的 API 设计
Flink CDC YAML:面向数据集成的 API 设计
497 5
|
10月前
|
SQL 存储 API
Flink Materialized Table:构建流批一体 ETL
Flink Materialized Table:构建流批一体 ETL
219 3

热门文章

最新文章