【建议收藏】Flink watermark分析实战(下)

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: 【建议收藏】Flink watermark分析实战
  • 样例
.<Bean>forBoundedOutOfOrderness(Duration.ofMillis(0))  //允许乱序的生成策略   最大时间时间-容错时间
  • 源码
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.flink.api.common.eventtime;
import org.apache.flink.annotation.Public;
import java.time.Duration;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
 * A WatermarkGenerator for situations where records are out of order, but you can place an upper
 * bound on how far the events are out of order. An out-of-order bound B means that once an event
 * with timestamp T was encountered, no events older than {@code T - B} will follow any more.
 *
 * <p>The watermarks are generated periodically. The delay introduced by this watermark strategy is
 * the periodic interval length, plus the out-of-orderness bound.
 */
@Public
public class BoundedOutOfOrdernessWatermarks<T> implements WatermarkGenerator<T> {
    /** The maximum timestamp encountered so far. */
    private long maxTimestamp;
    /** The maximum out-of-orderness that this watermark generator assumes. */
    private final long outOfOrdernessMillis;
    /**
     * Creates a new watermark generator with the given out-of-orderness bound.
     *
     * @param maxOutOfOrderness The bound for the out-of-orderness of the event timestamps.
     */
    public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) {
        checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
        checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");
        this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();
        // start so that our lowest watermark would be Long.MIN_VALUE.
        this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
    }
    // ------------------------------------------------------------------------
    @Override
    public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
        maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
    }
    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
    }
}

中延迟 - 事件时间等待


  • allowedLateness(窗口)

设置允许元素延迟的时间。到达水印后超过指定时间的元素将被丢弃。默认情况下,允许的迟到时间为0L。

设置允许的迟到时间仅对事件时间窗口有效。

  • 样例
OutputTag<Bean> WMTag = new OutputTag("w_m_tag", TypeInformation.of(Bean.class)) {
        };
        //窗口版
         source.keyBy(Bean::getId)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .allowedLateness(Time.seconds(5))  //窗口最大等待时间
                .sideOutputLateData(WMTag)
                .reduce(new ReduceFunction<Bean>() {
                    @Override
                    public Bean reduce(Bean value1, Bean value2) throws Exception {
                        System.out.println(value2);
                        return value2;
                    }
                })
                ;

大延迟 - 测输出流机制

  • sideOutputLateData(窗口)

将迟到数据,输入到测流

此处同测流输出

  • 样例
OutputTag<Bean> WMTag = new OutputTag("w_m_tag", TypeInformation.of(Bean.class)) {
        };
        //窗口版
         source.keyBy(Bean::getId)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .allowedLateness(Time.seconds(5))
                .sideOutputLateData(WMTag)  //迟到数据测流输出
                .reduce(new ReduceFunction<Bean>() {
                    @Override
                    public Bean reduce(Bean value1, Bean value2) throws Exception {
                        System.out.println(value2);
                        return value2;
                    }
                })
                ;

eventime获取

  • withTimestampAssigner

传入上游数据的对象,通过反射,告诉watermark应该从哪个字段获取timestamp

  • 样例

此处主要为实现接口

.withTimestampAssigner(new SerializableTimestampAssigner<Bean>() {
    @Override
    public long extractTimestamp(Bean element, long recordTimestamp) {
        return element.getEventTime();
    }
})//watermark提取策略(从数据中)

问题延申

在上述过程中,我们讲了watermark机制是怎么工作的,也知道了他是怎么解决迟到数据的。但是根据watermark的刷新机制,下游获取到上游所有并发向下广播的watermark后,是对比所有watermark的最小值来做自身的watermark值的。那么如果碰到上游某一并发迟迟没有新数据进入,岂不是会导致自身的watermark一直没有更新,从而导致下游时间窗一直不触发?


idle

  • 是的

是的,上述场景是存在的。但flink已经是一个成熟的计算引擎了,他不应该存在这样的漏洞。如果有,那就是我们没用对

  • 概念

如果其中一个input stream中一直没有数据出现,WatermarkGenerator就无法生成watermark, 因为watermark取的是多个input stream中的最小值。这是我们称这个input是空闲的。watermarkStrategy提供了withIdleness方法处理这种情况。

其实就是当某个分区的窗口触发条件达到,并且其他的分区没有数据的情况下持续我们约定好的空闲时间,那么窗口会触发计算。如果一直有数据但是无法达到触发条件的话,窗口并不会触发计算。

  • 样例
.withIdleness(Duration.ofSeconds(5))
  • 源码
public class WatermarksWithIdleness<T> implements WatermarkGenerator<T> {
    private final WatermarkGenerator<T> watermarks;
    private final IdlenessTimer idlenessTimer;
    /**
        创建一个新的WatermarksWithIdleness生成器,用于在给定超时的情况下检测给定的生成器空闲状态。
        参数:
        watermarks–原始水印生成器。
        idleTimeout–空闲检测的超时。
     */
    public WatermarksWithIdleness(WatermarkGenerator<T> watermarks, Duration idleTimeout) {
        this(watermarks, idleTimeout, SystemClock.getInstance());
    }
    @VisibleForTesting
    WatermarksWithIdleness(WatermarkGenerator<T> watermarks, Duration idleTimeout, Clock clock) {
        checkNotNull(idleTimeout, "idleTimeout");
        checkArgument(
                !(idleTimeout.isZero() || idleTimeout.isNegative()),
                "idleTimeout must be greater than zero");
        this.watermarks = checkNotNull(watermarks, "watermarks");
        this.idlenessTimer = new IdlenessTimer(clock, idleTimeout);
    }
    @Override
    public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
        watermarks.onEvent(event, eventTimestamp, output);
        idlenessTimer.activity();
    }
        @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        if (idlenessTimer.checkIfIdle()) {
            output.markIdle();
        } else {
            watermarks.onPeriodicEmit(output);
        }
    }
}

FlinkSQL

  • flink sql中的watermark定义方式
//建表
        tenv.executeSql( "CREATE TABLE t_kafka("
                +"    shopId string, "
                +"    shopName string,"
                +"    eventTime bigint, "//flink-sql中,watermark的数据来源字段需要是timestamp(3)类型
                +"    rt as to_timestamp_ltz(eventTime,3), "//所以我们想要使用他,必须要从数据中格式化时间
                +"    orderId string, "
                +"    orderAmount bigint, "
                +"    WATERMARK FOR rt AS rt - INTERVAL '3' SECOND" //watermark_Field-interval ‘int’ date_tpye为boundoutoforderness机制
//              +"    et timestamp(3) metadata from 'rowtime'"//如果你的时间字段,在kafka的key中,需要以元数据的形式进行提取,具体方式参考官网样例
                +") WITH (                                                   "
                +"  'connector' = 'kafka',                                   "
                +"  'topic' = 'test',                                        "
                +"  'properties.bootstrap.servers' = '192.168.110.128:9092',     "
                +"  'properties.group.id' = 'testGroup',                     "
                +"  'scan.startup.mode' = 'earliest-offset',                 "
                +"  'format' = 'json',                                       "
                +"  'json.fail-on-missing-field' = 'false',                  "
                +"  'json.ignore-parse-errors' = 'true'                      "
                +")                                                          "
        );
  • 通过查看表结构和打印数据来查看表中watermark的情况
//打印表结构
tenv.executeSql("desc  t_kafka").print();
//查看在flinksql中的watermark表现效果
env.executeSql("select eventTime,CURRENT_WATERMARK(rt) as wmTime from t_kafka").print();
  • 表转流与流转表
//演示二,表转流后的watermark表现效果--可以继承
        //1.表转流时,参数需要传入表而不是sql或者表名,所以需要先提取出表
        Table t_kafka = tenv.from("t_kafka");
        DataStream<Row> rowDataStream = tenv.toDataStream(t_kafka); //apply-only 流
//        tenv.toChangelogStream(t_kafka);//回撤流 +u -u -d
        SingleOutputStreamOperator<Row> process = rowDataStream.process(new ProcessFunction<Row, Row>() {
            @Override
            public void processElement(Row value, ProcessFunction<Row, Row>.Context ctx, Collector<Row> out) throws Exception {
                long l = ctx.timerService().currentWatermark();
//                System.out.println("表转流时的watermark:" + l);
                out.collect(value);
            }
        });
//        process.print();
        Table table = tenv.fromDataStream(process);  //当由流转表时,此表不会从上层继承任何元数据信息,包括watermark,需要重新定义
        table.printSchema();


相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
4月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
在数字化转型中,企业亟需从海量数据中快速提取价值并转化为业务增长动力。5月15日19:00-21:00,阿里云三位技术专家将讲解Kafka与Flink的强强联合方案,帮助企业零门槛构建分布式实时分析平台。此组合广泛应用于实时风控、用户行为追踪等场景,具备高吞吐、弹性扩缩容及亚秒级响应优势。直播适合初学者、开发者和数据工程师,参与还有机会领取定制好礼!扫描海报二维码或点击链接预约直播:[https://developer.aliyun.com/live/255088](https://developer.aliyun.com/live/255088)
328 35
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
|
4月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
164 11
|
10月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
670 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
11月前
|
SQL 消息中间件 分布式计算
大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析
大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析
213 5
|
11月前
|
消息中间件 分布式计算 大数据
大数据-121 - Flink Time Watermark 详解 附带示例详解
大数据-121 - Flink Time Watermark 详解 附带示例详解
190 0
|
6月前
|
消息中间件 JSON 数据库
探索Flink动态CEP:杭州银行的实战案例
探索Flink动态CEP:杭州银行的实战案例
203 5
|
9月前
|
消息中间件 JSON 数据库
探索Flink动态CEP:杭州银行的实战案例
本文由杭州银行大数据工程师唐占峰、欧阳武林撰写,介绍Flink动态CEP的定义、应用场景、技术实现及使用方式。Flink动态CEP是基于Flink的复杂事件处理库,支持在不重启服务的情况下动态更新规则,适应快速变化的业务需求。文章详细阐述了其在反洗钱、反欺诈和实时营销等金融领域的应用,并展示了某金融机构的实际应用案例。通过动态CEP,用户可以实时调整规则,提高系统的灵活性和响应速度,降低维护成本。文中还提供了具体的代码示例和技术细节,帮助读者理解和使用Flink动态CEP。
1091 2
探索Flink动态CEP:杭州银行的实战案例
|
10月前
|
SQL 流计算 关系型数据库
基于OpenLake的Flink+Paimon+EMR StarRocks流式湖仓分析
阿里云OpenLake解决方案建立在开放可控的OpenLake湖仓之上,提供大数据搜索与AI一体化服务。通过元数据管理平台DLF管理结构化、半结构化和非结构化数据,提供湖仓数据表和文件的安全访问及IO加速,并支持大数据、搜索和AI多引擎对接。本文为您介绍以Flink作为Openlake方案的核心计算引擎,通过流式数据湖仓Paimon(使用DLF 2.0存储)和EMR StarRocks搭建流式湖仓。
959 5
基于OpenLake的Flink+Paimon+EMR StarRocks流式湖仓分析
|
10月前
|
运维 数据挖掘 网络安全
场景实践 | 基于Flink+Hologres搭建GitHub实时数据分析
基于Flink和Hologres构建的实时数仓方案在数据开发运维体验、成本与收益等方面均表现出色。同时,该产品还具有与其他产品联动组合的可能性,能够为企业提供更全面、更智能的数据处理和分析解决方案。
|
11月前
|
分布式计算 Java 大数据
大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window
大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window
121 0