Flink-08 Flink Java 3分钟上手 滑动窗口 SlidingWindow 时间驱动 事件驱动 TimeWindow CountWindow GlobalWindow

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: Flink-08 Flink Java 3分钟上手 滑动窗口 SlidingWindow 时间驱动 事件驱动 TimeWindow CountWindow GlobalWindow

代码仓库

会同步代码到 GitHub

https://github.com/turbo-duck/flink-demo

上节进度

上节修改了数据源Socketkafka。此外,完成了 滚动窗口-事件驱动

核心代码

三个数据(key相同数据)触发一次事件

WindowedStream<Tuple2<String, Integer>, String, GlobalWindow> countWindow = keyedStream.countWindow(3);
        countWindow.apply(new MyCountWindowFunction()).print();

运行效果如下图

滑动窗口

什么是滑动窗口

Flink 的滑动窗口(Sliding Window)是一种在流处理应用中使用的窗口类型,用于对连续流数据进行分割和处理。

滑动窗口相对于滚动窗口(Tumbling Window)来说更灵活,因为它允许窗口在时间上重叠,从而可以更加精细地分析流数据。


滑动窗口按照固定的时间间隔(滑动步长)在数据流上滑动,并生成多个窗口。

这些窗口可以重叠,因此每条数据可能会被分配到多个窗口中。每个窗口都会独立地进行计算和聚合操作。


|-----|-----|-----|-----|-----|-----|-----|-----|-----|-----|-----|-----|
  0    5    10   15   20   25   30   35   40   45   50   55   60  (时间轴)

窗口 1:从 0 分钟到 10 分钟
窗口 2:从 5 分钟到 15 分钟
窗口 3:从 10 分钟到 20 分钟
窗口 4:从 15 分钟到 25 分钟
窗口 5:从 20 分钟到 30 分钟
...

时间驱动

StartApp

package icu.wzk.demo07;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;

import java.text.SimpleDateFormat;
import java.util.Random;

/**
 * 滑动窗口 SlidingWindow
 * 窗口长度固定 可以有重叠
 * 基于时间驱动、基于事件驱动
 * @author wzk
 * @date 10:51 2024/6/22
**/
public class SlidingWindow {

    private static final Random RANDOM = new Random();

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> dataStreamSource = env.socketTextStream("0.0.0.0", 9999);
        SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
                long timeMillis = System.currentTimeMillis();
                int random = RANDOM.nextInt(10);
                System.err.println("value : " + value + " random : " + random + " timestamp : " + timeMillis + "|" + format.format(timeMillis));
                return new Tuple2<>(value, random);
            }
        });
        KeyedStream<Tuple2<String, Integer>, String> keyedStream = mapStream
                .keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
                    @Override
                    public String getKey(Tuple2<String, Integer> value) throws Exception {
                        return value.f0;
                    }
                });

        // ==================== 时间驱动 ============================
        // 基于时间驱动,每隔5s计算一下最近10s的数据
        WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> timeWindow = keyedStream.timeWindow(Time.seconds(10), Time.seconds(5));
        timeWindow.sum(1).print();
        timeWindow.apply(new MyTimeWindowFunction()).print();
        env.execute();
    }

}

MyTimeWindowFunction

package icu.wzk.demo06;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;


/**
 * 基于时间驱动 TimeWindow
 * @author wzk
 * @date 10:26 2024/6/22
**/
public class MyTimeWindowFunction implements WindowFunction<Tuple2<String,Integer>, String, String, TimeWindow> {

    @Override
    public void apply(String s, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception {
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

        int sum = 0;

        for(Tuple2<String,Integer> tuple2 : input){
            sum +=tuple2.f1;
        }

        long start = window.getStart();
        long end = window.getEnd();

        out.collect("key:" + s + " value: " + sum + "| window_start :"
                + format.format(start) + "  window_end :" + format.format(end)
        );
    }
}

事件驱动

StartApp

package icu.wzk.demo07;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;

import java.text.SimpleDateFormat;
import java.util.Random;

/**
 * 滑动窗口 SlidingWindow
 * 窗口长度固定 可以有重叠
 * 基于时间驱动、基于事件驱动
 * @author wzk
 * @date 10:51 2024/6/22
**/
public class SlidingWindow {

    private static final Random RANDOM = new Random();

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> dataStreamSource = env.socketTextStream("0.0.0.0", 9999);
        SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
                long timeMillis = System.currentTimeMillis();
                int random = RANDOM.nextInt(10);
                System.err.println("value : " + value + " random : " + random + " timestamp : " + timeMillis + "|" + format.format(timeMillis));
                return new Tuple2<>(value, random);
            }
        });
        KeyedStream<Tuple2<String, Integer>, String> keyedStream = mapStream
                .keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
                    @Override
                    public String getKey(Tuple2<String, Integer> value) throws Exception {
                        return value.f0;
                    }
                });
        // =================== 事件驱动 =============================
        //基于事件驱动,每隔2个事件,触发一次计算,本次窗口的大小为3,代表窗口里的每种事件最多为3个
        WindowedStream<Tuple2<String, Integer>, String, GlobalWindow> countWindow = keyedStream
                .countWindow(3, 2);
        countWindow.sum(1).print();
        countWindow.apply(new MyCountWindowFunction()).print();
        env.execute();
    }
}

MyCountWindowFunction

package icu.wzk.demo06;


import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;


/**
 * 基于事件驱动 GlobalWindow
 * @author wzk
 * @date 10:27 2024/6/22
**/
public class MyCountWindowFunction implements WindowFunction<Tuple2<String, Integer>, String, String, GlobalWindow> {

    @Override
    public void apply(String s, GlobalWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception {
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        int sum = 0;
        for (Tuple2<String, Integer> tuple2 : input){
            sum += tuple2.f1;
        }
        // 无用的时间戳,默认值为: Long.MAX_VALUE,因为基于事件计数的情况下,不关心时间。
        long maxTimestamp = window.maxTimestamp();
        out.collect("key:" + s + " value: " + sum + "| maxTimeStamp :"
                + maxTimestamp + "," + format.format(maxTimestamp)
        );
    }
}


相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
目录
相关文章
|
11月前
|
存储 监控 安全
单位网络监控软件:Java 技术驱动的高效网络监管体系构建
在数字化办公时代,构建基于Java技术的单位网络监控软件至关重要。该软件能精准监管单位网络活动,保障信息安全,提升工作效率。通过网络流量监测、访问控制及连接状态监控等模块,实现高效网络监管,确保网络稳定、安全、高效运行。
256 11
|
1月前
|
人工智能 算法 Java
Java与AI驱动区块链:构建智能合约与去中心化AI应用
区块链技术和人工智能的融合正在开创去中心化智能应用的新纪元。本文深入探讨如何使用Java构建AI驱动的区块链应用,涵盖智能合约开发、去中心化AI模型训练与推理、数据隐私保护以及通证经济激励等核心主题。我们将完整展示从区块链基础集成、智能合约编写、AI模型上链到去中心化应用(DApp)开发的全流程,为构建下一代可信、透明的智能去中心化系统提供完整技术方案。
223 3
|
3月前
|
消息中间件 Java Kafka
Java 事件驱动架构设计实战与 Kafka 生态系统组件实操全流程指南
本指南详解Java事件驱动架构与Kafka生态实操,涵盖环境搭建、事件模型定义、生产者与消费者实现、事件测试及高级特性,助你快速构建高可扩展分布式系统。
232 7
|
2月前
|
人工智能 运维 Java
Flink Agents:基于Apache Flink的事件驱动AI智能体框架
本文基于Apache Flink PMC成员宋辛童在Community Over Code Asia 2025的演讲,深入解析Flink Agents项目的技术背景、架构设计与应用场景。该项目聚焦事件驱动型AI智能体,结合Flink的实时处理能力,推动AI在工业场景中的工程化落地,涵盖智能运维、直播分析等典型应用,展现其在AI发展第四层次——智能体AI中的重要意义。
1074 27
Flink Agents:基于Apache Flink的事件驱动AI智能体框架
|
3月前
|
JavaScript 安全 前端开发
Java开发:最新技术驱动的病人挂号系统实操指南与全流程操作技巧汇总
本文介绍基于Spring Boot 3.x、Vue 3等最新技术构建现代化病人挂号系统,涵盖技术选型、核心功能实现与部署方案,助力开发者快速搭建高效、安全的医疗挂号平台。
220 3
|
5月前
|
消息中间件 机器学习/深度学习 Java
java 最新技术驱动的智能教育在线实验室设备管理与实验资源优化实操指南
这是一份基于最新技术的智能教育在线实验室设备管理与实验资源优化的实操指南,涵盖系统搭建、核心功能实现及优化策略。采用Flink实时处理、Kafka消息队列、Elasticsearch搜索分析和Redis缓存等技术栈,结合强化学习动态优化资源调度。指南详细描述了开发环境准备、基础组件部署、数据采集与处理、模型训练、API服务集成及性能调优步骤,支持高并发设备接入与低延迟处理,满足教育机构数字化转型需求。代码已提供下载链接,助力快速构建智能化实验室管理系统。
163 44
|
Java 流计算
利用java8 的 CompletableFuture 优化 Flink 程序
本文探讨了Flink使用avatorscript脚本语言时遇到的性能瓶颈,并通过CompletableFuture优化代码,显著提升了Flink的QPS。文中详细介绍了avatorscript的使用方法,包括自定义函数、从Map中取值、使用Java工具类及AviatorScript函数等,帮助读者更好地理解和应用avatorscript。
239 2
利用java8 的 CompletableFuture 优化 Flink 程序
|
5月前
|
Java 调度 流计算
基于Java 17 + Spring Boot 3.2 + Flink 1.18的智慧实验室管理系统核心代码
这是一套基于Java 17、Spring Boot 3.2和Flink 1.18开发的智慧实验室管理系统核心代码。系统涵盖多协议设备接入(支持OPC UA、MQTT等12种工业协议)、实时异常检测(Flink流处理引擎实现设备状态监控)、强化学习调度(Q-Learning算法优化资源分配)、三维可视化(JavaFX与WebGL渲染实验室空间)、微服务架构(Spring Cloud构建分布式体系)及数据湖建设(Spark构建实验室数据仓库)。实际应用中,该系统显著提升了设备调度效率(响应时间从46分钟降至9秒)、设备利用率(从41%提升至89%),并大幅减少实验准备时间和维护成本。
334 0
|
8月前
|
SQL Java 中间件
【YashanDB知识库】yasdb jdbc驱动集成BeetISQL中间件,业务(java)报autoAssignKey failure异常
在BeetISQL 2.13.8版本中,客户使用batch insert向yashandb表插入数据并尝试获取自动生成的sequence id时,出现类型转换异常。原因是beetlsql在prepareStatement时未指定返回列,导致yashan JDBC驱动返回rowid(字符串),与Java Bean中的数字类型tid不匹配。此问题影响业务流程,使无法正确获取sequence id。解决方法包括:1) 在batchInsert时不返回自动生成的sequence id;2) 升级至BeetISQL 3,其已修正该问题。
【YashanDB知识库】yasdb jdbc驱动集成BeetISQL中间件,业务(java)报autoAssignKey failure异常