- 样例
.<Bean>forBoundedOutOfOrderness(Duration.ofMillis(0)) //允许乱序的生成策略 最大时间时间-容错时间
- 源码
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.flink.api.common.eventtime; import org.apache.flink.annotation.Public; import java.time.Duration; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; /** * A WatermarkGenerator for situations where records are out of order, but you can place an upper * bound on how far the events are out of order. An out-of-order bound B means that once an event * with timestamp T was encountered, no events older than {@code T - B} will follow any more. * * <p>The watermarks are generated periodically. The delay introduced by this watermark strategy is * the periodic interval length, plus the out-of-orderness bound. */ @Public public class BoundedOutOfOrdernessWatermarks<T> implements WatermarkGenerator<T> { /** The maximum timestamp encountered so far. */ private long maxTimestamp; /** The maximum out-of-orderness that this watermark generator assumes. */ private final long outOfOrdernessMillis; /** * Creates a new watermark generator with the given out-of-orderness bound. * * @param maxOutOfOrderness The bound for the out-of-orderness of the event timestamps. */ public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) { checkNotNull(maxOutOfOrderness, "maxOutOfOrderness"); checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative"); this.outOfOrdernessMillis = maxOutOfOrderness.toMillis(); // start so that our lowest watermark would be Long.MIN_VALUE. this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1; } // ------------------------------------------------------------------------ @Override public void onEvent(T event, long eventTimestamp, WatermarkOutput output) { maxTimestamp = Math.max(maxTimestamp, eventTimestamp); } @Override public void onPeriodicEmit(WatermarkOutput output) { output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1)); } }
中延迟 - 事件时间等待
- allowedLateness(窗口)
设置允许元素延迟的时间。到达水印后超过指定时间的元素将被丢弃。默认情况下,允许的迟到时间为0L。
设置允许的迟到时间仅对事件时间窗口有效。
- 样例
OutputTag<Bean> WMTag = new OutputTag("w_m_tag", TypeInformation.of(Bean.class)) { }; //窗口版 source.keyBy(Bean::getId) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .allowedLateness(Time.seconds(5)) //窗口最大等待时间 .sideOutputLateData(WMTag) .reduce(new ReduceFunction<Bean>() { @Override public Bean reduce(Bean value1, Bean value2) throws Exception { System.out.println(value2); return value2; } }) ;
大延迟 - 测输出流机制
- sideOutputLateData(窗口)
将迟到数据,输入到测流
此处同测流输出
- 样例
OutputTag<Bean> WMTag = new OutputTag("w_m_tag", TypeInformation.of(Bean.class)) { }; //窗口版 source.keyBy(Bean::getId) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .allowedLateness(Time.seconds(5)) .sideOutputLateData(WMTag) //迟到数据测流输出 .reduce(new ReduceFunction<Bean>() { @Override public Bean reduce(Bean value1, Bean value2) throws Exception { System.out.println(value2); return value2; } }) ;
eventime获取
- withTimestampAssigner
传入上游数据的对象,通过反射,告诉watermark应该从哪个字段获取timestamp
- 样例
此处主要为实现接口
.withTimestampAssigner(new SerializableTimestampAssigner<Bean>() { @Override public long extractTimestamp(Bean element, long recordTimestamp) { return element.getEventTime(); } })//watermark提取策略(从数据中)
问题延申
在上述过程中,我们讲了watermark机制是怎么工作的,也知道了他是怎么解决迟到数据的。但是根据watermark的刷新机制,下游获取到上游所有并发向下广播的watermark后,是对比所有watermark的最小值来做自身的watermark值的。那么如果碰到上游某一并发迟迟没有新数据进入,岂不是会导致自身的watermark一直没有更新,从而导致下游时间窗一直不触发?
idle
- 是的
是的,上述场景是存在的。但flink已经是一个成熟的计算引擎了,他不应该存在这样的漏洞。如果有,那就是我们没用对
- 概念
如果其中一个input stream中一直没有数据出现,WatermarkGenerator就无法生成watermark, 因为watermark取的是多个input stream中的最小值。这是我们称这个input是空闲的。watermarkStrategy提供了withIdleness方法处理这种情况。
其实就是当某个分区的窗口触发条件达到,并且其他的分区没有数据的情况下持续我们约定好的空闲时间,那么窗口会触发计算。如果一直有数据但是无法达到触发条件的话,窗口并不会触发计算。
- 样例
.withIdleness(Duration.ofSeconds(5))
- 源码
public class WatermarksWithIdleness<T> implements WatermarkGenerator<T> { private final WatermarkGenerator<T> watermarks; private final IdlenessTimer idlenessTimer; /** 创建一个新的WatermarksWithIdleness生成器,用于在给定超时的情况下检测给定的生成器空闲状态。 参数: watermarks–原始水印生成器。 idleTimeout–空闲检测的超时。 */ public WatermarksWithIdleness(WatermarkGenerator<T> watermarks, Duration idleTimeout) { this(watermarks, idleTimeout, SystemClock.getInstance()); } @VisibleForTesting WatermarksWithIdleness(WatermarkGenerator<T> watermarks, Duration idleTimeout, Clock clock) { checkNotNull(idleTimeout, "idleTimeout"); checkArgument( !(idleTimeout.isZero() || idleTimeout.isNegative()), "idleTimeout must be greater than zero"); this.watermarks = checkNotNull(watermarks, "watermarks"); this.idlenessTimer = new IdlenessTimer(clock, idleTimeout); } @Override public void onEvent(T event, long eventTimestamp, WatermarkOutput output) { watermarks.onEvent(event, eventTimestamp, output); idlenessTimer.activity(); } @Override public void onPeriodicEmit(WatermarkOutput output) { if (idlenessTimer.checkIfIdle()) { output.markIdle(); } else { watermarks.onPeriodicEmit(output); } } }
FlinkSQL
- flink sql中的watermark定义方式
//建表 tenv.executeSql( "CREATE TABLE t_kafka(" +" shopId string, " +" shopName string," +" eventTime bigint, "//flink-sql中,watermark的数据来源字段需要是timestamp(3)类型 +" rt as to_timestamp_ltz(eventTime,3), "//所以我们想要使用他,必须要从数据中格式化时间 +" orderId string, " +" orderAmount bigint, " +" WATERMARK FOR rt AS rt - INTERVAL '3' SECOND" //watermark_Field-interval ‘int’ date_tpye为boundoutoforderness机制 // +" et timestamp(3) metadata from 'rowtime'"//如果你的时间字段,在kafka的key中,需要以元数据的形式进行提取,具体方式参考官网样例 +") WITH ( " +" 'connector' = 'kafka', " +" 'topic' = 'test', " +" 'properties.bootstrap.servers' = '192.168.110.128:9092', " +" 'properties.group.id' = 'testGroup', " +" 'scan.startup.mode' = 'earliest-offset', " +" 'format' = 'json', " +" 'json.fail-on-missing-field' = 'false', " +" 'json.ignore-parse-errors' = 'true' " +") " );
- 通过查看表结构和打印数据来查看表中watermark的情况
//打印表结构 tenv.executeSql("desc t_kafka").print(); //查看在flinksql中的watermark表现效果 env.executeSql("select eventTime,CURRENT_WATERMARK(rt) as wmTime from t_kafka").print();
- 表转流与流转表
//演示二,表转流后的watermark表现效果--可以继承 //1.表转流时,参数需要传入表而不是sql或者表名,所以需要先提取出表 Table t_kafka = tenv.from("t_kafka"); DataStream<Row> rowDataStream = tenv.toDataStream(t_kafka); //apply-only 流 // tenv.toChangelogStream(t_kafka);//回撤流 +u -u -d SingleOutputStreamOperator<Row> process = rowDataStream.process(new ProcessFunction<Row, Row>() { @Override public void processElement(Row value, ProcessFunction<Row, Row>.Context ctx, Collector<Row> out) throws Exception { long l = ctx.timerService().currentWatermark(); // System.out.println("表转流时的watermark:" + l); out.collect(value); } }); // process.print(); Table table = tenv.fromDataStream(process); //当由流转表时,此表不会从上层继承任何元数据信息,包括watermark,需要重新定义 table.printSchema();