Flink Window 、Time(二)| 学习笔记

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
日志服务 SLS,月写入数据量 50GB 1个月
简介: 快速学习 Flink Window 、Time 。

开发者学堂课程【Apache Flink 入门到实战 - Flink 开源社区出品 Flink Window 、Time(二)】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/632/detail/10045


Flink Window 、Time (二)

 

三、Window 内部实现

在讨论 Window 内部实现的时候我们再通过下图回顾一下 window 的生命周期。

每条数据过来之后会由 WindowAssigner 分配到对应的 Window,与时间无关,当 Window 被触发之后会交给 Evictor(如果没有设置 Evictor 则跳过)然后处理 UserFunction 其中 WindowAssignerTrigger Evictor 我们都在上面讨论过而 UserFunction 则是用户编写的代码

trigger 的时候会把里面的数据交给 evictor,通过 evictor 通过处理以后交给function。一般来说,就会清空。当然取决于 trigger 结果返回的值。在后面的Demo里会有相应的描述。

这张图穿起来,差不多 Window 相关的就可以了。

watermark 会有怎么触发的 Window。

整个流程还有一个问题需要讨论: Window 中的状态存储

Flink 是支持 Exactly Once 处理语义的那么 Window 中的状态存储和普通的状态存储又有什么不一样的地方呢?

首先给出具体的答案:从接口上可以认为没有区别,但是每个 Window 会属于不同的 namespace .而非 Window 场景下则都属于 VoidNamespace,最终由 State/Checkpoint 来保证数据的 Exactly Once 下面我们从

org. apache.lk. streaming. runt ine. operators windowing. Windowoperator摘取一段代码

进行阐述

for windo: elenentvindow)(

wdrop if the window is already Late

// drop if (iswindoMate(window))

continve:

)

isskippedt tenent false;

windowtate 保存素

首先设置 natespace. 每个  window 都有自己的

naespace

windowstate. set(urrentNanespace(window)

对于这块来说 Window 也提供一个 data once,首先要知道一个 Window 里面会有不同的数据,可以看前面的图。Window前面只有两条数据来了,source 也只会发下来这三条数据,也就是说这两条数据一定是保存在某个位置。Flink 一定会帮助,卸货的时候,就不会丢,保护数据恢复 Window。同样交给 Window,Window 处理三条数据的时候和没有 crash 一样。

Window space 是有 late space,这个表示属于哪个 Window。其他处理一样。

想了解 Window 相关的逻辑,可以看一下 Window operater。里面有一个 processElement,处理每一条数据的时候会经过这里。

drop if the window is already late

it (isWindowLate(window) )

continue:

isSkippedElerent- false;

windowState. setCurrentNamespa(window)

windowState. add (element getvalue ()

triggerContext. key key:

triggerContext. window window;

TriggerResult triggerResult -triggerContext. onElement (element):

(triggerResult.isFire ())

ACC contents windowState. get (

if (contents null){

continue;

emitWindowContents (window, contents):

if (triggerResult.isPurge))

windowstate. clear();

registercleanupTimer(window)

Window late 这个数据晚到了,就直接忽略掉。Window State 里面有 and 和 set,自己处理的时候,没有实际本质区别。把它塞到 State 里面去,然后 Flink 框架提供保证数据一致性。

我们会跑两个 Demo,一个是 session Window,Flink exemple 里面会还有output。 

把日志过滤掉,把 root logger= off 关掉,他会把很多其他一些东西,对于展示,或理解逻辑来说,那些日志不是必须的,可能会造成一些干扰,所以关掉。

首先看一个直观的结果:

input, add(new Tuple3<>("a", 1L, 1))

input.add(new Tuple3<>("b", 1L, 1))

input. add(new Tuple3<>("b", 3L, 1)):

input. add(new Tuple3<>("b", 5L, 1))

t. add (new Tuple3>("a", 1L, 2))

input. add(new Tuple3>("c", 6L, 1))

t to detect the session "a" earlier than this point (the old

1/ functionality can only detect here when the next starts)

input.add (nex Tuple3>("a", 10L, 1))

expect to detect session "b and "c" at this point as well

Tunle3("",11l1); 

如果没有看过 exemple,先看一下。第一个是t,第二个表示带的时间戳。第三个是 Count。

结果:

(a,1,1)

(b,1,3)

(c,6,1)

(a,10,1)

(c,11,1)

a表示 Window start 出现了一次,session Window 看到的是1,剩下三个会合并成一个 Window。

Window 里面会输出这样的结果。

input. add(new Tuple3<>("a",1L,2))这一条数据其实没有的,它是晚到的。结婚里面有 sum,然后 print,就会打印到终端。

(a,1,2)发现没有了。

为了直观感受到,加一点日志,表示当前的 watermark 是什么。触发后会触发哪一个 timer。最好比对代码和结果来学习,看一下结果。

因为时间有限,watermark 没有具体介绍,以及多种 watermark 的方式。

value.f1-1 表示时间戳-1。通过加了日志,设置并发的是1,所以 abc 会都进来,因为是同一个并发。b是1的时候,watermark 本身就是0了,它就不会 advance。a是1,gap 是3,结束。会触发 timer。

Timer(timestamp-3, key (a),

nameppace-TimeWindow( start-1, end-4)

然后输出,sum 之后 print 就可以输出为1。

/Library/Java/JavavirtualMachines/jdk1. 8.

162.jdk/Contents/Home/bin/java

Printing result to stdout. Use --output to specify output path

Advanced watermark 0.

Advanced watermark 2.

Advanced watermark 4.

Timer{timestamp-3, key(a), namespac=TimeWindow{ start=1, end-4))

a,1,1

Advanced waterhark 5.

Advanced watermark 9.

Timer{timestamp-7, key=(b), namespace=TimeWindow{start=1, end=8))

tb,1,3

Timer{timestamp-8, key=(c); namespace=TimeWindow(start-6, end=9)}

tc,6,1

Advanced watermark 10.

Advanced watermark9223372036854775807

Timer(timestamp-12, key-(a) namespace-TimeWindow(start=10, end=13))

有限流,框架会发一个终止的地方。

对于无限流来说,一直没有触发最后一个 Window。需要一个超过 Windowend Time 的 watermark,否则不会被触发。

把日志关掉,看一下结果。

窗口大小3,这条数据是延迟了,看 sideoutput,Time 是两秒,把 side output 加上去之后,得到一个结果。

把 aggregated 给注释掉,不然会有干扰。

final boolean tileOutput params, has value: "output")

tinal List> input new ArrayListo>()

Enput. add [new Tuple3<>("a", 1L, 1))

Enput. add(new Tuple3<>("b", 1L, 1))

input. add (new Tuple3<>("b", 3L, 1))

Anput. add( Tuple3>("", 5L, 1))

ple3("a",1l,2));

input. add (new Tuple3<>("c", 6L, 1))

to detect the sess Don "a" earlier than this point (the old

onality can only detect here when the next starts)

input. add(new Tuple3<>("a", 10L, 1))

expect to detect session "b" and "c" at this point as well

Enput. add( Tuple3<>("c", 11L, 1))

brary/Java/JavaVirtualMachinesjdk1. 8.0 162. jdk/Contents/Home/bin/java

watermark 没有讲到的东西,可以看到 watermark 从哪里来,一直跟到以下代码。

we acknovledge and output the new overall watermark if it really is a aggregated

//fron some remaining aligned channel, and is also larger than the last output watermark

if(hasAlignedchannels newMinWatermark>hastoutputWatermark haslastoutputWatermark true

newMinWatermark: newMinNatermark: 0

output Handler. handleWat Watermark( lastOutputWatermark))

如果自己不太了解,相信加端点,进行调试。

 

四、生产环境中的 Wndow 使用遇到的一些问题

1.壹舟:

在 Periodic Watermarks 情况下。如果把 watermarkSystem 设置成. currentTimeMillis,每隔一个周期 fink 递增的获取 watermark 系统时间作为,是不是意味着小于系统时间的窗口都会立即被触发?

通过 contest,得到 source 的 watermark 的时间戳。对齐后计算新的watermark,看 watermark 是否需要触发窗口。时间戳小于当前 watermark,当然窗口就结束了。

2. 请教一个 state 的间题,在官同的例子, nalzedsta 方法,把 stat 里面的值放到本地变量这样做有什么好处?

没有明显的好处,根据自己需求。

3.请教一个问题,需要在  OpeartorHBase 中查询,但是每条数据查询一次效率很低,所以希望能够攒一批数据再batchHBase 方式查询这个场景是不是可以通过 Window攒批呢?

需要在实际应用中看,不断优化。

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
12月前
|
消息中间件 分布式计算 大数据
大数据-121 - Flink Time Watermark 详解 附带示例详解
大数据-121 - Flink Time Watermark 详解 附带示例详解
199 0
|
12月前
|
SQL 消息中间件 分布式计算
大数据-120 - Flink Window 窗口机制-滑动时间窗口、会话窗口-基于时间驱动&基于事件驱动
大数据-120 - Flink Window 窗口机制-滑动时间窗口、会话窗口-基于时间驱动&基于事件驱动
255 0
|
12月前
|
SQL 分布式计算 大数据
大数据-119 - Flink Window总览 窗口机制-滚动时间窗口-基于时间驱动&基于事件驱动
大数据-119 - Flink Window总览 窗口机制-滚动时间窗口-基于时间驱动&基于事件驱动
389 0
|
9月前
|
缓存 监控 数据处理
Flink 四大基石之窗口(Window)使用详解
在流处理场景中,窗口(Window)用于将无限数据流切分成有限大小的“块”,以便进行计算。Flink 提供了多种窗口类型,如时间窗口(滚动、滑动、会话)和计数窗口,通过窗口大小、滑动步长和偏移量等属性控制数据切分。窗口函数包括增量聚合函数、全窗口函数和ProcessWindowFunction,支持灵活的数据处理。应用案例展示了如何使用窗口进行实时流量统计和电商销售分析。
1786 28
|
9月前
|
传感器 监控 数据挖掘
Flink 四大基石之 Time (时间语义) 的使用详解
Flink 中的时间分为三类:Event Time(事件发生时间)、Ingestion Time(数据进入系统时间)和 Processing Time(数据处理时间)。Event Time 通过嵌入事件中的时间戳准确反映数据顺序,支持复杂窗口操作。Watermark 机制用于处理 Event Time,确保数据完整性并触发窗口计算。Flink 还提供了多种迟到数据处理方式,如默认丢弃、侧输出流和允许延迟处理,以应对不同场景需求。掌握这些时间语义对编写高效、准确的 Flink 应用至关重要。
497 21
|
12月前
|
分布式计算 Java 大数据
大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window
大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window
127 0
|
12月前
|
SQL 消息中间件 分布式计算
大数据-115 - Flink DataStream Transformation 多个函数方法 FlatMap Window Aggregations Reduce
大数据-115 - Flink DataStream Transformation 多个函数方法 FlatMap Window Aggregations Reduce
137 0
|
Java 流计算
Flink学习笔记记录
Flink学习笔记记录
2316 0
|
2月前
|
存储 分布式计算 数据处理
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
阿里云实时计算Flink团队,全球领先的流计算引擎缔造者,支撑双11万亿级数据处理,推动Apache Flink技术发展。现招募Flink执行引擎、存储引擎、数据通道、平台管控及产品经理人才,地点覆盖北京、杭州、上海。技术深度参与开源核心,打造企业级实时计算解决方案,助力全球企业实现毫秒洞察。
379 0
「48小时极速反馈」阿里云实时计算Flink广招天下英雄

热门文章

最新文章