大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)

HDFS(已更完)

MapReduce(已更完)

Hive(已更完)

Flume(已更完)

Sqoop(已更完)

Zookeeper(已更完)

HBase(已更完)

Redis (已更完)

Kafka(已更完)

Spark(已更完)

Flink(正在更新!)

章节内容

上节我们完成了如下的内容:


Flink Time 详解

示例内容分析

Watermark

Watermark

Watermark 在窗口计算中的作用

在使用基于事件时间的窗口时,Flink 依赖 Watermark 来决定何时触发窗口计算。例如,如果你有一个每 10 秒的滚动窗口,当 Watermark 达到某个窗口的结束时间后,Flink 才会触发该窗口的计算。


假设有一个 10 秒的窗口,并且 Watermark 达到 12:00:10,此时 Flink 会触发 12:00:00 - 12:00:10 的窗口计算。


如何处理迟到事件

尽管 Watermark 能有效解决乱序问题,但总有可能会出现事件在生成 Watermark 之后才到达的情况(即“迟到事件”)。为此,Flink 提供了处理迟到事件的机制:


允许一定的延迟处理:可以配置窗口允许迟到的时间。

迟到事件的侧输出流(Side Output):可以将迟到的事件发送到一个侧输出流中,以便后续处理。


DataStream<Tuple2<String, Integer>> mainStream = 
  stream.keyBy(t -> t.f0)
        .window(TumblingEventTimeWindows.of(Time.seconds(10)))
        .allowedLateness(Time.seconds(5))
        .sideOutputLateData(lateOutputTag);

代码实现

数据格式

01,1586489566000
01,1586489567000
01,1586489568000
01,1586489569000
01,1586489570000
01,1586489571000
01,1586489572000
01,1586489573000
01,1586489574000
01,1586489575000
01,1586489576000
01,1586489577000
01,1586489578000
01,1586489579000

编写代码

这段代码实现了:


通过 socket 获取实时流数据。

将流数据映射成带有时间戳的二元组形式。

应用了一个允许 5 秒乱序的水印策略,确保 Flink 可以处理乱序的事件流。

按照事件的 key 进行分组,并在事件时间的基础上进行 5 秒的滚动窗口计算。

最后输出每个窗口内事件的时间范围、窗口开始和结束时间等信息。

其中,这里对流数据进行了按 key(事件的第一个字段)分组,并且使用了 滚动窗口(Tumbling Window),窗口长度为 5 秒。

在 apply 方法中,你收集窗口中的所有事件,并根据事件时间戳进行排序,然后输出每个窗口的开始和结束时间,以及窗口中最早和最晚事件的时间戳。

SingleOutputStreamOperator<String> res = waterMark
    .keyBy(new KeySelector<Tuple2<String, Long>, String>() {
        @Override
        public String getKey(Tuple2<String, Long> value) throws Exception {
            return value.f0;
        }
    })
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .apply(new WindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() {
        @Override
        public void apply(String s, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception {
            List<Long> list = new ArrayList<>();
            for (Tuple2<String, Long> next : input) {
                list.add(next.f1);
            }
            Collections.sort(list);
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
            String result = "key: " + s + ", list.size(): " + list.size() + ", list.get(0): " + sdf.format(list.get(0)) + ", list.get(last): " + sdf.format(list.get(list.size() - 1))
                    + ", start: " + sdf.format(window.getStart()) + ", end: " + sdf.format(window.getEnd());
            out.collect(result);
        }
    });

水印的策略,定义了一个Bounded Out-of-Orderness 的水印策略,允许最多 5 秒的事件乱序,在 extractTimestamp 中,提取了事件的时间戳,并打印出每个事件的 key 和对应的事件时间。还维护了一个 currentMaxTimestamp 来记录当前最大的事件时间戳:

WatermarkStrategy<Tuple2<String, Long>> watermarkStrategy = WatermarkStrategy
    .<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
    .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {

        Long currentMaxTimestamp = 0L;
        final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

        @Override
        public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
            long timestamp = element.f1;
            currentMaxTimestamp = Math.max(currentMaxTimestamp, timestamp);
            System.out.println("Key:" + element.f0 + ", EventTime: " + element.f1 + ", " + format.format(element.f1));
            return element.f1;
        }
    });

完整代码如下所示,代码实现了一个基于事件时间的流处理系统,并通过水印(Watermark)机制来处理乱序事件:

package icu.wzk;


import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;


public class WatermarkTest01 {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        DataStreamSource<String> data = env.socketTextStream("localhost", 9999);
        SingleOutputStreamOperator<Tuple2<String, Long>> mapped = data.map(
                new MapFunction<String, Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> map(String value) throws Exception {
                        String[] split = value.split(",");
                        return new Tuple2<>(split[0], Long.valueOf(split[1]));
                    }
                }
        );

        WatermarkStrategy<Tuple2<String, Long>> watermarkStrategy = WatermarkStrategy
                .<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {

                    Long currentMaxTimestamp = 0L;
                    final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

                    @Override
                    public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                        long timestamp = element.f1;
                        currentMaxTimestamp = Math.max(currentMaxTimestamp, timestamp);
                        System.out.println("Key:" + element.f0 + ", EventTime: " + element.f1 + ", " + format.format(element.f1));
                        return element.f1;
                    }
                });

        SingleOutputStreamOperator<Tuple2<String, Long>> waterMark = mapped
                .assignTimestampsAndWatermarks(watermarkStrategy);
        SingleOutputStreamOperator<String> res = waterMark
                .keyBy(new KeySelector<Tuple2<String, Long>, String>() {
                    @Override
                    public String getKey(Tuple2<String, Long> value) throws Exception {
                        return value.f0;
                    }
                })
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .apply(new WindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() {
                    @Override
                    public void apply(String s, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception {
                        List<Long> list = new ArrayList<>();
                        for (Tuple2<String, Long> next : input) {
                            list.add(next.f1);
                        }
                        Collections.sort(list);
                        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
                        String result = "key: " + s + ", list.size(): " + list.size() + ", list.get(0): " + sdf.format(list.get(0)) + ", list.get(last): " + sdf.format(list.get(list.size() - 1))
                                + ", start: " + sdf.format(window.getStart()) + ", end: " + sdf.format(window.getEnd());
                        out.collect(result);
                    }
                });

        res.print();
        env.execute();
    }
}

运行代码

传入数据

在控制台中,输入如下的数据:

查看结果

控制台运行结果如下:


相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
1月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
130 2
ClickHouse与大数据生态集成:Spark & Flink 实战
zdl
|
1月前
|
消息中间件 运维 大数据
大数据实时计算产品的对比测评:实时计算Flink版 VS 自建Flink集群
本文介绍了实时计算Flink版与自建Flink集群的对比,涵盖部署成本、性能表现、易用性和企业级能力等方面。实时计算Flink版作为全托管服务,显著降低了运维成本,提供了强大的集成能力和弹性扩展,特别适合中小型团队和业务波动大的场景。文中还提出了改进建议,并探讨了与其他产品的联动可能性。总结指出,实时计算Flink版在简化运维、降低成本和提升易用性方面表现出色,是大数据实时计算的优选方案。
zdl
155 56
|
1月前
|
分布式计算 Java MaxCompute
ODPS MR节点跑graph连通分量计算代码报错java heap space如何解决
任务启动命令:jar -resources odps-graph-connect-family-2.0-SNAPSHOT.jar -classpath ./odps-graph-connect-family-2.0-SNAPSHOT.jar ConnectFamily 若是设置参数该如何设置
|
1月前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
69 1
|
2月前
|
分布式计算 Hadoop 大数据
大数据体系知识学习(一):PySpark和Hadoop环境的搭建与测试
这篇文章是关于大数据体系知识学习的,主要介绍了Apache Spark的基本概念、特点、组件,以及如何安装配置Java、PySpark和Hadoop环境。文章还提供了详细的安装步骤和测试代码,帮助读者搭建和测试大数据环境。
74 1
|
2月前
|
SQL 分布式计算 NoSQL
大数据-170 Elasticsearch 云服务器三节点集群搭建 测试运行
大数据-170 Elasticsearch 云服务器三节点集群搭建 测试运行
50 4
|
2月前
|
运维 监控 数据可视化
大数据-171 Elasticsearch ES-Head 与 Kibana 配置 使用 测试
大数据-171 Elasticsearch ES-Head 与 Kibana 配置 使用 测试
81 1
|
2月前
|
SQL 消息中间件 大数据
大数据-159 Apache Kylin 构建Cube 准备和测试数据(一)
大数据-159 Apache Kylin 构建Cube 准备和测试数据(一)
72 1
|
2月前
|
SQL 大数据 Apache
大数据-159 Apache Kylin 构建Cube 准备和测试数据(二)
大数据-159 Apache Kylin 构建Cube 准备和测试数据(二)
87 1
|
2月前
|
分布式计算 监控 大数据
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
80 1
下一篇
DataWorks