开发者学堂课程【Apache Flink 入门到实战 - Flink 开源社区出品 :Flink Window 、Time(二)】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/632/detail/10045
Flink Window 、Time (二)
三、Window 内部实现
在讨论 Window 内部实现的时候我们再通过下图回顾一下 window 的生命周期。
每条数据过来之后会由 WindowAssigner 分配到对应的 Window,与时间无关,当 Window 被触发之后,会交给 Evictor(如果没有设置 Evictor 则跳过)然后处理 UserFunction 其中 WindowAssigner,Trigger, Evictor 我们都在上面讨论过,而 UserFunction 则是用户编写的代码。
trigger 的时候会把里面的数据交给 evictor,通过 evictor 通过处理以后交给function。一般来说,就会清空。当然取决于 trigger 结果返回的值。在后面的Demo里会有相应的描述。
这张图穿起来,差不多 Window 相关的就可以了。
watermark 会有怎么触发的 Window。
整个流程还有一个问题需要讨论: Window 中的状态存储
Flink 是支持 Exactly Once 处理语义的,那么 Window 中的状态存储和普通的状态存储又有什么不一样的地方呢?
首先给出具体的答案:从接口上可以认为没有区别,但是每个 Window 会属于不同的 namespace .而非 Window 场景下,则都属于 VoidNamespace,最终由 State/Checkpoint 来保证数据的 Exactly Once 下面我们从
org. apache.lk. streaming. runt ine. operators windowing. Windowoperator摘
取一段代码
进行阐述。
for windo: elenentvindow)(
wdrop if the window is already Late
// drop if (iswindoMate(window))
continve:
)
isskippedt tenent false;
windowtate 保存素
首先设置 natespace. 每个 window 都有自己的
naespace
windowstate. set(urrentNanespace(window)
对于这块来说 Window 也提供一个 data once,首先要知道一个 Window 里面会有不同的数据,可以看前面的图。Window前面只有两条数据来了,source 也只会发下来这三条数据,也就是说这两条数据一定是保存在某个位置。Flink 一定会帮助,卸货的时候,就不会丢,保护数据恢复 Window。同样交给 Window,Window 处理三条数据的时候和没有 crash 一样。
Window space 是有 late space,这个表示属于哪个 Window。其他处理一样。
想了解 Window 相关的逻辑,可以看一下 Window operater。里面有一个 processElement,处理每一条数据的时候会经过这里。
drop if the window is already late
it (isWindowLate(window) )
continue:
】
isSkippedElerent- false;
windowState. setCurrentNamespa(window)
windowState. add (element getvalue ()
triggerContext. key key:
triggerContext. window window;
TriggerResult triggerResult -triggerContext. onElement (element):
(triggerResult.isFire ())
ACC contents windowState. get (
if (contents null){
continue;
emitWindowContents (window, contents):
if (triggerResult.isPurge))
windowstate. clear();
registercleanupTimer(window)
Window late 这个数据晚到了,就直接忽略掉。Window State 里面有 and 和 set,自己处理的时候,没有实际本质区别。把它塞到 State 里面去,然后 Flink 框架提供保证数据一致性。
我们会跑两个 Demo,一个是 session Window,Flink exemple 里面会还有output。
把日志过滤掉,把 root logger= off 关掉,他会把很多其他一些东西,对于展示,或理解逻辑来说,那些日志不是必须的,可能会造成一些干扰,所以关掉。
首先看一个直观的结果:
input, add(new Tuple3<>("a", 1L, 1))
input.add(new Tuple3<>("b", 1L, 1))
input. add(new Tuple3<>("b", 3L, 1)):
input. add(new Tuple3<>("b", 5L, 1))
t. add (new Tuple3>("a", 1L, 2))
input. add(new Tuple3>("c", 6L, 1))
t to detect the session "a" earlier than this point (the old
1/ functionality can only detect here when the next starts)
input.add (nex Tuple3>("a", 10L, 1))
expect to detect session "b and "c" at this point as well
Tunle3("",11l1);
如果没有看过 exemple,先看一下。第一个是t,第二个表示带的时间戳。第三个是 Count。
结果:
(a,1,1)
(b,1,3)
(c,6,1)
(a,10,1)
(c,11,1)
a表示 Window start 出现了一次,session Window 看到的是1,剩下三个会合并成一个 Window。
Window 里面会输出这样的结果。
input. add(new Tuple3<>("a",1L,2))
这一条数据其实没有的,它是晚到的。结婚里面有 sum,然后 print,就会打印到终端。
(a,1,2)发现没有了。
为了直观感受到,加一点日志,表示当前的 watermark 是什么。触发后会触发哪一个 timer。最好比对代码和结果来学习,看一下结果。
因为时间有限,watermark 没有具体介绍,以及多种 watermark 的方式。
value.f1-1 表示时间戳-1。通过加了日志,设置并发的是1,所以 abc 会都进来,因为是同一个并发。b是1的时候,watermark 本身就是0了,它就不会 advance。a是1,gap 是3,结束。会触发 timer。
Timer(timestamp-3, key (a),
nameppace-TimeWindow( start-1, end-4)
然后输出,sum 之后 print 就可以输出为1。
/Library/Java/JavavirtualMachines/jdk1. 8.
162.jdk/Contents/Home/bin/java
Printing result to stdout. Use --output to specify output path
Advanced watermark 0.
Advanced watermark 2.
Advanced watermark 4.
Timer{timestamp-3, key(a), namespac=TimeWindow{ start=1, end-4))
a,1,1
Advanced waterhark 5.
Advanced watermark 9.
Timer{timestamp-7, key=(b), namespace=TimeWindow{start=1, end=8))
tb,1,3
Timer{timestamp-8, key=(c); namespace=TimeWindow(start-6, end=9)}
tc,6,1
Advanced watermark 10.
Advanced watermark9223372036854775807
Timer(timestamp-12, key-(a) namespace-TimeWindow(start=10, end=13))
有限流,框架会发一个终止的地方。
对于无限流来说,一直没有触发最后一个 Window。需要一个超过 Windowend Time 的 watermark,否则不会被触发。
把日志关掉,看一下结果。
窗口大小3,这条数据是延迟了,看 sideoutput,Time 是两秒,把 side output 加上去之后,得到一个结果。
把 aggregated 给注释掉,不然会有干扰。
final boolean tileOutput params, has value: "output")
tinal List> input new ArrayListo>()
Enput. add [new Tuple3<>("a", 1L, 1))
Enput. add(new Tuple3<>("b", 1L, 1))
input. add (new Tuple3<>("b", 3L, 1))
Anput. add( Tuple3>("", 5L, 1))
ple3("a",1l,2));
input. add (new Tuple3<>("c", 6L, 1))
to detect the sess Don "a" earlier than this point (the old
onality can only detect here when the next starts)
input. add(new Tuple3<>("a", 10L, 1))
expect to detect session "b" and "c" at this point as well
Enput. add( Tuple3<>("c", 11L, 1))
brary/Java/JavaVirtualMachinesjdk1. 8.0 162. jdk/Contents/Home/bin/java
watermark 没有讲到的东西,可以看到 watermark 从哪里来,一直跟到以下代码。
we acknovledge and output the new overall watermark if it really is a aggregated
//fron some remaining aligned channel, and is also larger than the last output watermark
if(hasAlignedchannels newMinWatermark>hastoutputWatermark haslastoutputWatermark true
newMinWatermark: newMinNatermark: 0
output Handler. handleWat Watermark( lastOutputWatermark))
如果自己不太了解,相信加端点,进行调试。
四、生产环境中的 Wndow 使用遇到的一些问题
1.壹舟:
在 Periodic Watermarks 情况下。如果把 watermarkSystem 设置成. currentTimeMillis,每隔一个周期 fink 递增的获取 watermark 系统时间作为,是不是意味着小于系统时间的窗口都会立即被触发?
通过 contest,得到 source 的 watermark 的时间戳。对齐后计算新的watermark,看 watermark 是否需要触发窗口。时间戳小于当前 watermark,当然窗口就结束了。
2. 请教一个 state 的间题,在官同的例子, nalzedsta 方法,把 stat 里面的值放到本地变量这样做有什么好处?
没有明显的好处,根据自己需求。
3.请教一个问题,需要在 OpeartorHBase 中查询,但是每条数据查询一次效率很低,所以希望能够攒一批数据再batchHBase 方式查询这个场景是不是可以通过 Window攒批呢?
需要在实际应用中看,不断优化。