开发者社区> 问答> 正文

Flink SQL 自定义函数表值函数(UDTF)的示例是什么?

Flink SQL 自定义函数表值函数(UDTF)的示例是什么?

展开
收起
游客qzzytmszf3zhq 2021-12-07 20:38:27 602 0
1 条回答
写回答
取消 提交回答
  • import org.apache.flink.table.annotation.DataTypeHint
    import org.apache.flink.table.annotation.FunctionHint
    import org.apache.flink.table.api._
    import org.apache.flink.table.functions.TableFunction
    import org.apache.flink.types.Row
     
    @FunctionHint(output = new DataTypeHint("ROW<word STRING, length INT>"))
    class SplitFunction extends TableFunction[Row] {
     
      def eval(str: String): Unit = {
        // use collect(...) to emit a row
        str.split(" ").foreach(s => collect(Row.of(s, Int.box(s.length))))
      }
    }
     
    val env = TableEnvironment.create(...)
     
    // 在 Table API 里不经注册直接“内联”调用函数
    env
      .from("MyTable")
      .joinLateral(call(classOf[SplitFunction], $"myField")
      .select($"myField", $"word", $"length")
    env
      .from("MyTable")
      .leftOuterJoinLateral(call(classOf[SplitFunction], $"myField"))
      .select($"myField", $"word", $"length")
     
    // 在 Table API 里重命名函数字段
    env
      .from("MyTable")
      .leftOuterJoinLateral(call(classOf[SplitFunction], $"myField").as("newWord", "newLength"))
      .select($"myField", $"newWord", $"newLength")
     
    // 注册函数
    env.createTemporarySystemFunction("SplitFunction", classOf[SplitFunction])
     
    // 在 Table API 里调用注册好的函数
    env
      .from("MyTable")
      .joinLateral(call("SplitFunction", $"myField"))
      .select($"myField", $"word", $"length")
    env
      .from("MyTable")
      .leftOuterJoinLateral(call("SplitFunction", $"myField"))
      .select($"myField", $"word", $"length")
     
    // 在 SQL 里调用注册好的函数
    env.sqlQuery(
      "SELECT myField, word, length " +
      "FROM MyTable, LATERAL TABLE(SplitFunction(myField))");
    env.sqlQuery(
      "SELECT myField, word, length " +
      "FROM MyTable " +
      "LEFT JOIN LATERAL TABLE(SplitFunction(myField)) ON TRUE")
     
    // 在 SQL 里重命名函数字段
    env.sqlQuery(
      "SELECT myField, newWord, newLength " +
      "FROM MyTable " +
      "LEFT JOIN LATERAL TABLE(SplitFunction(myField)) AS T(newWord, newLength) ON TRUE")
    
    2021-12-07 20:38:39
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
SQL Server在电子商务中的应用与实践 立即下载
GeoMesa on Spark SQL 立即下载
原生SQL on Hadoop引擎- Apache HAWQ 2.x最新技术解密malili 立即下载