Flink Window 、Time(二)| 学习笔记

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
日志服务 SLS,月写入数据量 50GB 1个月
简介: 快速学习 Flink Window 、Time 。

开发者学堂课程【Apache Flink 入门到实战 - Flink 开源社区出品 Flink Window 、Time(二)】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/632/detail/10045


Flink Window 、Time (二)

 

三、Window 内部实现

在讨论 Window 内部实现的时候我们再通过下图回顾一下 window 的生命周期。

每条数据过来之后会由 WindowAssigner 分配到对应的 Window,与时间无关,当 Window 被触发之后会交给 Evictor(如果没有设置 Evictor 则跳过)然后处理 UserFunction 其中 WindowAssignerTrigger Evictor 我们都在上面讨论过而 UserFunction 则是用户编写的代码

trigger 的时候会把里面的数据交给 evictor,通过 evictor 通过处理以后交给function。一般来说,就会清空。当然取决于 trigger 结果返回的值。在后面的Demo里会有相应的描述。

这张图穿起来,差不多 Window 相关的就可以了。

watermark 会有怎么触发的 Window。

整个流程还有一个问题需要讨论: Window 中的状态存储

Flink 是支持 Exactly Once 处理语义的那么 Window 中的状态存储和普通的状态存储又有什么不一样的地方呢?

首先给出具体的答案:从接口上可以认为没有区别,但是每个 Window 会属于不同的 namespace .而非 Window 场景下则都属于 VoidNamespace,最终由 State/Checkpoint 来保证数据的 Exactly Once 下面我们从

org. apache.lk. streaming. runt ine. operators windowing. Windowoperator摘取一段代码

进行阐述

for windo: elenentvindow)(

wdrop if the window is already Late

// drop if (iswindoMate(window))

continve:

)

isskippedt tenent false;

windowtate 保存素

首先设置 natespace. 每个  window 都有自己的

naespace

windowstate. set(urrentNanespace(window)

对于这块来说 Window 也提供一个 data once,首先要知道一个 Window 里面会有不同的数据,可以看前面的图。Window前面只有两条数据来了,source 也只会发下来这三条数据,也就是说这两条数据一定是保存在某个位置。Flink 一定会帮助,卸货的时候,就不会丢,保护数据恢复 Window。同样交给 Window,Window 处理三条数据的时候和没有 crash 一样。

Window space 是有 late space,这个表示属于哪个 Window。其他处理一样。

想了解 Window 相关的逻辑,可以看一下 Window operater。里面有一个 processElement,处理每一条数据的时候会经过这里。

drop if the window is already late

it (isWindowLate(window) )

continue:

isSkippedElerent- false;

windowState. setCurrentNamespa(window)

windowState. add (element getvalue ()

triggerContext. key key:

triggerContext. window window;

TriggerResult triggerResult -triggerContext. onElement (element):

(triggerResult.isFire ())

ACC contents windowState. get (

if (contents null){

continue;

emitWindowContents (window, contents):

if (triggerResult.isPurge))

windowstate. clear();

registercleanupTimer(window)

Window late 这个数据晚到了,就直接忽略掉。Window State 里面有 and 和 set,自己处理的时候,没有实际本质区别。把它塞到 State 里面去,然后 Flink 框架提供保证数据一致性。

我们会跑两个 Demo,一个是 session Window,Flink exemple 里面会还有output。 

把日志过滤掉,把 root logger= off 关掉,他会把很多其他一些东西,对于展示,或理解逻辑来说,那些日志不是必须的,可能会造成一些干扰,所以关掉。

首先看一个直观的结果:

input, add(new Tuple3<>("a", 1L, 1))

input.add(new Tuple3<>("b", 1L, 1))

input. add(new Tuple3<>("b", 3L, 1)):

input. add(new Tuple3<>("b", 5L, 1))

t. add (new Tuple3>("a", 1L, 2))

input. add(new Tuple3>("c", 6L, 1))

t to detect the session "a" earlier than this point (the old

1/ functionality can only detect here when the next starts)

input.add (nex Tuple3>("a", 10L, 1))

expect to detect session "b and "c" at this point as well

Tunle3("",11l1); 

如果没有看过 exemple,先看一下。第一个是t,第二个表示带的时间戳。第三个是 Count。

结果:

(a,1,1)

(b,1,3)

(c,6,1)

(a,10,1)

(c,11,1)

a表示 Window start 出现了一次,session Window 看到的是1,剩下三个会合并成一个 Window。

Window 里面会输出这样的结果。

input. add(new Tuple3<>("a",1L,2))这一条数据其实没有的,它是晚到的。结婚里面有 sum,然后 print,就会打印到终端。

(a,1,2)发现没有了。

为了直观感受到,加一点日志,表示当前的 watermark 是什么。触发后会触发哪一个 timer。最好比对代码和结果来学习,看一下结果。

因为时间有限,watermark 没有具体介绍,以及多种 watermark 的方式。

value.f1-1 表示时间戳-1。通过加了日志,设置并发的是1,所以 abc 会都进来,因为是同一个并发。b是1的时候,watermark 本身就是0了,它就不会 advance。a是1,gap 是3,结束。会触发 timer。

Timer(timestamp-3, key (a),

nameppace-TimeWindow( start-1, end-4)

然后输出,sum 之后 print 就可以输出为1。

/Library/Java/JavavirtualMachines/jdk1. 8.

162.jdk/Contents/Home/bin/java

Printing result to stdout. Use --output to specify output path

Advanced watermark 0.

Advanced watermark 2.

Advanced watermark 4.

Timer{timestamp-3, key(a), namespac=TimeWindow{ start=1, end-4))

a,1,1

Advanced waterhark 5.

Advanced watermark 9.

Timer{timestamp-7, key=(b), namespace=TimeWindow{start=1, end=8))

tb,1,3

Timer{timestamp-8, key=(c); namespace=TimeWindow(start-6, end=9)}

tc,6,1

Advanced watermark 10.

Advanced watermark9223372036854775807

Timer(timestamp-12, key-(a) namespace-TimeWindow(start=10, end=13))

有限流,框架会发一个终止的地方。

对于无限流来说,一直没有触发最后一个 Window。需要一个超过 Windowend Time 的 watermark,否则不会被触发。

把日志关掉,看一下结果。

窗口大小3,这条数据是延迟了,看 sideoutput,Time 是两秒,把 side output 加上去之后,得到一个结果。

把 aggregated 给注释掉,不然会有干扰。

final boolean tileOutput params, has value: "output")

tinal List> input new ArrayListo>()

Enput. add [new Tuple3<>("a", 1L, 1))

Enput. add(new Tuple3<>("b", 1L, 1))

input. add (new Tuple3<>("b", 3L, 1))

Anput. add( Tuple3>("", 5L, 1))

ple3("a",1l,2));

input. add (new Tuple3<>("c", 6L, 1))

to detect the sess Don "a" earlier than this point (the old

onality can only detect here when the next starts)

input. add(new Tuple3<>("a", 10L, 1))

expect to detect session "b" and "c" at this point as well

Enput. add( Tuple3<>("c", 11L, 1))

brary/Java/JavaVirtualMachinesjdk1. 8.0 162. jdk/Contents/Home/bin/java

watermark 没有讲到的东西,可以看到 watermark 从哪里来,一直跟到以下代码。

we acknovledge and output the new overall watermark if it really is a aggregated

//fron some remaining aligned channel, and is also larger than the last output watermark

if(hasAlignedchannels newMinWatermark>hastoutputWatermark haslastoutputWatermark true

newMinWatermark: newMinNatermark: 0

output Handler. handleWat Watermark( lastOutputWatermark))

如果自己不太了解,相信加端点,进行调试。

 

四、生产环境中的 Wndow 使用遇到的一些问题

1.壹舟:

在 Periodic Watermarks 情况下。如果把 watermarkSystem 设置成. currentTimeMillis,每隔一个周期 fink 递增的获取 watermark 系统时间作为,是不是意味着小于系统时间的窗口都会立即被触发?

通过 contest,得到 source 的 watermark 的时间戳。对齐后计算新的watermark,看 watermark 是否需要触发窗口。时间戳小于当前 watermark,当然窗口就结束了。

2. 请教一个 state 的间题,在官同的例子, nalzedsta 方法,把 stat 里面的值放到本地变量这样做有什么好处?

没有明显的好处,根据自己需求。

3.请教一个问题,需要在  OpeartorHBase 中查询,但是每条数据查询一次效率很低,所以希望能够攒一批数据再batchHBase 方式查询这个场景是不是可以通过 Window攒批呢?

需要在实际应用中看,不断优化。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
1月前
|
消息中间件 分布式计算 大数据
大数据-121 - Flink Time Watermark 详解 附带示例详解
大数据-121 - Flink Time Watermark 详解 附带示例详解
59 0
|
1月前
|
SQL 消息中间件 分布式计算
大数据-120 - Flink Window 窗口机制-滑动时间窗口、会话窗口-基于时间驱动&基于事件驱动
大数据-120 - Flink Window 窗口机制-滑动时间窗口、会话窗口-基于时间驱动&基于事件驱动
86 0
|
1月前
|
SQL 分布式计算 大数据
大数据-119 - Flink Window总览 窗口机制-滚动时间窗口-基于时间驱动&基于事件驱动
大数据-119 - Flink Window总览 窗口机制-滚动时间窗口-基于时间驱动&基于事件驱动
62 0
|
1月前
|
分布式计算 Java 大数据
大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window
大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window
30 0
|
1月前
|
SQL 消息中间件 分布式计算
大数据-115 - Flink DataStream Transformation 多个函数方法 FlatMap Window Aggregations Reduce
大数据-115 - Flink DataStream Transformation 多个函数方法 FlatMap Window Aggregations Reduce
35 0
|
3月前
|
数据安全/隐私保护 流计算
Flink四大基石——2.Time
Flink四大基石——2.Time
38 1
|
3月前
|
数据处理 调度 双11
Flink四大基石——1.window
Flink四大基石——1.window
50 0
|
Java 流计算
Flink学习笔记记录
Flink学习笔记记录
2249 0
|
2月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
8天前
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
613 10
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
下一篇
无影云桌面