Flink之Time与Watermark(一)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 笔记

(1)时间分类


在Flink的流式处理中,会涉及到时间的不同概念,如下图所示

事件时间EventTime: 事件真真正正发生产生的时间

摄入时间IngestionTime: 事件到达Flink的时间

处理时间ProcessingTime: 事件真正被处理/计算的时间


上面的三个时间,我们更关注事件时间EventTime


10.png


(2)Watermark详解


(2.1)Watermark图解

11.png12.png


(2.2)什么是Watermark?

Watermark就是给数据再额外的加的一个时间列也就是Watermark是个时间戳!


(2.3)如何计算Watermark?

Watermark =当前窗口的最大的事件时间 - 最大允许的延迟时间或乱序时间


这样可以保证Watermark水位线会一直上升(变大),不会下降


(2.4)Watermark有什么用?

之前的窗口都是按照系统时间来触发计算的,如:[10:00:00~10:00:10) 的窗口,一但系统时间到了10:00:10就会触发计算,那么可能会导致延迟到达的数据丢失!那么现在有了Watermark,窗口就可以按照Watermark来触发计算!


也就是说Watermark是用来触发窗口计算的!


(2.5)Watermark如何出发窗口计算?

窗口计算的触发条件为:


窗口中有数据

Watermaker >= 窗口的结束时间

注意:

上面的触发公式进行如下变形:

Watermaker >= 窗口的结束时间
Watermaker = 当前窗口的最大的事件时间 - 最大允许的延迟时间或乱序时间
当前窗口的最大的事件时间 - 最大允许的延迟时间或乱序时间 >=  窗口的结束时间
当前窗口的最大的事件时间 >= 窗口的结束时间 + 最大允许的延迟时间或乱序时间

Watermark API:


https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/event_timestamps_watermarks.html


(3)EventTime 和 WaterMark 的使用


Flink 内置了两个 WaterMark 生成器:


1.Monotonously Increasing Timestamps(时间戳单调增长:其实就是允许的延迟为 0)

WatermarkStrategy.<WaterSensor>forMonotonousTimestamps()
  1. Fixed Amount of Lateness(允许固定时间的延迟)
WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))

(3.1)基于事件时间的滚动窗口测试watermark机制

代码开发:

package com.aikfk.flink.datastream.bean;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/20 9:19 下午
 * 水位传感器:用于接收水位数据
 * <p>
 * id:传感器编号
 * ts:时间戳
 * vc:水位
 */
public class WaterSensor {
    private String id;
    private Long ts;
    private Integer vc;
    public WaterSensor(String id, Long ts, Integer vc) {
        this.id = id;
        this.ts = ts;
        this.vc = vc;
    }
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
    public Long getTs() {
        return ts;
    }
    public void setTs(Long ts) {
        this.ts = ts;
    }
    public Integer getVc() {
        return vc;
    }
    public void setVc(Integer vc) {
        this.vc = vc;
    }
    @Override
    public String toString() {
        return "WaterSensor{" +
                "id='" + id + '\'' +
                ", ts=" + ts +
                ", vc=" + vc +
                '}';
    }
}
package com.aikfk.flink.datastream.watermark;
import com.aikfk.flink.datastream.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import java.time.Duration;
/**
 * @author :caizhengjie
 * @description:基于事件事件滚动窗口测试watermark机制
 * @date :2021/3/20 9:21 下午
 */
public class EventTimeTumbling {
    public static void main(String[] args) throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 2.读取端口数据并转换为JavaBean
        SingleOutputStreamOperator<WaterSensor> waterSensorDS = env.socketTextStream("bigdata-pro-m07", 9999)
                .map(new MapFunction<String, WaterSensor>() {
                    @Override
                    public WaterSensor map(String s) throws Exception {
                        String[] split = s.split(",");
                        return new WaterSensor(split[0],Long.parseLong(split[1]),Integer.parseInt(split[2]));
                    }
                });
        // 3.提取数据中的时间戳字段
        SingleOutputStreamOperator<WaterSensor> waterSensorSingleOutputStreamOperator = waterSensorDS
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        // 设置最大允许的延迟时间
                        .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                        // 指定事时间件列
                .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
                    @Override
                    public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                        return element.getTs() * 1000L;
                    }
                }));
        // 4.按照id分组
        KeyedStream<WaterSensor, String> keyedStream = waterSensorSingleOutputStreamOperator.keyBy(WaterSensor::getId);
        // 5.开窗
        WindowedStream<WaterSensor, String, TimeWindow> window = keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(5)));
        // 6.计算总和
        SingleOutputStreamOperator<WaterSensor> result = window.reduce(new ReduceFunction<WaterSensor>() {
            @Override
            public WaterSensor reduce(WaterSensor t1, WaterSensor t2) throws Exception {
                return new WaterSensor(t1.getId(),t1.getTs(),t1.getVc() + t2.getVc());
            }
        });
        // 7.打印
        result.print();
        // 8.执行任务
        env.execute();
    }
}

测试非乱序数据:

ws_001,1577844001,1
ws_001,1577844002,1
ws_001,1577844003,1
ws_001,1577844005,1
ws_001,1577844006,1
ws_001,1577844009,1

行结果:

WaterSensor{id='ws_001', ts=1577844001, vc=3}

运行过程解释:

因为滚动窗口是基于事件时间0到5秒,左闭右开[0,5)。输入的数据事件时间1到3秒时,会落入窗口为[0,5),当输入的数据事件时间为t(比如是9秒),假设设置最大允许的延迟时间为2秒,即watermark为7秒,而wm >= 窗口最大边界值5秒,所以触发[0,5)的窗口,得到的结果为vc = 3



测试乱序数据:13.png

ws_001,1577844001,1
ws_001,1577844002,1
ws_001,1577844003,1
ws_001,1577844005,1
ws_001,1577844001,1
ws_001,1577844002,1
ws_001,1577844009,1

运行结果:

WaterSensor{id='ws_001', ts=1577844001, vc=5}

运行过程解释:


因为滚动窗口是基于事件时间0到5秒,左闭右开[0,5)。输入的数据事件时间1到3秒时,会落入窗口为[0,5),后面来了第5秒的数据,落入的窗口为[5,10),再后面又来了1,2秒的数据,为迟到的数据,因为在来第5秒数据的时候,wm为3秒,它是小于窗口的边界值,所以[0,5)窗口没有关闭,因此来的1,2秒数据会落入到[0,5)窗口中。当输入的数据事件时间为t(比如是9秒),假设设置最大允许的延迟时间为2秒,即watermark为7秒,而wm >= 窗口最大边界值5秒,所以触发[0,5)的窗口,得到的结果为vc = 5.

14.png

(3.2)基于事件时间的滚动窗口测试允许迟到数据(allowedLateness)机制与侧输出流(sideOutput)

已经添加了 wartemark 之后, 仍有数据会迟到怎么办? Flink 的窗口, 也允许迟到数据.


当触发了窗口计算后, 会先计算当前的结果, 但是此时并不会关闭窗口.以后每来一条 迟到数据, 则触发一次这条数据所在窗口计算(增量计算).


那么什么时候会真正的关闭窗口呢? wartermark 超过了 窗口结束时间+等待时间

.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(3))

注意:允许迟到只能运用在 event time 上

允许迟到数据, 窗口也会真正的关闭, 如果还有迟到的数据怎么办? Flink 提供了一种叫做侧输出流的来处理关窗之后到达的数据.

代码开发:

package com.aikfk.flink.datastream.watermark;
import com.aikfk.flink.datastream.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.OutputTag;
import java.time.Duration;
/**
 * @author :caizhengjie
 * @description:基于事件事件滚动窗口测试watermark机制
 * @date :2021/3/20 9:21 下午
 */
public class LateAndSideOutPut {
    public static void main(String[] args) throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 2.读取端口数据并转换为JavaBean
        SingleOutputStreamOperator<WaterSensor> waterSensorDS = env.socketTextStream("bigdata-pro-m07", 9999)
                .map(new MapFunction<String, WaterSensor>() {
                    @Override
                    public WaterSensor map(String s) throws Exception {
                        String[] split = s.split(",");
                        return new WaterSensor(split[0],Long.parseLong(split[1]),Integer.parseInt(split[2]));
                    }
                });
        // 3.提取数据中的时间戳字段
        SingleOutputStreamOperator<WaterSensor> waterSensorSingleOutputStreamOperator = waterSensorDS
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        // 设置最大允许的延迟时间
                        .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                        // 指定事时间件列
                .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
                    @Override
                    public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                        return element.getTs() * 1000L;
                    }
                }));
        // 4.按照id分组
        KeyedStream<WaterSensor, String> keyedStream = waterSensorSingleOutputStreamOperator.keyBy(WaterSensor::getId);
        // 5.开窗,允许迟到数据,侧输出流
        WindowedStream<WaterSensor, String, TimeWindow> window = keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .allowedLateness(Time.seconds(2))
                .sideOutputLateData(new OutputTag<WaterSensor>("Side") {
                });
        // 6.计算总和
        SingleOutputStreamOperator<WaterSensor> result = window.reduce(new ReduceFunction<WaterSensor>() {
            @Override
            public WaterSensor reduce(WaterSensor t1, WaterSensor t2) throws Exception {
                return new WaterSensor(t1.getId(),t1.getTs(),t1.getVc() + t2.getVc());
            }
        });
        DataStream<WaterSensor> sideOutput = result.getSideOutput(new OutputTag<WaterSensor>("Side") {
        });
        // 7.打印
        result.print();
        sideOutput.print("Side");
        // 8.执行任务
        env.execute();
    }
}

测试数据:

ws_001,1577844001,1
ws_001,1577844002,1
ws_001,1577844003,1
ws_001,1577844008,1
ws_001,1577844001,1
ws_001,1577844002,1
ws_001,1577844003,1
ws_001,1577844009,1
ws_001,1577844001,1
ws_001,1577844002,1

运行结果:

WaterSensor{id='ws_001', ts=1577844001, vc=3}
WaterSensor{id='ws_001', ts=1577844001, vc=4}
WaterSensor{id='ws_001', ts=1577844001, vc=5}
WaterSensor{id='ws_001', ts=1577844001, vc=6}
Side> WaterSensor{id='ws_001', ts=1577844001, vc=1}
Side> WaterSensor{id='ws_001', ts=1577844002, vc=1}

运行过程解释:


因为滚动窗口是基于事件时间0到5秒,左闭右开[0,5)。输入的数据事件时间1到3秒时,会落入窗口为[0,5),后面来了第8秒的数据,假设设置最大允许的延迟时间为2秒 ,此时的wm = 6秒大于窗口的最大边界值,触发窗口计算,所以输入第8秒的数据会得到vc=3,但是由于添加了允许迟到数据(allowedLateness)机制,设置允许迟到时间是2秒,因此窗口并没有关闭,而是持续到了wm = 7秒,后面来了1,2,3秒的迟到数据,还会落入到[0,5)窗口中,但是是来一条迟到数据则触发一次这条数据所在窗口计算(增量计算)。当输入的数据事件时间为t(比如是9秒),即watermark为7秒,而wm >= 窗口结束时间+等待时间,窗口关闭,后面再来的1,2秒迟到数据就不会落入到[0,5)窗口中,即通过侧输出流来处理关窗之后到达的数据。15.png


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
1月前
|
消息中间件 分布式计算 大数据
大数据-121 - Flink Time Watermark 详解 附带示例详解
大数据-121 - Flink Time Watermark 详解 附带示例详解
70 0
|
1月前
|
分布式计算 Java 大数据
大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window
大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window
34 0
|
3月前
|
数据安全/隐私保护 流计算
Flink四大基石——2.Time
Flink四大基石——2.Time
42 1
|
3月前
|
监控 Apache 流计算
时间的守卫者:揭秘Flink中Watermark如何掌控数据流的时空秩序?
【8月更文挑战第26天】Apache Flink是一款功能强大的流处理框架,其Watermark机制为核心,确保了系统即使面对数据乱序或延迟也能准确处理时间相关的特性。Watermark作为一种特殊事件,标记了所有在此之前发生事件的最晚时间点,这对于时间窗口操作至关重要。
55 0
|
6月前
|
数据处理 Apache 流计算
Flink Watermark和时间语义
Flink Watermark和时间语义
|
6月前
|
消息中间件 Kubernetes Java
实时计算 Flink版操作报错合集之写入 Kafka 报错 "Failed to send data to Kafka: Failed to allocate memory within the configured max blocking time 60000 ms",该怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
462 0
|
6月前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之使用 Event Time Temporal Join 关联多个 HBase 后,Kafka 数据的某个字段变为 null 是什么原因导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
137 0
|
6月前
|
SQL 消息中间件 Kafka
实时计算 Flink版操作报错合集之使用 Event Time Temporal Join 关联多个 HBase 后,Kafka 数据的某个字段变为 null 是什么原因导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
90 0
|
2月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
21天前
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
756 10
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎