开发者学堂课程【开源 Flink 极客训练营:Stream Processing with Apache Flink】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/760/detail/13338
Stream Processing with Apache Flink
2、Fllink 中的状态原语
抛弃编程语言中原生的数据容器(List、Map等),使用 Flink 状态原语
状态原语的状态是在写代码的时候所应用的怎么去写才能够去使用 Flink 里面的状态。基本的原则非常的简单是在编程的时候只需要抛弃原生语言中所提供的数据容器,List map 等等,把这些内容更换为 Flink 里面的状态原语。作为对于状态知识比较好的系统 Frink内部提供很多种的状态源于去使用的,从大的方面上来分所有的状态原语可以分为 Keyed State,Operators State 两类,Operators State 应用对比较少,感兴趣可以去官网上看相关的文档。重点将 Keyed State,分区状态分析状态的好处是可以把有的状态按照逻辑所提供的分区分成不同的块,不同的状态块每一个计算和每一块的计算还有状态都是绑定在一起的,不同的 K值之间的计算还有状态的读写都是隔离开的,每一块只需要一个配置,只需要管理好自己的计算逻辑自己的状态就可以,不需要考虑其他 K 值对应的逻辑或者状态,对于 Keyed State 进一步的划分可以分为五类啊,比较常用的是 ValueSdate,第二个是 ListState ,第三个是 MapState 比较常用。这三个是对应单机的编程里对应的一个值 ValueState,一个列表是 ListState,map 对应 MapState 。
3、KeyedState 使用方法
1.只能用于 RichFunction
2.将 State 声明为实例变量
3.在 open()方法中为 State 赋值
创建一个 StateDescriptor
利用 getRuntimeContext().getState(..)获得 State
4.调用 State 的方法进行读写(例如:state.value()、state.update(...))
KeyedState 四个步骤,首先有一个前提是所有的 KatedStayed 只能应用在 RichFunction 中使用,RichFunction 和普通的方式、简单的方式不同,它有自己的生命周期,涉及到 open,processful在整个执行完毕之后还有 close 方法提供几个回调方法,供自己去实现,可以把自己的逻辑放在三个方法里,系统进行不同阶段操作的时候会调用自己编写的逻辑,达到 RichFunction 条件之后可以实际去使用,第一步要将 state 去声明成为 RichFunction 里的一个实例变量,在 RichFunction 对应的 Open 方法为 state 进行初始化的赋值操作,要经过两步:第一步要创建去描述状态的 Descriptor 。 Descriptor 需要指定一个名称,名称是整个 state 全局唯一的一个名称,得到 StateDescription 之后,可以调用 RichFunction 的 getRuntimeContext().getState(...)把刚定义的 StateDescription 传进去,可以获取到 state ,注意如果是流失应用第一次运行,获得的 state 内容为空,如果 state 从某一个中间的过程重新启动,根据配置获得的数据是之前保存的,系统里写好的数据,可以拿到已有的数据在此基础上将进行进一步操作。最后一步得到 state 对象之后可以在 RichFunction 里对应的 map 方法、process 方法对CN 进行一些读写,如果是 values 可以调用 value 的方法来获取值,同时还可以调用 state update 方法去更新它已有的值。这些操作都不太需要去考虑并发的问题,因为 Flink 框架自身会控制好所有的状态的并发访问的一些对它进行一些限制,不需要用户去过多的考虑。
/**
* 6. Sum with state.
*/
public static void state() throws Exception {
StreamExecutionEnviroment =StreamExecutionEnvirorment.g etExecutianEnvironmentl): e.fromCollection(data)
.keyBy(v -> \ 2)
.process(new KeyedbrocessFunction(){
private ValueState
@Override
public void open(Configuration parameters) throws Exception {super.open(paraneters);
ValueStateDescriptor sumDescriptor = new ValueStat eDescriptore(
"Sum",
Integer.class);
sumState =getRuntimeContextO.getState(sumDescriptor);
}
@Override
public woid processelement(Integer value, Context ctx, CollectoreInteger> out) throws Exception {
Integer oldsum=sunState.valwe();
int sum=oldsun=null?0:oldSum ;
sum+=value;
sumstate.update(sum);
out.collect(sum);
}
}).print().setParallelisn(2);
e.execute();
/**
* 7. Processing time tuubling window.
*/
public static void presessingTinchindew() throws Exception {
StreamExecutionEnviromment e=StreamExecutionEnvirormen t.getExecutionEnvironment();
DataStreamSourcesInteger> source =e
.add5ourcelnew 5ourceFunction-Integer>(){
private volatile boolean stop = false;
@Override
public void runlSourceContext ctx]throws Exception {
int1=8;
white (1stop 5& 1 < data.size()) {
ctx.collect(data.get(i++));
Thread.sleep(200);
}
}
@Override
public void cancell) { stop =true; }
}).setParallelism(1);
e.setStreanTineCharacteristic(TineCharacteristic.ProcessingTime);
source.keyBy(wv2).processnewKeyedProcessFunction(){
private static final int wINDOwSIZE= 200;
private TreeMapcLong,Integerwindows;
@Override
public void open(Configaration parameters)throws Exception{
super.open(paraneters);
windows=new TreeMap-o();
@Override
public void processElement(Integer valve, Context ctx, Collector out] {
Long currentTime =ctx.timerServicel).currentProcessingTime(); long windowstart =currentTime / WINDOWSIZE;
// Update the window
int sum= windows.getOrDefault(windowStart,0)
windows.put(windowstart,sum+value);
// Fire old windows
Mapoldwindows=windows.headMap(windowStart;false);
Iterator
out.collect(iterator.next().getValue());
iterator.remove();
}
}
@Override
public void close() throws Exception {
super.close();
Systen.out.println(windows);
}
}).print().setParallelism(2);
e.execute();
}
使用 Flink 里提供的状态来进行累加的示例,累加为了简化没有进行过多额外的操作,传入的某一个数据集,把所有数据集里面的数据进行分组,k 的操作一定要对数据进行按照 k 值的分组,简单的按照奇偶数进行划分,把所有的奇数化为一组,所有的偶数划为一组,分组之后没有使用系统提供的 sum 聚合方法,而是通过手写 k 的 process 方式。具体看一下命令,首先把所用到的状态声明成为一个实例变量,因为存取的 sum 值为单值选用 valuce 方法,state 存储类型为 Int 类型。在 open 方法里面是继承 RichFunction 的接口,然后可以通过 open 方法里边的逻辑来创建聚合方法的description,然后对于 description 把它传到 getstate 方法里来获取 somestate 对象,获取完毕之后可以在 ProcessFunction的方法去用, state 具体而言首先拿到一个已有数字,调用 some state 方法,然后需要判断一下对象是否为空,因为要注意第一次来访问对象状态的时候,里边什么内容都没有,所以应该是为空,就赋一个初始值零,如果不为空把它值给取出来,取出来之后把新传入的数据加到已有的 sum 里面,然后更新一下已有的状态,同时把已经计算好的新的 sum 值发送出去,和单机要求和的逻辑没有太大区别,唯一的区别是把存储 sum 从一个单机的变量转换成 state 对象。Process 处理逻辑完成之后可以调用 print 方法,print 方法是一种简写的形式相当于在整个的处理逻辑最后添加一个PintSink Function 两者基本上是等价的,为了让结果有一个更清晰的输出效果,将最后输出函数 PintSink Function 并行度设为2,因为前面分为奇偶数是分别进行求和的操作,设为2能够更清晰地看到最终的输出结果,最后调用 execute 方法被逻辑提交到本机的集群上执行,演示一下结果
/Library/Java/JavaVirtualMachines/jdk1.8.6_101.jdk/Contents/Hone/bin/java...
2>1
2>4
2>9
2>16
1>2
1>6
1>12
1>20
1>30
2>25
Process finished with exit code 0
所有的基数加起来是一加三是四、四加五是九、九加七是十六,然后二加二是四、六加六是十二、然后十二加八是二是,最后是十六加九是二十五,一三五七九、二四六八十最终结果看最大的,一输出的是30,二输出的是25,相当于奇数加的所有结果是25,偶数所有的结果是30,最终两个合并起来55,是从一加到十的整体的结果,可以看到一旦对数据进行奇偶数分组之后是互相独立完全没有影响。