开发者社区> 问答> 正文

处理时间窗口不适用于Apache Flink中的有限数据源

我正在尝试将一个非常简单的窗口函数应用于Apache Flink中的有限数据流(本地,没有集群)。这是一个例子:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env
.fromCollection(List("a", "b", "c", "d", "e"))

.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
.trigger(ProcessingTimeTrigger.create)
.process(new ProcessAllWindowFunction[String, String, TimeWindow] {

override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {
  out.collect(elements.toList.sorted.toString())
}

})

.print()

env.execute()
在这里,我尝试在一秒钟内将所有到达窗口的元素分组,然后打印这些组。

我假设所有元素都会在不到一秒的时间内生成并进入一个窗口,因此将会有一个传入元素print()。但是,当我运行它时,根本没有打印任何内容。

如果我删除所有窗口的东西,比如

val env = StreamExecutionEnvironment.getExecutionEnvironment
env
.fromCollection(List("a", "b", "c", "d", "e"))
.print()
我看到运行后打印的元素。我也用文件源尝试了这个,没有区别。

我的机器上的默认并行度是6.如果我试验并行度和延迟级别,就像这样

val env = StreamExecutionEnvironment.createLocalEnvironment(2)
env
.fromCollection(List("a", "b", "c", "d", "e"))
.map { x => Thread.sleep(1500); x }
我能够将一些 - 而不是所有 - 元素分成组,这些组被打印出来。

我的第一个假设是源完成的速度比1秒快得多,并且在窗口的计时器触发之前关闭任务。调试显示已到达定时器设置行ProcessingTimeTrigger。不应该在任务关闭之前完成所有启动的计时器(至少这是我从代码中获得的印象)?

你能帮助我理解这个并使其更具确定性吗?

更新#1,23.09.2018:

我还尝试了事件时间窗口而不是处理时间窗口。如果我这样做:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env
.fromCollection(List("a", "b", "c", "d", "e"))
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[String] {

override def extractAscendingTimestamp(element: String): Long = {
  element.charAt(0).toInt
}

})

.windowAll(TumblingEventTimeWindows.of(Time.seconds(1)))
.trigger(EventTimeTrigger.create)
.process(new ProcessAllWindowFunction[String, String, TimeWindow] {

override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {
  out.collect(elements.toList.toString())
}

})

.print()

env.execute()
然后再没有打印任何东西。调试器显示onElement为每个元素调用触发器,但从onEventTime不调用。

此外,如果我修改时间戳提取器以执行更大的步骤:

element.charAt(0).toInt * 1000
所有元素都打印出来(每组一个元素,这是预期的),除了最后一个。

展开
收起
flink小助手 2018-12-10 11:12:15 2194 0
1 条回答
写回答
取消 提交回答
  • flink小助手会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关flink的问题及回答。

    当有限源到达终点时,如果您正在使用事件时间,则将注入时间戳为Long.MAX_VALUE的水印,这将导致所有事件时间计时器触发。但是,随着处理时间的推移,Flink将等待所有当前触发计时器完成其操作,然后退出。

    如您所料,您没有看到任何输出,因为源完成得非常快。

    事件时间处理确定性行为很简单; 处理时间不是真的可以实现。

    但却是有效
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val s = env.fromCollection(List("a", "b", "c", "d", "e"))
    val t = env.addSource((context: SourceContext[String]) => {
    while(true) {

    Thread.sleep(100)
    context.collect("dummy")

    }
    })

    s.union(t)
    .filter(_ != "dummy")
    .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
    .process(new ProcessAllWindowFunction[String, String, TimeWindow] {

    override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {
      out.collect(elements.toList.sorted.toString())
    }

    })
    .print()

    env.execute()

    2019-07-17 23:19:05
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载

相关镜像