Flink-06 Flink Java 3分钟上手 滚动窗口 时间驱动 Kafka TumblingWindow TimeWindowFunction TumblingProcessing

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink-06 Flink Java 3分钟上手 滚动窗口 时间驱动 Kafka TumblingWindow TimeWindowFunction TumblingProcessing

代码仓库

会同步代码到 GitHub

https://github.com/turbo-duck/flink-demo

上节进度

上一节,已经实现了 Flink + Kafka + Redis 的相关配置和代码。

Kakfa Redis In Docker

KafkaProducer

StartApp

这里没有数据是正常的,因为都 写入到 Redis 了,并没有进行计算。

Redis数据

滚动窗口

Flink 中的滚动窗口(Tumbling Window)是一种常见的窗口机制,用于对数据流进行分割和处理。在滚动窗口中,时间驱动是窗口触发和关闭的关键机制。


什么是滚动窗口?

滚动窗口将数据流按照固定的时间间隔进行分割,每个时间间隔形成一个独立的窗口。滚动窗口的特点是窗口之间不重叠,每个元素只属于一个窗口。

时间驱动

核心代码

WindowedStream<Tuple2<String, Integer>, String, TimeWindow> timeWindow = keyedStream
                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
        timeWindow.apply(new MyTimeWindowFunction()).print();

StartApp

package icu.wzk.demo06;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.api.java.tuple.Tuple2;

import java.text.SimpleDateFormat;
import java.util.Random;


/**
 * 滚动时间窗口 Tumbling Window
 * 时间对齐,窗口长度固定,没有重叠
 * @author wzk
 * @date 10:48 2024/6/22
**/
public class TumblingWindow {

    private static final Random RANDOM = new Random();

    public static void main(String[] args) throws Exception {
        //设置执行环境,类似spark中初始化sparkContext
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> dataStreamSource = env.socketTextStream("0.0.0.0", 9999);
        SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
                long timeMillis = System.currentTimeMillis();
                int random = RANDOM.nextInt(10);
                System.out.println("value: " + value + " random: " + random + "timestamp: " + timeMillis + "|" + format.format(timeMillis));
                return new Tuple2<>(value, random);
            }
        });
        KeyedStream<Tuple2<String, Integer>, String> keyedStream = mapStream
                .keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
                    @Override
                    public String getKey(Tuple2<String, Integer> value) throws Exception {
                        return value.f0;
                    }
                });

        // =============== 时间驱动 ============================
        // 每隔10s划分一个窗口
        WindowedStream<Tuple2<String, Integer>, String, TimeWindow> timeWindow = keyedStream
                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
        timeWindow.apply(new MyTimeWindowFunction()).print();
        env.execute();
    }

}

MyTimeWindowFunction

package icu.wzk.demo06;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;


/**
 * 基于时间驱动 TimeWindow
 * @author wzk
 * @date 10:26 2024/6/22
**/
public class MyTimeWindowFunction implements WindowFunction<Tuple2<String,Integer>, String, String, TimeWindow> {

    @Override
    public void apply(String s, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception {
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

        int sum = 0;

        for(Tuple2<String,Integer> tuple2 : input){
            sum +=tuple2.f1;
        }

        long start = window.getStart();
        long end = window.getEnd();

        out.collect("key:" + s + " value: " + sum + "| window_start :"
                + format.format(start) + "  window_end :" + format.format(end)
        );
    }
}


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
2月前
|
Java 流计算
利用java8 的 CompletableFuture 优化 Flink 程序
本文探讨了Flink使用avatorscript脚本语言时遇到的性能瓶颈,并通过CompletableFuture优化代码,显著提升了Flink的QPS。文中详细介绍了avatorscript的使用方法,包括自定义函数、从Map中取值、使用Java工具类及AviatorScript函数等,帮助读者更好地理解和应用avatorscript。
利用java8 的 CompletableFuture 优化 Flink 程序
|
2月前
|
SQL 消息中间件 分布式计算
大数据-120 - Flink Window 窗口机制-滑动时间窗口、会话窗口-基于时间驱动&基于事件驱动
大数据-120 - Flink Window 窗口机制-滑动时间窗口、会话窗口-基于时间驱动&基于事件驱动
98 0
|
2月前
|
SQL 分布式计算 大数据
大数据-119 - Flink Window总览 窗口机制-滚动时间窗口-基于时间驱动&基于事件驱动
大数据-119 - Flink Window总览 窗口机制-滚动时间窗口-基于时间驱动&基于事件驱动
73 0
|
25天前
|
消息中间件 资源调度 Java
用Java实现samza转换成flink
【10月更文挑战第20天】
|
2月前
|
Java Shell Maven
Flink-11 Flink Java 3分钟上手 打包Flink 提交任务至服务器执行 JobSubmit Maven打包Ja配置 maven-shade-plugin
Flink-11 Flink Java 3分钟上手 打包Flink 提交任务至服务器执行 JobSubmit Maven打包Ja配置 maven-shade-plugin
118 4
|
2月前
|
Kubernetes Cloud Native 流计算
Flink-12 Flink Java 3分钟上手 Kubernetes云原生下的Flink集群 Rancher Stateful Set yaml详细 扩容缩容部署 Docker容器编排
Flink-12 Flink Java 3分钟上手 Kubernetes云原生下的Flink集群 Rancher Stateful Set yaml详细 扩容缩容部署 Docker容器编排
77 3
|
2月前
|
分布式计算 Java 大数据
大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window
大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window
36 0
|
消息中间件 存储 缓存
Kafka 3.0重磅发布,弃用 Java 8 的支持!
Kafka 3.0重磅发布,弃用 Java 8 的支持!
1153 0
Kafka 3.0重磅发布,弃用 Java 8 的支持!
|
12天前
|
Java 开发者
Java多线程编程中的常见误区与最佳实践####
本文深入剖析了Java多线程编程中开发者常遇到的几个典型误区,如对`start()`与`run()`方法的混淆使用、忽视线程安全问题、错误处理未同步的共享变量等,并针对这些问题提出了具体的解决方案和最佳实践。通过实例代码对比,直观展示了正确与错误的实现方式,旨在帮助读者构建更加健壮、高效的多线程应用程序。 ####
|
3天前
|
缓存 Java 开发者
Java多线程编程的陷阱与最佳实践####
本文深入探讨了Java多线程编程中常见的陷阱,如竞态条件、死锁和内存一致性错误,并提供了实用的避免策略。通过分析典型错误案例,本文旨在帮助开发者更好地理解和掌握多线程环境下的编程技巧,从而提升并发程序的稳定性和性能。 ####