Flink之Time与Watermark(二)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 笔记

(3.3)基于事件时间的滑动窗口测试watermark机制

代码开发:

package com.aikfk.flink.datastream.watermark;
import com.aikfk.flink.datastream.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import java.time.Duration;
/**
 * @author :caizhengjie
 * @description:基于事件事件滚动窗口测试watermark机制
 * @date :2021/3/20 9:21 下午
 */
public class EventTimeSliding {
    public static void main(String[] args) throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 2.读取端口数据并转换为JavaBean
        SingleOutputStreamOperator<WaterSensor> waterSensorDS = env.socketTextStream("bigdata-pro-m07", 9999)
                .map(new MapFunction<String, WaterSensor>() {
                    @Override
                    public WaterSensor map(String s) throws Exception {
                        String[] split = s.split(",");
                        return new WaterSensor(split[0],Long.parseLong(split[1]),Integer.parseInt(split[2]));
                    }
                });
        // 3.提取数据中的时间戳字段,生成watermark
       SingleOutputStreamOperator<WaterSensor> waterSensorSingleOutputStreamOperator = waterSensorDS
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        // 设置最大允许的延迟时间
                        .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                        // 指定事时间件列
                .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
                    @Override
                    public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                        return element.getTs() * 1000L;
                    }
                }));
        // 4.按照id分组
        KeyedStream<WaterSensor, String> keyedStream = waterSensorSingleOutputStreamOperator.keyBy(WaterSensor::getId);
        // 5.开窗
        WindowedStream<WaterSensor, String, TimeWindow> window = keyedStream.window(SlidingEventTimeWindows.of(Time.seconds(6), Time.seconds(2)));
        // 6.计算总和
        SingleOutputStreamOperator<WaterSensor> result = window.reduce(new ReduceFunction<WaterSensor>() {
            @Override
            public WaterSensor reduce(WaterSensor t1, WaterSensor t2) throws Exception {
                return new WaterSensor(t1.getId(),t1.getTs(),t1.getVc() + t2.getVc());
            }
        });
        // 7.打印
        result.print();
        // 8.执行任务
        env.execute();
    }
}

测试数据:

ws_001,1577844001,1
ws_001,1577844008,1
ws_001,1577844012,1

运行结果:

WaterSensor{id='ws_001', ts=1577844001, vc=1}
WaterSensor{id='ws_001', ts=1577844001, vc=1}
WaterSensor{id='ws_001', ts=1577844001, vc=1}
WaterSensor{id='ws_001', ts=1577844008, vc=1}

运行过程解释:

程序中设置的滑动窗口大小为6秒,步长为2秒,当输入的数据事件时间为1秒时,所属的窗口为[-4,2),[-2,4),[0,6)这三个窗口中,当输入的数据事件时间为8秒时,wm为6秒 >= [0,6)这个窗口的最大边界值,关闭窗口,触发前面三个窗口计算,所以直接输出三个结果。而8秒属于[4,10),[6,12),[8,14)这三个窗口,如果想输出一个结果,则输出数据事件时间为12秒,wm为10秒 >= [4,10)这个窗口的最大边界值,触发窗口计算,得到一个结果。


(3.4)基于事件时间的会话窗口测试watermark机制

时间间隔:指的是WaterMark跟数据本身的时间差值,包含间隔时间


代码开发:

package com.aikfk.flink.datastream.watermark;
import com.aikfk.flink.datastream.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import java.time.Duration;
/**
 * @author :caizhengjie
 * @description:基于事件事件滚动窗口测试watermark机制
 * @date :2021/3/20 9:21 下午
 */
public class EventTimeSession {
    public static void main(String[] args) throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 2.读取端口数据并转换为JavaBean
        SingleOutputStreamOperator<WaterSensor> waterSensorDS = env.socketTextStream("bigdata-pro-m07", 9999)
                .map(new MapFunction<String, WaterSensor>() {
                    @Override
                    public WaterSensor map(String s) throws Exception {
                        String[] split = s.split(",");
                        return new WaterSensor(split[0],Long.parseLong(split[1]),Integer.parseInt(split[2]));
                    }
                });
        // 3.提取数据中的时间戳字段,生成watermark
       SingleOutputStreamOperator<WaterSensor> waterSensorSingleOutputStreamOperator = waterSensorDS
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        // 设置最大允许的延迟时间
                        .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                        // 指定事时间件列
                .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
                    @Override
                    public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                        return element.getTs() * 1000L;
                    }
                }));
        // 4.按照id分组
        KeyedStream<WaterSensor, String> keyedStream = waterSensorSingleOutputStreamOperator.keyBy(WaterSensor::getId);
        //5.开窗,时间间隔:指的是WaterMark跟数据本身的时间差值,包含间隔时间
        WindowedStream<WaterSensor, String, TimeWindow> window = keyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(5)));
        // 6.计算总和
        SingleOutputStreamOperator<WaterSensor> result = window.reduce(new ReduceFunction<WaterSensor>() {
            @Override
            public WaterSensor reduce(WaterSensor t1, WaterSensor t2) throws Exception {
                return new WaterSensor(t1.getId(),t1.getTs(),t1.getVc() + t2.getVc());
            }
        });
        // 7.打印
        result.print();
        // 8.执行任务
        env.execute();
    }
}

测试数据:

ws_001,1577844002,1
ws_001,1577844007,1
ws_001,1577844014,1


运行结果:

WaterSensor{id='ws_001', ts=1577844002, vc=2}

运行过程解释:

程序中设置的会话窗口大小为5秒,第一次输入的数据事件时间是2秒,第二次输入的数据事件时间是7秒,不会触发窗口,因为只有输入数据的watermark >= 上一次的数据事件时间 + 时间间隔(5秒)。当输入的数据时间为14秒,wm为12秒 >= 7 + 5,所以触发窗口计算,得到两个结果。


(4)自定义 WatermarkStrategy


有 2 种风格的 WaterMark 生产方式: periodic(周期性) and punctuated(间歇性).

都需要继承接口: WatermarkGenerator


(4.1)周期性

package com.aikfk.flink.datastream.watermark;
import com.aikfk.flink.datastream.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
/**
 * @author :caizhengjie
 * @description:基于事件事件滚动窗口测试watermark机制
 * @date :2021/3/20 9:21 下午
 */
public class EventTimeTumblingCustomerPeriod {
    public static void main(String[] args) throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 2.读取端口数据并转换为JavaBean
        SingleOutputStreamOperator<WaterSensor> waterSensorDS = env.socketTextStream("bigdata-pro-m07", 9999)
                .map(new MapFunction<String, WaterSensor>() {
                    @Override
                    public WaterSensor map(String s) throws Exception {
                        String[] split = s.split(",");
                        return new WaterSensor(split[0],Long.parseLong(split[1]),Integer.parseInt(split[2]));
                    }
                });
        // 3.提取数据中的时间戳字段,生成watermark
        SingleOutputStreamOperator<WaterSensor> waterSensorSingleOutputStreamOperator = waterSensorDS
                .assignTimestampsAndWatermarks(new WatermarkStrategy<WaterSensor>() {
                   @Override
                   public WatermarkGenerator<WaterSensor> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                       return new MyPeriod(2000L);
                   }
               }.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
                                @Override
                                public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                                    return element.getTs() * 1000L;
                                }
                            }));
        // 4.按照id分组
        KeyedStream<WaterSensor, String> keyedStream = waterSensorSingleOutputStreamOperator.keyBy(WaterSensor::getId);
        // 5.开窗
        WindowedStream<WaterSensor, String, TimeWindow> window = keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(5)));
        // 6.计算总和
        SingleOutputStreamOperator<WaterSensor> result = window.reduce(new ReduceFunction<WaterSensor>() {
            @Override
            public WaterSensor reduce(WaterSensor t1, WaterSensor t2) throws Exception {
                return new WaterSensor(t1.getId(),t1.getTs(),t1.getVc() + t2.getVc());
            }
        });
        // 7.打印
        result.print();
        // 8.执行任务
        env.execute();
    }
    /**
     * 自定义周期性的Watermark生成器
     */
    public static class MyPeriod implements WatermarkGenerator<WaterSensor> {
        private Long maxTs;
        // 允许的最大延迟时间 ms
        private Long maxDelay;
        public MyPeriod(Long maxDelay) {
            this.maxDelay = maxDelay;
            this.maxTs = Long.MIN_VALUE + maxDelay + 1;
        }
        // 每收到一个元素, 执行一次. 用来生产WaterMark中的时间戳
        @Override
        public void onEvent(WaterSensor event, long eventTimestamp, WatermarkOutput output) {
            //有了新的元素找到最大的时间戳
            System.out.println("取数据中最大的时间戳");
            maxTs = Math.max(eventTimestamp, maxTs);
        }
        // 周期性的把WaterMark发射出去, 默认周期是200ms
        @Override
        public void onPeriodicEmit(WatermarkOutput output) {
            // 周期性的发射水印: 相当于Flink把自己的时钟调慢了一个最大延迟
            System.out.println("生成WaterMark" + (maxTs - maxDelay));
            output.emitWatermark(new Watermark(maxTs - maxDelay));
        }
    }
}

(4.2)间歇性

package com.aikfk.flink.datastream.watermark;
import com.aikfk.flink.datastream.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
/**
 * @author :caizhengjie
 * @description:基于事件事件滚动窗口测试watermark机制
 * @date :2021/3/20 9:21 下午
 */
public class EventTimeTumblingCustomerPunt {
    public static void main(String[] args) throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 2.读取端口数据并转换为JavaBean
        SingleOutputStreamOperator<WaterSensor> waterSensorDS = env.socketTextStream("bigdata-pro-m07", 9999)
                .map(new MapFunction<String, WaterSensor>() {
                    @Override
                    public WaterSensor map(String s) throws Exception {
                        String[] split = s.split(",");
                        return new WaterSensor(split[0],Long.parseLong(split[1]),Integer.parseInt(split[2]));
                    }
                });
        // 3.提取数据中的时间戳字段
        SingleOutputStreamOperator<WaterSensor> waterSensorSingleOutputStreamOperator = waterSensorDS
                .assignTimestampsAndWatermarks(new WatermarkStrategy<WaterSensor>() {
                    @Override
                    public WatermarkGenerator<WaterSensor> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                        return new MyPunt(2000L);
                    }
                }.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
                    @Override
                    public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                        return element.getTs() * 1000L;
                    }
                }));
        // 4.按照id分组
        KeyedStream<WaterSensor, String> keyedStream = waterSensorSingleOutputStreamOperator.keyBy(WaterSensor::getId);
        // 5.开窗
        WindowedStream<WaterSensor, String, TimeWindow> window = keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(5)));
        // 6.计算总和
        SingleOutputStreamOperator<WaterSensor> result = window.reduce(new ReduceFunction<WaterSensor>() {
            @Override
            public WaterSensor reduce(WaterSensor t1, WaterSensor t2) throws Exception {
                return new WaterSensor(t1.getId(),t1.getTs(),t1.getVc() + t2.getVc());
            }
        });
        // 7.打印
        result.print();
        // 8.执行任务
        env.execute();
    }
    /**
     * 自定义间歇性watermark
     * */
    public static class MyPunt implements WatermarkGenerator<WaterSensor> {
        private Long maxTs;
        private Long maxDelay;
        public MyPunt(Long maxDelay) {
            this.maxDelay = maxDelay;
            this.maxTs = Long.MIN_VALUE + maxDelay + 1;
        }
        //当数据来的时候调用
        @Override
        public void onEvent(WaterSensor event, long eventTimestamp, WatermarkOutput output) {
            System.out.println("取数据中最大的时间戳");
            maxTs = Math.max(eventTimestamp, maxTs);
            output.emitWatermark(new Watermark(maxTs - maxDelay));
        }
        //周期性调用
        @Override
        public void onPeriodicEmit(WatermarkOutput output) {
        }
    }
}

测试数据:

ws_001,1577844001,1
ws_001,1577844002,1
ws_001,1577844012,1

运行结果:

取数据中最大的时间戳
取数据中最大的时间戳
取数据中最大的时间戳
WaterSensor{id='ws_001', ts=1577844001, vc=2}


(5)多并行度下 WaterMark 的传递


WaterMark传递:

  1. 使用广播的方式传输的
  2. 某个并行度中Watermark值取决于前面所有并行度的最小WaterMark值
  3. 当WaterMark值没有增长的时候,不会向下游传递,注意:生成不变

9.png

总结: 多并行度的条件下, 向下游传递 WaterMark 的时候, 总是以最小的那个 WaterMark 为准! 木桶原理!


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
5月前
|
数据安全/隐私保护 流计算
Flink的Interval Join是基于水印(Watermark)和时间窗口(Time Window)实现的
Flink的Interval Join是基于水印(Watermark)和时间窗口(Time Window)实现的
97 2
|
6月前
|
API 数据安全/隐私保护 流计算
Flink教程(12)- Flink高级API(Time与Watermaker)
Flink教程(12)- Flink高级API(Time与Watermaker)
47 0
|
8月前
|
大数据 API 数据安全/隐私保护
大数据Flink Time与Watermaker
大数据Flink Time与Watermaker
31 0
|
BI API 数据处理
带你理解并使用flink中的Time、Window(窗口)、Windows Function(窗口函数)
flink中,streaming流式计算被设计为用于处理无限数据集的数据处理引擎,其中无限数据集是指一种源源不断有数据过来的数据集,window (窗口)将无界数据流切割成为有界数据流进行处理的方式。实现方式是将流分发到有限大小的桶(bucket)中进行分析。flink 中的streaming定义了多种流式处理的时间,Event Time(事件时间)、Ingestion Time(接收时间)、Processing Time(处理时间)。
531 0
带你理解并使用flink中的Time、Window(窗口)、Windows Function(窗口函数)
|
存储 Java Apache
Flink Window 、Time(二)| 学习笔记
快速学习 Flink Window 、Time 。
124 0
|
存储 流计算
Flink - CountAndProcessingTimeTrigger 基于 Count 和 Time 触发窗口
​上一篇文章提到了CountTrigger && ProcessingTimeTriger,前者 CountTrigger 指定 count 数,当窗口内元素满足逻辑时进行一次触发,后者通过 TimeServer 注册窗口过期时间,到期后进行一次触发,本文自定义 Trigger 实现二者的合并即 Count 和 ProcessingTime 满足任意条件窗口都进行一次触发。...
336 0
Flink - CountAndProcessingTimeTrigger 基于 Count 和 Time 触发窗口
|
SQL 消息中间件 监控
|
Java Scala 数据安全/隐私保护
Flink实战(七) - Time & Windows编程(下)
Flink实战(七) - Time & Windows编程(下)
225 0
Flink实战(七) - Time & Windows编程(下)