Stream Processing with Apache Flink | 学习笔记(四)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 快速学习 Stream Processing with Apache Flink

开发者学堂课程【开源 Flink 极客训练营Stream Processing with Apache Flink】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址https://developer.aliyun.com/learning/course/760/detail/13338


Stream Processing with Apache Flink


2、Fllink 中的状态原语

抛弃编程语言中原生的数据容器(List、Map等),使用 Flink 状态原语

状态原语的状态是在写代码的时候所应用的怎么去写才能够去使用 Flink 里面的状态。基本的原则非常的简单是在编程的时候只需要抛弃原生语言中所提供的数据容器,List map 等等,把这些内容更换为 Flink 里面的状态原语。作为对于状态知识比较好的系统 Frink内部提供很多种的状态源于去使用的,从大的方面上来分所有的状态原语可以分为 Keyed State,Operators State 两类,Operators State 应用对比较少,感兴趣可以去官网上看相关的文档。重点将 Keyed State,分区状态分析状态的好处是可以把有的状态按照逻辑所提供的分区分成不同的块,不同的状态块每一个计算和每一块的计算还有状态都是绑定在一起的,不同的 K值之间的计算还有状态的读写都是隔离开的,每一块只需要一个配置,只需要管理好自己的计算逻辑自己的状态就可以,不需要考虑其他 K 值对应的逻辑或者状态,对于 Keyed State 进一步的划分可以分为五类啊,比较常用的是 ValueSdate,第二个是 ListState ,第三个是 MapState 比较常用。这三个是对应单机的编程里对应的一个值 ValueState,一个列表是 ListState,map 对应 MapState 。  

3、KeyedState 使用方法

1.只能用于 RichFunction

2.将 State 声明为实例变量

3.在 open()方法中为 State 赋值

创建一个 StateDescriptor

利用 getRuntimeContext().getState(..)获得 State

4.调用 State 的方法进行读写(例如:state.value()、state.update(...))

KeyedState 四个步骤,首先有一个前提是所有的 KatedStayed 只能应用在 RichFunction 中使用,RichFunction 和普通的方式、简单的方式不同,它有自己的生命周期,涉及到 open,processful在整个执行完毕之后还有 close 方法提供几个回调方法,供自己去实现,可以把自己的逻辑放在三个方法里,系统进行不同阶段操作的时候会调用自己编写的逻辑,达到 RichFunction 条件之后可以实际去使用,第一步要将 state 去声明成为 RichFunction 里的一个实例变量,在 RichFunction 对应的 Open 方法为 state 进行初始化的赋值操作,要经过两步:第一步要创建去描述状态的 Descriptor 。 Descriptor 需要指定一个名称,名称是整个 state 全局唯一的一个名称,得到 StateDescription 之后,可以调用 RichFunction 的 getRuntimeContext().getState(...)把刚定义的 StateDescription 传进去,可以获取到 state ,注意如果是流失应用第一次运行,获得的 state 内容为空,如果 state 从某一个中间的过程重新启动,根据配置获得的数据是之前保存的,系统里写好的数据,可以拿到已有的数据在此基础上将进行进一步操作。最后一步得到 state 对象之后可以在 RichFunction 里对应的 map 方法、process 方法对CN 进行一些读写,如果是 values 可以调用 value 的方法来获取值,同时还可以调用 state update 方法去更新它已有的值。这些操作都不太需要去考虑并发的问题,因为 Flink 框架自身会控制好所有的状态的并发访问的一些对它进行一些限制,不需要用户去过多的考虑。

/**

* 6. Sum with state.

*/

public static void state() throws Exception {

StreamExecutionEnviroment =StreamExecutionEnvirorment.g etExecutianEnvironmentl): e.fromCollection(data)

.keyBy(v -> \ 2)

.process(new KeyedbrocessFunction(){

private ValueState

@Override

public void open(Configuration parameters) throws Exception {super.open(paraneters);

ValueStateDescriptor sumDescriptor = new ValueStat eDescriptore(

"Sum",

Integer.class);

sumState =getRuntimeContextO.getState(sumDescriptor);

}

@Override  

public woid processelement(Integer value, Context ctx, CollectoreInteger> out) throws Exception {

Integer oldsum=sunState.valwe();

int sum=oldsun=null?0:oldSum ;

sum+=value;

sumstate.update(sum);

out.collect(sum);

}

}).print().setParallelisn(2);

e.execute();

/**

* 7. Processing time tuubling window.

*/

public static void presessingTinchindew() throws Exception {

StreamExecutionEnviromment e=StreamExecutionEnvirormen t.getExecutionEnvironment();  

DataStreamSourcesInteger> source =e

.add5ourcelnew 5ourceFunction-Integer>(){

private volatile boolean stop = false;  

@Override

public void runlSourceContext ctx]throws Exception {

int1=8;

white (1stop 5& 1 < data.size()) {

ctx.collect(data.get(i++));

Thread.sleep(200);

}

}

@Override  

public void cancell) { stop =true; }  

}).setParallelism(1);e.setStreanTineCharacteristic(TineCharacteristic.ProcessingTime);source.keyBy(wv2).processnewKeyedProcessFunction(){

private static final int wINDOwSIZE= 200;  

private TreeMapcLong,Integerwindows;

@Override

public void open(Configaration parameters)throws Exception{

super.open(paraneters);

windows=new TreeMap-o();

@Override

public void processElement(Integer valve, Context ctx, Collector out] {

Long currentTime =ctx.timerServicel).currentProcessingTime(); long windowstart =currentTime / WINDOWSIZE;

// Update the window

int sum= windows.getOrDefault(windowStart,0)  

windows.put(windowstart,sum+value);

// Fire old windows

Mapoldwindows=windows.headMap(windowStart;false);

Iterator

out.collect(iterator.next().getValue());  

iterator.remove();

}

}

@Override

public void close() throws Exception {

super.close();

Systen.out.println(windows);

}

}).print().setParallelism(2);  

e.execute();

}

使用 Flink 里提供的状态来进行累加的示例,累加为了简化没有进行过多额外的操作,传入的某一个数据集,把所有数据集里面的数据进行分组,k 的操作一定要对数据进行按照 k 值的分组,简单的按照奇偶数进行划分,把所有的奇数化为一组,所有的偶数划为一组,分组之后没有使用系统提供的 sum 聚合方法,而是通过手写 k 的 process 方式。具体看一下命令,首先把所用到的状态声明成为一个实例变量,因为存取的 sum 值为单值选用 valuce 方法,state 存储类型为 Int 类型。在 open 方法里面是继承 RichFunction 的接口,然后可以通过 open 方法里边的逻辑来创建聚合方法的description,然后对于 description 把它传到 getstate 方法里来获取 somestate 对象,获取完毕之后可以在 ProcessFunction的方法去用, state 具体而言首先拿到一个已有数字,调用 some state 方法,然后需要判断一下对象是否为空,因为要注意第一次来访问对象状态的时候,里边什么内容都没有,所以应该是为空,就赋一个初始值零,如果不为空把它值给取出来,取出来之后把新传入的数据加到已有的 sum 里面,然后更新一下已有的状态,同时把已经计算好的新的 sum 值发送出去,和单机要求和的逻辑没有太大区别,唯一的区别是把存储 sum 从一个单机的变量转换成 state 对象。Process 处理逻辑完成之后可以调用 print 方法,print 方法是一种简写的形式相当于在整个的处理逻辑最后添加一个PintSink Function 两者基本上是等价的,为了让结果有一个更清晰的输出效果,将最后输出函数 PintSink Function 并行度设为2,因为前面分为奇偶数是分别进行求和的操作,设为2能够更清晰地看到最终的输出结果,最后调用 execute 方法被逻辑提交到本机的集群上执行,演示一下结果

/Library/Java/JavaVirtualMachines/jdk1.8.6_101.jdk/Contents/Hone/bin/java...

2>1

2>4

2>9

2>16

1>2

1>6

1>12

1>20

1>30

2>25

Process finished with exit code 0

所有的基数加起来是一加三是四、四加五是九、九加七是十六,然后二加二是四、六加六是十二、然后十二加八是二是,最后是十六加九是二十五,一三五七九、二四六八十最终结果看最大的,一输出的是30,二输出的是25,相当于奇数加的所有结果是25,偶数所有的结果是30,最终两个合并起来55,是从一加到十的整体的结果,可以看到一旦对数据进行奇偶数分组之后是互相独立完全没有影响。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
7月前
|
传感器 存储 缓存
[尚硅谷flink学习笔记] 实战案例TopN 问题
这段内容是关于如何使用Apache Flink解决实时统计水位传感器数据中,在一定时间窗口内出现次数最多的水位问题,即&quot;Top N&quot;问题。首先,介绍了一个使用滑动窗口的简单实现,通过收集传感器数据,按照水位计数,然后排序并输出前两名。接着,提出了全窗口和优化方案,其中优化包括按键分区(按水位vc分组)、开窗操作(增量聚合计算count)和过程函数处理(聚合并排序输出Top N结果)。最后,给出了一个使用`KeyedProcessFunction`进行优化的示例代码,通过按键by窗口结束时间,确保每个窗口的所有数据到达后再进行处理,提高了效率。
195 1
|
7月前
|
分布式计算 资源调度 Hadoop
Hadoop学习笔记(HDP)-Part.18 安装Flink
01 关于HDP 02 核心组件原理 03 资源规划 04 基础环境配置 05 Yum源配置 06 安装OracleJDK 07 安装MySQL 08 部署Ambari集群 09 安装OpenLDAP 10 创建集群 11 安装Kerberos 12 安装HDFS 13 安装Ranger 14 安装YARN+MR 15 安装HIVE 16 安装HBase 17 安装Spark2 18 安装Flink 19 安装Kafka 20 安装Flume
258 2
Hadoop学习笔记(HDP)-Part.18 安装Flink
|
前端开发 数据可视化 关系型数据库
用 PolarDB - X + Flink 搭建实时数据大屏|学习笔记(三)
快速学习用 PolarDB - X + Flink 搭建实时数据大屏
用 PolarDB - X + Flink 搭建实时数据大屏|学习笔记(三)
|
存储 运维 监控
如何开通实时计算 Flink 版|学习笔记(三)
快速学习如何开通实时计算 Flink 版
如何开通实时计算 Flink 版|学习笔记(三)
|
机器学习/深度学习 SQL 人工智能
实时计算 Flink 训练营场景与应用|学习笔记(三)
快速学习实时计算 Flink 训练营场景与应用
实时计算 Flink 训练营场景与应用|学习笔记(三)
|
SQL 存储 搜索推荐
实时计算 Flink 训练营场景与应用|学习笔记(二)
快速学习实时计算 Flink 训练营场景与应用
实时计算 Flink 训练营场景与应用|学习笔记(二)
|
SQL 存储 弹性计算
实时计算 Flink 与你相约阿里云|学习笔记(二)
快速学习实时计算 Flink 与你相约阿里云
实时计算 Flink 与你相约阿里云|学习笔记(二)
|
传感器 存储 Shell
走进 Apache Flink(二)|学习笔记
快速学习走进 Apache Flink
220 0
走进 Apache  Flink(二)|学习笔记
|
SQL 消息中间件 存储
Flink SQL_Table 介绍与实战(二)|学习笔记
快速学习 Flink SQL_Table 介绍与实战
260 0
Flink SQL_Table 介绍与实战(二)|学习笔记
|
负载均衡 Java 调度
Flink Runtime Architecture(二)|学习笔记
快速学习 Flink Runtime Architecture(二)
175 0
Flink Runtime Architecture(二)|学习笔记

推荐镜像

更多