请教一下 flink stream API 代码里面怎么获取window 的起始时间 和结束时间?
TimeWindow有getStart()和getEnd()方法,参见示例代码:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(PARALLELISM);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
env.getConfig().disableSysoutLogging();
env.addSource(new FailingSource(numKeys,
numElementsPerKey,
numElementsPerKey / 3))
.rebalance()
.timeWindowAll(Time.of(windowSize, MILLISECONDS))
.apply(new RichAllWindowFunction, Tuple4, TimeWindow>() {
private boolean open = false;
@Override
public void open(Configuration parameters) {
assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
open = true;
}
@Override
public void apply(
TimeWindow window,
Iterable> values,
Collector> out) {
// validate that the function has been opened properly
assertTrue(open);
int sum = 0;
long key = -1;
for (Tuple2 value : values) {
sum += value.f1.value;
key = value.f0;
}
out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
}
})
.addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSize)).setParallelism(1);
env.execute('Tumbling Window Test');
TimeWindow有getStart()和getEnd()方法。
赞2
踩0