开发者社区> 问答> 正文

Flink 周期性创建watermark,200ms的周期是怎么控制的

各位大佬, 今天看flink 指派Timestamp 和watermark 的源码,发现周期性创建 watermark 确实是周期性的,从打印到控制台的时间可以看到差不多是200毫秒执行一 次, 200毫秒是在哪里控制的,在debug 的调用栈找不到(源码位置)? 周期新创建watermark 方法如下: .assignAscendingTimestamps(element => sdf.parse(element.createTime).getTime)

.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractorEvent 生成Timestamp的方法: TimestampsAndPeriodicWatermarksOperator 类的 : @Override public void processElement(StreamRecord element) throws Exception { final long newTimestamp = userFunction.extractTimestamp(element.getValue(), element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);

output.collect(element.replace(element.getValue(), newTimestamp)); }

生成watermark的方法:

TimestampsAndPeriodicWatermarksOperator 类的 :

@Override public void onProcessingTime(long timestamp) throws Exception { // 从这里可以看到,每200ms 打印一次 System.out.println("timestamp : " + timestamp + ", system.current : " + System.currentTimeMillis()); // register next timer Watermark newWatermark = userFunction.getCurrentWatermark(); if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) { currentWatermark = newWatermark.getTimestamp(); // emit watermark output.emitWatermark(newWatermark); }

long now = getProcessingTimeService().getCurrentProcessingTime(); getProcessingTimeService().registerTimer(now + watermarkInterval, this); } 感谢各位大佬*来自志愿者整理的flink邮件归档

展开
收起
毛毛虫雨 2021-12-08 11:04:13 607 0
1 条回答
写回答
取消 提交回答
  • 基于EventTIme的Watermark间隔默认200ms,可以通过ExecutionConfig的setAutoWatermarkInterval方法进行设置,见StreamExecutionEnvironment:

    /** * Sets the time characteristic for all streams create from this environment, e.g., processing * time, event time, or ingestion time. * *

    If you set the characteristic to IngestionTime of EventTime this will set a default * watermark update interval of 200 ms. If this is not applicable for your application * you should change it using {@link ExecutionConfig#setAutoWatermarkInterval(long)}. * * @param characteristic The time characteristic. */ @PublicEvolving public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) { this.timeCharacteristic = Preconditions.checkNotNull(characteristic); if (characteristic == TimeCharacteristic.ProcessingTime) { getConfig().setAutoWatermarkInterval(0); } else { getConfig().setAutoWatermarkInterval(200); } }*来自志愿者整理的flink邮件归档

    2021-12-08 19:43:03
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载