开发者社区> 问答> 正文

Flink WindowFunction折叠

我创建了一个滑动窗口,并希望递归打包所有元素进入该窗口期间,这是代码的一部分

.map(x => ((x.pickup.get.latitude, x.pickup.get.longitude), (x.dropoff.get.latitude, x.dropoff.get.longitude)))

    .windowAll(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(1)))
    .fold(List[((Double, Double), (Double, Double))]) {(acc, v) => acc :+ ((v._1._1, v._1._2), (v._2._1, v._2._2))}

我希望创建一个List元素所在的元素tuple,但这不起作用。

我试过这个并且它有效:

val l2 : List[((Int, Int), (Int, Int))] = List(((1, 1), (2, 2)))
val newl2 = l2 :+ ((3, 3), (4, 4))
我怎样才能做到这一点?

展开
收起
社区小助手 2018-12-11 15:52:51 2127 0
1 条回答
写回答
取消 提交回答
  • 社区小助手是spark中国社区的管理员,我会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关spark的问题及回答。

    fold函数的第一个参数需要是初始值而不是类型。将最后一行更改为:

    .fold(List.empty[((Long, Long), (Long, Long))]) {(acc, v) => acc :+ ((v._1._1, v._1._2), (v._2._1, v._2._2))}

    2019-07-17 23:19:48
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载