开发者社区> 问答> 正文

Flink窗口:聚合和输出到接收器

我们有一个数据流,其中每个元素都是这种类型:

id: String
type: Type
amount: Integer
我们希望聚合此流并输出amount每周一次的总和。

当前解决方案

示例flink管道将如下所示:

stream.keyBy(type)

  .window(TumblingProcessingTimeWindows.of(Time.days(7)))
  .reduce(sumAmount())
  .addSink(someOutput())

用于输入

| id | type | amount |
| 1 | CAT | 10 |
| 2 | DOG | 20 |
| 3 | CAT | 5 |
| 4 | DOG | 15 |
| 5 | DOG | 50 |
如果窗口在记录3和4我们的输出之间结束将是:

| TYPE | sumAmount |
| CAT | 15 | (id 1 and id 3 added together)
| DOG | 20 | (only id 2 as been 'summed')
标识4和5仍然是弗林克管道内,下周将被输出。

因此,下周我们的总产量将是:

| TYPE | sumAmount |
| CAT | 15 | (of last week)
| DOG | 20 | (of last week)
| DOG | 65 | (id 4 and id 5 added together)
新要求:

我们现在还想知道每条记录在哪一周处理了每条记录。换句话说,我们的新输出应该是:

| TYPE | sumAmount | weekNumber |
| CAT | 15 | 1 |
| DOG | 20 | 1 |
| DOG | 65 | 2 |
但我们还想要一个像这样的额外输出:

| id | weekNumber |
| 1 | 1 |
| 2 | 1 |
| 3 | 1 |
| 4 | 2 |
| 5 | 2 |
怎么办呢?

flink有没有办法实现这个目标?我会想象我们会有一个汇总函数的汇总函数,但也会输出每个记录和当前周数,但我没有找到在文档中执行此操作的方法。

(注意:我们每周处理大约1亿条记录,所以理想情况下我们只希望在一周内将聚合保持在flink状态,而不是所有单个记录)

编辑:

我去了Anton描述的解决方案:

DataStream elements =
stream.keyBy(type)

    .process(myKeyedProcessFunction());

elements.addSink(outputElements());
elements.getSideOutput(outputTag)

    .addSink(outputAggregates())

而KeyedProcessFunction看起来像:

class MyKeyedProcessFunction extends KeyedProcessFunction

private ValueState<ZonedDateTime> state;
private ValueState<Integer> sum;

public void processElement(Element e, Context c, Collector<Element> out) {
    if (state.value() == null) {
        state.update(ZonedDateTime.now());
        sum.update(0);
        c.timerService().registerProcessingTimeTimer(nowPlus7Days);
    }
    element.addAggregationId(state.value());
    sum.update(sum.value() + element.getAmount());
}

public void onTimer(long timestamp, OnTimerContext c, Collector<Element> out) {
    state.update(null);
    c.output(outputTag, sum.value()); 
}

}

展开
收起
社区小助手 2018-12-11 16:19:31 4345 0
1 条回答
写回答
取消 提交回答
  • 社区小助手是spark中国社区的管理员,我会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关spark的问题及回答。

    reduce方法有一种变体,它将ProcessWindowFunction作为第二个参数。你会像这样使用它:

    stream.keyBy(type)
    .window(TumblingProcessingTimeWindows.of(Time.days(7)))
    .reduce(sumAmount(), new WrapWithWeek())
    .addSink(someOutput())

    private static class WrapWithWeek
    extends ProcessWindowFunction, Type, TimeWindow> {

      public void process(Type key,
                Context context,
                Iterable<Event> reducedEvents,
                Collector<Tuple3<Type, Long, Long>> out) {
          Long sum = reducedEvents.iterator().next();
          out.collect(new Tuple3<Type, Long, Long>(key, context.window.getStart(), sum));
      }

    }
    通常,ProcessWindowFunction传递一个Iterable来保存窗口收集的所有事件,但是如果使用reduce或aggregate函数预聚合窗口结果,那么只有那个值传递给Iterable。这里的文档在这里,https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/windows.html#incremental-window-aggregation-with-reducefunction但文档中的示例目前有一个小错误,我已在我的示例中修复了这里。

    但鉴于对第二个输出的新要求,我建议您放弃使用Windows执行此操作的想法,而是使用带键的ProcessFunction。你需要两个每个键值的ValueState:一个按周计算,另一个用于存储总和。您需要一个每周触发一次的计时器:当它触发时,它应该发出类型,总和和周数,然后增加周数。同时,process元素方法将简单地输出每个传入事件的ID以及周计数器的值。

    2019-07-17 23:19:51
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载