Flink-06 Flink Java 3分钟上手 滚动窗口 时间驱动 Kafka TumblingWindow TimeWindowFunction TumblingProcessing

简介: Flink-06 Flink Java 3分钟上手 滚动窗口 时间驱动 Kafka TumblingWindow TimeWindowFunction TumblingProcessing

代码仓库

会同步代码到 GitHub

https://github.com/turbo-duck/flink-demo

上节进度

上一节,已经实现了 Flink + Kafka + Redis 的相关配置和代码。

Kakfa Redis In Docker

KafkaProducer

StartApp

这里没有数据是正常的,因为都 写入到 Redis 了,并没有进行计算。

Redis数据

滚动窗口

Flink 中的滚动窗口(Tumbling Window)是一种常见的窗口机制,用于对数据流进行分割和处理。在滚动窗口中,时间驱动是窗口触发和关闭的关键机制。


什么是滚动窗口?

滚动窗口将数据流按照固定的时间间隔进行分割,每个时间间隔形成一个独立的窗口。滚动窗口的特点是窗口之间不重叠,每个元素只属于一个窗口。

时间驱动

核心代码

WindowedStream<Tuple2<String, Integer>, String, TimeWindow> timeWindow = keyedStream
                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
        timeWindow.apply(new MyTimeWindowFunction()).print();

StartApp

package icu.wzk.demo06;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.api.java.tuple.Tuple2;

import java.text.SimpleDateFormat;
import java.util.Random;


/**
 * 滚动时间窗口 Tumbling Window
 * 时间对齐,窗口长度固定,没有重叠
 * @author wzk
 * @date 10:48 2024/6/22
**/
public class TumblingWindow {

    private static final Random RANDOM = new Random();

    public static void main(String[] args) throws Exception {
        //设置执行环境,类似spark中初始化sparkContext
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> dataStreamSource = env.socketTextStream("0.0.0.0", 9999);
        SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
                long timeMillis = System.currentTimeMillis();
                int random = RANDOM.nextInt(10);
                System.out.println("value: " + value + " random: " + random + "timestamp: " + timeMillis + "|" + format.format(timeMillis));
                return new Tuple2<>(value, random);
            }
        });
        KeyedStream<Tuple2<String, Integer>, String> keyedStream = mapStream
                .keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
                    @Override
                    public String getKey(Tuple2<String, Integer> value) throws Exception {
                        return value.f0;
                    }
                });

        // =============== 时间驱动 ============================
        // 每隔10s划分一个窗口
        WindowedStream<Tuple2<String, Integer>, String, TimeWindow> timeWindow = keyedStream
                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
        timeWindow.apply(new MyTimeWindowFunction()).print();
        env.execute();
    }

}

MyTimeWindowFunction

package icu.wzk.demo06;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;


/**
 * 基于时间驱动 TimeWindow
 * @author wzk
 * @date 10:26 2024/6/22
**/
public class MyTimeWindowFunction implements WindowFunction<Tuple2<String,Integer>, String, String, TimeWindow> {

    @Override
    public void apply(String s, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception {
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

        int sum = 0;

        for(Tuple2<String,Integer> tuple2 : input){
            sum +=tuple2.f1;
        }

        long start = window.getStart();
        long end = window.getEnd();

        out.collect("key:" + s + " value: " + sum + "| window_start :"
                + format.format(start) + "  window_end :" + format.format(end)
        );
    }
}


相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
目录
相关文章
|
6月前
|
人工智能 算法 Java
Java与AI驱动区块链:构建智能合约与去中心化AI应用
区块链技术和人工智能的融合正在开创去中心化智能应用的新纪元。本文深入探讨如何使用Java构建AI驱动的区块链应用,涵盖智能合约开发、去中心化AI模型训练与推理、数据隐私保护以及通证经济激励等核心主题。我们将完整展示从区块链基础集成、智能合约编写、AI模型上链到去中心化应用(DApp)开发的全流程,为构建下一代可信、透明的智能去中心化系统提供完整技术方案。
437 3
消息中间件 存储 传感器
417 0
|
8月前
|
JavaScript 安全 前端开发
Java开发:最新技术驱动的病人挂号系统实操指南与全流程操作技巧汇总
本文介绍基于Spring Boot 3.x、Vue 3等最新技术构建现代化病人挂号系统,涵盖技术选型、核心功能实现与部署方案,助力开发者快速搭建高效、安全的医疗挂号平台。
404 5
|
10月前
|
消息中间件 机器学习/深度学习 Java
java 最新技术驱动的智能教育在线实验室设备管理与实验资源优化实操指南
这是一份基于最新技术的智能教育在线实验室设备管理与实验资源优化的实操指南,涵盖系统搭建、核心功能实现及优化策略。采用Flink实时处理、Kafka消息队列、Elasticsearch搜索分析和Redis缓存等技术栈,结合强化学习动态优化资源调度。指南详细描述了开发环境准备、基础组件部署、数据采集与处理、模型训练、API服务集成及性能调优步骤,支持高并发设备接入与低延迟处理,满足教育机构数字化转型需求。代码已提供下载链接,助力快速构建智能化实验室管理系统。
255 44
|
10月前
|
Java 调度 流计算
基于Java 17 + Spring Boot 3.2 + Flink 1.18的智慧实验室管理系统核心代码
这是一套基于Java 17、Spring Boot 3.2和Flink 1.18开发的智慧实验室管理系统核心代码。系统涵盖多协议设备接入(支持OPC UA、MQTT等12种工业协议)、实时异常检测(Flink流处理引擎实现设备状态监控)、强化学习调度(Q-Learning算法优化资源分配)、三维可视化(JavaFX与WebGL渲染实验室空间)、微服务架构(Spring Cloud构建分布式体系)及数据湖建设(Spark构建实验室数据仓库)。实际应用中,该系统显著提升了设备调度效率(响应时间从46分钟降至9秒)、设备利用率(从41%提升至89%),并大幅减少实验准备时间和维护成本。
506 0
|
10月前
|
消息中间件 SQL 关系型数据库
Flink CDC + Kafka 加速业务实时化
Flink CDC 是一种支持流批一体的分布式数据集成工具,通过 YAML 配置实现数据传输过程中的路由与转换操作。它已从单一数据源的 CDC 数据流发展为完整的数据同步解决方案,支持 MySQL、Kafka 等多种数据源和目标端(如 Delta Lake、Iceberg)。其核心功能包括多样化数据输入链路、Schema Evolution、Transform 和 Routing 模块,以及丰富的监控指标。相比传统 SQL 和 DataStream 作业,Flink CDC 提供更灵活的 Schema 变更控制和原始 binlog 同步能力。
|
11月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
325 11
|
11月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
在数字化转型中,企业亟需从海量数据中快速提取价值并转化为业务增长动力。5月15日19:00-21:00,阿里云三位技术专家将讲解Kafka与Flink的强强联合方案,帮助企业零门槛构建分布式实时分析平台。此组合广泛应用于实时风控、用户行为追踪等场景,具备高吞吐、弹性扩缩容及亚秒级响应优势。直播适合初学者、开发者和数据工程师,参与还有机会领取定制好礼!扫描海报二维码或点击链接预约直播:[https://developer.aliyun.com/live/255088](https://developer.aliyun.com/live/255088)
680 35
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
|
12月前
|
存储 机器学习/深度学习 监控
如何监控员工的电脑——基于滑动时间窗口的Java事件聚合算法实现探析​
在企业管理场景中,如何监控员工的电脑操作行为是一个涉及效率与合规性的重要课题。传统方法依赖日志采集或屏幕截图,但数据量庞大且实时性不足。本文提出一种基于滑动时间窗口的事件聚合算法,通过Java语言实现高效、低资源占用的监控逻辑,为如何监控员工的电脑提供一种轻量化解决方案。
447 3
|
SQL druid Oracle
【YashanDB知识库】yasdb jdbc驱动集成druid连接池,业务(java)日志中有token IDENTIFIER start异常
客户Java日志中出现异常,影响Druid的merge SQL功能(将SQL字面量替换为绑定变量以统计性能),但不影响正常业务流程。原因是Druid在merge SQL时传入null作为dbType,导致无法解析递归查询中的`start`关键字。
下一篇
开通oss服务