01 引言
在Flink Table API
中除了提供大量的内建函数之外,用户也能够实现自定义函数,这样极大地拓展了Table API
和 SQL
的计算表达能力,使得用户能够更加方便灵活地使用 Table APIE 或 SQL 编写 Flink)应用。
但需要注意的是,自定义函数主要在
Table API
和SQL
中使用,对于DataStream
和DataSet APl
的应用,则无须借助自定义函数实现,只要在相应接口代码中构建计算函数逻辑即可。
02 如何注册自定义函数?
通常情况下,用户自定义的函数需要在 Flink TableEnvironment
中进行注册,然后才能在Table API
和SQL
中使用。
函数注册通过 TableEnvironment
的registerFunction()
方法完成,本质上是将用户自定义好的 Function
注册到TableEnvironment
中的 Function CataLog
中,每次在调用的过程中直接到 CataLog
中获取函数信息。
Flink
目前没有提供持久化注册的接口,因此需要每次在启动应用的时候重新对函数进行注册,且当应用被关闭后,TableEnvironment
中已经注册的函数信息将会被清理。
03 自定义函数分类
在 Table API
中,根处理的数据类型以及计算方式的不同将自定义函数一共分为三种类别,分别为 :
- Scalar Function
- Table Function
- Aggregation Function
3.1 Scalar Function
Scalar Function
也被称为标量函数:表示对单个输入或者多个输入字段计算后返回一个确定类型的标量值,其返回值类型可以为除 TEXT
、NTEXT
、IMAGE、CURSOR
、TIMESTAMP
和 TABLE
类型外的其他所有数据类型。例如 Flink
常见的内置标量函数有 DATE()、UPPER()、LTRIM ()
等。
注意:在自定义标量函数中,用户需要确认
Flink
内部是否已经实现相应的Scalar Fuction
,如果已经实现则可以直接使用;如果没有实现,则在注册自定义函数过程中,需要和内置的其他Scalar Function
名称区分,否则会导致注册函数失败,影响应用的正常执行。
3.1.1 Scalar Function如何实现?
代码实现方式:
- 定义
Scalar Function
需要继承org. Apache. Fink. Table. Functions
包中的ScalarFunction
类; - 同时实现类中的
evaluation
方法,自定义函数计算逻辑需要在该方法中定义,同时该方法必须声明为public
; - 将方法名称定义为
eval
; - 同时在一个
ScalarFunction
实现类中可以定义多个evaluation
方法,只需要保证传递进来的参数不相同即可。
3.1.2 Scalar Function举例
通过定义
Add Class
并继承ScalarFunction
接口,实现对两个数值相加的功能。然后在Table Select
操作符和SQL
语句中使用。
举例:自定义实现 Scalar Function
实现字符串长度获取。
3.1.3 特殊返回值处理
在自定义标量函数过程中,函数的返回值类型必须为标量值,尽管 Flink
内部已经定义了大部分的基本数据类型以及 POJOs
类型等,但有些比较复杂的数据类型如果Flink
不支持获取。
此时需要用户通过继承并实现 ScalarFunction
类中的 getResultType3
实现 getResult--Type
方法对数据类型进行转换。
例如:在 Table API
和SQL
中可能需要使用 Types. TIMESTAMP
数据类型,但是基于 ScalarFunction
得出的只能是Long
类型,因此可以通过实现getResultType
:方法对结果数据进行类型转换,从而返回 Timestamp
类型。
3.2 Table Function
和 Scalar Function
不同,Table Function
:将一个或多个标量字段作为输入参数,且经过计算和处理后返回的是任意数量的记录,不再是单独的一个标量指标,且返回结果中可以含有一列或多列指标,根据自定义 Table Funciton
函数返回值确定,因此从形式上看更像是 Table
结构数据 。
3.2.1 Table Function如何实现?
代码实现方式:
- 定义
Table Function
需要继承org. Apache. Fink. Table. Functions
包中的TableFunction
类; - 实现类中的
evaluation
方法,且所有的自定义函数计算逻辑均在该方法中定义 - 需要注意方法必须声明为 public,且名称必须定义为 eval;
- 另外在一个
TableFunction
,实现类中可以实现多个 evaluation方法,只需要保证参数不相同即可。
在
Scala
语言Table API
中,Table Function
可以用在Join、LeftOuterJoin
算子中,Table Function
相当于产生一张被关联的表,主表中的数据会与Table Function
所有产生的数据进行交叉关联。其中LeftOuterJoin
算子当Table Function
产生结果为空时,Table Function 产
生的字段会被填为空值。
3.2.2 Table Function举例
在应用
Table Function
之前,需要事先在TableEnvironment
中注册Table Function
,然后结合LATERAL TABLE
关键字使用,根据语句结尾是否增加ON TRUE
关键字来区分是Join
还是leftOuterJoin
操作。
举例:通过自定义 SplitFunction Class
继承 TableFunction
接口,实现根据指定切割符来切分输入字符串,并获取每个字符的长度和 HashCode
的功能,然后在 Table Select
操作符和SQL
语句中使用定义的 SplitFunction
。
自定义Table Funciton
实现将给定字符串切分成多条记录:
和 Scalar Function()
一样,对于不支持的输出结果类型,可以通过实现 TableFunction
接口中的 getResultType()
对输出结果的数据类型进行转换,具体可以参考 ScalarFunciton
定义。
3.3 Aggregation Function
Flink Table API
中提供了User-Defined Aggregate Functions (UDAGGs)
,其主要功能是将一行或多行数据进行聚合然后输出一个标量值,例如在数据集中根据 Key
求取指定Value
的最大值或最小值。
3.3.1 Aggregation Function如何实现?
自定义 Aggregation Function
需要创建 Class
实现org. Apache. Fink. Table. Functions
包中的AggregateFunction
类。
3.3.2 Aggregation Function举例
关于AggregateFunction
的接口定义如代码所示可以看出 AggregateFunction
定义相对比较复杂。
在 AggregateFunction
抽象类中包含了必须实现的方法createAccumulator()、accumulate()、getValue()
,其中:
- createAccumulator() 方法:主要用于创建
Accumulator
,以用于存储计算过程中读取的中间数据,同时在Accumulator
中完成数据的累加操作; - accumulate() 方法:将每次接入的数据元素累加到定义的
accumulator
中,另外accumulate()
方法也可以通过方法复载的方式处理不同类型的数据; - 当完成所有的数据累加操作结束后,最后通过 getValue() 方法返回函数的统计结果,最终完成整个
AggregateFunction
的计算流程。
除了以上三个必须要实现的方法之外,在 Aggregation Function
中还有根据具体使用场景选择性实现的方法,如 retract()、merge()、resetAccumulator()
等方法。其中:
- retract()方法:是在基于
Bouded Over Windows
的自定义聚合算子中使用 - merge():方法是在多批聚合和
Session Window
场景中使用 - resetAccumulator():方法是在批量计算中多批聚合的场景中使用,主要对
accumulator
计数器进行重置操作。
因为目前在 Flink
中对Scala
的类型参数提取效率相对较低,因此Flink
建议用
户尽可能实现 Java
语言的 Aggregation Function
,同时应尽可能使用原始数据类型,例如 Int、Long
等,避免使用复合数据类型,如自定义 POJOs
等,这样做的主要原因是在Aggregation Function
计算过程中,期间会有大量对象被创建和销毁,将对整个系统的性能造成一定的影响。
04 文末
本文主要讲解Flink
自定义函数,谢谢大家的阅读,本文完!