Flink SQL 自定义函数表值函数(UDTF)的示例是什么?
import org.apache.flink.table.annotation.DataTypeHint
import org.apache.flink.table.annotation.FunctionHint
import org.apache.flink.table.api._
import org.apache.flink.table.functions.TableFunction
import org.apache.flink.types.Row
@FunctionHint(output = new DataTypeHint("ROW<word STRING, length INT>"))
class SplitFunction extends TableFunction[Row] {
def eval(str: String): Unit = {
// use collect(...) to emit a row
str.split(" ").foreach(s => collect(Row.of(s, Int.box(s.length))))
}
}
val env = TableEnvironment.create(...)
// 在 Table API 里不经注册直接“内联”调用函数
env
.from("MyTable")
.joinLateral(call(classOf[SplitFunction], $"myField")
.select($"myField", $"word", $"length")
env
.from("MyTable")
.leftOuterJoinLateral(call(classOf[SplitFunction], $"myField"))
.select($"myField", $"word", $"length")
// 在 Table API 里重命名函数字段
env
.from("MyTable")
.leftOuterJoinLateral(call(classOf[SplitFunction], $"myField").as("newWord", "newLength"))
.select($"myField", $"newWord", $"newLength")
// 注册函数
env.createTemporarySystemFunction("SplitFunction", classOf[SplitFunction])
// 在 Table API 里调用注册好的函数
env
.from("MyTable")
.joinLateral(call("SplitFunction", $"myField"))
.select($"myField", $"word", $"length")
env
.from("MyTable")
.leftOuterJoinLateral(call("SplitFunction", $"myField"))
.select($"myField", $"word", $"length")
// 在 SQL 里调用注册好的函数
env.sqlQuery(
"SELECT myField, word, length " +
"FROM MyTable, LATERAL TABLE(SplitFunction(myField))");
env.sqlQuery(
"SELECT myField, word, length " +
"FROM MyTable " +
"LEFT JOIN LATERAL TABLE(SplitFunction(myField)) ON TRUE")
// 在 SQL 里重命名函数字段
env.sqlQuery(
"SELECT myField, newWord, newLength " +
"FROM MyTable " +
"LEFT JOIN LATERAL TABLE(SplitFunction(myField)) AS T(newWord, newLength) ON TRUE")
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。