开发者社区> 问答> 正文

使用Flink在键控窗口中获取计数

我通过Scala接口使用Flink进行一些数据处理。我有一些元组中的用户数据:

(user1, "titanic")
(user1, "titanic")
(user1, "batman")
(user2, "star wars")
(user2, "star wars")
(user2, "batman")
我想要由用户键入,创建一个窗口,然后计算用户在该窗口中查看特定电影的次数,这样我最终得到每个电影的map到每个用户的视图数量。例如,对于user1,正确的输出是Map("titanic" -> 2, "batman" -> 1)。我知道我的代码的第一部分应该是这样的:

keyedStream.keyBy(0).window(EventTimeSessionWindows.withGap(Time.minutes(10)))

但我不知道如何在窗口内进行进一步聚合,以便最终得到每个用户/窗口的视图计数。我试图编写自己的AggregateFunction,将这些计数收集到一个可变的Map中,但不幸的是,一个可变的Map不可序列化,所以它失败了。

我怎么能这样做?我通过Scala接口使用Flink进行一些数据处理。我有一些元组中的用户数据:

(user1, "titanic")
(user1, "titanic")
(user1, "batman")
(user2, "star wars")
(user2, "star wars")
(user2, "batman")
我想要由用户键入,创建一个窗口,然后计算用户在该窗口中查看特定电影的次数,这样我最终得到每个电影的map到每个用户的视图数量。例如,对于user1,正确的输出是Map("titanic" -> 2, "batman" -> 1)。我知道我的代码的第一部分应该是这样的:

keyedStream.keyBy(0).window(EventTimeSessionWindows.withGap(Time.minutes(10)))

但我不知道如何在窗口内进行进一步聚合,以便最终得到每个用户/窗口的视图计数。我试图编写自己的AggregateFunction,将这些计数收集到一个可变的Map中,但不幸的是,一个可变的Map不可序列化,所以它失败了。

我怎么能这样做?

展开
收起
flink小助手 2018-12-10 10:22:33 2432 0
1 条回答
写回答
取消 提交回答
  • flink小助手会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关flink的问题及回答。

    通过使用以下方法解决问题AggregateFunction:

    source
    .keyBy(0)
    .timeWindow(Time.seconds(10L))
    .aggregate(new AggregateFunction[(String, String), (String, Map[String, Int]), (String, Map[String, Int])] {

    override def createAccumulator(): (String, Map[String, Int]) = ("", Map())
    
    override def add(value: (String, String), accumulator: (String, Map[String, Int])): (String, Map[String, Int]) = {
      val counter = accumulator._2.getOrElse(value._2, 0)
      (value._1, accumulator._2 + (value._2 -> (counter + 1)))
    }
    
    override def getResult(accumulator: (String, Map[String, Int])): (String, Map[String, Int]) = accumulator
    
    override def merge(a: (String, Map[String, Int]), b: (String, Map[String, Int])): (String, Map[String, Int]) = {
      (a._1, (a._2.keySet ++ b._2.keySet) map (k => k -> (a._2.getOrElse(k, 0) + b._2.getOrElse(k, 0))) toMap)
    }

    })

    2019-07-17 23:19:03
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载