Stream Processing with Apache Flink(二)|学习笔记

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 快速学习 Stream Processing with Apache Flink

开发者学堂课程【开源 Flink 极速上手教程:Stream Processing with Apache Flink】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/331/detail/3708


Stream Processing with Apache Flink(二)

三、状态和时间

涉及对于流式处理较为精细的步骤

1.有状态的计算

所有的计算可以分为无状态的计算和有状态的计算。对于无状态计算相对而言比较容易,假设有一个算子,自己记不住之前干过的所有的事情,这个算子的工作是每次提供一些数据,然后对于这个数据应用一些预先定义好的计算的逻辑,比如说这个算子就是一个加法算式,每一个进来的一组数据,都把它给全部的加起来,最后把这个结果输出。这一类呢在函数式编程的思想中,经常提到的一点就是纯函数。每次计算结果只和输入数据有关,不会和之前的计算或者说是外部状态也不会产生任何影响。这一计算相对比较容易。

如剪树枝小游戏,游戏从业内人员来分析做的非常好的一点,在于记录了非常多的状态,比如有几天没有上线登录,再上线时和里面的NPC对话的时会告诉已经有好久没有上线了,会记住之前上线的时间,即作为一种状态把这个记录下来,生成对话的内容的时,其实是考虑到这个状态的影响。

相对老一些的游戏,里面的NPC对话都是一种无状态,每次对话的内容都是完全一样的。实现有状态的计算,要做的一点就是把之前的状态给记录下来,再把状态注入到新的一次计算过程中,实现这个方式有两种,第一种可以把这个状态在这个数据进入算子之前提取出来,然把这个状态数据和输入数据合并在一起,这个状态作为一种输入,与原来的输入合并,一同注入到算子中,最后得到一个输出。这种方式在spark中的structure里应用,structure dreaming聚合的实现是这样,会提前把之前的聚合结果添加到新一轮数据中,同时直接发送给这个聚合算子,好处在于重用无状态算子,算子在看到数据和输入数据时,不知道将之前的状态数据混入到这新一批输入的数据中,算子可以保持无状态,用无状态的算子实现有状态的计算,这是第一类方法。

第二类方法是flink现有的这种办法就,算子本身是有状态的算子,每到来新的数据之后,在做计算的时,相当于同时考虑新的数据与已有的状态对于计算过程的影响,最终把结果输出。计算引擎应该像游戏一样变得越来越智能,甚至可以怎自动学习数据中潜在的一些规律,不断修改、优化处理逻辑,计算逻辑,不断的提高处理的性能,脱离了状态的记录是无法完成的。

2.Flink 中的状态原语

状态言语是写代码的时所应用的,如何写才能使用里面的一个状态,基本原则非常简单,抛弃编程语言中原生的数据容器(List、Map等),使用Flink状态原语

Flink内部提供了很多状态原语使用,从大的方面分,state 分为 Keyed State 和Operator State 原语两类,Operator State 应用相对比较少,重点介绍 Keyed State分区状态,分区状态的好处在于将已有的状态按照逻辑提供的分区分成不同的块,不同的状态块,每一个计算和每一块的计算还有状态是绑定在一起的,并且不同的这个key间的计算以及状态的读写是隔离开的,每一个key只需要管理好自己的计算逻辑和自己的状态就可以,不需要考虑其他的key所对应的逻辑或状态,对于Keyed State可以进行进一步的划分,分为图中五类,比较常用的是 valueState、ListSyate、MapState,对于单机的一个值 valueState,一个表 ListSyate,一个 map 对应MapState

3.Keyed State 使用方法

(1)只能用于RichFunction(前提)

RichFunction与普通function不同在于有自己的生命周期,涉及到open、process等,有一个方法,回调方法以便实现,将逻辑放在方法中,系统进行不同阶段操作时,会调用编写的逻辑

(2)将State声明为RichFunction实例变量

(3)在open()方法中为State进行初始化赋值

·创建一个StateDescriptor,StateDescriptor指定一个名称

·利用getRuntimeContext().getState(...)获得State

第一次运行,获得的内容为空,state从中间的过程重新启动,根据配置获得的数据是之前保存的、已经写好的数据、拿到已有的数据进行操作

(4)调用State的方法进行读写(例如: state.value()、state.update(...))

所有操作不需要考虑并发的问题,Flink框架自身会控制好所有状态的并发访问的限制,不需要用户过多的考虑

Flink 编写示例:

使用 Flink 提供的状态进行累加的示例

public static void state() throws Exception {

Stream ExecutionEnvironment e = StreamExecutionEnvironment.getExecutionEnvironment();

e.fromCollection(data)

.keyBy{v->V%2);

.process(new KeyedProcessFunction<Integer,Integer,Integer>(){

private ValueState<Integer> sumState;

@override

public void open(Configuration parameters) throws Exception {

super.open(parameters);

ValueStateDescriptor<Integer> sumDescriptor = new ValueStateDescriptor<>(

”Sum”

Integer.class);

SumState =getRuntimecontext().getState(sumDescripton);

}

@override

public void processElement(Integer value,Context ctx, Collector<Integer out)throws Exception {

Integer oldSum=SumState.value();

int sum=oldSum == null ?0: oldSum;

Sum += value;

sumState.update(sun);

out.collect(sum);

}

}).print().setParallelism(2);

e.execute();

}

传入数据集,对数据集进行分组,keyed操作对数据集按照key值进行分组,按照奇偶数划分,将所有奇数划为一组,所有偶数划为一组,分组后不使用系统提供的sum方法,而是使用 keyedProcess 方式,keyedProcess 方式内容,首先将用到的状态声明程一个实例变量,存取的 sum 值只有一个单值,

选用 value方法,state 类型是int类型,在open方法中,open 方法是keyedProcessFunction,通过open方法中的逻辑创建一个 SumDescriptor,聚合方法的 Descriptor,对于 Descriptor,传入 getState 方法中,获取 SumState 对象,获取完成后,在 keyedProcessFunction 中 processElement 用 state,拿到已有的数字,调用 sumState.value 方法,

判断 oldsum 对象是否为空,第一次使用返回的对象是空值,做一个初始值0,不为空将值取出,将新传入的数据加到 sum 中,更新已有的状态,将计算的新的 sum 值发送,与单机求和的逻辑没有太大区别,唯一区别是存储sum从单一变量转成state对象,完成后可以调用 print 方法,print 方法是一种简写的形式,在处理逻辑添加一个PrintSinkFunction,二者基本是等价的,

为了让结果有一个更清晰的输出效果,将输出的 PrintSinkFunction 并行度设置为2,能够清晰的看到最终的结果,调用 execute方法将逻辑提交到本地集群执行

演示state方法

最终结果看最大的,奇数是25,偶数30,合并55

对数据按照奇偶数分组后,累加过程是相互独立的,是没有影响的

4.Flink 中的时间

Processing Time Event Time(Row Time)

真实世界的时间 数据世界的时间

处理数据节点的本地时间 记录携带的Timestamp

处理简单 处理复杂

结果不确定(无法重现) 结果确定(可重现)

时间与 state 是相辅相成的关系,Flink引擎提供的时间有两类,一类是Processing Time,一类是Event Time(Row Time),区别在于Processing Time表示真实世界的时间,机械的时间,Event Time是数据中包含的时间,与真实世界的时间没有任何关系,很多时候数据在生成时会携带时间戳之类的字段,需要按照数据本身携带的时间戳作为参考对数据进行分时间的处理,Processing Time 处理相对简单,Event Time处理较为复杂,Processing Time 处理时直接调取系统时间,因为多线程或系统的不确定性每次运行的结果是不确定的,Event Time时间戳被写入数据中,重放数据进行多次处理时,携带的时间戳不会改变,处理逻辑不改变,处理的结果是确定的,可以重现

5.Processing Time 和 Event Time

假设有一些数据,按照从1-7排列,机械时间如何机器时钟没有意外,会永远增加,不会出现第二次调用比第一次调用时间靠前的情况,在这种情况下用 Processing Time 获得的时间是完美的按照时间从小到大排序的一组数序,对于 Event Time 而言,由于延迟或分布式的一些原因,数据到来的顺序与数据真实产生的顺序有一定出入,数据存在一定程度的乱序,充分利用数据中携带的时间戳,对数据进行粗力度的划分,目标是解决数据乱序,或者说在一定程度上缓解数据乱去的问题。假设乱序它不是乱的,分数据乱序不是非常的过分,是在一定的区间范围之内乱序,把数据进行粗力度的划分,然后分为三组,第一组最小的时间是1,第二个最小的时间是4,第三组最长时间是7,按照粗力度划分之后,数据其实是有一定顺序的,从小到大的一个排列,如何充分的把这种一定程度的乱序化解掉,使整个系统看上数据是基本上有顺序,需要在数据的中间插入一些被称为 Watermark 的数据,一旦前三个数据到来后,假设判断没有小于等于3的数据进入,插入一条到整个数据中,系统看到Watermark3数字时,之后不会有小于等于3的数据进入,进行自己的处理逻辑,系统进入Watermark6,代表不会有小于等于6的数据进入,继续对已有数据进行处理逻辑。

Processing Time 使用时是严格递增的,Event Time 存在乱序,通过 Watermark 对乱序进行一定的缓解

6.Timestamp 分配和 Watermark 生成

从 API 角度如何分配 Timestamp 和生成 Watermark,相对较为容易,有两种方式,在定义整个处理逻辑是,引入一个 SourceFunction,调用环境方式方法将SourceFunction 添加进入,在 SourceFunction 中调用 collectWithTimestamp 方法,将记录中包含的时间戳提取出来,在S ourceFunction 中使用 emitWatermark 产生一个 Watermark,如看到某个记录,以后比时间戳小的数据不会进入,在SourceFunction 引用 emitWatermark 插入到数据流中,第二种方式调用DataStream.assignTimestampsAndWatermarks,同时传入两类 Watermark 生成器,第一类是定期生成,相当于在环境中配置一个值每隔多长时间,系统自动调用Watermark 提供的 Watermark 生成策略,提供的时间是现实世界的时间,每过一段时间触发 Watermark 生成的方式,第二章与定期生成相对是根据特殊记录生成,看到某些特殊数据到来时,对应时间比它小的都不会到来,进行时间戳或Watermark分配。

·在 SourceFunction 中产生

collectWithTimestamp(T element, long timestamp)

emitWatermark(Watermark mark)

·在流程中指定

DataStream.assignTimestampsAndWatermarks(...)

定期生成 根据特殊记录生成

现实时间驱动 数据驱动

每隔一段时间调用生成方法 每一次分配Timestamp都会调用生成方法

实现 AssignerWithPeriodicWatermarks 实现AssignerWithPunctuatedWatermarks

还有一点是 Flink 里面内置的一些常用的 signer, watermark、assigner,比如它有一个内置的固定时间,拿到一个数据之后把这个数据对应的时间戳减去一块固定的时间,作为一个 watermark,比如看到一个时间为1000的,可能就知道小于等于800的不会再来了,就类似于这样一种逻辑。

关于 Timestamp 分配和 Watermark 生成的接口,可能在后续的版本也会有一定的改动,因为之前在邮件列表里有相关的讨论。

7.时间相关 API

目标 Event Time  Processing Time  

获取记录事件获取 context.getTimestamp(或从数据字段中获取 timerService).currentProcessingTime()

watermark” timerService().currentWatermark() timerService().currentProcessingTime()

注册定时器 timerService.registerEventTimeTimer() timerService.registerProcessingTimeTimer()

按照 event time 还有 processing time 进行区分。其实要在应用逻辑里通过这个接口支持,需要做三件事,第一件事就是获取记录的时间,获取记录的时间就是可以调 context.getTimestamp(),如果是 cycle、table,它会直接从数据字段中把对应的时间给提取出来,相对而言,Processing time 就直接调 currentprocessingtime 就可以,service 的内部直接调用了获取系统时间的简单方法来返回值。除了获取记录的时间,还需要获取“watermark”,这里把 watermark 加了一个引号,因为只有 event time 里才有 watermark 的概念,而 processing time 里没有 watermark 的概念,但是如果非要把一个东西当成是它的 watermark,其实就是数据时间的本身,即第一次调用 timerservice().currentpercentingtime 方法之后,获取到一个值,这个值既是当前记录的时间,也是当前的 watermark 值,因为时间总是在往前流的,第一次调用完这个值之后,第二次调用这个值肯定不会再比第一次的值还小,这是获取 watermark。

第三个,可以在一些相关的方法里注册一些定时器,定时器的作用,如有一个 cash,对于cash在某今后的某一个时间,想对它进行一些类似于清理或者其他工作,知道清理应该发生在将来的某一个时间点,就可以调用这个timeService.registerEventTimeTimer(),或 ProcessingTimeTimer(),注册定时器后,要在整个方法里添加一个对于定时器的一个回调的处理逻辑,每当对应Event time 或 processing time的时间超过了定时器的时间,会调用在方法种自己编写的定时器的回调逻辑,将 cash 的清理或者一些其他的工作放到这个逻辑里面去完成

关于时间的使用,查看一个演示案例

定义了一个 tumbling window,一个翻滚的窗口,窗口和窗口之间是没有重合的,基于processTime完成,第一步获取一个运行环境,新添加了一个数据源,没有在直接定义的data中取数据,因为涉及到时间,为了效果需要在这个数据注入的过程中,人为的去引入了一些延迟。基本的逻辑是便利 data 集合从里边获取数据,只不过每一次获取数据后,引入200毫秒的延迟,这就是简单的一个 sourceFunction,如果用processing time 要注意环境中要设置它的这个时间特性啊,processing time是默认的一个特性,可有可无,为了表示它的效果,把它加上,如果要用envent time,注意一定要设置一下,将时间特性设置 einvent time,设置完成后,从sourceFunction进行出发,进行数据的处理,首先按照奇偶的复字进行分组,定义keyedprocessfunction 中,具体在这个 keyedprocessfunction里未使用系统内部的一个方法,使用 TreeMap 存储 window,size设置成200,下面是毫秒,在open方法用于存取不同窗口的时间,数值等数据结构,进行初始化,每到来一数据对首先通过调用方法获取当前的时间,对到来的这个数据进行一个窗口的划分,用窗口的开始时间作为窗口的一个唯一的标识进行划分。直接用当前的时间除以这个窗口的大小,取整,可以得到这个窗口到开始的时间。

要对这个数据窗口里的数据进行一个 sum 操作,为了获取这个窗口 sum 值,调用map方法,如果窗口里边的值已经存在,会返回已有的值,如果没有存在的话,返回0,更新窗口累加和的数值,进行一些判断,没有用到timer,直接在处理的逻辑中判断,将所有的小于开始的旧窗口的值拿出,旧窗口累加和的值不会再更新,然后把结果发送,删除窗口。

close 方法在整个处理完成之后,会自动调用,为了防止还有一些没有来得及触发的窗口,它的数值看不到,把所有剩下的这些窗口里面剩下值输出,把结果打印到控制台上,并且还将并行度设置为2,执行

可以看出来,因为在数据源窗口,每发送一条数据都给它加上一个200毫秒的间隔,这边窗口大小也是200毫秒,所以它基本上就是每过来一条数据都会触发一个新的窗口。

如果改成400再来运行,

可以看出来,基本上就是两个数字一加这样一种效果。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
消息中间件 API Apache
官宣|阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会
本文整理自阿里云开源大数据平台徐榜江 (雪尽),关于阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会。
1425 1
官宣|阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会
|
2月前
|
SQL Java API
官宣|Apache Flink 1.19 发布公告
Apache Flink PMC(项目管理委员)很高兴地宣布发布 Apache Flink 1.19.0。
1369 1
官宣|Apache Flink 1.19 发布公告
|
2月前
|
SQL Apache 流计算
Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
【2月更文挑战第25天】Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
147 3
|
2月前
|
Oracle 关系型数据库 流计算
flink cdc 同步问题之报错org.apache.flink.util.SerializedThrowable:如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
2月前
|
XML Java Apache
Apache Flink自定义 logback xml配置
Apache Flink自定义 logback xml配置
152 0
|
2月前
|
消息中间件 Java Kafka
Apache Hudi + Flink作业运行指南
Apache Hudi + Flink作业运行指南
86 1
|
2月前
|
缓存 分布式计算 Apache
Apache Hudi与Apache Flink更好地集成,最新方案了解下?
Apache Hudi与Apache Flink更好地集成,最新方案了解下?
62 0
|
2月前
|
监控 Apache 开发工具
Apache Flink 1.12.2集成Hudi 0.9.0运行指南
Apache Flink 1.12.2集成Hudi 0.9.0运行指南
67 0
|
SQL 架构师 API
《Apache Flink 知其然,知其所以然》系列视频课程
# 课程简介 目前在我的公众号新推出了《Apache Flink 知其然,知其所以然》的系列视频课程。在内容上会先对Flink整体架构和所适用的场景做一个基础介绍,让你对Flink有一个整体的认识!然后对核心概念进行详细介绍,让你深入了解流计算中一些核心术语的含义,然后对Flink 各个层面的API,如 SQL/Table&DataStreamAPI/PythonAPI 进行详细的介绍,以及
1289 0
《Apache Flink 知其然,知其所以然》系列视频课程
|
3月前
|
消息中间件 Kafka Apache
Apache Flink 是一个开源的分布式流处理框架
Apache Flink 是一个开源的分布式流处理框架
487 5

推荐镜像

更多