Flink Window 、Time(二)| 学习笔记

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 快速学习 Flink Window 、Time 。

开发者学堂课程【Apache Flink 入门到实战 - Flink 开源社区出品 Flink Window 、Time(二)】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/632/detail/10045


Flink Window 、Time (二)

 

三、Window 内部实现

在讨论 Window 内部实现的时候我们再通过下图回顾一下 window 的生命周期。

每条数据过来之后会由 WindowAssigner 分配到对应的 Window,与时间无关,当 Window 被触发之后会交给 Evictor(如果没有设置 Evictor 则跳过)然后处理 UserFunction 其中 WindowAssignerTrigger Evictor 我们都在上面讨论过而 UserFunction 则是用户编写的代码

trigger 的时候会把里面的数据交给 evictor,通过 evictor 通过处理以后交给function。一般来说,就会清空。当然取决于 trigger 结果返回的值。在后面的Demo里会有相应的描述。

这张图穿起来,差不多 Window 相关的就可以了。

watermark 会有怎么触发的 Window。

整个流程还有一个问题需要讨论: Window 中的状态存储

Flink 是支持 Exactly Once 处理语义的那么 Window 中的状态存储和普通的状态存储又有什么不一样的地方呢?

首先给出具体的答案:从接口上可以认为没有区别,但是每个 Window 会属于不同的 namespace .而非 Window 场景下则都属于 VoidNamespace,最终由 State/Checkpoint 来保证数据的 Exactly Once 下面我们从

org. apache.lk. streaming. runt ine. operators windowing. Windowoperator摘取一段代码

进行阐述

for windo: elenentvindow)(

wdrop if the window is already Late

// drop if (iswindoMate(window))

continve:

)

isskippedt tenent false;

windowtate 保存素

首先设置 natespace. 每个  window 都有自己的

naespace

windowstate. set(urrentNanespace(window)

对于这块来说 Window 也提供一个 data once,首先要知道一个 Window 里面会有不同的数据,可以看前面的图。Window前面只有两条数据来了,source 也只会发下来这三条数据,也就是说这两条数据一定是保存在某个位置。Flink 一定会帮助,卸货的时候,就不会丢,保护数据恢复 Window。同样交给 Window,Window 处理三条数据的时候和没有 crash 一样。

Window space 是有 late space,这个表示属于哪个 Window。其他处理一样。

想了解 Window 相关的逻辑,可以看一下 Window operater。里面有一个 processElement,处理每一条数据的时候会经过这里。

drop if the window is already late

it (isWindowLate(window) )

continue:

isSkippedElerent- false;

windowState. setCurrentNamespa(window)

windowState. add (element getvalue ()

triggerContext. key key:

triggerContext. window window;

TriggerResult triggerResult -triggerContext. onElement (element):

(triggerResult.isFire ())

ACC contents windowState. get (

if (contents null){

continue;

emitWindowContents (window, contents):

if (triggerResult.isPurge))

windowstate. clear();

registercleanupTimer(window)

Window late 这个数据晚到了,就直接忽略掉。Window State 里面有 and 和 set,自己处理的时候,没有实际本质区别。把它塞到 State 里面去,然后 Flink 框架提供保证数据一致性。

我们会跑两个 Demo,一个是 session Window,Flink exemple 里面会还有output。 

把日志过滤掉,把 root logger= off 关掉,他会把很多其他一些东西,对于展示,或理解逻辑来说,那些日志不是必须的,可能会造成一些干扰,所以关掉。

首先看一个直观的结果:

input, add(new Tuple3<>("a", 1L, 1))

input.add(new Tuple3<>("b", 1L, 1))

input. add(new Tuple3<>("b", 3L, 1)):

input. add(new Tuple3<>("b", 5L, 1))

t. add (new Tuple3>("a", 1L, 2))

input. add(new Tuple3>("c", 6L, 1))

t to detect the session "a" earlier than this point (the old

1/ functionality can only detect here when the next starts)

input.add (nex Tuple3>("a", 10L, 1))

expect to detect session "b and "c" at this point as well

Tunle3("",11l1); 

如果没有看过 exemple,先看一下。第一个是t,第二个表示带的时间戳。第三个是 Count。

结果:

(a,1,1)

(b,1,3)

(c,6,1)

(a,10,1)

(c,11,1)

a表示 Window start 出现了一次,session Window 看到的是1,剩下三个会合并成一个 Window。

Window 里面会输出这样的结果。

input. add(new Tuple3<>("a",1L,2))这一条数据其实没有的,它是晚到的。结婚里面有 sum,然后 print,就会打印到终端。

(a,1,2)发现没有了。

为了直观感受到,加一点日志,表示当前的 watermark 是什么。触发后会触发哪一个 timer。最好比对代码和结果来学习,看一下结果。

因为时间有限,watermark 没有具体介绍,以及多种 watermark 的方式。

value.f1-1 表示时间戳-1。通过加了日志,设置并发的是1,所以 abc 会都进来,因为是同一个并发。b是1的时候,watermark 本身就是0了,它就不会 advance。a是1,gap 是3,结束。会触发 timer。

Timer(timestamp-3, key (a),

nameppace-TimeWindow( start-1, end-4)

然后输出,sum 之后 print 就可以输出为1。

/Library/Java/JavavirtualMachines/jdk1. 8.

162.jdk/Contents/Home/bin/java

Printing result to stdout. Use --output to specify output path

Advanced watermark 0.

Advanced watermark 2.

Advanced watermark 4.

Timer{timestamp-3, key(a), namespac=TimeWindow{ start=1, end-4))

a,1,1

Advanced waterhark 5.

Advanced watermark 9.

Timer{timestamp-7, key=(b), namespace=TimeWindow{start=1, end=8))

tb,1,3

Timer{timestamp-8, key=(c); namespace=TimeWindow(start-6, end=9)}

tc,6,1

Advanced watermark 10.

Advanced watermark9223372036854775807

Timer(timestamp-12, key-(a) namespace-TimeWindow(start=10, end=13))

有限流,框架会发一个终止的地方。

对于无限流来说,一直没有触发最后一个 Window。需要一个超过 Windowend Time 的 watermark,否则不会被触发。

把日志关掉,看一下结果。

窗口大小3,这条数据是延迟了,看 sideoutput,Time 是两秒,把 side output 加上去之后,得到一个结果。

把 aggregated 给注释掉,不然会有干扰。

final boolean tileOutput params, has value: "output")

tinal List> input new ArrayListo>()

Enput. add [new Tuple3<>("a", 1L, 1))

Enput. add(new Tuple3<>("b", 1L, 1))

input. add (new Tuple3<>("b", 3L, 1))

Anput. add( Tuple3>("", 5L, 1))

ple3("a",1l,2));

input. add (new Tuple3<>("c", 6L, 1))

to detect the sess Don "a" earlier than this point (the old

onality can only detect here when the next starts)

input. add(new Tuple3<>("a", 10L, 1))

expect to detect session "b" and "c" at this point as well

Enput. add( Tuple3<>("c", 11L, 1))

brary/Java/JavaVirtualMachinesjdk1. 8.0 162. jdk/Contents/Home/bin/java

watermark 没有讲到的东西,可以看到 watermark 从哪里来,一直跟到以下代码。

we acknovledge and output the new overall watermark if it really is a aggregated

//fron some remaining aligned channel, and is also larger than the last output watermark

if(hasAlignedchannels newMinWatermark>hastoutputWatermark haslastoutputWatermark true

newMinWatermark: newMinNatermark: 0

output Handler. handleWat Watermark( lastOutputWatermark))

如果自己不太了解,相信加端点,进行调试。

 

四、生产环境中的 Wndow 使用遇到的一些问题

1.壹舟:

在 Periodic Watermarks 情况下。如果把 watermarkSystem 设置成. currentTimeMillis,每隔一个周期 fink 递增的获取 watermark 系统时间作为,是不是意味着小于系统时间的窗口都会立即被触发?

通过 contest,得到 source 的 watermark 的时间戳。对齐后计算新的watermark,看 watermark 是否需要触发窗口。时间戳小于当前 watermark,当然窗口就结束了。

2. 请教一个 state 的间题,在官同的例子, nalzedsta 方法,把 stat 里面的值放到本地变量这样做有什么好处?

没有明显的好处,根据自己需求。

3.请教一个问题,需要在  OpeartorHBase 中查询,但是每条数据查询一次效率很低,所以希望能够攒一批数据再batchHBase 方式查询这个场景是不是可以通过 Window攒批呢?

需要在实际应用中看,不断优化。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
4月前
|
数据安全/隐私保护 流计算
Flink的Interval Join是基于水印(Watermark)和时间窗口(Time Window)实现的
Flink的Interval Join是基于水印(Watermark)和时间窗口(Time Window)实现的
94 2
|
4月前
|
分布式计算 资源调度 Hadoop
Hadoop学习笔记(HDP)-Part.18 安装Flink
01 关于HDP 02 核心组件原理 03 资源规划 04 基础环境配置 05 Yum源配置 06 安装OracleJDK 07 安装MySQL 08 部署Ambari集群 09 安装OpenLDAP 10 创建集群 11 安装Kerberos 12 安装HDFS 13 安装Ranger 14 安装YARN+MR 15 安装HIVE 16 安装HBase 17 安装Spark2 18 安装Flink 19 安装Kafka 20 安装Flume
121 2
Hadoop学习笔记(HDP)-Part.18 安装Flink
|
5月前
|
API 数据安全/隐私保护 流计算
Flink教程(12)- Flink高级API(Time与Watermaker)
Flink教程(12)- Flink高级API(Time与Watermaker)
47 0
|
7月前
|
大数据 API 数据安全/隐私保护
大数据Flink Time与Watermaker
大数据Flink Time与Watermaker
31 0
|
BI API 数据处理
带你理解并使用flink中的Time、Window(窗口)、Windows Function(窗口函数)
flink中,streaming流式计算被设计为用于处理无限数据集的数据处理引擎,其中无限数据集是指一种源源不断有数据过来的数据集,window (窗口)将无界数据流切割成为有界数据流进行处理的方式。实现方式是将流分发到有限大小的桶(bucket)中进行分析。flink 中的streaming定义了多种流式处理的时间,Event Time(事件时间)、Ingestion Time(接收时间)、Processing Time(处理时间)。
525 0
带你理解并使用flink中的Time、Window(窗口)、Windows Function(窗口函数)
|
前端开发 数据可视化 关系型数据库
用 PolarDB - X + Flink 搭建实时数据大屏|学习笔记(三)
快速学习用 PolarDB - X + Flink 搭建实时数据大屏
344 0
用 PolarDB - X + Flink 搭建实时数据大屏|学习笔记(三)
|
存储 运维 监控
如何开通实时计算 Flink 版|学习笔记(三)
快速学习如何开通实时计算 Flink 版
290 0
如何开通实时计算 Flink 版|学习笔记(三)
|
机器学习/深度学习 SQL 人工智能
实时计算 Flink 训练营场景与应用|学习笔记(三)
快速学习实时计算 Flink 训练营场景与应用
269 0
实时计算 Flink 训练营场景与应用|学习笔记(三)
|
SQL 存储 搜索推荐
实时计算 Flink 训练营场景与应用|学习笔记(二)
快速学习实时计算 Flink 训练营场景与应用
168 0
实时计算 Flink 训练营场景与应用|学习笔记(二)
|
SQL 存储 弹性计算
实时计算 Flink 与你相约阿里云|学习笔记(二)
快速学习实时计算 Flink 与你相约阿里云
182 0
实时计算 Flink 与你相约阿里云|学习笔记(二)