开发者社区> 问答> 正文

Flink SQL 自定义函数表值聚合函数(UDTAGG)的示例是什么?

Flink SQL 自定义函数表值聚合函数(UDTAGG)的示例是什么?

展开
收起
游客yzrzs5mf6j7yy 2021-12-07 20:43:50 558 0
1 条回答
写回答
取消 提交回答
  • import java.lang.{Integer => JInteger}
    import org.apache.flink.table.api.Types
    import org.apache.flink.table.functions.TableAggregateFunction
     
    /**
     * Accumulator for top2.
     */
    class Top2Accum {
      var first: JInteger = _
      var second: JInteger = _
    }
     
    /**
     * The top2 user-defined table aggregate function.
     */
    class Top2 extends TableAggregateFunction[JTuple2[JInteger, JInteger], Top2Accum] {
     
      override def createAccumulator(): Top2Accum = {
        val acc = new Top2Accum
        acc.first = Int.MinValue
        acc.second = Int.MinValue
        acc
      }
     
      def accumulate(acc: Top2Accum, v: Int) {
        if (v > acc.first) {
          acc.second = acc.first
          acc.first = v
        } else if (v > acc.second) {
          acc.second = v
        }
      }
     
      def merge(acc: Top2Accum, its: JIterable[Top2Accum]): Unit = {
        val iter = its.iterator()
        while (iter.hasNext) {
          val top2 = iter.next()
          accumulate(acc, top2.first)
          accumulate(acc, top2.second)
        }
      }
     
      def emitValue(acc: Top2Accum, out: Collector[JTuple2[JInteger, JInteger]]): Unit = {
        // emit the value and rank
        if (acc.first != Int.MinValue) {
          out.collect(JTuple2.of(acc.first, 1))
        }
        if (acc.second != Int.MinValue) {
          out.collect(JTuple2.of(acc.second, 2))
        }
      }
    }
     
    // 初始化表
    val tab = ...
     
    // 使用函数
    tab
      .groupBy('key)
      .flatAggregate(top2('a) as ('v, 'rank))
      .select('key, 'v, 'rank)
    
    2021-12-07 20:44:12
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
SQL Server在电子商务中的应用与实践 立即下载
GeoMesa on Spark SQL 立即下载
原生SQL on Hadoop引擎- Apache HAWQ 2.x最新技术解密malili 立即下载