Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】(4)https://developer.aliyun.com/article/1532339
4.7.3、表函数(Table Functions)
跟标量函数一样,表函数的输入参数也可以是 0个、1个或多个标量值;不同的是,它可以返回任意多行数据。“多行数据”事实上就构成了一个表,所以“表函数”可以认为就是返回一个表的函数,这是一个“一对多”的转换关系。之前我们介绍过的窗口TVF,本质上就是表函数。
类似地,要实现自定义的表函数,需要自定义类来继承抽象类TableFunction,内部必须要实现的也是一个名为 eval 的求值方法。与标量函数不同的是,TableFunction类本身是有一个泛型参数T的,这就是表函数返回数据的类型;而eval()方法没有返回类型,内部也没有return语句,是通过调用collect()方法来发送想要输出的行数据的。
在SQL中调用表函数,需要使用LATERAL TABLE(<TableFunction>)来生成扩展的“侧向表”,然后与原始表进行联结(Join)。这里的Join操作可以是直接做交叉联结(cross join),在FROM后用逗号分隔两个表就可以;也可以是以ON TRUE为条件的左联结(LEFT JOIN)。
下面是表函数的一个具体示例。我们实现了一个分隔字符串的函数SplitFunction,可以将一个字符串转换成(字符串,长度)的二元组。
package com.lyh.sql; import org.apache.flink.table.annotation.DataTypeHint; import org.apache.flink.table.annotation.FunctionHint; import org.apache.flink.table.functions.TableFunction; import org.apache.flink.types.Row; @FunctionHint(output = @DataTypeHint("ROW<word STRING,length INT>")) public class MySplitFunction extends TableFunction<Row> { // 返回是 void ,用 collect 方法输出 public void eval(String str){ for (String word : str.split(" ")) { collect(Row.of(word,word.length())); } } }
package com.lyh.sql; import com.lyh.bean.WaterSensor; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import static org.apache.flink.table.api.Expressions.$; import static org.apache.flink.table.api.Expressions.call; public class ScalarFunctionDemo2 { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> wordsDS = env.fromElements( "hello flink", "hello hadoop", "hello kafka", "hello spark", "hello hive and impala" ); // TODO 1.创建表环境 StreamTableEnvironment table_env = StreamTableEnvironment.create(env); // TODO 流 -> 表 // 属性名 就是表的 字段名 Table sensorTable = table_env.fromDataStream(wordsDS,$("name"));// 字段名 table_env.createTemporaryView("words",sensorTable); //TODO 2. 注册函数 table_env.createTemporaryFunction("splitFunction",MySplitFunction.class); // TODO 3. 调用自定义函数 // 交叉用法 table_env.sqlQuery("select word,length from words,lateral table (splitFunction(name))") // 调用了 sql 的 execute 就不需要 env.execute() .execute() .print(); } }
运行结果:
左联结:
table_env .sqlQuery("select name,word,length from words left join lateral table (splitFunction(name)) on true") .execute() .print();
字段重命名:
.sqlQuery("select name,newWord,newLength from words left join lateral table (splitFunction(name)) as T(newWord,newLength) on true")
这里我们直接将表函数的输出类型定义成了ROW,这就是得到的侧向表中的数据类型;每行数据转换后也只有一行。我们分别用交叉联结和左联结两种方式在SQL中进行了调用,还可以对侧向表的中字段进行重命名。
4.7.4、聚合函数(Aggregate Functions)
用户自定义聚合函数(User Defined AGGregate function,UDAGG)会把一行或多行数据(也就是一个表)聚合成一个标量值。这是一个标准的“多对一”的转换。
聚合函数的概念我们之前已经接触过多次,如SUM()、MAX()、MIN()、AVG()、COUNT()都是常见的系统内置聚合函数。而如果有些需求无法直接调用系统函数解决,我们就必须自定义聚合函数来实现功能了。
自定义聚合函数需要继承抽象类AggregateFunction。AggregateFunction有两个泛型参数<T, ACC>,T表示聚合输出的结果类型,ACC则表示聚合的中间状态类型。
Flink SQL中的聚合函数的工作原理如下:
(1)首先,它需要创建一个累加器(accumulator),用来存储聚合的中间结果。这与DataStream API中的AggregateFunction非常类似,累加器就可以看作是一个聚合状态。调用createAccumulator()方法可以创建一个空的累加器。
(2)对于输入的每一行数据,都会调用accumulate()方法来更新累加器,这是聚合的核心过程。
(3)当所有的数据都处理完之后,通过调用getValue()方法来计算并返回最终的结果。
所以,每个 AggregateFunction 都必须实现以下几个方法:
- createAccumulator()
这是创建累加器的方法。没有输入参数,返回类型为累加器类型ACC。
- accumulate()
这是进行聚合计算的核心方法,每来一行数据都会调用。它的第一个参数是确定的,就是当前的累加器,类型为ACC,表示当前聚合的中间状态;后面的参数则是聚合函数调用时传入的参数,可以有多个,类型也可以不同。这个方法主要是更新聚合状态,所以没有返回类型。需要注意的是,accumulate()与之前的求值方法eval()类似,也是底层架构要求的,必须为public,方法名必须为accumulate,且无法直接override、只能手动实现。
- getValue()
这是得到最终返回结果的方法。输入参数是ACC类型的累加器,输出类型为T。
在遇到复杂类型时,Flink 的类型推导可能会无法得到正确的结果。所以AggregateFunction也可以专门对累加器和返回结果的类型进行声明,这是通过 getAccumulatorType()和getResultType()两个方法来指定的。
AggregateFunction 的所有方法都必须是 公有的(public),不能是静态的(static),而且名字必须跟上面写的完全一样。createAccumulator、getValue、getResultType 以及 getAccumulatorType 这几个方法是在抽象类 AggregateFunction 中定义的,可以override;而其他则都是底层架构约定的方法。
下面举一个具体的示例,我们从学生的分数表ScoreTable中计算每个学生的加权平均分。
package com.lyh.sql; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.table.functions.AggregateFunction; // 泛型 T:返回类型 ACC: 累加器类型<加权总和,权重总和> public class MyWeightAvg extends AggregateFunction<Double, Tuple2<Integer,Integer>> { /** * 计算累加和 * @param acc 累加器的类型 固定写法 * @param score 分神 * @param weight 权重 */ public void accumulate(Tuple2<Integer, Integer> acc,Integer score,Integer weight){ acc.f0 += score*weight; acc.f1 += weight; } @Override public Double getValue(Tuple2<Integer, Integer> integerIntegerTuple2) { return integerIntegerTuple2.f0*1d/integerIntegerTuple2.f1; } @Override public Tuple2<Integer, Integer> createAccumulator() { return Tuple2.of(0,0); } }
package com.lyh.sql; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import static org.apache.flink.table.api.Expressions.$; public class ScalarFunctionDemo3 { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Tuple3<String,Integer,Integer>> scoreDS = env.fromElements( Tuple3.of("燕双鹰",80,3), Tuple3.of("李大喜",90,4), Tuple3.of("李大喜",88,4), Tuple3.of("狄仁杰",95,4), Tuple3.of("狄仁杰",86,4) ); // TODO 1.创建表环境 StreamTableEnvironment table_env = StreamTableEnvironment.create(env); // TODO 流 -> 表 // 属性名 就是表的 字段名 Table scoreTable = table_env.fromDataStream(scoreDS,$("f0").as("name"),$("f1").as("score"),$("f2").as("weight"));// 字段名 table_env.createTemporaryView("scores",scoreTable); //TODO 2. 注册函数 table_env.createTemporaryFunction("weightAvg",MyWeightAvg.class); // TODO 3. 调用自定义函数 table_env .sqlQuery("select name,weightAvg(score,weight) from scores group by name") .execute() .print(); } }
运行结果:
聚合函数的accumulate()方法有三个输入参数。第一个是WeightedAvgAccum类型的累加器;另外两个则是函数调用时输入的字段:要计算的值 ivalue 和 对应的权重 iweight。这里我们并不考虑其它方法的实现,只要有必须的三个方法就可以了。
4.7.5、表聚合函数(Table Aggregate Functions)
用户自定义表聚合函数(UDTAGG)可以把一行或多行数据(也就是一个表)聚合成另一张表,结果表中可以有多行多列。很明显,这就像表函数和聚合函数的结合体,是一个“多对多”的转换。
自定义表聚合函数需要继承抽象类TableAggregateFunction。TableAggregateFunction的结构和原理与AggregateFunction非常类似,同样有两个泛型参数<T, ACC>,用一个ACC类型的累加器(accumulator)来存储聚合的中间结果。聚合函数中必须实现的三个方法,在TableAggregateFunction中也必须对应实现:
- createAccumulator()
- 创建累加器的方法,与AggregateFunction中用法相同。
- accumulate()
- 聚合计算的核心方法,与AggregateFunction中用法相同。
- emitValue()
所有输入行处理完成后,输出最终计算结果的方法。这个方法对应着AggregateFunction中的getValue()方法;区别在于emitValue没有输出类型,而输入参数有两个:第一个是ACC类型的累加器,第二个则是用于输出数据的“收集器”out,它的类型为Collect<T>。另外,emitValue()在抽象类中也没有定义,无法override,必须手动实现。
表聚合函数相对比较复杂,它的一个典型应用场景就是TOP-N查询。比如我们希望选出一组数据排序后的前两名,这就是最简单的TOP-2查询。没有现成的系统函数,那么我们就可以自定义一个表聚合函数来实现这个功能。在累加器中应该能够保存当前最大的两个值,每当来一条新数据就在accumulate()方法中进行比较更新,最终在emitValue()中调用两次out.collect()将前两名数据输出。
具体代码如下:
package com.lyh.sql; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.table.functions.TableAggregateFunction; import org.apache.flink.util.Collector; // T: 返回类型(数值,排名) ACC:累加器类型(第一大的数,第二大的数) public class MyTableAggregate extends TableAggregateFunction<Tuple2<Integer,Integer>,Tuple2<Integer,Integer>> { @Override public Tuple2<Integer, Integer> createAccumulator() { return Tuple2.of(0,0); } /** * * 每来一个数据 调用一次 判断比较大小 更新top2到acc中 * @param acc 累加器类型 * @param num 过来的数据 */ public void accumulate(Tuple2<Integer,Integer> acc,Integer num){ if (num > acc.f0){ // 新来的变第一,旧的第一变第二 acc.f1 = acc.f0; acc.f0 = num; }else if (num > acc.f1){ // 新来的变第二,旧的第二删除 acc.f1 = num; } } /** * 输出结果 (数值,排名) * @param acc 累加器类型 * @param out 采集器类型,和输出结果类型一样 */ public void emitValue(Tuple2<Integer,Integer> acc, Collector<Tuple2<Integer,Integer>> out){ if (acc.f0 != 0){ out.collect(Tuple2.of(acc.f0,1)); } if (acc.f1 != 0){ out.collect(Tuple2.of(acc.f1,2)); } } }
package com.lyh.sql; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import static org.apache.flink.table.api.Expressions.$; import static org.apache.flink.table.api.Expressions.call; public class ScalarFunctionDemo4 { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Integer> valDS = env.fromElements(3,5,4,7,5,6,1,4,2); // TODO 1.创建表环境 StreamTableEnvironment table_env = StreamTableEnvironment.create(env); // TODO 流 -> 表 // 属性名 就是表的 字段名 Table valueTable = table_env.fromDataStream(valDS,$("value"));// 字段名 table_env.createTemporaryView("values",valueTable); //TODO 2. 注册函数 table_env.createTemporaryFunction("top2",MyTableAggregate.class); // TODO 3. 调用自定义函数: 只支持 table api valueTable .flatAggregate(call("top2",$("value")).as("value","rank")) .select($("value"),$("rank")) .execute() .print(); } }
运行结果:
+----+-------------+-------------+ | op | value | rank | +----+-------------+-------------+ | +I | 3 | 1 | | -D | 3 | 1 | | +I | 5 | 1 | | +I | 3 | 2 | | -D | 5 | 1 | | -D | 3 | 2 | | +I | 5 | 1 | | +I | 4 | 2 | | -D | 5 | 1 | | -D | 4 | 2 | | +I | 7 | 1 | | +I | 5 | 2 | | -D | 7 | 1 | | -D | 5 | 2 | | +I | 7 | 1 | | +I | 5 | 2 | | -D | 7 | 1 | | -D | 5 | 2 | | +I | 7 | 1 | | +I | 6 | 2 | | -D | 7 | 1 | | -D | 6 | 2 | | +I | 7 | 1 | | +I | 6 | 2 | | -D | 7 | 1 | | -D | 6 | 2 | | +I | 7 | 1 | | +I | 6 | 2 | | -D | 7 | 1 | | -D | 6 | 2 | | +I | 7 | 1 | | +I | 6 | 2 | +----+-------------+-------------+ 32 rows in set
目前SQL中没有直接使用表聚合函数的方式,所以需要使用Table API的方式来调用。
这里使用了flatAggregate()方法,它就是专门用来调用表聚合函数的接口。统计num值最大的两个;并将聚合结果的两个字段重命名为value和rank,之后就可以使用select()将它们提取出来了。
总结
这节课用到不少东西:Kafka、Hive、MySQL、Flink,需要注意的地方很多,忘了的东西也很多。这里记录一下:
- 关闭 hadoop 前 把 yarn里的任务都 kill 掉(尤其是 flink 的 yarn-session)
一些启动命令:
- 启动 hive:hiveservices.sh start
- 启动 flink sql:
- bin/sqlsession -d
- bin/sql-client embeded -s yarn-session -i sql-client-init.sql
现在是 2024-01-23 22:13,终于把 Flink 完结了,从这学期的开始,耗时半年断断续续。Flink 是我难以言说的喜欢的一门课,没有缘由,希望接下来可以好好把它弄熟,更希望未来若干年的工作可以都和它打交道。
永言配命,自求多福,希望未来能有一个好的结果,浅浅期待一下吧 ~