【建议收藏】Flink watermark分析实战(下)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 【建议收藏】Flink watermark分析实战
  • 样例
.<Bean>forBoundedOutOfOrderness(Duration.ofMillis(0))  //允许乱序的生成策略   最大时间时间-容错时间
  • 源码
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.flink.api.common.eventtime;
import org.apache.flink.annotation.Public;
import java.time.Duration;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
 * A WatermarkGenerator for situations where records are out of order, but you can place an upper
 * bound on how far the events are out of order. An out-of-order bound B means that once an event
 * with timestamp T was encountered, no events older than {@code T - B} will follow any more.
 *
 * <p>The watermarks are generated periodically. The delay introduced by this watermark strategy is
 * the periodic interval length, plus the out-of-orderness bound.
 */
@Public
public class BoundedOutOfOrdernessWatermarks<T> implements WatermarkGenerator<T> {
    /** The maximum timestamp encountered so far. */
    private long maxTimestamp;
    /** The maximum out-of-orderness that this watermark generator assumes. */
    private final long outOfOrdernessMillis;
    /**
     * Creates a new watermark generator with the given out-of-orderness bound.
     *
     * @param maxOutOfOrderness The bound for the out-of-orderness of the event timestamps.
     */
    public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) {
        checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
        checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");
        this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();
        // start so that our lowest watermark would be Long.MIN_VALUE.
        this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
    }
    // ------------------------------------------------------------------------
    @Override
    public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
        maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
    }
    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
    }
}

中延迟 - 事件时间等待


  • allowedLateness(窗口)

设置允许元素延迟的时间。到达水印后超过指定时间的元素将被丢弃。默认情况下,允许的迟到时间为0L。

设置允许的迟到时间仅对事件时间窗口有效。

  • 样例
OutputTag<Bean> WMTag = new OutputTag("w_m_tag", TypeInformation.of(Bean.class)) {
        };
        //窗口版
         source.keyBy(Bean::getId)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .allowedLateness(Time.seconds(5))  //窗口最大等待时间
                .sideOutputLateData(WMTag)
                .reduce(new ReduceFunction<Bean>() {
                    @Override
                    public Bean reduce(Bean value1, Bean value2) throws Exception {
                        System.out.println(value2);
                        return value2;
                    }
                })
                ;

大延迟 - 测输出流机制

  • sideOutputLateData(窗口)

将迟到数据,输入到测流

此处同测流输出

  • 样例
OutputTag<Bean> WMTag = new OutputTag("w_m_tag", TypeInformation.of(Bean.class)) {
        };
        //窗口版
         source.keyBy(Bean::getId)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .allowedLateness(Time.seconds(5))
                .sideOutputLateData(WMTag)  //迟到数据测流输出
                .reduce(new ReduceFunction<Bean>() {
                    @Override
                    public Bean reduce(Bean value1, Bean value2) throws Exception {
                        System.out.println(value2);
                        return value2;
                    }
                })
                ;

eventime获取

  • withTimestampAssigner

传入上游数据的对象,通过反射,告诉watermark应该从哪个字段获取timestamp

  • 样例

此处主要为实现接口

.withTimestampAssigner(new SerializableTimestampAssigner<Bean>() {
    @Override
    public long extractTimestamp(Bean element, long recordTimestamp) {
        return element.getEventTime();
    }
})//watermark提取策略(从数据中)

问题延申

在上述过程中,我们讲了watermark机制是怎么工作的,也知道了他是怎么解决迟到数据的。但是根据watermark的刷新机制,下游获取到上游所有并发向下广播的watermark后,是对比所有watermark的最小值来做自身的watermark值的。那么如果碰到上游某一并发迟迟没有新数据进入,岂不是会导致自身的watermark一直没有更新,从而导致下游时间窗一直不触发?


idle

  • 是的

是的,上述场景是存在的。但flink已经是一个成熟的计算引擎了,他不应该存在这样的漏洞。如果有,那就是我们没用对

  • 概念

如果其中一个input stream中一直没有数据出现,WatermarkGenerator就无法生成watermark, 因为watermark取的是多个input stream中的最小值。这是我们称这个input是空闲的。watermarkStrategy提供了withIdleness方法处理这种情况。

其实就是当某个分区的窗口触发条件达到,并且其他的分区没有数据的情况下持续我们约定好的空闲时间,那么窗口会触发计算。如果一直有数据但是无法达到触发条件的话,窗口并不会触发计算。

  • 样例
.withIdleness(Duration.ofSeconds(5))
  • 源码
public class WatermarksWithIdleness<T> implements WatermarkGenerator<T> {
    private final WatermarkGenerator<T> watermarks;
    private final IdlenessTimer idlenessTimer;
    /**
        创建一个新的WatermarksWithIdleness生成器,用于在给定超时的情况下检测给定的生成器空闲状态。
        参数:
        watermarks–原始水印生成器。
        idleTimeout–空闲检测的超时。
     */
    public WatermarksWithIdleness(WatermarkGenerator<T> watermarks, Duration idleTimeout) {
        this(watermarks, idleTimeout, SystemClock.getInstance());
    }
    @VisibleForTesting
    WatermarksWithIdleness(WatermarkGenerator<T> watermarks, Duration idleTimeout, Clock clock) {
        checkNotNull(idleTimeout, "idleTimeout");
        checkArgument(
                !(idleTimeout.isZero() || idleTimeout.isNegative()),
                "idleTimeout must be greater than zero");
        this.watermarks = checkNotNull(watermarks, "watermarks");
        this.idlenessTimer = new IdlenessTimer(clock, idleTimeout);
    }
    @Override
    public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
        watermarks.onEvent(event, eventTimestamp, output);
        idlenessTimer.activity();
    }
        @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        if (idlenessTimer.checkIfIdle()) {
            output.markIdle();
        } else {
            watermarks.onPeriodicEmit(output);
        }
    }
}

FlinkSQL

  • flink sql中的watermark定义方式
//建表
        tenv.executeSql( "CREATE TABLE t_kafka("
                +"    shopId string, "
                +"    shopName string,"
                +"    eventTime bigint, "//flink-sql中,watermark的数据来源字段需要是timestamp(3)类型
                +"    rt as to_timestamp_ltz(eventTime,3), "//所以我们想要使用他,必须要从数据中格式化时间
                +"    orderId string, "
                +"    orderAmount bigint, "
                +"    WATERMARK FOR rt AS rt - INTERVAL '3' SECOND" //watermark_Field-interval ‘int’ date_tpye为boundoutoforderness机制
//              +"    et timestamp(3) metadata from 'rowtime'"//如果你的时间字段,在kafka的key中,需要以元数据的形式进行提取,具体方式参考官网样例
                +") WITH (                                                   "
                +"  'connector' = 'kafka',                                   "
                +"  'topic' = 'test',                                        "
                +"  'properties.bootstrap.servers' = '192.168.110.128:9092',     "
                +"  'properties.group.id' = 'testGroup',                     "
                +"  'scan.startup.mode' = 'earliest-offset',                 "
                +"  'format' = 'json',                                       "
                +"  'json.fail-on-missing-field' = 'false',                  "
                +"  'json.ignore-parse-errors' = 'true'                      "
                +")                                                          "
        );
  • 通过查看表结构和打印数据来查看表中watermark的情况
//打印表结构
tenv.executeSql("desc  t_kafka").print();
//查看在flinksql中的watermark表现效果
env.executeSql("select eventTime,CURRENT_WATERMARK(rt) as wmTime from t_kafka").print();
  • 表转流与流转表
//演示二,表转流后的watermark表现效果--可以继承
        //1.表转流时,参数需要传入表而不是sql或者表名,所以需要先提取出表
        Table t_kafka = tenv.from("t_kafka");
        DataStream<Row> rowDataStream = tenv.toDataStream(t_kafka); //apply-only 流
//        tenv.toChangelogStream(t_kafka);//回撤流 +u -u -d
        SingleOutputStreamOperator<Row> process = rowDataStream.process(new ProcessFunction<Row, Row>() {
            @Override
            public void processElement(Row value, ProcessFunction<Row, Row>.Context ctx, Collector<Row> out) throws Exception {
                long l = ctx.timerService().currentWatermark();
//                System.out.println("表转流时的watermark:" + l);
                out.collect(value);
            }
        });
//        process.print();
        Table table = tenv.fromDataStream(process);  //当由流转表时,此表不会从上层继承任何元数据信息,包括watermark,需要重新定义
        table.printSchema();


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
21天前
|
自然语言处理 监控 数据挖掘
【Flink】Flink中的窗口分析
【4月更文挑战第19天】【Flink】Flink中的窗口分析
|
7月前
|
分布式计算 Hadoop 大数据
大数据Hadoop之——Apache Hudi 数据湖实战操作(Spark,Flink与Hudi整合)
大数据Hadoop之——Apache Hudi 数据湖实战操作(Spark,Flink与Hudi整合)
|
17天前
|
传感器 存储 缓存
[尚硅谷flink学习笔记] 实战案例TopN 问题
这段内容是关于如何使用Apache Flink解决实时统计水位传感器数据中,在一定时间窗口内出现次数最多的水位问题,即&quot;Top N&quot;问题。首先,介绍了一个使用滑动窗口的简单实现,通过收集传感器数据,按照水位计数,然后排序并输出前两名。接着,提出了全窗口和优化方案,其中优化包括按键分区(按水位vc分组)、开窗操作(增量聚合计算count)和过程函数处理(聚合并排序输出Top N结果)。最后,给出了一个使用`KeyedProcessFunction`进行优化的示例代码,通过按键by窗口结束时间,确保每个窗口的所有数据到达后再进行处理,提高了效率。
|
2月前
|
SQL 缓存 分布式计算
flink1.18 SqlGateway 的使用和原理分析
# 了解flink1.18 sqlGateway 的安装和使用步骤 # 启动sqlgateway 流程,了解核心的结构 # sql提交流程,了解sql 的流转逻辑 # select 查询的ResultSet的对接流程,了解数据的返回和获取逻辑
|
3月前
|
消息中间件 Kafka API
【极数系列】ClassNotFoundException: org.apache.flink.connector.base.source.reader.RecordEmitter & 详细分析解决
【极数系列】ClassNotFoundException: org.apache.flink.connector.base.source.reader.RecordEmitter & 详细分析解决
|
4月前
|
存储 NoSQL MongoDB
阿里云 Flink 原理分析与应用:深入探索 MongoDB Schema Inference
本文整理自阿里云 Flink 团队归源老师关于阿里云 Flink 原理分析与应用:深入探索 MongoDB Schema Inference 的研究。
46950 2
阿里云 Flink 原理分析与应用:深入探索 MongoDB Schema Inference
|
5月前
|
消息中间件 监控 Java
一次线上Flink 背压情况分析之重新认识java dump 文件
一次线上Flink 背压情况分析之重新认识java dump 文件
77 0
|
5月前
|
SQL 消息中间件 分布式数据库
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(三)离线分析
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(三)离线分析
67 0
|
5月前
|
消息中间件 大数据 Kafka
Flink史上最简单双十一实时分析案例
Flink史上最简单双十一实时分析案例
40 0
|
6月前
|
资源调度 Java 调度
Flink教程(05)- Flink原理简单分析
Flink教程(05)- Flink原理简单分析
69 0