开发者社区> 问答> 正文

请教一下 flink stream API 代码里面怎么获取window 的起始时间 和结束时间?

请教一下 flink stream API 代码里面怎么获取window 的起始时间 和结束时间?

展开
收起
flink小助手 2018-11-22 11:00:52 3788 0
1 条回答
写回答
取消 提交回答
  • TimeWindow有getStart()和getEnd()方法,参见示例代码:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(PARALLELISM);
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    env.enableCheckpointing(100);
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
    env.getConfig().disableSysoutLogging();
    
    env.addSource(new FailingSource(numKeys,
                    numElementsPerKey,
                    numElementsPerKey / 3))
            .rebalance()
            .timeWindowAll(Time.of(windowSize, MILLISECONDS))
            .apply(new RichAllWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, TimeWindow>() {
    
                private boolean open = false;
    
                @Override
                public void open(Configuration parameters) {
                    assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
                    open = true;
                }
    
                @Override
                public void apply(
                        TimeWindow window,
                        Iterable<Tuple2<Long, IntType>> values,
                        Collector<Tuple4<Long, Long, Long, IntType>> out) {
    
                    // validate that the function has been opened properly
                    assertTrue(open);
    
                    int sum = 0;
                    long key = -1;
    
                    for (Tuple2<Long, IntType> value : values) {
                        sum += value.f1.value;
                        key = value.f0;
                    }
                    out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
                }
            })
            .addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSize)).setParallelism(1);
    
    env.execute("Tumbling Window Test");

    TimeWindow有getStart()和getEnd()方法。

    2019-07-17 23:15:53
    赞同 2 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Spring Boot2.0实战Redis分布式缓存 立即下载
CUDA MATH API 立即下载
API PLAYBOOK 立即下载