FlinkSQL函数

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 自定义函数分类函数基本使用UDF实现要点UDTF, UDAF, UDATF 实现要点

内置函数


Flink Table API/SQL提供了⼤量的内置函数:

https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/functions/systemfunctions/


自定义函数


分类

说到UDF函数,⽤过HiveSQL的⼈会想到UDF、UDAF、UDTF,在Flink Table API/SQL中没有可以提这⼏个概念


Flink Table API/SQL提供如下⼏种函数:


函数类型

对标

说明

输入

输出

Scalar Functions(标量 函数

UDF

一进一出

0个/1个/多个标量值

1个标量值

Table functions

UDTF

一进多出(炸裂)

输入1个/多个标量值

输出任意⾏(每⼀⾏ 可以是多个字段)

Aggregate functions

UDAF

多进一出

多⾏的标量 值

输出1个标量值

Table aggregate functions

UDATF

多进多出(先聚合后炸裂),思考下window top N 场景



Async table functions

异步表函数,是用于table source 执行lookup 操作的特殊函数



注意:⼏进⼏出说的是⾏,不是列


注册和使用UDF



package com.blink.sb.udfs.udf;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import static org.apache.flink.table.api.Expressions.*;
import static org.apache.flink.table.api.Expressions.row;
/**
 * 标量函数使用
 */
public class FlinkBaseScalarFunction {
    public static void main(String[] args) {
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .build();
        TableEnvironment tEnv = TableEnvironment.create(settings);
        Table userinfo = tEnv.fromValues(
                DataTypes.ROW(
                        DataTypes.FIELD("id", DataTypes.DECIMAL(10, 2)),
                        DataTypes.FIELD("name", DataTypes.STRING()),
                        DataTypes.FIELD("introduction", DataTypes.STRING())
                ),
                row(1, "zhangsan","In my spare time, I like to do anything relating to English such as listening to English songs, watching English movies or TV programs, or even attending the activities held by some English clubs or institutes. I used to go abroad for a short- term English study. During that time, I learned a lot of daily life English and saw a lot of different things. I think language is very interesting. I could express one substance by using different sounds. So I wish I could study and read more English literatures and enlarge my knowledge"),
                row(2L, "lisi","In my spare time, I like to do anything relating to English such as listening to English songs, watching English movies or TV programs, or even attending the activities held by some English clubs or institutes. I used to go abroad for a short- term English study. During that time, I learned a lot of daily life English and saw a lot of different things. I think language is very interesting. I could express one substance by using different sounds. So I wish I could study and read more English literatures and enlarge my knowledge")
        ).select($("id"), $("name"),$("introduction"));
        tEnv.createTemporaryView("users",userinfo);
        // 调用方式1:以call函数内联方式调用(不需要注册)
        tEnv.from("users").select($("id"), $("name"),call(SubstringFunction.class, $("introduction"), 5, 13))
                .execute()
                .print();
        //调用方式2:先注册在通过注册的名字调用
        //注册函数
        tEnv.createTemporarySystemFunction("SubstringFunction", SubstringFunction.class);
        // Table API使用:使用call函数调用已注册的UDF
        tEnv.from("users").select($("id"), $("name"),call(
                "SubstringFunction",
                $("introduction"), 5, 13
            )
        )
                .execute()
                .print();
        // SQL使用
        Table result = tEnv.sqlQuery("SELECT id,name,SubstringFunction(introduction, 5, 13) FROM users");
        result.printSchema();
        result.execute().print();
    }
    /**
     * 最简单的标量函数:
     */
    public static class SubstringFunction extends ScalarFunction {
        public String eval(String s, Integer begin, Integer end) {
            return s.substring(begin, end);
        }
    }
}


UDF实现要点


基类

UDF需要继承对应的基类,例如ScalarFunction(该类必须声明为公共、⾮抽象、全局可访问的。 因此,不允许使 ⽤⾮静态内部类或匿名类; 必须有默认构造⽅法(因为Flink需要实例化并注册到catalog中)



eval方法实现


必须提供公共的、有明确定义的参数的eval方法(可以重载,可变参数,继承)


publicstaticclassSumFunctionextendsScalarFunction {
publicIntegereval(Integera, Integerb) {
returna+b;
 }
publicIntegereval(Stringa, Stringb) {
returnInteger.valueOf(a) +Integer.valueOf(b);
 }
publicIntegereval(Double... d) {
doubleresult=0;
for (doublevalue : d)
result+=value;
return (int) result;
 }
}


类型推断


一般情况下Flink可以⾃动推断eval⽅法返回值和参数的类型,复杂情况下可以⽤@DataTypeHint and @FunctionHint两个数据类型提示注解来辅助⾃动类型推断


/具有重载eval⽅法的UDF函数publicstaticclassOverloadedFunctionextendsScalarFunction {
// 没有hintpublicLongeval(longa, longb) {
returna+b;
 }
// 定义DECIMAL的精度public@DataTypeHint("DECIMAL(12, 3)") BigDecimaleval(doublea, doubleb) {
returnBigDecimal.valueOf(a+b);
 }
// 定义嵌套数据类型@DataTypeHint("ROW<s STRING, t TIMESTAMP_LTZ(3)>")
publicRoweval(inti) {
returnRow.of(String.valueOf(i), Instant.ofEpochSecond(i));
 }
//允许任意类型输⼊和⾃定义序列化输出@DataTypeHint(value="RAW", bridgedTo=ByteBuffer.class)
publicByteBuffereval(@DataTypeHint(inputGroup=InputGroup.ANY) Objecto) {
returnMyUtils.serializeToByteBuffer(o);
 }
}
// 具有重载eval⽅法的UDF函数// 全局定义输出类型(返回类型)@FunctionHint(output=@DataTypeHint("ROW<s STRING, i INT>"))
publicstaticclassOverloadedFunctionextendsTableFunction<Row> {
publicvoideval(inta, intb) {
collect(Row.of("Sum", a+b));
 }
publicvoideval() {
collect(Row.of("Empty args", -1));
 }
}



/将类型推断与eval⽅法分离, 类型推断完全由函数提示决定


@FunctionHint(
input= {@DataTypeHint("INT"), @DataTypeHint("INT")},
output=@DataTypeHint("INT")
)
@FunctionHint(
input= {@DataTypeHint("BIGINT"), @DataTypeHint("BIGINT")},
output=@DataTypeHint("BIGINT")
)
@FunctionHint(
input= {},
output=@DataTypeHint("BOOLEAN")
)
publicstaticclassOverloadedFunctionextendsTableFunction<Object> {
publicvoideval(Object... o) {
if (o.length==0) {
collect(false);
 }
collect(o[0]);
 }
}


⾃定义类型推断



⼤多数情况下Flink可以⾃⾏推断类型,⼤不了⽤Hint给与提示,特殊情况下需要⾃定义推断类型,覆盖 getTypeInference⽅法即可


publicstaticclassLiteralFunctionextendsScalarFunction {
publicObjecteval(Strings, Stringtype) {
switch (type) {
case"INT":
returnInteger.valueOf(s);
case"DOUBLE":
returnDouble.valueOf(s);
case"STRING":
default:
returns;
 }
 }
//⼀旦覆盖了getTypeInference⽅法,基于反射的类型推断会⾃动被禁⽤,并替换为getTypeInference⽅法的逻辑@OverridepublicTypeInferencegetTypeInference(DataTypeFactorytypeFactory) {
returnTypeInference.newBuilder()
//指定参数类型 .typedArguments(DataTypes.STRING(), DataTypes.STRING())
// 为函数的结果数据类型指定策略 .outputTypeStrategy(callContext-> {
if (!callContext.isArgumentLiteral(1) ||callContext.isArgumentNull(1)) {
throwcallContext.newValidationError("Literal expected for secondargument."); }
// return a data type based on a literalfinalStringliteral=callContext.getArgumentValue(1,
String.class).orElse("STRING");
switch (literal) {
case"INT":
returnOptional.of(DataTypes.INT().notNull());
case"DOUBLE":
returnOptional.of(DataTypes.DOUBLE().notNull());
case"STRING":
default:
returnOptional.of(DataTypes.STRING());
 }
 })
 .build();
 }
}


使用运行时

UDF基类的open、close⽅法可以被覆盖,分别⽤于⾃定义UDF初始化和清理逻辑。 在open⽅法中,提供FunctionContext参数,通过它可以获取Runtime环境的各种信息


方法

作用

getMetricGroup()

获取当前⼦任务的metricGroup

getCachedFile(name)

获取分布式缓存⽂件的本地临时⽂件副本

getJobParameter(name, defaultValue)

获取job参数

getExternalResourceInfos(resourceName)

获取外部资源信息

publicstaticclassHashCodeFunctionextendsScalarFunction {
privateintfactor=0;
@Overridepublicvoidopen(FunctionContextcontext) throwsException {
factor=Integer.parseInt(context.getJobParameter("hashcode_factor", "12"));
 }
publicinteval(Strings) {
returns.hashCode() *factor;
 }
}


UDF


packagecom.blink.sb.udfs.udf;
importorg.apache.flink.table.annotation.DataTypeHint;
importorg.apache.flink.table.api.DataTypes;
importorg.apache.flink.table.api.EnvironmentSettings;
importorg.apache.flink.table.api.Table;
importorg.apache.flink.table.api.TableEnvironment;
importorg.apache.flink.table.functions.ScalarFunction;
importstaticorg.apache.flink.table.api.Expressions.*;
importstaticorg.apache.flink.table.api.Expressions.row;
/*** 标量函数使用*/publicclassFlinkBaseScalarFunction {
publicstaticvoidmain(String[] args) {
EnvironmentSettingssettings=EnvironmentSettings                .newInstance()
                .build();
TableEnvironmenttEnv=TableEnvironment.create(settings);
Tableuserinfo=tEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.DECIMAL(10, 2)),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("introduction", DataTypes.STRING())
                ),
row(1, "zhangsan","In my spare time, I like to do anything relating to English such as listening to English songs, watching English movies or TV programs, or even attending the activities held by some English clubs or institutes. I used to go abroad for a short- term English study. During that time, I learned a lot of daily life English and saw a lot of different things. I think language is very interesting. I could express one substance by using different sounds. So I wish I could study and read more English literatures and enlarge my knowledge"),
row(2L, "lisi","In my spare time, I like to do anything relating to English such as listening to English songs, watching English movies or TV programs, or even attending the activities held by some English clubs or institutes. I used to go abroad for a short- term English study. During that time, I learned a lot of daily life English and saw a lot of different things. I think language is very interesting. I could express one substance by using different sounds. So I wish I could study and read more English literatures and enlarge my knowledge")
        ).select($("id"), $("name"),$("introduction"));
tEnv.createTemporaryView("users",userinfo);
// 调用方式1:以call函数内联方式调用(不需要注册)tEnv.from("users").select($("id"), $("name"),call(SubstringFunction.class, $("introduction"), 5, 13))
                .execute()
                .print();
//调用方式2:先注册在通过注册的名字调用//注册函数tEnv.createTemporarySystemFunction("SubstringFunction", SubstringFunction.class);
// Table API使用:使用call函数调用已注册的UDFtEnv.from("users").select($("id"), $("name"),call(
"SubstringFunction",
$("introduction"), 5, 13            )
        )
                .execute()
                .print();
// SQL使用Tableresult=tEnv.sqlQuery("SELECT id,name,SubstringFunction(introduction, 5, 13) FROM users");
result.printSchema();
result.execute().print();
    }
/*** 最简单的标量函数:*/publicstaticclassSubstringFunctionextendsScalarFunction {
publicStringeval(Strings, Integerbegin, Integerend) {
returns.substring(begin, end);
        }
    }
}


参数化标量函数

packagecom.blink.sb.udfs.udf;
importorg.apache.flink.table.api.DataTypes;
importorg.apache.flink.table.api.EnvironmentSettings;
importorg.apache.flink.table.api.Table;
importorg.apache.flink.table.api.TableEnvironment;
importorg.apache.flink.table.functions.ScalarFunction;
importstaticorg.apache.flink.table.api.Expressions.*;
importstaticorg.apache.flink.table.api.Expressions.row;
/*** 标量函数使用*/publicclassFlinkParameterizableScalarFunction {
publicstaticvoidmain(String[] args) {
EnvironmentSettingssettings=EnvironmentSettings                .newInstance()
                .build();
TableEnvironmenttEnv=TableEnvironment.create(settings);
Tableuserinfo=tEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.DECIMAL(10, 2)),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("introduction", DataTypes.STRING())
                ),
row(1, "zhangsan","In my spare time, I like to do anything relating to English such as listening to English songs, watching English movies or TV programs, or even attending the activities held by some English clubs or institutes. I used to go abroad for a short- term English study. During that time, I learned a lot of daily life English and saw a lot of different things. I think language is very interesting. I could express one substance by using different sounds. So I wish I could study and read more English literatures and enlarge my knowledge"),
row(2L, "lisi","In my spare time, I like to do anything relating to English such as listening to English songs, watching English movies or TV programs, or even attending the activities held by some English clubs or institutes. I used to go abroad for a short- term English study. During that time, I learned a lot of daily life English and saw a lot of different things. I think language is very interesting. I could express one substance by using different sounds. So I wish I could study and read more English literatures and enlarge my knowledge")
        ).select($("id"), $("name"),$("introduction"));
tEnv.createTemporaryView("users",userinfo);
// 调用方式1:以call函数内联方式调用(不需要注册)//tEnv.from("MyTable").select(call(SubstringFunction.class, $("myField"), 5, 13));tEnv.from("users").select(call(newSubstringFunction(true), $("introduction"), 5, 13))
                .execute()
                .print();
//调用方式2:先注册在通过注册的名字调用//注册函数//tEnv.createTemporarySystemFunction("SubstringFunction", SubstringFunction.class);tEnv.createTemporarySystemFunction("SubstringFunction", newSubstringFunction(true));
// Table API使用:使用call函数调用已注册的UDFtEnv.from("users").select(call("SubstringFunction", $("introduction"), 5, 13))
                .execute()
                .print();
// SQL使用tEnv.sqlQuery("SELECT SubstringFunction(introduction, 5, 13) FROM users")
                .execute()
                .print();
    }
/*** 参数化标量函数*/publicstaticclassSubstringFunctionextendsScalarFunction {
privatebooleanendInclusive;
publicSubstringFunction(booleanendInclusive) {
this.endInclusive=endInclusive;
        }
publicStringeval(Strings, Integerbegin, Integerend) {
returns.substring(begin, endInclusive?end+1 : end);
        }
    }
}

通配符标量函数


packagecom.blink.sb.udfs.udf;
importorg.apache.flink.table.annotation.DataTypeHint;
importorg.apache.flink.table.annotation.InputGroup;
importorg.apache.flink.table.api.DataTypes;
importorg.apache.flink.table.api.EnvironmentSettings;
importorg.apache.flink.table.api.Table;
importorg.apache.flink.table.api.TableEnvironment;
importorg.apache.flink.table.functions.ScalarFunction;
importjava.util.Arrays;
importjava.util.stream.Collectors;
importstaticorg.apache.flink.table.api.Expressions.*;
/*** 标量函数使用*/publicclassFlinkWildcardScalarFunction {
publicstaticvoidmain(String[] args) {
EnvironmentSettingssettings=EnvironmentSettings                .newInstance()
                .build();
TableEnvironmenttEnv=TableEnvironment.create(settings);
Tableuserinfo=tEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.DECIMAL(10, 2)),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("age", DataTypes.INT())
                ),
row(1, "zhangsan",23),
row(2L, "lisi",18)
        ).select($("id"), $("name"),$("age"));
tEnv.createTemporaryView("users",userinfo);
// 调用方式1:以call函数内联方式调用(不需要注册)tEnv.from("users").select(call(MyConcatFunction.class, $("*")))
                .execute()
                .print();
//调用方式2:先注册在通过注册的名字调用//注册函数//tEnv.createTemporarySystemFunction("SubstringFunction", SubstringFunction.class);tEnv.createTemporarySystemFunction("MyConcatFunction", newMyConcatFunction());
// Table API使用:使用call函数调用已注册的UDFtEnv.from("users").select(call("MyConcatFunction", $("id"), $("name")))
                .execute()
                .print();
// SQL使用(注意:通配符参数类型UDF函数,Flink SQL中不能使用,下面两段会报错)tEnv.sqlQuery("SELECT SubstringFunction(*) FROM users")
                .execute()
                .print();
tEnv.sqlQuery("SELECT SubstringFunction(id,name) FROM users")
                .execute()
                .print();
    }
/*** 支持通配符的标量函数*/publicstaticclassMyConcatFunctionextendsScalarFunction {
publicStringeval(@DataTypeHint(inputGroup=InputGroup.ANY) Object... fields) {
returnArrays.stream(fields)
                    .map(Object::toString)
                    .collect(Collectors.joining(","));
        }
    }
}

UDTF


  • Table functions(表函数)⼀进多出(炸裂),继承TableFunction,提供⽆返回值的eval⽅法,使⽤collect来输 出。


  • Table functions的返回值是⼀个表,需要跟原来的表join才能得到最终结果,因此要⽤到侧写表(不明⽩的可 以研究下LATERAL TABLE)


现有需求如下:


学⽣成绩在⼀个字段⾥:1zhangsanChinese:90,Math:74,English:100需要按照学科拆分为多⾏记录:
1zhangsanChinese901zhangsanMath741zhangsanEnglish100


packagecom.blink.sb.udfs.udtf;
importorg.apache.flink.table.annotation.DataTypeHint;
importorg.apache.flink.table.annotation.FunctionHint;
importorg.apache.flink.table.api.DataTypes;
importorg.apache.flink.table.api.EnvironmentSettings;
importorg.apache.flink.table.api.Table;
importorg.apache.flink.table.api.TableEnvironment;
importorg.apache.flink.table.functions.TableFunction;
importorg.apache.flink.types.Row;
importstaticorg.apache.flink.table.api.Expressions.*;
/*** Table Functions*/publicclassFlinkTableFunction {
publicstaticvoidmain(String[] args) {
EnvironmentSettingssettings=EnvironmentSettings                .newInstance()
                .build();
TableEnvironmenttEnv=TableEnvironment.create(settings);
Tablescores=tEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.DECIMAL(10, 2)),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("scores", DataTypes.STRING())
                ),
row(1, "zhangsan","Chinese:90,Math:74,English:100"),
row(2L, "lisi","Chinese:86,Math:99,English:92")
        ).select($("id"), $("name"),$("scores"));
tEnv.createTemporaryView("scoresTable",scores);
//注册函数tEnv.createTemporarySystemFunction("ScoresSplitFunction", newScoresSplitFunction());
// Table API使用:使用call函数调用已注册的UDF//这样直接使用是不可以的//        tEnv.from("scoresTable")//                .select($("id"), $("name"),call("ScoresSplitFunction", $("scores")))//                .execute()//                .print();//必须跟joinLateral(内连接)或者leftOuterJoinLateral(左外连接)搭配使用tEnv.from("scoresTable")
                .joinLateral(call(ScoresSplitFunction.class, $("scores"))
                        .as("subject","score")
                )
                .select($("id"), $("name"),$("subject"),$("score"))
                .execute()
                .print();
tEnv.from("scoresTable")
                .leftOuterJoinLateral(call(ScoresSplitFunction.class, $("scores"))
                        .as("subject","score")
                )
                .select($("id"), $("name"),$("subject"),$("score"))
                .execute()
                .print();
// SQL使用tEnv.sqlQuery("SELECT id, name, subject,score "+//诈裂出的字段得跟UDF中定义的字段名一样"FROM scoresTable,"+//注意有个逗号"LATERAL TABLE(ScoresSplitFunction(scores))")
                .execute()
                .print();
tEnv.sqlQuery("SELECT id, name, subject1,score1 "+"FROM scoresTable "+"LEFT JOIN LATERAL TABLE(ScoresSplitFunction(scores)) AS sc(subject1, score1) ON TRUE")
                .execute()
                .print();
    }
/*** 学生成绩在一个字段里:*      1 zhangsan    Chinese:90,Math:74,English:100* 需要按照成绩拆分为多行记录:*      1 zhangsan Chinese 90*      1 zhangsan Math    74*      1 zhangsan English 100*/@FunctionHint(output=@DataTypeHint("ROW<subject STRING, score INT>"))
publicstaticclassScoresSplitFunctionextendsTableFunction {
publicvoideval(Stringstr) {
for (Strings : str.split(",")) {
String[] arr=s.split(":");
//使用collect方法发出新的行collect(Row.of(arr[0], Integer.parseInt(arr[1])));
            }
        }
    }
}



UDAF


Aggregate functions(聚合函数)将多⾏的标量值映射到新的标量值(多进⼀出),聚合函数⽤到了累加器,下图是聚 合过程:


image.png


  • 继承AggregateFunction
  • 必须覆盖createAccumulator(初始化时创建空的累加器)和getValue (结束时获取累加结果)
  • 提供accumulate⽅法(每来一条数据调用accumulate方法进行累加)
  • retract⽅法在OVER windows上才是必须的
  • merge有界聚合以及会话窗⼝和滑动窗⼝聚合都需要(对性能优化也有好处)



需求:计算学⽣的各学科平均分

输⼊:+--------------+--------------------------------+--------------------------------+-------------+|id|name|subject|score|+--------------+--------------------------------+--------------------------------+-------------+|1.00|zhangsan|Chinese|90||1.00|zhangsan|Math|74||1.00|zhangsan|English|100||2.00|lisi|Chinese|86||2.00|lisi|Math|99||2.00|lisi|English|92|+--------------+--------------------------------+--------------------------------+-------------+输出:+--------------------------------+-----------------+|subject|score_avg|+--------------------------------+-----------------+|Chinese|86.5||Math|74||English|96|+--------------------------------+-----------------+
packagecom.blink.sb.udfs.udaf;
importlombok.*;
importorg.apache.flink.table.api.DataTypes;
importorg.apache.flink.table.api.EnvironmentSettings;
importorg.apache.flink.table.api.Table;
importorg.apache.flink.table.api.TableEnvironment;
importorg.apache.flink.table.functions.AggregateFunction;
importstaticorg.apache.flink.table.api.Expressions.*;
/*** Table Functions*/publicclassFlinkAggFunction {
publicstaticvoidmain(String[] args) {
EnvironmentSettingssettings=EnvironmentSettings                .newInstance()
                .build();
TableEnvironmenttEnv=TableEnvironment.create(settings);
Tablescores=tEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.DECIMAL(10, 2)),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("subject", DataTypes.STRING()),
DataTypes.FIELD("score", DataTypes.DOUBLE())
                ),
row(1, "zhangsan","Chinese","90"),
row(1, "zhangsan","Math","74"),
row(1, "zhangsan","English","100"),
row(2L, "lisi","Chinese","86"),
row(2L, "lisi","Math","99"),
row(2L, "lisi","English","92")
        ).select($("id"), $("name"),$("subject"),$("score"));
tEnv.createTemporaryView("scoresTable",scores);
//注册函数tEnv.createTemporarySystemFunction("AvgFunction", newAvgFunction());
// Table API使用:使用call函数调用已注册的UDFtEnv.from("scoresTable")
                .groupBy($("subject"))
                .select($("subject"), call("AvgFunction",$("score")).as("score_avg"))
                .execute()
                .print();
// SQL使用tEnv.sqlQuery("SELECT subject, AvgFunction(score) as score_avg FROM scoresTable GROUP BY subject")
                .execute()
                .print();
    }
/*** 可变累加器的数据结构* 商品分类的平均值(sum/count)*/@Data@NoArgsConstructor@AllArgsConstructorpublicstaticclassAvgAccumulator {
publicdoublesum=0.0;
publicintcount=0;
    }
publicstaticclassAvgFunctionextendsAggregateFunction<Double,AvgAccumulator> {
@OverridepublicDoublegetValue(AvgAccumulatoraccumulator) {
if (accumulator.count==0) {
returnnull;
            } else {
returnaccumulator.sum/accumulator.count;
            }
        }
@OverridepublicAvgAccumulatorcreateAccumulator() {
returnnewAvgAccumulator();
        }
publicvoidaccumulate(AvgAccumulatoracc, Doubleprice) {
acc.setSum(acc.sum+price);
acc.setCount(acc.count+1);
        }
//在OVER windows上才是必须的//        public void retract(AvgAccumulator acc, Double price) {//            acc.setSum(acc.sum-price);//            acc.setCount(acc.count-1);//        }//有界聚合以及会话窗口和滑动窗口聚合都需要(对性能优化也有好处)//        public void merge(AvgAccumulator acc, Iterable<AvgAccumulator> it) {//            for (AvgAccumulator a : it) {//                acc.setSum(acc.sum+a.getSum());//                acc.setCount(acc.count+a.getCount());//            }//        }//publicvoidresetAccumulator(AvgAccumulatoracc) {
acc.count=0;
acc.sum=0.0d;
        }
    }
}


UDATF


Table aggregate functions(表聚合函数)多进多出(先聚合后炸裂),聚合过程如下图:


image.png




  1. 继承TableAggregateFunction
  2. 必须覆盖createAccumulator
  3. 提供1个或者多个accumulate⽅法,⼀般就1个,实现更新累加器逻辑
  4. 提供emitValue(...)或者emitUpdateWithRetract(...),实现获取计算结果的逻辑
  5. retract⽅法在OVER windows上才是必须的
  6. merge有界聚合以及会话窗⼝和滑动窗⼝聚合都需要(对性能优化也有好处)
  7. emitValue有界窗⼝聚合是必须的,⽆界场景⽤emitUpdateWithRetract可以提⾼性能 如果累加器需要保存⼤的状态,可以使⽤org.apache.flink.table.api.dataview.ListView或者 org.apache.flink.table.api.dataview.MapView以使⽤Flink状态后端
  8. 必须跟flatAggregate搭配使⽤





需求:找出各学科前两名


输⼊:+-----+---------------------------+--------------------------------+--------------------------------+|id|name|subject|score|+-----+---------------------------+--------------------------------+--------------------------------+|2|lisi|Chinese|86.0||2|lisi|Math|96.0||2|lisi|English|92.0||1|zhangsan|Chinese|90.0||1|zhangsan|Math|74.0||1|zhangsan|English|88.0||3|mary|Chinese|59.0||3|mary|Math|99.0||3|mary|English|100.0|+-----+---------------------------+--------------------------------+--------------------------------+输出:+----+--------------------------------+--------------------------------+-------------+|op|subject|score|rank|+----+--------------------------------+--------------------------------+-------------+|+I|Math|99.0|1||+I|Math|96.0|2||+I|Chinese|90.0|1||+I|Chinese|86.0|2||+I|English|100.0|1||+I|English|92.0|2|+----+--------------------------------+--------------------------------+-------------+


代码实现:

packagecom.blink.sb.udfs.udatf;
importlombok.AllArgsConstructor;
importlombok.Data;
importlombok.NoArgsConstructor;
importorg.apache.flink.api.java.tuple.Tuple2;
importorg.apache.flink.table.api.DataTypes;
importorg.apache.flink.table.api.EnvironmentSettings;
importorg.apache.flink.table.api.Table;
importorg.apache.flink.table.api.TableEnvironment;
importorg.apache.flink.table.functions.TableAggregateFunction;
importorg.apache.flink.util.Collector;
importstaticorg.apache.flink.table.api.Expressions.*;
/*** TableAggregateFunction*/publicclassFlinkUdatfFunction {
publicstaticvoidmain(String[] args) {
EnvironmentSettingssettings=EnvironmentSettings                .newInstance()
                .build();
TableEnvironmenttEnv=TableEnvironment.create(settings);
Tablescores=tEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.INT()),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("subject", DataTypes.STRING()),
DataTypes.FIELD("score", DataTypes.DOUBLE())
                ),
row(1, "zhangsan","Chinese","90"),
row(1, "zhangsan","Math","74"),
row(1, "zhangsan","English","88"),
row(2, "lisi","Chinese","86"),
row(2, "lisi","Math","96"),
row(2, "lisi","English","92"),
row(3, "mary","Chinese","59"),
row(3, "mary","Math","99"),
row(3, "mary","English","100")
        ).select($("id"), $("name"),$("subject"),$("score"));
scores.execute().print();
tEnv.createTemporaryView("scoresTable",scores);
//注册函数tEnv.createTemporarySystemFunction("Top2Func", newTop2Func());
// Table API使用:使用call函数调用已注册的UDFtEnv.from("scoresTable")
                .groupBy($("subject"))//groupby不是必须的//必须咋flatAggregate中调用                .flatAggregate(call("Top2Func",$("score")).as("score","rank"))
                .select($("subject"),$("score"),$("rank"))
//                .select($("score"),$("rank"))                .execute()
                .print();
    }
/*** 可变累加器的数据结构*/@Data@NoArgsConstructor@AllArgsConstructorpublicstaticclassTop2Accumulator {
//为啥不保存top n的记录的全部信息/*** top 1的值*/publicDoubletopOne=Double.MIN_VALUE;
/*** top 2的值*/publicDoubletopTwo=Double.MIN_VALUE;
    }
publicstaticclassTop2FuncextendsTableAggregateFunction<Tuple2<Double, Integer>, Top2Accumulator> {
@OverridepublicTop2AccumulatorcreateAccumulator() {
returnnewTop2Accumulator();
        }
publicvoidaccumulate(Top2Accumulatoracc, Doublevalue) {
if (value>acc.topOne) {
acc.topTwo=acc.topOne;
acc.topOne=value;
            } elseif (value>acc.topTwo) {
acc.topTwo=value;
            }
        }
publicvoidmerge(Top2Accumulatoracc, Iterable<Top2Accumulator>it) {
for (Top2AccumulatorotherAcc : it) {
accumulate(acc, otherAcc.topOne);
accumulate(acc, otherAcc.topTwo);
            }
        }
publicvoidemitValue(Top2Accumulatoracc, Collector<Tuple2<Double, Integer>>out) {
if (acc.topOne!=Double.MIN_VALUE) {
out.collect(Tuple2.of(acc.topOne, 1));
            }
if (acc.topTwo!=Double.MIN_VALUE) {
out.collect(Tuple2.of(acc.topTwo, 2));
            }
        }
    }
}


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
3月前
|
SQL Java 分布式数据库
Flink CDC HBase字段类型与Flink SQL类型之间的转换
【1月更文挑战第4天】【1月更文挑战第19篇】Flink CDC HBase字段类型与Flink SQL类型之间的转换
63 1
|
5月前
|
SQL 存储 API
Flink教程(20)- Flink高级特性(双流Join)
Flink教程(20)- Flink高级特性(双流Join)
88 0
|
1月前
|
SQL Oracle 关系型数据库
Flink的表值函数
【2月更文挑战第18天】Flink的表值函数
17 3
|
2月前
|
SQL 消息中间件 存储
Flink报错问题之flink双流join报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
2月前
|
SQL 消息中间件 Java
Flink报错问题之flink 1.11 upsert结果报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
2月前
|
SQL 消息中间件 分布式数据库
Flink报错问题之flink 1.11指定rowtime字段报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
2月前
|
SQL 消息中间件 Apache
Flink报错问题之使用hive udf函数报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
2月前
|
SQL 分布式计算 资源调度
Flink报错问题之写orc报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
5月前
|
SQL Serverless API
Flink自定义函数
Flink自定义函数
39 0
|
5月前
|
SQL 存储 缓存
Flink CDC中flink sql 如果缓存起来所有的数据,然后基于这个数据做查询?
Flink CDC中flink sql 如果缓存起来所有的数据,然后基于这个数据做查询?
63 1