开发者社区> 问答> 正文

Flink SQL 自定义函数聚合函数(UDAGG)的示例是什么?

Flink SQL 自定义函数聚合函数(UDAGG)的示例是什么?

展开
收起
游客yzrzs5mf6j7yy 2021-12-07 20:42:41 530 0
1 条回答
写回答
取消 提交回答
  • import java.lang.{Long => JLong, Integer => JInteger}
    import org.apache.flink.api.java.tuple.{Tuple1 => JTuple1}
    import org.apache.flink.api.java.typeutils.TupleTypeInfo
    import org.apache.flink.table.api.Types
    import org.apache.flink.table.functions.AggregateFunction
     
    /**
     * Accumulator for WeightedAvg.
     */
    class WeightedAvgAccum extends JTuple1[JLong, JInteger] {
      sum = 0L
      count = 0
    }
     
    /**
     * Weighted Average user-defined aggregate function.
     */
    class WeightedAvg extends AggregateFunction[JLong, CountAccumulator] {
     
      override def createAccumulator(): WeightedAvgAccum = {
        new WeightedAvgAccum
      }
     
      override def getValue(acc: WeightedAvgAccum): JLong = {
        if (acc.count == 0) {
            null
        } else {
            acc.sum / acc.count
        }
      }
     
      def accumulate(acc: WeightedAvgAccum, iValue: JLong, iWeight: JInteger): Unit = {
        acc.sum += iValue * iWeight
        acc.count += iWeight
      }
     
      def retract(acc: WeightedAvgAccum, iValue: JLong, iWeight: JInteger): Unit = {
        acc.sum -= iValue * iWeight
        acc.count -= iWeight
      }
     
      def merge(acc: WeightedAvgAccum, it: java.lang.Iterable[WeightedAvgAccum]): Unit = {
        val iter = it.iterator()
        while (iter.hasNext) {
          val a = iter.next()
          acc.count += a.count
          acc.sum += a.sum
        }
      }
     
      def resetAccumulator(acc: WeightedAvgAccum): Unit = {
        acc.count = 0
        acc.sum = 0L
      }
     
      override def getAccumulatorType: TypeInformation[WeightedAvgAccum] = {
        new TupleTypeInfo(classOf[WeightedAvgAccum], Types.LONG, Types.INT)
      }
     
      override def getResultType: TypeInformation[JLong] = Types.LONG
    }
     
    // 注册函数
    val tEnv: StreamTableEnvironment = ???
    tEnv.registerFunction("wAvg", new WeightedAvg())
     
    // 使用函数
    tEnv.sqlQuery("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user")
    
    2021-12-07 20:43:18
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载