Flink之Time与Watermark(二)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 笔记

(3.3)基于事件时间的滑动窗口测试watermark机制

代码开发:

package com.aikfk.flink.datastream.watermark;
import com.aikfk.flink.datastream.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import java.time.Duration;
/**
 * @author :caizhengjie
 * @description:基于事件事件滚动窗口测试watermark机制
 * @date :2021/3/20 9:21 下午
 */
public class EventTimeSliding {
    public static void main(String[] args) throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 2.读取端口数据并转换为JavaBean
        SingleOutputStreamOperator<WaterSensor> waterSensorDS = env.socketTextStream("bigdata-pro-m07", 9999)
                .map(new MapFunction<String, WaterSensor>() {
                    @Override
                    public WaterSensor map(String s) throws Exception {
                        String[] split = s.split(",");
                        return new WaterSensor(split[0],Long.parseLong(split[1]),Integer.parseInt(split[2]));
                    }
                });
        // 3.提取数据中的时间戳字段,生成watermark
       SingleOutputStreamOperator<WaterSensor> waterSensorSingleOutputStreamOperator = waterSensorDS
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        // 设置最大允许的延迟时间
                        .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                        // 指定事时间件列
                .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
                    @Override
                    public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                        return element.getTs() * 1000L;
                    }
                }));
        // 4.按照id分组
        KeyedStream<WaterSensor, String> keyedStream = waterSensorSingleOutputStreamOperator.keyBy(WaterSensor::getId);
        // 5.开窗
        WindowedStream<WaterSensor, String, TimeWindow> window = keyedStream.window(SlidingEventTimeWindows.of(Time.seconds(6), Time.seconds(2)));
        // 6.计算总和
        SingleOutputStreamOperator<WaterSensor> result = window.reduce(new ReduceFunction<WaterSensor>() {
            @Override
            public WaterSensor reduce(WaterSensor t1, WaterSensor t2) throws Exception {
                return new WaterSensor(t1.getId(),t1.getTs(),t1.getVc() + t2.getVc());
            }
        });
        // 7.打印
        result.print();
        // 8.执行任务
        env.execute();
    }
}

测试数据:

ws_001,1577844001,1
ws_001,1577844008,1
ws_001,1577844012,1

运行结果:

WaterSensor{id='ws_001', ts=1577844001, vc=1}
WaterSensor{id='ws_001', ts=1577844001, vc=1}
WaterSensor{id='ws_001', ts=1577844001, vc=1}
WaterSensor{id='ws_001', ts=1577844008, vc=1}

运行过程解释:

程序中设置的滑动窗口大小为6秒,步长为2秒,当输入的数据事件时间为1秒时,所属的窗口为[-4,2),[-2,4),[0,6)这三个窗口中,当输入的数据事件时间为8秒时,wm为6秒 >= [0,6)这个窗口的最大边界值,关闭窗口,触发前面三个窗口计算,所以直接输出三个结果。而8秒属于[4,10),[6,12),[8,14)这三个窗口,如果想输出一个结果,则输出数据事件时间为12秒,wm为10秒 >= [4,10)这个窗口的最大边界值,触发窗口计算,得到一个结果。


(3.4)基于事件时间的会话窗口测试watermark机制

时间间隔:指的是WaterMark跟数据本身的时间差值,包含间隔时间


代码开发:

package com.aikfk.flink.datastream.watermark;
import com.aikfk.flink.datastream.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import java.time.Duration;
/**
 * @author :caizhengjie
 * @description:基于事件事件滚动窗口测试watermark机制
 * @date :2021/3/20 9:21 下午
 */
public class EventTimeSession {
    public static void main(String[] args) throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 2.读取端口数据并转换为JavaBean
        SingleOutputStreamOperator<WaterSensor> waterSensorDS = env.socketTextStream("bigdata-pro-m07", 9999)
                .map(new MapFunction<String, WaterSensor>() {
                    @Override
                    public WaterSensor map(String s) throws Exception {
                        String[] split = s.split(",");
                        return new WaterSensor(split[0],Long.parseLong(split[1]),Integer.parseInt(split[2]));
                    }
                });
        // 3.提取数据中的时间戳字段,生成watermark
       SingleOutputStreamOperator<WaterSensor> waterSensorSingleOutputStreamOperator = waterSensorDS
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        // 设置最大允许的延迟时间
                        .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                        // 指定事时间件列
                .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
                    @Override
                    public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                        return element.getTs() * 1000L;
                    }
                }));
        // 4.按照id分组
        KeyedStream<WaterSensor, String> keyedStream = waterSensorSingleOutputStreamOperator.keyBy(WaterSensor::getId);
        //5.开窗,时间间隔:指的是WaterMark跟数据本身的时间差值,包含间隔时间
        WindowedStream<WaterSensor, String, TimeWindow> window = keyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(5)));
        // 6.计算总和
        SingleOutputStreamOperator<WaterSensor> result = window.reduce(new ReduceFunction<WaterSensor>() {
            @Override
            public WaterSensor reduce(WaterSensor t1, WaterSensor t2) throws Exception {
                return new WaterSensor(t1.getId(),t1.getTs(),t1.getVc() + t2.getVc());
            }
        });
        // 7.打印
        result.print();
        // 8.执行任务
        env.execute();
    }
}

测试数据:

ws_001,1577844002,1
ws_001,1577844007,1
ws_001,1577844014,1


运行结果:

WaterSensor{id='ws_001', ts=1577844002, vc=2}

运行过程解释:

程序中设置的会话窗口大小为5秒,第一次输入的数据事件时间是2秒,第二次输入的数据事件时间是7秒,不会触发窗口,因为只有输入数据的watermark >= 上一次的数据事件时间 + 时间间隔(5秒)。当输入的数据时间为14秒,wm为12秒 >= 7 + 5,所以触发窗口计算,得到两个结果。


(4)自定义 WatermarkStrategy


有 2 种风格的 WaterMark 生产方式: periodic(周期性) and punctuated(间歇性).

都需要继承接口: WatermarkGenerator


(4.1)周期性

package com.aikfk.flink.datastream.watermark;
import com.aikfk.flink.datastream.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
/**
 * @author :caizhengjie
 * @description:基于事件事件滚动窗口测试watermark机制
 * @date :2021/3/20 9:21 下午
 */
public class EventTimeTumblingCustomerPeriod {
    public static void main(String[] args) throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 2.读取端口数据并转换为JavaBean
        SingleOutputStreamOperator<WaterSensor> waterSensorDS = env.socketTextStream("bigdata-pro-m07", 9999)
                .map(new MapFunction<String, WaterSensor>() {
                    @Override
                    public WaterSensor map(String s) throws Exception {
                        String[] split = s.split(",");
                        return new WaterSensor(split[0],Long.parseLong(split[1]),Integer.parseInt(split[2]));
                    }
                });
        // 3.提取数据中的时间戳字段,生成watermark
        SingleOutputStreamOperator<WaterSensor> waterSensorSingleOutputStreamOperator = waterSensorDS
                .assignTimestampsAndWatermarks(new WatermarkStrategy<WaterSensor>() {
                   @Override
                   public WatermarkGenerator<WaterSensor> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                       return new MyPeriod(2000L);
                   }
               }.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
                                @Override
                                public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                                    return element.getTs() * 1000L;
                                }
                            }));
        // 4.按照id分组
        KeyedStream<WaterSensor, String> keyedStream = waterSensorSingleOutputStreamOperator.keyBy(WaterSensor::getId);
        // 5.开窗
        WindowedStream<WaterSensor, String, TimeWindow> window = keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(5)));
        // 6.计算总和
        SingleOutputStreamOperator<WaterSensor> result = window.reduce(new ReduceFunction<WaterSensor>() {
            @Override
            public WaterSensor reduce(WaterSensor t1, WaterSensor t2) throws Exception {
                return new WaterSensor(t1.getId(),t1.getTs(),t1.getVc() + t2.getVc());
            }
        });
        // 7.打印
        result.print();
        // 8.执行任务
        env.execute();
    }
    /**
     * 自定义周期性的Watermark生成器
     */
    public static class MyPeriod implements WatermarkGenerator<WaterSensor> {
        private Long maxTs;
        // 允许的最大延迟时间 ms
        private Long maxDelay;
        public MyPeriod(Long maxDelay) {
            this.maxDelay = maxDelay;
            this.maxTs = Long.MIN_VALUE + maxDelay + 1;
        }
        // 每收到一个元素, 执行一次. 用来生产WaterMark中的时间戳
        @Override
        public void onEvent(WaterSensor event, long eventTimestamp, WatermarkOutput output) {
            //有了新的元素找到最大的时间戳
            System.out.println("取数据中最大的时间戳");
            maxTs = Math.max(eventTimestamp, maxTs);
        }
        // 周期性的把WaterMark发射出去, 默认周期是200ms
        @Override
        public void onPeriodicEmit(WatermarkOutput output) {
            // 周期性的发射水印: 相当于Flink把自己的时钟调慢了一个最大延迟
            System.out.println("生成WaterMark" + (maxTs - maxDelay));
            output.emitWatermark(new Watermark(maxTs - maxDelay));
        }
    }
}

(4.2)间歇性

package com.aikfk.flink.datastream.watermark;
import com.aikfk.flink.datastream.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
/**
 * @author :caizhengjie
 * @description:基于事件事件滚动窗口测试watermark机制
 * @date :2021/3/20 9:21 下午
 */
public class EventTimeTumblingCustomerPunt {
    public static void main(String[] args) throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 2.读取端口数据并转换为JavaBean
        SingleOutputStreamOperator<WaterSensor> waterSensorDS = env.socketTextStream("bigdata-pro-m07", 9999)
                .map(new MapFunction<String, WaterSensor>() {
                    @Override
                    public WaterSensor map(String s) throws Exception {
                        String[] split = s.split(",");
                        return new WaterSensor(split[0],Long.parseLong(split[1]),Integer.parseInt(split[2]));
                    }
                });
        // 3.提取数据中的时间戳字段
        SingleOutputStreamOperator<WaterSensor> waterSensorSingleOutputStreamOperator = waterSensorDS
                .assignTimestampsAndWatermarks(new WatermarkStrategy<WaterSensor>() {
                    @Override
                    public WatermarkGenerator<WaterSensor> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                        return new MyPunt(2000L);
                    }
                }.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
                    @Override
                    public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                        return element.getTs() * 1000L;
                    }
                }));
        // 4.按照id分组
        KeyedStream<WaterSensor, String> keyedStream = waterSensorSingleOutputStreamOperator.keyBy(WaterSensor::getId);
        // 5.开窗
        WindowedStream<WaterSensor, String, TimeWindow> window = keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(5)));
        // 6.计算总和
        SingleOutputStreamOperator<WaterSensor> result = window.reduce(new ReduceFunction<WaterSensor>() {
            @Override
            public WaterSensor reduce(WaterSensor t1, WaterSensor t2) throws Exception {
                return new WaterSensor(t1.getId(),t1.getTs(),t1.getVc() + t2.getVc());
            }
        });
        // 7.打印
        result.print();
        // 8.执行任务
        env.execute();
    }
    /**
     * 自定义间歇性watermark
     * */
    public static class MyPunt implements WatermarkGenerator<WaterSensor> {
        private Long maxTs;
        private Long maxDelay;
        public MyPunt(Long maxDelay) {
            this.maxDelay = maxDelay;
            this.maxTs = Long.MIN_VALUE + maxDelay + 1;
        }
        //当数据来的时候调用
        @Override
        public void onEvent(WaterSensor event, long eventTimestamp, WatermarkOutput output) {
            System.out.println("取数据中最大的时间戳");
            maxTs = Math.max(eventTimestamp, maxTs);
            output.emitWatermark(new Watermark(maxTs - maxDelay));
        }
        //周期性调用
        @Override
        public void onPeriodicEmit(WatermarkOutput output) {
        }
    }
}

测试数据:

ws_001,1577844001,1
ws_001,1577844002,1
ws_001,1577844012,1

运行结果:

取数据中最大的时间戳
取数据中最大的时间戳
取数据中最大的时间戳
WaterSensor{id='ws_001', ts=1577844001, vc=2}


(5)多并行度下 WaterMark 的传递


WaterMark传递:

  1. 使用广播的方式传输的
  2. 某个并行度中Watermark值取决于前面所有并行度的最小WaterMark值
  3. 当WaterMark值没有增长的时候,不会向下游传递,注意:生成不变

9.png

总结: 多并行度的条件下, 向下游传递 WaterMark 的时候, 总是以最小的那个 WaterMark 为准! 木桶原理!


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
Flink 四大基石之 Time (时间语义) 的使用详解
Flink 中的时间分为三类:Event Time(事件发生时间)、Ingestion Time(数据进入系统时间)和 Processing Time(数据处理时间)。Event Time 通过嵌入事件中的时间戳准确反映数据顺序,支持复杂窗口操作。Watermark 机制用于处理 Event Time,确保数据完整性并触发窗口计算。Flink 还提供了多种迟到数据处理方式,如默认丢弃、侧输出流和允许延迟处理,以应对不同场景需求。掌握这些时间语义对编写高效、准确的 Flink 应用至关重要。
82 21
大数据-121 - Flink Time Watermark 详解 附带示例详解
大数据-121 - Flink Time Watermark 详解 附带示例详解
98 0
大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window
大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window
54 0
时间的守卫者:揭秘Flink中Watermark如何掌控数据流的时空秩序?
【8月更文挑战第26天】Apache Flink是一款功能强大的流处理框架,其Watermark机制为核心,确保了系统即使面对数据乱序或延迟也能准确处理时间相关的特性。Watermark作为一种特殊事件,标记了所有在此之前发生事件的最晚时间点,这对于时间窗口操作至关重要。
85 0
实时计算 Flink版操作报错合集之写入 Kafka 报错 "Failed to send data to Kafka: Failed to allocate memory within the configured max blocking time 60000 ms",该怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
706 0
实时计算 Flink版操作报错合集之使用 Event Time Temporal Join 关联多个 HBase 后,Kafka 数据的某个字段变为 null 是什么原因导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
177 0

热门文章

最新文章