【建议收藏】Flink watermark分析实战(下)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 【建议收藏】Flink watermark分析实战
  • 样例
.<Bean>forBoundedOutOfOrderness(Duration.ofMillis(0))  //允许乱序的生成策略   最大时间时间-容错时间
  • 源码
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.flink.api.common.eventtime;
import org.apache.flink.annotation.Public;
import java.time.Duration;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
 * A WatermarkGenerator for situations where records are out of order, but you can place an upper
 * bound on how far the events are out of order. An out-of-order bound B means that once an event
 * with timestamp T was encountered, no events older than {@code T - B} will follow any more.
 *
 * <p>The watermarks are generated periodically. The delay introduced by this watermark strategy is
 * the periodic interval length, plus the out-of-orderness bound.
 */
@Public
public class BoundedOutOfOrdernessWatermarks<T> implements WatermarkGenerator<T> {
    /** The maximum timestamp encountered so far. */
    private long maxTimestamp;
    /** The maximum out-of-orderness that this watermark generator assumes. */
    private final long outOfOrdernessMillis;
    /**
     * Creates a new watermark generator with the given out-of-orderness bound.
     *
     * @param maxOutOfOrderness The bound for the out-of-orderness of the event timestamps.
     */
    public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) {
        checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
        checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");
        this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();
        // start so that our lowest watermark would be Long.MIN_VALUE.
        this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
    }
    // ------------------------------------------------------------------------
    @Override
    public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
        maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
    }
    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
    }
}

中延迟 - 事件时间等待


  • allowedLateness(窗口)

设置允许元素延迟的时间。到达水印后超过指定时间的元素将被丢弃。默认情况下,允许的迟到时间为0L。

设置允许的迟到时间仅对事件时间窗口有效。

  • 样例
OutputTag<Bean> WMTag = new OutputTag("w_m_tag", TypeInformation.of(Bean.class)) {
        };
        //窗口版
         source.keyBy(Bean::getId)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .allowedLateness(Time.seconds(5))  //窗口最大等待时间
                .sideOutputLateData(WMTag)
                .reduce(new ReduceFunction<Bean>() {
                    @Override
                    public Bean reduce(Bean value1, Bean value2) throws Exception {
                        System.out.println(value2);
                        return value2;
                    }
                })
                ;

大延迟 - 测输出流机制

  • sideOutputLateData(窗口)

将迟到数据,输入到测流

此处同测流输出

  • 样例
OutputTag<Bean> WMTag = new OutputTag("w_m_tag", TypeInformation.of(Bean.class)) {
        };
        //窗口版
         source.keyBy(Bean::getId)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .allowedLateness(Time.seconds(5))
                .sideOutputLateData(WMTag)  //迟到数据测流输出
                .reduce(new ReduceFunction<Bean>() {
                    @Override
                    public Bean reduce(Bean value1, Bean value2) throws Exception {
                        System.out.println(value2);
                        return value2;
                    }
                })
                ;

eventime获取

  • withTimestampAssigner

传入上游数据的对象,通过反射,告诉watermark应该从哪个字段获取timestamp

  • 样例

此处主要为实现接口

.withTimestampAssigner(new SerializableTimestampAssigner<Bean>() {
    @Override
    public long extractTimestamp(Bean element, long recordTimestamp) {
        return element.getEventTime();
    }
})//watermark提取策略(从数据中)

问题延申

在上述过程中,我们讲了watermark机制是怎么工作的,也知道了他是怎么解决迟到数据的。但是根据watermark的刷新机制,下游获取到上游所有并发向下广播的watermark后,是对比所有watermark的最小值来做自身的watermark值的。那么如果碰到上游某一并发迟迟没有新数据进入,岂不是会导致自身的watermark一直没有更新,从而导致下游时间窗一直不触发?


idle

  • 是的

是的,上述场景是存在的。但flink已经是一个成熟的计算引擎了,他不应该存在这样的漏洞。如果有,那就是我们没用对

  • 概念

如果其中一个input stream中一直没有数据出现,WatermarkGenerator就无法生成watermark, 因为watermark取的是多个input stream中的最小值。这是我们称这个input是空闲的。watermarkStrategy提供了withIdleness方法处理这种情况。

其实就是当某个分区的窗口触发条件达到,并且其他的分区没有数据的情况下持续我们约定好的空闲时间,那么窗口会触发计算。如果一直有数据但是无法达到触发条件的话,窗口并不会触发计算。

  • 样例
.withIdleness(Duration.ofSeconds(5))
  • 源码
public class WatermarksWithIdleness<T> implements WatermarkGenerator<T> {
    private final WatermarkGenerator<T> watermarks;
    private final IdlenessTimer idlenessTimer;
    /**
        创建一个新的WatermarksWithIdleness生成器,用于在给定超时的情况下检测给定的生成器空闲状态。
        参数:
        watermarks–原始水印生成器。
        idleTimeout–空闲检测的超时。
     */
    public WatermarksWithIdleness(WatermarkGenerator<T> watermarks, Duration idleTimeout) {
        this(watermarks, idleTimeout, SystemClock.getInstance());
    }
    @VisibleForTesting
    WatermarksWithIdleness(WatermarkGenerator<T> watermarks, Duration idleTimeout, Clock clock) {
        checkNotNull(idleTimeout, "idleTimeout");
        checkArgument(
                !(idleTimeout.isZero() || idleTimeout.isNegative()),
                "idleTimeout must be greater than zero");
        this.watermarks = checkNotNull(watermarks, "watermarks");
        this.idlenessTimer = new IdlenessTimer(clock, idleTimeout);
    }
    @Override
    public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
        watermarks.onEvent(event, eventTimestamp, output);
        idlenessTimer.activity();
    }
        @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        if (idlenessTimer.checkIfIdle()) {
            output.markIdle();
        } else {
            watermarks.onPeriodicEmit(output);
        }
    }
}

FlinkSQL

  • flink sql中的watermark定义方式
//建表
        tenv.executeSql( "CREATE TABLE t_kafka("
                +"    shopId string, "
                +"    shopName string,"
                +"    eventTime bigint, "//flink-sql中,watermark的数据来源字段需要是timestamp(3)类型
                +"    rt as to_timestamp_ltz(eventTime,3), "//所以我们想要使用他,必须要从数据中格式化时间
                +"    orderId string, "
                +"    orderAmount bigint, "
                +"    WATERMARK FOR rt AS rt - INTERVAL '3' SECOND" //watermark_Field-interval ‘int’ date_tpye为boundoutoforderness机制
//              +"    et timestamp(3) metadata from 'rowtime'"//如果你的时间字段,在kafka的key中,需要以元数据的形式进行提取,具体方式参考官网样例
                +") WITH (                                                   "
                +"  'connector' = 'kafka',                                   "
                +"  'topic' = 'test',                                        "
                +"  'properties.bootstrap.servers' = '192.168.110.128:9092',     "
                +"  'properties.group.id' = 'testGroup',                     "
                +"  'scan.startup.mode' = 'earliest-offset',                 "
                +"  'format' = 'json',                                       "
                +"  'json.fail-on-missing-field' = 'false',                  "
                +"  'json.ignore-parse-errors' = 'true'                      "
                +")                                                          "
        );
  • 通过查看表结构和打印数据来查看表中watermark的情况
//打印表结构
tenv.executeSql("desc  t_kafka").print();
//查看在flinksql中的watermark表现效果
env.executeSql("select eventTime,CURRENT_WATERMARK(rt) as wmTime from t_kafka").print();
  • 表转流与流转表
//演示二,表转流后的watermark表现效果--可以继承
        //1.表转流时,参数需要传入表而不是sql或者表名,所以需要先提取出表
        Table t_kafka = tenv.from("t_kafka");
        DataStream<Row> rowDataStream = tenv.toDataStream(t_kafka); //apply-only 流
//        tenv.toChangelogStream(t_kafka);//回撤流 +u -u -d
        SingleOutputStreamOperator<Row> process = rowDataStream.process(new ProcessFunction<Row, Row>() {
            @Override
            public void processElement(Row value, ProcessFunction<Row, Row>.Context ctx, Collector<Row> out) throws Exception {
                long l = ctx.timerService().currentWatermark();
//                System.out.println("表转流时的watermark:" + l);
                out.collect(value);
            }
        });
//        process.print();
        Table table = tenv.fromDataStream(process);  //当由流转表时,此表不会从上层继承任何元数据信息,包括watermark,需要重新定义
        table.printSchema();


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
4月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
274 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
2天前
|
消息中间件 JSON 数据库
探索Flink动态CEP:杭州银行的实战案例
探索Flink动态CEP:杭州银行的实战案例
|
5月前
|
SQL 消息中间件 分布式计算
大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析
大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析
116 5
|
5月前
|
消息中间件 分布式计算 大数据
大数据-121 - Flink Time Watermark 详解 附带示例详解
大数据-121 - Flink Time Watermark 详解 附带示例详解
108 0
|
3月前
|
消息中间件 JSON 数据库
探索Flink动态CEP:杭州银行的实战案例
本文由杭州银行大数据工程师唐占峰、欧阳武林撰写,介绍Flink动态CEP的定义、应用场景、技术实现及使用方式。Flink动态CEP是基于Flink的复杂事件处理库,支持在不重启服务的情况下动态更新规则,适应快速变化的业务需求。文章详细阐述了其在反洗钱、反欺诈和实时营销等金融领域的应用,并展示了某金融机构的实际应用案例。通过动态CEP,用户可以实时调整规则,提高系统的灵活性和响应速度,降低维护成本。文中还提供了具体的代码示例和技术细节,帮助读者理解和使用Flink动态CEP。
584 2
探索Flink动态CEP:杭州银行的实战案例
|
7月前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
415 2
|
4月前
|
SQL 流计算 关系型数据库
基于OpenLake的Flink+Paimon+EMR StarRocks流式湖仓分析
阿里云OpenLake解决方案建立在开放可控的OpenLake湖仓之上,提供大数据搜索与AI一体化服务。通过元数据管理平台DLF管理结构化、半结构化和非结构化数据,提供湖仓数据表和文件的安全访问及IO加速,并支持大数据、搜索和AI多引擎对接。本文为您介绍以Flink作为Openlake方案的核心计算引擎,通过流式数据湖仓Paimon(使用DLF 2.0存储)和EMR StarRocks搭建流式湖仓。
674 5
基于OpenLake的Flink+Paimon+EMR StarRocks流式湖仓分析
|
5月前
|
分布式计算 Java 大数据
大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window
大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window
64 0
|
5月前
|
消息中间件 druid Kafka
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
124 0
|
7月前
|
大数据 API 数据处理
揭秘!Flink如何从默默无闻到大数据界的璀璨明星?起源、设计理念与实战秘籍大公开!
【8月更文挑战第24天】Apache Flink是一款源自Stratosphere项目的开源流处理框架,由柏林理工大学等机构于2010至2014年间开发,并于2014年捐赠给Apache软件基金会。Flink设计之初即聚焦于提供统一的数据处理模型,支持事件时间处理、精确一次状态一致性等特性,实现了流批一体化处理。其核心优势包括高吞吐量、低延迟及强大的容错机制。
119 1

热门文章

最新文章