Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】(5)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】

Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】(4)https://developer.aliyun.com/article/1532339

4.7.3、表函数(Table Functions

       跟标量函数一样,表函数的输入参数也可以是 0个、1个或多个标量值;不同的是,它可以返回任意多行数据。“多行数据”事实上就构成了一个表,所以“表函数”可以认为就是返回一个表的函数,这是一个“一对多”的转换关系。之前我们介绍过的窗口TVF,本质上就是表函数。

       类似地,要实现自定义的表函数,需要自定义类来继承抽象类TableFunction,内部必须要实现的也是一个名为 eval 的求值方法。与标量函数不同的是,TableFunction类本身是有一个泛型参数T的,这就是表函数返回数据的类型;而eval()方法没有返回类型,内部也没有return语句,是通过调用collect()方法来发送想要输出的行数据的。

       在SQL中调用表函数,需要使用LATERAL TABLE(<TableFunction>)来生成扩展的“侧向表”,然后与原始表进行联结(Join)。这里的Join操作可以是直接做交叉联结(cross join),在FROM后用逗号分隔两个表就可以;也可以是以ON TRUE为条件的左联结(LEFT JOIN)。

       下面是表函数的一个具体示例。我们实现了一个分隔字符串的函数SplitFunction,可以将一个字符串转换成(字符串,长度)的二元组。

package com.lyh.sql;
 
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
 
@FunctionHint(output = @DataTypeHint("ROW<word STRING,length INT>"))
public class MySplitFunction extends TableFunction<Row> {
 
    // 返回是 void ,用 collect 方法输出
    public void eval(String str){
        for (String word : str.split(" ")) {
            collect(Row.of(word,word.length()));
        }
    }
}
package com.lyh.sql;
 
import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;
 
public class ScalarFunctionDemo2 {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
        DataStreamSource<String> wordsDS = env.fromElements(
                "hello flink",
                "hello hadoop",
                "hello kafka",
                "hello spark",
                "hello hive and impala"
        );
        
        // TODO 1.创建表环境
        StreamTableEnvironment table_env = StreamTableEnvironment.create(env);
        // TODO 流 -> 表
        // 属性名 就是表的 字段名
        Table sensorTable = table_env.fromDataStream(wordsDS,$("name"));// 字段名
        table_env.createTemporaryView("words",sensorTable);
 
        //TODO 2. 注册函数
        table_env.createTemporaryFunction("splitFunction",MySplitFunction.class);
 
        // TODO 3. 调用自定义函数
        // 交叉用法
        table_env.sqlQuery("select word,length from words,lateral table (splitFunction(name))")
                // 调用了 sql 的 execute 就不需要 env.execute()
                .execute()
                .print();
 
    }
}

运行结果:

左联结:

table_env
     .sqlQuery("select name,word,length from words left join lateral table (splitFunction(name)) on true")
      .execute()
      .print();

字段重命名:

.sqlQuery("select name,newWord,newLength from words left join lateral table (splitFunction(name)) as T(newWord,newLength) on true")

       这里我们直接将表函数的输出类型定义成了ROW,这就是得到的侧向表中的数据类型;每行数据转换后也只有一行。我们分别用交叉联结和左联结两种方式在SQL中进行了调用,还可以对侧向表的中字段进行重命名。

4.7.4、聚合函数(Aggregate Functions

用户自定义聚合函数(User Defined AGGregate function,UDAGG)会把一行或多行数据(也就是一个表)聚合成一个标量值。这是一个标准的“多对一”的转换。

聚合函数的概念我们之前已经接触过多次,如SUM()、MAX()、MIN()、AVG()、COUNT()都是常见的系统内置聚合函数。而如果有些需求无法直接调用系统函数解决,我们就必须自定义聚合函数来实现功能了。

自定义聚合函数需要继承抽象类AggregateFunction。AggregateFunction有两个泛型参数<T, ACC>,T表示聚合输出的结果类型,ACC则表示聚合的中间状态类型。

Flink SQL中的聚合函数的工作原理如下:

(1)首先,它需要创建一个累加器(accumulator),用来存储聚合的中间结果。这与DataStream API中的AggregateFunction非常类似,累加器就可以看作是一个聚合状态。调用createAccumulator()方法可以创建一个空的累加器。

(2)对于输入的每一行数据,都会调用accumulate()方法来更新累加器,这是聚合的核心过程。

(3)当所有的数据都处理完之后,通过调用getValue()方法来计算并返回最终的结果。

所以,每个 AggregateFunction 都必须实现以下几个方法:

  1. createAccumulator()

这是创建累加器的方法。没有输入参数,返回类型为累加器类型ACC。

  1. accumulate()

这是进行聚合计算的核心方法,每来一行数据都会调用。它的第一个参数是确定的,就是当前的累加器,类型为ACC,表示当前聚合的中间状态;后面的参数则是聚合函数调用时传入的参数,可以有多个,类型也可以不同。这个方法主要是更新聚合状态,所以没有返回类型。需要注意的是,accumulate()与之前的求值方法eval()类似,也是底层架构要求的,必须为public,方法名必须为accumulate,且无法直接override、只能手动实现。

  1. getValue()

这是得到最终返回结果的方法。输入参数是ACC类型的累加器,输出类型为T。

在遇到复杂类型时,Flink 的类型推导可能会无法得到正确的结果。所以AggregateFunction也可以专门对累加器和返回结果的类型进行声明,这是通过 getAccumulatorType()和getResultType()两个方法来指定的。

AggregateFunction 的所有方法都必须是 公有的(public),不能是静态的(static),而且名字必须跟上面写的完全一样。createAccumulator、getValue、getResultType 以及 getAccumulatorType 这几个方法是在抽象类 AggregateFunction 中定义的,可以override;而其他则都是底层架构约定的方法。

下面举一个具体的示例,我们从学生的分数表ScoreTable中计算每个学生的加权平均分。

package com.lyh.sql;
 
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.functions.AggregateFunction;
 
// 泛型 T:返回类型 ACC: 累加器类型<加权总和,权重总和>
public class MyWeightAvg extends AggregateFunction<Double, Tuple2<Integer,Integer>> {
 
    /**
     * 计算累加和
     * @param acc 累加器的类型 固定写法
     * @param score 分神
     * @param weight 权重
     */
    public void accumulate(Tuple2<Integer, Integer> acc,Integer score,Integer weight){
        acc.f0 += score*weight;
        acc.f1 += weight;
    }
 
    @Override
    public Double getValue(Tuple2<Integer, Integer> integerIntegerTuple2) {
        return integerIntegerTuple2.f0*1d/integerIntegerTuple2.f1;
    }
 
    @Override
    public Tuple2<Integer, Integer> createAccumulator() {
        return Tuple2.of(0,0);
    }
}
package com.lyh.sql;
 
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 
import static org.apache.flink.table.api.Expressions.$;
 
public class ScalarFunctionDemo3 {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
        DataStreamSource<Tuple3<String,Integer,Integer>> scoreDS = env.fromElements(
                Tuple3.of("燕双鹰",80,3),
                Tuple3.of("李大喜",90,4),
                Tuple3.of("李大喜",88,4),
                Tuple3.of("狄仁杰",95,4),
                Tuple3.of("狄仁杰",86,4)
        );
 
        // TODO 1.创建表环境
        StreamTableEnvironment table_env = StreamTableEnvironment.create(env);
        // TODO 流 -> 表
        // 属性名 就是表的 字段名
        Table scoreTable = table_env.fromDataStream(scoreDS,$("f0").as("name"),$("f1").as("score"),$("f2").as("weight"));// 字段名
        table_env.createTemporaryView("scores",scoreTable);
 
        //TODO 2. 注册函数
        table_env.createTemporaryFunction("weightAvg",MyWeightAvg.class);
 
        // TODO 3. 调用自定义函数
        table_env
                .sqlQuery("select name,weightAvg(score,weight) from scores group by name")
                .execute()
                .print();
    }
}

运行结果:

聚合函数的accumulate()方法有三个输入参数。第一个是WeightedAvgAccum类型的累加器;另外两个则是函数调用时输入的字段:要计算的值 ivalue 和 对应的权重 iweight。这里我们并不考虑其它方法的实现,只要有必须的三个方法就可以了。

4.7.5、表聚合函数(Table Aggregate Functions

       用户自定义表聚合函数(UDTAGG)可以把一行或多行数据(也就是一个表)聚合成另一张表,结果表中可以有多行多列。很明显,这就像表函数和聚合函数的结合体,是一个“多对多”的转换。

       自定义表聚合函数需要继承抽象类TableAggregateFunction。TableAggregateFunction的结构和原理与AggregateFunction非常类似,同样有两个泛型参数<T, ACC>,用一个ACC类型的累加器(accumulator)来存储聚合的中间结果。聚合函数中必须实现的三个方法,在TableAggregateFunction中也必须对应实现:

  • createAccumulator()
  •    创建累加器的方法,与AggregateFunction中用法相同。
  • accumulate()
  •    聚合计算的核心方法,与AggregateFunction中用法相同。
  • emitValue()

   所有输入行处理完成后,输出最终计算结果的方法。这个方法对应着AggregateFunction中的getValue()方法;区别在于emitValue没有输出类型,而输入参数有两个:第一个是ACC类型的累加器,第二个则是用于输出数据的“收集器”out,它的类型为Collect<T>。另外,emitValue()在抽象类中也没有定义,无法override,必须手动实现。

       表聚合函数相对比较复杂,它的一个典型应用场景就是TOP-N查询。比如我们希望选出一组数据排序后的前两名,这就是最简单的TOP-2查询。没有现成的系统函数,那么我们就可以自定义一个表聚合函数来实现这个功能。在累加器中应该能够保存当前最大的两个值,每当来一条新数据就在accumulate()方法中进行比较更新,最终在emitValue()中调用两次out.collect()将前两名数据输出。

具体代码如下:

package com.lyh.sql;
 
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.util.Collector;
 
// T: 返回类型(数值,排名) ACC:累加器类型(第一大的数,第二大的数)
public class MyTableAggregate extends TableAggregateFunction<Tuple2<Integer,Integer>,Tuple2<Integer,Integer>> {
 
    @Override
    public Tuple2<Integer, Integer> createAccumulator() {
        return Tuple2.of(0,0);
    }
 
    /**
     *
     * 每来一个数据 调用一次 判断比较大小 更新top2到acc中
     * @param acc 累加器类型
     * @param num 过来的数据
     */
    public void accumulate(Tuple2<Integer,Integer> acc,Integer num){
        if (num > acc.f0){
            // 新来的变第一,旧的第一变第二
            acc.f1 = acc.f0;
            acc.f0 = num;
        }else if (num > acc.f1){
            // 新来的变第二,旧的第二删除
            acc.f1 = num;
        }
    }
 
    /**
     * 输出结果 (数值,排名)
     * @param acc 累加器类型
     * @param out 采集器类型,和输出结果类型一样
     */
    public void emitValue(Tuple2<Integer,Integer> acc, Collector<Tuple2<Integer,Integer>> out){
        if (acc.f0 != 0){
            out.collect(Tuple2.of(acc.f0,1));
        }
        if (acc.f1 != 0){
            out.collect(Tuple2.of(acc.f1,2));
        }
    }
}
package com.lyh.sql;
 
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;
 
public class ScalarFunctionDemo4 {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
        DataStreamSource<Integer> valDS = env.fromElements(3,5,4,7,5,6,1,4,2);
 
        // TODO 1.创建表环境
        StreamTableEnvironment table_env = StreamTableEnvironment.create(env);
        // TODO 流 -> 表
        // 属性名 就是表的 字段名
        Table valueTable = table_env.fromDataStream(valDS,$("value"));// 字段名
        table_env.createTemporaryView("values",valueTable);
 
        //TODO 2. 注册函数
        table_env.createTemporaryFunction("top2",MyTableAggregate.class);
 
        // TODO 3. 调用自定义函数: 只支持 table api
        valueTable
                .flatAggregate(call("top2",$("value")).as("value","rank"))
                .select($("value"),$("rank"))
                .execute()
                .print();
    }
}

运行结果:

+----+-------------+-------------+
| op |       value |        rank |
+----+-------------+-------------+
| +I |           3 |           1 |
| -D |           3 |           1 |
| +I |           5 |           1 |
| +I |           3 |           2 |
| -D |           5 |           1 |
| -D |           3 |           2 |
| +I |           5 |           1 |
| +I |           4 |           2 |
| -D |           5 |           1 |
| -D |           4 |           2 |
| +I |           7 |           1 |
| +I |           5 |           2 |
| -D |           7 |           1 |
| -D |           5 |           2 |
| +I |           7 |           1 |
| +I |           5 |           2 |
| -D |           7 |           1 |
| -D |           5 |           2 |
| +I |           7 |           1 |
| +I |           6 |           2 |
| -D |           7 |           1 |
| -D |           6 |           2 |
| +I |           7 |           1 |
| +I |           6 |           2 |
| -D |           7 |           1 |
| -D |           6 |           2 |
| +I |           7 |           1 |
| +I |           6 |           2 |
| -D |           7 |           1 |
| -D |           6 |           2 |
| +I |           7 |           1 |
| +I |           6 |           2 |
+----+-------------+-------------+
32 rows in set

目前SQL中没有直接使用表聚合函数的方式,所以需要使用Table API的方式来调用。

       这里使用了flatAggregate()方法,它就是专门用来调用表聚合函数的接口。统计num值最大的两个;并将聚合结果的两个字段重命名为value和rank,之后就可以使用select()将它们提取出来了。

总结

这节课用到不少东西:Kafka、Hive、MySQL、Flink,需要注意的地方很多,忘了的东西也很多。这里记录一下:

  • 关闭 hadoop 前 把 yarn里的任务都 kill 掉(尤其是 flink 的 yarn-session)

一些启动命令:

  • 启动 hive:hiveservices.sh start
  • 启动 flink sql:
  • bin/sqlsession -d
  • bin/sql-client embeded -s yarn-session -i sql-client-init.sql

       现在是 2024-01-23 22:13,终于把 Flink 完结了,从这学期的开始,耗时半年断断续续。Flink 是我难以言说的喜欢的一门课,没有缘由,希望接下来可以好好把它弄熟,更希望未来若干年的工作可以都和它打交道。

       永言配命,自求多福,希望未来能有一个好的结果,浅浅期待一下吧 ~

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
1天前
|
SQL 分布式计算 测试技术
概述Flink API中的4个层次
【7月更文挑战第14天】Flink的API分为4个层次:核心底层API(如ProcessFunction)、DataStream/DataSet API、Table API和SQL。
|
29天前
|
SQL 关系型数据库 API
实时计算 Flink版产品使用问题之如何使用stream api
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
28天前
|
Kubernetes Oracle 关系型数据库
实时计算 Flink版操作报错合集之用dinky在k8s上提交作业,会报错:Caused by: org.apache.flink.table.api.ValidationException:,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
140 0
|
28天前
|
SQL 关系型数据库 数据库
实时计算 Flink版操作报错合集之在本地执行代码没有问题,但是在线执行sql命令就会报错,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
100 0
|
28天前
|
SQL 关系型数据库 MySQL
实时计算 Flink版操作报错合集之在执行SQL语句时遇到了类找不到,该怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
104 0
|
8月前
|
SQL 关系型数据库 MySQL
Flink教程(16)- Flink Table与SQL
Flink教程(16)- Flink Table与SQL
209 0
|
2月前
|
SQL Apache 流计算
Flink table&SQL 的使用
Flink table&SQL 的使用
40 0
|
SQL IDE Java
【Flink】(十)Flink Table API 和 Flink SQL 入门
【Flink】(十)Flink Table API 和 Flink SQL 入门
286 0
|
SQL API 流计算
Flink-Table-&-SQL
简介 Apache Flink具有两个关系API - 表API和SQL - 用于统一流和批处理。Table API是Scala和Java的语言集成查询API,允许以非常直观的方式组合来自关系运算符的查询,Table API和SQL接口彼此紧密集成,以及Flink的DataStream和DataSet API。
1166 0
|
28天前
|
消息中间件 Java 关系型数据库
实时计算 Flink版操作报错合集之从 PostgreSQL 读取数据并写入 Kafka 时,遇到 "initial slot snapshot too large" 的错误,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
923 0