Flink 实现自定义滑动窗口

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 背景一般情况下 Flink 提供的窗口可以满足我们大部分的场景,但是有时候我们需要计算一个固定时间范围内的数据,比如实时计算每天凌晨到第二天凌晨的数据,或者每天上午 7 点到第二天上午 7 点。类似于这种情况 Flink 默认提供的窗口是不支持的,因为 Flink 计算窗口的开始时间和结束时间是根据数据本身携带的时间戳然后把数据划分到不同的窗口的,所以它不是一个固定的范围。这个时候就需要我们自己实现窗口划分的逻辑。Flink 提供了 WindowAssigner 抽象类,我们只需要实现 assignWindows 方法即可。

背景


一般情况下 Flink 提供的窗口可以满足我们大部分的场景,但是有时候我们需要计算一个固定时间范围内的数据,比如实时计算每天凌晨到第二天凌晨的数据,或者每天上午 7 点到第二天上午 7 点。类似于这种情况 Flink 默认提供的窗口是不支持的,因为 Flink 计算窗口的开始时间和结束时间是根据数据本身携带的时间戳然后把数据划分到不同的窗口的,所以它不是一个固定的范围。这个时候就需要我们自己实现窗口划分的逻辑。Flink 提供了 WindowAssigner 抽象类,我们只需要实现 assignWindows 方法即可。


WindowAssigner 源码


@PublicEvolving
public abstract class WindowAssigner<T, W extends Window> implements Serializable {
    private static final long serialVersionUID = 1L;
    /**
     * Returns a {@code Collection} of windows that should be assigned to the element.
     *
     * @param element The element to which windows should be assigned.
     * @param timestamp The timestamp of the element.
     * @param context The {@link WindowAssignerContext} in which the assigner operates.
     */
    public abstract Collection<W> assignWindows(
            T element, long timestamp, WindowAssignerContext context);
    /** Returns the default trigger associated with this {@code WindowAssigner}. */
    public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);
    /**
     * Returns a {@link TypeSerializer} for serializing windows that are assigned by this {@code
     * WindowAssigner}.
     */
    public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);
    /**
     * Returns {@code true} if elements are assigned to windows based on event time, {@code false}
     * otherwise.
     */
    public abstract boolean isEventTime();
}


其中 assignWindows 方法决定了一条数据应该划分到几个窗口里面,getDefaultTrigger 返回和 WindowAssigner 相关联的默认触发器,决定何时触发窗口计算,getWindowSerializer 返回窗口的序列化器,isEventTime 返回是否是 eventtime 时间语义。


自定义 MyEventTimeWindow 实现


package flink.streaming.window;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collection;
import java.util.List;
/**
 * 自定义实现 window
 */
public class MyEventTimeWindow extends WindowAssigner<Object, TimeWindow> {
    // 窗口的大小
    private final long size;
    // 多长时间滑动一次
    private final long slide;
    // 窗口偏移量
    private final long offset;
    protected MyEventTimeWindow(long size, long slide, long offset) {
        this.size = size;
        this.slide = slide;
        this.offset = offset;
    }
    public static MyEventTimeWindow of(Time size, Time slide, Time offset) {
        return new MyEventTimeWindow(size.toMilliseconds(), slide.toMilliseconds(), offset.toMilliseconds());
    }
    public static MyEventTimeWindow of(Time size, Time slide) {
        return new MyEventTimeWindow(size.toMilliseconds(), slide.toMilliseconds(), 0L);
    }
    @Override
    public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext windowAssignerContext) {
        Calendar calendar = Calendar.getInstance();
        calendar.setTimeInMillis(timestamp);
        // 设置从每天的0点开始计算
        calendar.set(Calendar.HOUR_OF_DAY, 0);
        calendar.set(Calendar.MINUTE, 0);
        calendar.set(Calendar.SECOND, 0);
        calendar.set(Calendar.MILLISECOND, 0);
        // 获取窗口的开始时间 其实就是 0 点
        long winStart = calendar.getTimeInMillis();
        // 获取窗口的结束时间,就是在开始时间的基础上加上窗口的长度 这里是 1 天
        calendar.add(Calendar.DATE, 1);
        // 获取窗口的结束时间 其实就是第二天的 0 点
        long winEnd = calendar.getTimeInMillis() + 1;
        String format = String.format("window的开始时间:%s,window的结束时间:%s", winStart, winEnd);
        System.out.println(format);
        // 当前数据所属窗口的结束时间
        long currentWindowEnd = TimeWindow.getWindowStartWithOffset(timestamp, this.offset, this.slide) + slide;
        System.out.println(TimeWindow.getWindowStartWithOffset(timestamp, this.offset, this.slide) + "====" + currentWindowEnd);
        // 一条数据属于几个窗口 因为是滑动窗口一条数据会分配到多个窗口里
        int windowCounts = (int) ((winEnd - currentWindowEnd) / slide);
        List<TimeWindow> windows = new ArrayList<>(windowCounts);
        long currentEnd = currentWindowEnd;
        if (timestamp > Long.MIN_VALUE) {
            while (currentEnd < winEnd) {
                windows.add(new TimeWindow(winStart, currentEnd));
                currentEnd += slide;
            }
        }
        return windows;
    }
    @Override
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment streamExecutionEnvironment) {
        return EventTimeTrigger.create();
    }
    @Override
    public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
        return new TimeWindow.Serializer();
    }
    @Override
    public boolean isEventTime() {
        return true;
    }
}


这里主要有两个问题:


1,数据被分到几个窗口?窗口的长度 / 窗口滑动的步长 = 窗口的个数。


2,窗口的开始时间和结束时间怎么计算?对应的 TimeWindow#getWindowStartWithOffset 方法。


getWindowStartWithOffset 源码


/**
 * Method to get the window start for a timestamp.
 *
 * @param timestamp epoch millisecond to get the window start.
 * @param offset The offset which window start would be shifted by.
 * @param windowSize The size of the generated windows.
 * @return window start
 */
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
    return timestamp - (timestamp - offset + windowSize) % windowSize;
}


窗口的开始时间主要就是通过上面的算法计算而来,有了窗口的开始时间,那结束时间就非常简单了,直接加上窗口的大小就好了。


验证结果


window = ds.window(MyEventTimeWindow.of(Time.days(1), Time.hours(1)))
window的开始时间:1639238400000(2021-12-12 00:00:00),window的结束时间:1639324800001(2021-12-13 00:00:00)
3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639242000000)
3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639245600000)
3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639249200000)
3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639252800000)
3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639256400000)
3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639260000000)
3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639263600000)
3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639267200000)
3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639270800000)
3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639274400000)
3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639278000000)
3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639281600000)
3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639285200000)
3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639288800000)
3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639292400000)
3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639296000000)
3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639299600000)
3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639303200000)
3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639306800000)
3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639310400000)
3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639314000000)
3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639317600000)
3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639321200000)
3> FlinkWindowWordCountDemo.WindowResult(name=JasonLee, count=5, windowStart=1639238400000, windowEnd=1639324800000)


窗口的开始时间是 2021-12-12 00:00:00,窗口的结束时间是 2021-12-13 00:00:00。窗口的长度是 24 小时,每隔 1 小时滑动一次,所以一条数据会分配到 24 个窗口里面,所以触发了 24 个窗口计算,结果也没有问题,这样就实现了任意时间的滑动窗口。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
1月前
|
消息中间件 分布式计算 大数据
大数据-113 Flink DataStreamAPI 程序输入源 自定义输入源 非并行源与并行源
大数据-113 Flink DataStreamAPI 程序输入源 自定义输入源 非并行源与并行源
45 0
|
6月前
|
关系型数据库 MySQL 流计算
Flink自定义sink写入mysql
Flink自定义sink写入mysql
92 0
|
6月前
|
流计算
Flink自定义source、自定义sink
Flink自定义source、自定义sink
120 0
|
消息中间件 SQL Java
Flink自定义Connector
Flink自定义Connector
462 0
|
消息中间件 关系型数据库 MySQL
Flink--6、输出算子(连接到外部系统、文件、kafka、MySQL、自定义Sink)
Flink--6、输出算子(连接到外部系统、文件、kafka、MySQL、自定义Sink)
|
1月前
|
分布式计算 监控 大数据
大数据-114 Flink DataStreamAPI 程序输入源 自定义输入源 Rich并行源 RichParallelSourceFunction
大数据-114 Flink DataStreamAPI 程序输入源 自定义输入源 Rich并行源 RichParallelSourceFunction
46 0
|
1月前
|
消息中间件 Java Kafka
Flink-08 Flink Java 3分钟上手 滑动窗口 SlidingWindow 时间驱动 事件驱动 TimeWindow CountWindow GlobalWindow
Flink-08 Flink Java 3分钟上手 滑动窗口 SlidingWindow 时间驱动 事件驱动 TimeWindow CountWindow GlobalWindow
69 7
|
4月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之在自定义RichSinkFunction中,如何获取source的schema
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
存储 数据库连接 数据处理
实时计算 Flink版产品使用合集之下游连接器的写入频率自定义配置的步骤是什么
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
消息中间件 Oracle 关系型数据库
实时计算 Flink版产品使用合集之如果想自定义connector和pipeline要如何入手
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
下一篇
无影云桌面