大数据-120 - Flink Window 窗口机制-滑动时间窗口、会话窗口-基于时间驱动&基于事件驱动

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: 大数据-120 - Flink Window 窗口机制-滑动时间窗口、会话窗口-基于时间驱动&基于事件驱动

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)

HDFS(已更完)

MapReduce(已更完)

Hive(已更完)

Flume(已更完)

Sqoop(已更完)

Zookeeper(已更完)

HBase(已更完)

Redis (已更完)

Kafka(已更完)

Spark(已更完)

Flink(正在更新!)

章节内容

上节我们完成了如下的内容:


Flink Window 背景总览

Flink Window 滚动时间窗口

基于时间驱动

基于事件驱动

滑动时间窗口

滑动窗口是固定窗口更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成。Flink 的滑动时间窗口(Sliding Window)是一种常用的窗口机制,适用于处理流式数据时需要在时间范围内定期计算的场景。滑动窗口会按照指定的窗口大小(window size)和滑动步长(slide interval)不断地划分数据,并对每个窗口内的数据进行聚合计算。


类型特点

窗口长度固定,可以有重叠。


滑动窗口会有重叠部分,因此每个事件可能会被包含在多个窗口中。

滑动窗口更适合定期计算某个时间范围内的聚合值,像是移动平均值、最近一段时间的活跃用户等场景。

关键参数

窗口大小(window size):每个窗口包含的时间范围,例如 10 秒。

滑动步长(slide interval):窗口每次滑动的时间步长,例如 5 秒。这意味着每隔 5 秒就会创建一个新的窗口,每个窗口覆盖的时间范围是 10 秒。

基于时间驱动

场景:我们可以每30秒计算一次最近一分钟用户购买的商品数

package icu.wzk;


import org.apache.commons.math3.analysis.function.Sin;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;


import java.text.SimpleDateFormat;
import java.util.Random;

public class SlidingWindow {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 9999);
        SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource
                .map(new MapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> map(String value) throws Exception {
                        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                        long timeMillis = System.currentTimeMillis();
                        int random = new Random().nextInt(10);
                        System.out.println("value: " + value + ", random: " + random +
                                ", timestamp: " + format.format(timeMillis));
                        return Tuple2.of(value, random);
                    }
                });
        KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = mapStream
                .keyBy(new KeySelector<Tuple2<String, Integer>, Tuple>() {
                    @Override
                    public Tuple getKey(Tuple2<String, Integer> value) throws Exception {
                        return Tuple1.of(value.f0);
                    }
                });
        WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> timeWindow = keyedStream
                .timeWindow(Time.seconds(10), Time.seconds(5));
        timeWindow.apply(new MyTimeWindowFunction()).print();
        
        env.execute("SlidingWindow");
    }

}

基于事件驱动

package icu.wzk;


import org.apache.commons.math3.analysis.function.Sin;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;


import java.text.SimpleDateFormat;
import java.util.Random;

public class SlidingWindow {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 9999);
        SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource
                .map(new MapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> map(String value) throws Exception {
                        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                        long timeMillis = System.currentTimeMillis();
                        int random = new Random().nextInt(10);
                        System.out.println("value: " + value + ", random: " + random +
                                ", timestamp: " + format.format(timeMillis));
                        return Tuple2.of(value, random);
                    }
                });
        KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = mapStream
                .keyBy(new KeySelector<Tuple2<String, Integer>, Tuple>() {
                    @Override
                    public Tuple getKey(Tuple2<String, Integer> value) throws Exception {
                        return Tuple1.of(value.f0);
                    }
                });
        WindowedStream<Tuple2<String, Integer>, Tuple, GlobalWindow> globalWindow = keyedStream
                .countWindow(3, 2);
        globalWindow.apply(new MyCountWindowFuntion()).print();
        
        env.execute("SlidingWindow");
    }

}

会话窗口

由一系列事件组合一个指定时间长度timeout间隙组成,类似于Web应用的Session,也就是一段时间没有接收到新数据会生成新的窗口。

Session窗口分配器通过Session活动来对元素进行分组,Session窗口跟滚动窗口和滑动窗口相比,不会有重叠和固定的开始时间和结束时间的情况。

Session窗口在一个固定的时间周期内不再收到元素,即非活动间隔产生,那么这个窗口就会关闭。

一个Session窗口通过一个Session间隔来配置,这个Session间隔定义了非活跃周期的长度,当这个非活跃周期产生,那么当前的Session将关闭并且后续的元素将被分配到新的Session窗口去。


类型特点

会话窗口不重叠,没有固定的开始和结束时间

于翻滚窗口和滑动窗口相反,当会话窗口在一段时间内没有接收到元素时会关闭会话窗口。

后续的元素将会被分配到新的会话窗口

基于时间驱动

package icu.wzk;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

public class SessionWindow {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 9999);
        SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource
                .map(new MapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> map(String value) throws Exception {

                        return null;
                    }
                });
        KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = mapStream
                .keyBy(new KeySelector<Tuple2<String, Integer>, Tuple>() {
                    @Override
                    public Tuple getKey(Tuple2<String, Integer> value) throws Exception {
                        return Tuple1.of(value.f0);
                    }
                });
        WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> window = keyedStream
                .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)));
        window.apply(new MyTimeWindowFunction()).print();
        env.execute("SessionWindow");
    }

}


相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
2月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
167 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
3月前
|
分布式计算 监控 大数据
大数据-131 - Flink CEP 案例:检测交易活跃用户、超时未交付
大数据-131 - Flink CEP 案例:检测交易活跃用户、超时未交付
97 0
|
19天前
|
数据处理 数据安全/隐私保护 流计算
Flink 三种时间窗口、窗口处理函数使用及案例
Flink 是处理无界数据流的强大工具,提供了丰富的窗口机制。本文介绍了三种时间窗口(滚动窗口、滑动窗口和会话窗口)及其使用方法,包括时间窗口的概念、窗口处理函数的使用和实际案例。通过这些机制,可以灵活地对数据流进行分析和计算,满足不同的业务需求。
128 27
zdl
|
2月前
|
消息中间件 运维 大数据
大数据实时计算产品的对比测评:实时计算Flink版 VS 自建Flink集群
本文介绍了实时计算Flink版与自建Flink集群的对比,涵盖部署成本、性能表现、易用性和企业级能力等方面。实时计算Flink版作为全托管服务,显著降低了运维成本,提供了强大的集成能力和弹性扩展,特别适合中小型团队和业务波动大的场景。文中还提出了改进建议,并探讨了与其他产品的联动可能性。总结指出,实时计算Flink版在简化运维、降低成本和提升易用性方面表现出色,是大数据实时计算的优选方案。
zdl
173 56
|
2月前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
78 1
|
3月前
|
分布式计算 监控 大数据
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
91 1
|
3月前
|
消息中间件 分布式计算 Kafka
大数据平台的毕业设计02:Spark与实时计算
大数据平台的毕业设计02:Spark与实时计算
136 0
|
3月前
|
SQL 运维 大数据
大数据实时计算产品的对比测评
在使用多种Flink实时计算产品后,我发现Flink凭借其流批一体的优势,在实时数据处理领域表现出色。它不仅支持复杂的窗口机制与事件时间处理,还具备高效的数据吞吐能力和精准的状态管理,确保数据处理既快又准。此外,Flink提供了多样化的编程接口和运维工具,简化了开发流程,但在界面友好度上还有提升空间。针对企业级应用,Flink展现了高可用性和安全性,不过价格因素可能影响小型企业的采纳决策。未来可进一步优化文档和自动化调优工具,以提升用户体验。
151 0
|
3月前
|
SQL 大数据 API
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
64 0
|
4月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。