FlinkSQL函数

简介: 自定义函数分类函数基本使用UDF实现要点UDTF, UDAF, UDATF 实现要点

内置函数


Flink Table API/SQL提供了⼤量的内置函数:

https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/functions/systemfunctions/


自定义函数


分类

说到UDF函数,⽤过HiveSQL的⼈会想到UDF、UDAF、UDTF,在Flink Table API/SQL中没有可以提这⼏个概念


Flink Table API/SQL提供如下⼏种函数:


函数类型

对标

说明

输入

输出

Scalar Functions(标量 函数

UDF

一进一出

0个/1个/多个标量值

1个标量值

Table functions

UDTF

一进多出(炸裂)

输入1个/多个标量值

输出任意⾏(每⼀⾏ 可以是多个字段)

Aggregate functions

UDAF

多进一出

多⾏的标量 值

输出1个标量值

Table aggregate functions

UDATF

多进多出(先聚合后炸裂),思考下window top N 场景



Async table functions

异步表函数,是用于table source 执行lookup 操作的特殊函数



注意:⼏进⼏出说的是⾏,不是列


注册和使用UDF



package com.blink.sb.udfs.udf;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import static org.apache.flink.table.api.Expressions.*;
import static org.apache.flink.table.api.Expressions.row;
/**
 * 标量函数使用
 */
public class FlinkBaseScalarFunction {
    public static void main(String[] args) {
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .build();
        TableEnvironment tEnv = TableEnvironment.create(settings);
        Table userinfo = tEnv.fromValues(
                DataTypes.ROW(
                        DataTypes.FIELD("id", DataTypes.DECIMAL(10, 2)),
                        DataTypes.FIELD("name", DataTypes.STRING()),
                        DataTypes.FIELD("introduction", DataTypes.STRING())
                ),
                row(1, "zhangsan","In my spare time, I like to do anything relating to English such as listening to English songs, watching English movies or TV programs, or even attending the activities held by some English clubs or institutes. I used to go abroad for a short- term English study. During that time, I learned a lot of daily life English and saw a lot of different things. I think language is very interesting. I could express one substance by using different sounds. So I wish I could study and read more English literatures and enlarge my knowledge"),
                row(2L, "lisi","In my spare time, I like to do anything relating to English such as listening to English songs, watching English movies or TV programs, or even attending the activities held by some English clubs or institutes. I used to go abroad for a short- term English study. During that time, I learned a lot of daily life English and saw a lot of different things. I think language is very interesting. I could express one substance by using different sounds. So I wish I could study and read more English literatures and enlarge my knowledge")
        ).select($("id"), $("name"),$("introduction"));
        tEnv.createTemporaryView("users",userinfo);
        // 调用方式1:以call函数内联方式调用(不需要注册)
        tEnv.from("users").select($("id"), $("name"),call(SubstringFunction.class, $("introduction"), 5, 13))
                .execute()
                .print();
        //调用方式2:先注册在通过注册的名字调用
        //注册函数
        tEnv.createTemporarySystemFunction("SubstringFunction", SubstringFunction.class);
        // Table API使用:使用call函数调用已注册的UDF
        tEnv.from("users").select($("id"), $("name"),call(
                "SubstringFunction",
                $("introduction"), 5, 13
            )
        )
                .execute()
                .print();
        // SQL使用
        Table result = tEnv.sqlQuery("SELECT id,name,SubstringFunction(introduction, 5, 13) FROM users");
        result.printSchema();
        result.execute().print();
    }
    /**
     * 最简单的标量函数:
     */
    public static class SubstringFunction extends ScalarFunction {
        public String eval(String s, Integer begin, Integer end) {
            return s.substring(begin, end);
        }
    }
}


UDF实现要点


基类

UDF需要继承对应的基类,例如ScalarFunction(该类必须声明为公共、⾮抽象、全局可访问的。 因此,不允许使 ⽤⾮静态内部类或匿名类; 必须有默认构造⽅法(因为Flink需要实例化并注册到catalog中)



eval方法实现


必须提供公共的、有明确定义的参数的eval方法(可以重载,可变参数,继承)


publicstaticclassSumFunctionextendsScalarFunction {
publicIntegereval(Integera, Integerb) {
returna+b;
 }
publicIntegereval(Stringa, Stringb) {
returnInteger.valueOf(a) +Integer.valueOf(b);
 }
publicIntegereval(Double... d) {
doubleresult=0;
for (doublevalue : d)
result+=value;
return (int) result;
 }
}


类型推断


一般情况下Flink可以⾃动推断eval⽅法返回值和参数的类型,复杂情况下可以⽤@DataTypeHint and @FunctionHint两个数据类型提示注解来辅助⾃动类型推断


/具有重载eval⽅法的UDF函数publicstaticclassOverloadedFunctionextendsScalarFunction {
// 没有hintpublicLongeval(longa, longb) {
returna+b;
 }
// 定义DECIMAL的精度public@DataTypeHint("DECIMAL(12, 3)") BigDecimaleval(doublea, doubleb) {
returnBigDecimal.valueOf(a+b);
 }
// 定义嵌套数据类型@DataTypeHint("ROW<s STRING, t TIMESTAMP_LTZ(3)>")
publicRoweval(inti) {
returnRow.of(String.valueOf(i), Instant.ofEpochSecond(i));
 }
//允许任意类型输⼊和⾃定义序列化输出@DataTypeHint(value="RAW", bridgedTo=ByteBuffer.class)
publicByteBuffereval(@DataTypeHint(inputGroup=InputGroup.ANY) Objecto) {
returnMyUtils.serializeToByteBuffer(o);
 }
}
// 具有重载eval⽅法的UDF函数// 全局定义输出类型(返回类型)@FunctionHint(output=@DataTypeHint("ROW<s STRING, i INT>"))
publicstaticclassOverloadedFunctionextendsTableFunction<Row> {
publicvoideval(inta, intb) {
collect(Row.of("Sum", a+b));
 }
publicvoideval() {
collect(Row.of("Empty args", -1));
 }
}



/将类型推断与eval⽅法分离, 类型推断完全由函数提示决定


@FunctionHint(
input= {@DataTypeHint("INT"), @DataTypeHint("INT")},
output=@DataTypeHint("INT")
)
@FunctionHint(
input= {@DataTypeHint("BIGINT"), @DataTypeHint("BIGINT")},
output=@DataTypeHint("BIGINT")
)
@FunctionHint(
input= {},
output=@DataTypeHint("BOOLEAN")
)
publicstaticclassOverloadedFunctionextendsTableFunction<Object> {
publicvoideval(Object... o) {
if (o.length==0) {
collect(false);
 }
collect(o[0]);
 }
}


⾃定义类型推断



⼤多数情况下Flink可以⾃⾏推断类型,⼤不了⽤Hint给与提示,特殊情况下需要⾃定义推断类型,覆盖 getTypeInference⽅法即可


publicstaticclassLiteralFunctionextendsScalarFunction {
publicObjecteval(Strings, Stringtype) {
switch (type) {
case"INT":
returnInteger.valueOf(s);
case"DOUBLE":
returnDouble.valueOf(s);
case"STRING":
default:
returns;
 }
 }
//⼀旦覆盖了getTypeInference⽅法,基于反射的类型推断会⾃动被禁⽤,并替换为getTypeInference⽅法的逻辑@OverridepublicTypeInferencegetTypeInference(DataTypeFactorytypeFactory) {
returnTypeInference.newBuilder()
//指定参数类型 .typedArguments(DataTypes.STRING(), DataTypes.STRING())
// 为函数的结果数据类型指定策略 .outputTypeStrategy(callContext-> {
if (!callContext.isArgumentLiteral(1) ||callContext.isArgumentNull(1)) {
throwcallContext.newValidationError("Literal expected for secondargument."); }
// return a data type based on a literalfinalStringliteral=callContext.getArgumentValue(1,
String.class).orElse("STRING");
switch (literal) {
case"INT":
returnOptional.of(DataTypes.INT().notNull());
case"DOUBLE":
returnOptional.of(DataTypes.DOUBLE().notNull());
case"STRING":
default:
returnOptional.of(DataTypes.STRING());
 }
 })
 .build();
 }
}


使用运行时

UDF基类的open、close⽅法可以被覆盖,分别⽤于⾃定义UDF初始化和清理逻辑。 在open⽅法中,提供FunctionContext参数,通过它可以获取Runtime环境的各种信息


方法

作用

getMetricGroup()

获取当前⼦任务的metricGroup

getCachedFile(name)

获取分布式缓存⽂件的本地临时⽂件副本

getJobParameter(name, defaultValue)

获取job参数

getExternalResourceInfos(resourceName)

获取外部资源信息

publicstaticclassHashCodeFunctionextendsScalarFunction {
privateintfactor=0;
@Overridepublicvoidopen(FunctionContextcontext) throwsException {
factor=Integer.parseInt(context.getJobParameter("hashcode_factor", "12"));
 }
publicinteval(Strings) {
returns.hashCode() *factor;
 }
}


UDF


packagecom.blink.sb.udfs.udf;
importorg.apache.flink.table.annotation.DataTypeHint;
importorg.apache.flink.table.api.DataTypes;
importorg.apache.flink.table.api.EnvironmentSettings;
importorg.apache.flink.table.api.Table;
importorg.apache.flink.table.api.TableEnvironment;
importorg.apache.flink.table.functions.ScalarFunction;
importstaticorg.apache.flink.table.api.Expressions.*;
importstaticorg.apache.flink.table.api.Expressions.row;
/*** 标量函数使用*/publicclassFlinkBaseScalarFunction {
publicstaticvoidmain(String[] args) {
EnvironmentSettingssettings=EnvironmentSettings                .newInstance()
                .build();
TableEnvironmenttEnv=TableEnvironment.create(settings);
Tableuserinfo=tEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.DECIMAL(10, 2)),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("introduction", DataTypes.STRING())
                ),
row(1, "zhangsan","In my spare time, I like to do anything relating to English such as listening to English songs, watching English movies or TV programs, or even attending the activities held by some English clubs or institutes. I used to go abroad for a short- term English study. During that time, I learned a lot of daily life English and saw a lot of different things. I think language is very interesting. I could express one substance by using different sounds. So I wish I could study and read more English literatures and enlarge my knowledge"),
row(2L, "lisi","In my spare time, I like to do anything relating to English such as listening to English songs, watching English movies or TV programs, or even attending the activities held by some English clubs or institutes. I used to go abroad for a short- term English study. During that time, I learned a lot of daily life English and saw a lot of different things. I think language is very interesting. I could express one substance by using different sounds. So I wish I could study and read more English literatures and enlarge my knowledge")
        ).select($("id"), $("name"),$("introduction"));
tEnv.createTemporaryView("users",userinfo);
// 调用方式1:以call函数内联方式调用(不需要注册)tEnv.from("users").select($("id"), $("name"),call(SubstringFunction.class, $("introduction"), 5, 13))
                .execute()
                .print();
//调用方式2:先注册在通过注册的名字调用//注册函数tEnv.createTemporarySystemFunction("SubstringFunction", SubstringFunction.class);
// Table API使用:使用call函数调用已注册的UDFtEnv.from("users").select($("id"), $("name"),call(
"SubstringFunction",
$("introduction"), 5, 13            )
        )
                .execute()
                .print();
// SQL使用Tableresult=tEnv.sqlQuery("SELECT id,name,SubstringFunction(introduction, 5, 13) FROM users");
result.printSchema();
result.execute().print();
    }
/*** 最简单的标量函数:*/publicstaticclassSubstringFunctionextendsScalarFunction {
publicStringeval(Strings, Integerbegin, Integerend) {
returns.substring(begin, end);
        }
    }
}


参数化标量函数

packagecom.blink.sb.udfs.udf;
importorg.apache.flink.table.api.DataTypes;
importorg.apache.flink.table.api.EnvironmentSettings;
importorg.apache.flink.table.api.Table;
importorg.apache.flink.table.api.TableEnvironment;
importorg.apache.flink.table.functions.ScalarFunction;
importstaticorg.apache.flink.table.api.Expressions.*;
importstaticorg.apache.flink.table.api.Expressions.row;
/*** 标量函数使用*/publicclassFlinkParameterizableScalarFunction {
publicstaticvoidmain(String[] args) {
EnvironmentSettingssettings=EnvironmentSettings                .newInstance()
                .build();
TableEnvironmenttEnv=TableEnvironment.create(settings);
Tableuserinfo=tEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.DECIMAL(10, 2)),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("introduction", DataTypes.STRING())
                ),
row(1, "zhangsan","In my spare time, I like to do anything relating to English such as listening to English songs, watching English movies or TV programs, or even attending the activities held by some English clubs or institutes. I used to go abroad for a short- term English study. During that time, I learned a lot of daily life English and saw a lot of different things. I think language is very interesting. I could express one substance by using different sounds. So I wish I could study and read more English literatures and enlarge my knowledge"),
row(2L, "lisi","In my spare time, I like to do anything relating to English such as listening to English songs, watching English movies or TV programs, or even attending the activities held by some English clubs or institutes. I used to go abroad for a short- term English study. During that time, I learned a lot of daily life English and saw a lot of different things. I think language is very interesting. I could express one substance by using different sounds. So I wish I could study and read more English literatures and enlarge my knowledge")
        ).select($("id"), $("name"),$("introduction"));
tEnv.createTemporaryView("users",userinfo);
// 调用方式1:以call函数内联方式调用(不需要注册)//tEnv.from("MyTable").select(call(SubstringFunction.class, $("myField"), 5, 13));tEnv.from("users").select(call(newSubstringFunction(true), $("introduction"), 5, 13))
                .execute()
                .print();
//调用方式2:先注册在通过注册的名字调用//注册函数//tEnv.createTemporarySystemFunction("SubstringFunction", SubstringFunction.class);tEnv.createTemporarySystemFunction("SubstringFunction", newSubstringFunction(true));
// Table API使用:使用call函数调用已注册的UDFtEnv.from("users").select(call("SubstringFunction", $("introduction"), 5, 13))
                .execute()
                .print();
// SQL使用tEnv.sqlQuery("SELECT SubstringFunction(introduction, 5, 13) FROM users")
                .execute()
                .print();
    }
/*** 参数化标量函数*/publicstaticclassSubstringFunctionextendsScalarFunction {
privatebooleanendInclusive;
publicSubstringFunction(booleanendInclusive) {
this.endInclusive=endInclusive;
        }
publicStringeval(Strings, Integerbegin, Integerend) {
returns.substring(begin, endInclusive?end+1 : end);
        }
    }
}

通配符标量函数


packagecom.blink.sb.udfs.udf;
importorg.apache.flink.table.annotation.DataTypeHint;
importorg.apache.flink.table.annotation.InputGroup;
importorg.apache.flink.table.api.DataTypes;
importorg.apache.flink.table.api.EnvironmentSettings;
importorg.apache.flink.table.api.Table;
importorg.apache.flink.table.api.TableEnvironment;
importorg.apache.flink.table.functions.ScalarFunction;
importjava.util.Arrays;
importjava.util.stream.Collectors;
importstaticorg.apache.flink.table.api.Expressions.*;
/*** 标量函数使用*/publicclassFlinkWildcardScalarFunction {
publicstaticvoidmain(String[] args) {
EnvironmentSettingssettings=EnvironmentSettings                .newInstance()
                .build();
TableEnvironmenttEnv=TableEnvironment.create(settings);
Tableuserinfo=tEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.DECIMAL(10, 2)),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("age", DataTypes.INT())
                ),
row(1, "zhangsan",23),
row(2L, "lisi",18)
        ).select($("id"), $("name"),$("age"));
tEnv.createTemporaryView("users",userinfo);
// 调用方式1:以call函数内联方式调用(不需要注册)tEnv.from("users").select(call(MyConcatFunction.class, $("*")))
                .execute()
                .print();
//调用方式2:先注册在通过注册的名字调用//注册函数//tEnv.createTemporarySystemFunction("SubstringFunction", SubstringFunction.class);tEnv.createTemporarySystemFunction("MyConcatFunction", newMyConcatFunction());
// Table API使用:使用call函数调用已注册的UDFtEnv.from("users").select(call("MyConcatFunction", $("id"), $("name")))
                .execute()
                .print();
// SQL使用(注意:通配符参数类型UDF函数,Flink SQL中不能使用,下面两段会报错)tEnv.sqlQuery("SELECT SubstringFunction(*) FROM users")
                .execute()
                .print();
tEnv.sqlQuery("SELECT SubstringFunction(id,name) FROM users")
                .execute()
                .print();
    }
/*** 支持通配符的标量函数*/publicstaticclassMyConcatFunctionextendsScalarFunction {
publicStringeval(@DataTypeHint(inputGroup=InputGroup.ANY) Object... fields) {
returnArrays.stream(fields)
                    .map(Object::toString)
                    .collect(Collectors.joining(","));
        }
    }
}

UDTF


  • Table functions(表函数)⼀进多出(炸裂),继承TableFunction,提供⽆返回值的eval⽅法,使⽤collect来输 出。


  • Table functions的返回值是⼀个表,需要跟原来的表join才能得到最终结果,因此要⽤到侧写表(不明⽩的可 以研究下LATERAL TABLE)


现有需求如下:


学⽣成绩在⼀个字段⾥:1zhangsanChinese:90,Math:74,English:100需要按照学科拆分为多⾏记录:
1zhangsanChinese901zhangsanMath741zhangsanEnglish100


packagecom.blink.sb.udfs.udtf;
importorg.apache.flink.table.annotation.DataTypeHint;
importorg.apache.flink.table.annotation.FunctionHint;
importorg.apache.flink.table.api.DataTypes;
importorg.apache.flink.table.api.EnvironmentSettings;
importorg.apache.flink.table.api.Table;
importorg.apache.flink.table.api.TableEnvironment;
importorg.apache.flink.table.functions.TableFunction;
importorg.apache.flink.types.Row;
importstaticorg.apache.flink.table.api.Expressions.*;
/*** Table Functions*/publicclassFlinkTableFunction {
publicstaticvoidmain(String[] args) {
EnvironmentSettingssettings=EnvironmentSettings                .newInstance()
                .build();
TableEnvironmenttEnv=TableEnvironment.create(settings);
Tablescores=tEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.DECIMAL(10, 2)),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("scores", DataTypes.STRING())
                ),
row(1, "zhangsan","Chinese:90,Math:74,English:100"),
row(2L, "lisi","Chinese:86,Math:99,English:92")
        ).select($("id"), $("name"),$("scores"));
tEnv.createTemporaryView("scoresTable",scores);
//注册函数tEnv.createTemporarySystemFunction("ScoresSplitFunction", newScoresSplitFunction());
// Table API使用:使用call函数调用已注册的UDF//这样直接使用是不可以的//        tEnv.from("scoresTable")//                .select($("id"), $("name"),call("ScoresSplitFunction", $("scores")))//                .execute()//                .print();//必须跟joinLateral(内连接)或者leftOuterJoinLateral(左外连接)搭配使用tEnv.from("scoresTable")
                .joinLateral(call(ScoresSplitFunction.class, $("scores"))
                        .as("subject","score")
                )
                .select($("id"), $("name"),$("subject"),$("score"))
                .execute()
                .print();
tEnv.from("scoresTable")
                .leftOuterJoinLateral(call(ScoresSplitFunction.class, $("scores"))
                        .as("subject","score")
                )
                .select($("id"), $("name"),$("subject"),$("score"))
                .execute()
                .print();
// SQL使用tEnv.sqlQuery("SELECT id, name, subject,score "+//诈裂出的字段得跟UDF中定义的字段名一样"FROM scoresTable,"+//注意有个逗号"LATERAL TABLE(ScoresSplitFunction(scores))")
                .execute()
                .print();
tEnv.sqlQuery("SELECT id, name, subject1,score1 "+"FROM scoresTable "+"LEFT JOIN LATERAL TABLE(ScoresSplitFunction(scores)) AS sc(subject1, score1) ON TRUE")
                .execute()
                .print();
    }
/*** 学生成绩在一个字段里:*      1 zhangsan    Chinese:90,Math:74,English:100* 需要按照成绩拆分为多行记录:*      1 zhangsan Chinese 90*      1 zhangsan Math    74*      1 zhangsan English 100*/@FunctionHint(output=@DataTypeHint("ROW<subject STRING, score INT>"))
publicstaticclassScoresSplitFunctionextendsTableFunction {
publicvoideval(Stringstr) {
for (Strings : str.split(",")) {
String[] arr=s.split(":");
//使用collect方法发出新的行collect(Row.of(arr[0], Integer.parseInt(arr[1])));
            }
        }
    }
}



UDAF


Aggregate functions(聚合函数)将多⾏的标量值映射到新的标量值(多进⼀出),聚合函数⽤到了累加器,下图是聚 合过程:


image.png


  • 继承AggregateFunction
  • 必须覆盖createAccumulator(初始化时创建空的累加器)和getValue (结束时获取累加结果)
  • 提供accumulate⽅法(每来一条数据调用accumulate方法进行累加)
  • retract⽅法在OVER windows上才是必须的
  • merge有界聚合以及会话窗⼝和滑动窗⼝聚合都需要(对性能优化也有好处)



需求:计算学⽣的各学科平均分

输⼊:+--------------+--------------------------------+--------------------------------+-------------+|id|name|subject|score|+--------------+--------------------------------+--------------------------------+-------------+|1.00|zhangsan|Chinese|90||1.00|zhangsan|Math|74||1.00|zhangsan|English|100||2.00|lisi|Chinese|86||2.00|lisi|Math|99||2.00|lisi|English|92|+--------------+--------------------------------+--------------------------------+-------------+输出:+--------------------------------+-----------------+|subject|score_avg|+--------------------------------+-----------------+|Chinese|86.5||Math|74||English|96|+--------------------------------+-----------------+
packagecom.blink.sb.udfs.udaf;
importlombok.*;
importorg.apache.flink.table.api.DataTypes;
importorg.apache.flink.table.api.EnvironmentSettings;
importorg.apache.flink.table.api.Table;
importorg.apache.flink.table.api.TableEnvironment;
importorg.apache.flink.table.functions.AggregateFunction;
importstaticorg.apache.flink.table.api.Expressions.*;
/*** Table Functions*/publicclassFlinkAggFunction {
publicstaticvoidmain(String[] args) {
EnvironmentSettingssettings=EnvironmentSettings                .newInstance()
                .build();
TableEnvironmenttEnv=TableEnvironment.create(settings);
Tablescores=tEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.DECIMAL(10, 2)),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("subject", DataTypes.STRING()),
DataTypes.FIELD("score", DataTypes.DOUBLE())
                ),
row(1, "zhangsan","Chinese","90"),
row(1, "zhangsan","Math","74"),
row(1, "zhangsan","English","100"),
row(2L, "lisi","Chinese","86"),
row(2L, "lisi","Math","99"),
row(2L, "lisi","English","92")
        ).select($("id"), $("name"),$("subject"),$("score"));
tEnv.createTemporaryView("scoresTable",scores);
//注册函数tEnv.createTemporarySystemFunction("AvgFunction", newAvgFunction());
// Table API使用:使用call函数调用已注册的UDFtEnv.from("scoresTable")
                .groupBy($("subject"))
                .select($("subject"), call("AvgFunction",$("score")).as("score_avg"))
                .execute()
                .print();
// SQL使用tEnv.sqlQuery("SELECT subject, AvgFunction(score) as score_avg FROM scoresTable GROUP BY subject")
                .execute()
                .print();
    }
/*** 可变累加器的数据结构* 商品分类的平均值(sum/count)*/@Data@NoArgsConstructor@AllArgsConstructorpublicstaticclassAvgAccumulator {
publicdoublesum=0.0;
publicintcount=0;
    }
publicstaticclassAvgFunctionextendsAggregateFunction<Double,AvgAccumulator> {
@OverridepublicDoublegetValue(AvgAccumulatoraccumulator) {
if (accumulator.count==0) {
returnnull;
            } else {
returnaccumulator.sum/accumulator.count;
            }
        }
@OverridepublicAvgAccumulatorcreateAccumulator() {
returnnewAvgAccumulator();
        }
publicvoidaccumulate(AvgAccumulatoracc, Doubleprice) {
acc.setSum(acc.sum+price);
acc.setCount(acc.count+1);
        }
//在OVER windows上才是必须的//        public void retract(AvgAccumulator acc, Double price) {//            acc.setSum(acc.sum-price);//            acc.setCount(acc.count-1);//        }//有界聚合以及会话窗口和滑动窗口聚合都需要(对性能优化也有好处)//        public void merge(AvgAccumulator acc, Iterable<AvgAccumulator> it) {//            for (AvgAccumulator a : it) {//                acc.setSum(acc.sum+a.getSum());//                acc.setCount(acc.count+a.getCount());//            }//        }//publicvoidresetAccumulator(AvgAccumulatoracc) {
acc.count=0;
acc.sum=0.0d;
        }
    }
}


UDATF


Table aggregate functions(表聚合函数)多进多出(先聚合后炸裂),聚合过程如下图:


image.png




  1. 继承TableAggregateFunction
  2. 必须覆盖createAccumulator
  3. 提供1个或者多个accumulate⽅法,⼀般就1个,实现更新累加器逻辑
  4. 提供emitValue(...)或者emitUpdateWithRetract(...),实现获取计算结果的逻辑
  5. retract⽅法在OVER windows上才是必须的
  6. merge有界聚合以及会话窗⼝和滑动窗⼝聚合都需要(对性能优化也有好处)
  7. emitValue有界窗⼝聚合是必须的,⽆界场景⽤emitUpdateWithRetract可以提⾼性能 如果累加器需要保存⼤的状态,可以使⽤org.apache.flink.table.api.dataview.ListView或者 org.apache.flink.table.api.dataview.MapView以使⽤Flink状态后端
  8. 必须跟flatAggregate搭配使⽤





需求:找出各学科前两名


输⼊:+-----+---------------------------+--------------------------------+--------------------------------+|id|name|subject|score|+-----+---------------------------+--------------------------------+--------------------------------+|2|lisi|Chinese|86.0||2|lisi|Math|96.0||2|lisi|English|92.0||1|zhangsan|Chinese|90.0||1|zhangsan|Math|74.0||1|zhangsan|English|88.0||3|mary|Chinese|59.0||3|mary|Math|99.0||3|mary|English|100.0|+-----+---------------------------+--------------------------------+--------------------------------+输出:+----+--------------------------------+--------------------------------+-------------+|op|subject|score|rank|+----+--------------------------------+--------------------------------+-------------+|+I|Math|99.0|1||+I|Math|96.0|2||+I|Chinese|90.0|1||+I|Chinese|86.0|2||+I|English|100.0|1||+I|English|92.0|2|+----+--------------------------------+--------------------------------+-------------+


代码实现:

packagecom.blink.sb.udfs.udatf;
importlombok.AllArgsConstructor;
importlombok.Data;
importlombok.NoArgsConstructor;
importorg.apache.flink.api.java.tuple.Tuple2;
importorg.apache.flink.table.api.DataTypes;
importorg.apache.flink.table.api.EnvironmentSettings;
importorg.apache.flink.table.api.Table;
importorg.apache.flink.table.api.TableEnvironment;
importorg.apache.flink.table.functions.TableAggregateFunction;
importorg.apache.flink.util.Collector;
importstaticorg.apache.flink.table.api.Expressions.*;
/*** TableAggregateFunction*/publicclassFlinkUdatfFunction {
publicstaticvoidmain(String[] args) {
EnvironmentSettingssettings=EnvironmentSettings                .newInstance()
                .build();
TableEnvironmenttEnv=TableEnvironment.create(settings);
Tablescores=tEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.INT()),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("subject", DataTypes.STRING()),
DataTypes.FIELD("score", DataTypes.DOUBLE())
                ),
row(1, "zhangsan","Chinese","90"),
row(1, "zhangsan","Math","74"),
row(1, "zhangsan","English","88"),
row(2, "lisi","Chinese","86"),
row(2, "lisi","Math","96"),
row(2, "lisi","English","92"),
row(3, "mary","Chinese","59"),
row(3, "mary","Math","99"),
row(3, "mary","English","100")
        ).select($("id"), $("name"),$("subject"),$("score"));
scores.execute().print();
tEnv.createTemporaryView("scoresTable",scores);
//注册函数tEnv.createTemporarySystemFunction("Top2Func", newTop2Func());
// Table API使用:使用call函数调用已注册的UDFtEnv.from("scoresTable")
                .groupBy($("subject"))//groupby不是必须的//必须咋flatAggregate中调用                .flatAggregate(call("Top2Func",$("score")).as("score","rank"))
                .select($("subject"),$("score"),$("rank"))
//                .select($("score"),$("rank"))                .execute()
                .print();
    }
/*** 可变累加器的数据结构*/@Data@NoArgsConstructor@AllArgsConstructorpublicstaticclassTop2Accumulator {
//为啥不保存top n的记录的全部信息/*** top 1的值*/publicDoubletopOne=Double.MIN_VALUE;
/*** top 2的值*/publicDoubletopTwo=Double.MIN_VALUE;
    }
publicstaticclassTop2FuncextendsTableAggregateFunction<Tuple2<Double, Integer>, Top2Accumulator> {
@OverridepublicTop2AccumulatorcreateAccumulator() {
returnnewTop2Accumulator();
        }
publicvoidaccumulate(Top2Accumulatoracc, Doublevalue) {
if (value>acc.topOne) {
acc.topTwo=acc.topOne;
acc.topOne=value;
            } elseif (value>acc.topTwo) {
acc.topTwo=value;
            }
        }
publicvoidmerge(Top2Accumulatoracc, Iterable<Top2Accumulator>it) {
for (Top2AccumulatorotherAcc : it) {
accumulate(acc, otherAcc.topOne);
accumulate(acc, otherAcc.topTwo);
            }
        }
publicvoidemitValue(Top2Accumulatoracc, Collector<Tuple2<Double, Integer>>out) {
if (acc.topOne!=Double.MIN_VALUE) {
out.collect(Tuple2.of(acc.topOne, 1));
            }
if (acc.topTwo!=Double.MIN_VALUE) {
out.collect(Tuple2.of(acc.topTwo, 2));
            }
        }
    }
}


相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
存储 IDE Java
Flink---12、状态后端(HashMapStateBackend/RocksDB)、如何选择正确的状态后端
Flink---12、状态后端(HashMapStateBackend/RocksDB)、如何选择正确的状态后端
|
SQL Java 流计算
Flink SQL UDF(用户自定义函数)需要打包成JAR文件并上传到Flink集群中
【1月更文挑战第1天】【1月更文挑战第2篇】Flink SQL UDF(用户自定义函数)需要打包成JAR文件并上传到Flink集群中
668 0
|
SQL 监控 API
Flink教程(27)- Flink Metrics监控
Flink教程(27)- Flink Metrics监控
1015 1
|
SQL 存储 运维
Flink⼤状态作业调优实践指南:Flink SQL 作业篇
本文整理自俞航翔、陈婧敏、黄鹏程老师所撰写的大状态作业调优实践指南。由于内容丰富,本文中篇内容分享 Flink SQL 作业大状态导致反压的调优原理与方法。
70763 8
Flink⼤状态作业调优实践指南:Flink SQL 作业篇
|
SQL 消息中间件 存储
Flink(十三)【Flink SQL(上)SqlClient、DDL、查询】(2)
Flink(十三)【Flink SQL(上)SqlClient、DDL、查询】
|
SQL 数据处理 API
10分钟了解Flink SQL使用
Flink 是一个流处理和批处理统一的大数据框架,专门为高吞吐量和低延迟而设计。开发者可以使用SQL进行流批统一处理,大大简化了数据处理的复杂性。本文将介绍Flink SQL的基本原理、使用方法、流批统一,并通过几个例子进行实践。
10分钟了解Flink SQL使用
|
数据处理 Apache 流计算
【Flink】Flink 中的Watermark机制
【4月更文挑战第21天】【Flink】Flink 中的Watermark机制
|
消息中间件 SQL 关系型数据库
Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】(1)
Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】
|
消息中间件 SQL JSON
Flink报错问题之报类型转换错误如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

热门文章

最新文章