开发者社区> 问答> 正文

如何在Scala的Flink中定义KeySelector?

"我有一个流媒体流程基本上是这样的

Stream(Int, Boolean, Int).Keyby(0, 1).Window().process()
关键是我想要定义一个组合键然后处理它。但是,如果我使用keyby(0, 1)和process(... Key: (Int, Boolean), ...),进程中的键类型总是提示错误。我试图定义keyby(_._1, _._2),但不正确。

所以,无论如何使用scala定义组合键,以便我可以推断出类似于(Int, Boolean)以下过程函数的键类型?
"

展开
收起
flink小助手 2018-11-28 16:29:58 4573 0
1 条回答
写回答
取消 提交回答
  • flink小助手会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关flink的问题及回答。

    "问题是,input.keyBy(0, 1).timeWindow(Time.days(1))创建一个KeyedStream[(Int, Boolean, Int), Tuple]地方Tuple是flink的元组类。这也是process函数关键参数的类型。为了访问的领域Tuple,你需要调用tuple.[T]getField(idx)与T作为字段的类型。

    如果你想让Scala元组作为ProcessWindowFunction你需要定义一个的关键KeySelector。以下代码片段可以解决问题:

    input
    .keyBy(a => (a._1, a._2))
    .timeWindow(Time.days(1))
    .process(new ProcessWindowFunction[(Int, Boolean, Int), Int, (Int, Boolean), TimeWindow] {

    override def process(key: (Int, Boolean), context: Context, elements: Iterable[(Int, Boolean, Int)], out: Collector[Int]): Unit = {
      out.collect(key._1)
    }

    })"

    2019-07-17 23:16:52
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink峰会 - 徐榜江 立即下载
Just Enough Scala for Spark 立即下载
JDK8新特性与生产-for“华东地区scala爱好者聚会” 立即下载