Flink(十一)【状态管理】(3)https://developer.aliyun.com/article/1532241
3.2、联合列表状态(UnionListState)
与 ListState 类似,联合列表状态也会将状态表示为一个列表。它与常规列表状态的区别在于,算子并行度进行缩放调整时对于状态的分配方式不同。UnionListState 的重点就在于“联合”(union)。在并行度调整时,常规列表状态是轮询分配状态项,而联合列表状态的算子则会直接广播状态的完整列表。这样,并行度缩放之后的并行子任务就获取到了联合后完整的“大列表”,可以自行选择要使用的状态项和要丢弃的状态项。这种分配也叫作“联合重组”(union redistribution)。如果列表中状态项数量太多,为资源和效率考虑一般不建议使用联合重组的方式。
并行度 = 2 算子1 算子2 1 2 3 4 5 6 并行度 2->3 //普通ListState: 算子1 算子2 算子3 1 2 3 4 5 6 //UnionListState: 算子1 算子2 算子3 1 1 1 2 2 2 3 3 3 4 4 4 5 5 5 6 6 6
可以看到,当我们的分区进行重新调整时,ListState 会把数据先搜集在一起,再重新以轮询的方式进行数据的分配,而 UnionListState 同样会把数据先搜集在一起,再把全量数据以广播的形式分配给每个并行算子。
只需要修改上面的代码为:
@Override public void initializeState(FunctionInitializationContext context) throws Exception { // 3.1 从上下文初始化算子状态 countState = context.getOperatorStateStore() .getUnionListState(new ListStateDescriptor<Long>("union-state", Types.LONG)); // 3.2 从算子状态中把数据拷贝到本地变量 if (context.isRestored()) { // 如果初始化状态成功 for (Long c : countState.get()) { count += c; } } }
3.3、广播状态(BroadcastState)
有时我们希望算子并行子任务都保持同一份“全局”状态,用来做统一的配置和规则设定。这时所有分区的所有数据都会访问到同一个状态,状态就像被“广播”到所有分区一样,这种特殊的算子状态,就叫作广播状态(BroadcastState)。
因为广播状态在每个并行子任务上的实例都一样,所以在并行度调整的时候就比较简单,只要复制一份到新的并行任务就可以实现扩展;而对于并行度缩小的情况,可以将多余的并行子任务连同状态直接砍掉——因为状态都是复制出来的,并不会丢失。
在底层,广播状态是以类似映射结构(map)的键值对(key-value)来保存的,必须基于一个“广播流”(BroadcastStream)来创建。
案例-水位超过阈值发送警告,阈值可以动态修改
public class OperatorBroadcastState { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2); // 数据流 SingleOutputStreamOperator<WaterSensor> sensorDS = env .socketTextStream("localhost", 9999) .map(new WaterSensorFunction()); // 广播流 - 用来广播配置-动态修改水位报警阈值 DataStreamSource<String> thresholdDS = env.socketTextStream("localhost", 8888); // TODO 1. 将 配置流 广播 // 返回一个带有广播状态的广播流 MapStateDescriptor<String, Integer> broadcastMapDescriptor = new MapStateDescriptor<>("broadcast-state", Types.STRING, Types.INT); BroadcastStream<String> configDS = thresholdDS.broadcast(broadcastMapDescriptor); // TODO 2. 把 数据流 和 配置流 connect BroadcastConnectedStream<WaterSensor, String> sensorCS = sensorDS.connect(configDS); // TODO 3. 调用process sensorCS.process(new BroadcastProcessFunction<WaterSensor, String, String>() { // 处理数据流 @Override public void processElement(WaterSensor value, ReadOnlyContext ctx, Collector<String> out) throws Exception { // TODO 5. 通过上下文获取广播状态,取出状态中的值 // 广播状态对数据流只能读 ReadOnlyBroadcastState<String, Integer> broadcastState = ctx.getBroadcastState(broadcastMapDescriptor); Integer threshold = broadcastState.get("threshold"); // 防止数据流已经来数据了,但广播流开始未指定阈值 threshold = threshold==null?0:threshold; if (value.getVc()>threshold){ out.collect("传感器 "+value.getId()+" 的水位超过指定的阈值: "+threshold+" !!!"); } } // 处理广播流 @Override public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception { // TODO 4. 通过上下文获取广播状态, 往状态里面写数据 BroadcastState<String, Integer> broadcastState = ctx.getBroadcastState(broadcastMapDescriptor); broadcastState.put("threshold",Integer.valueOf(value)); } }).print(); env.execute(); } }
测试:
netcat(9999端口) | console ------------------------------------------------- s1,1,1 1> 传感器 s1 的水位超过指定的阈值: 0 !!! 5 // 8888端口修改阈值为5 s1,1,1 s1,1,4 s1,1,5 s1,1,6 1> 传感器 s1 的水位超过指定的阈值: 5 !!!
4、状态后端(State Backends)
在 Flink 的状态管理机制中,很重要的一个功能就是对状态进行持久化(persistence)保存,这样就可以在发生故障后进行重启恢复。Flink 对状态进行持久化的方式,就是将当前所有分布式状态进行“快照”保存,写入一个“检查点”(checkpoint)或者保存点(savepoint)保存到外部存储系统中。具体的存储介质,一般是分布式文件系统(distributed file system)。
状态后端主要负责管理本地状态的存储方式和位置。
4.1、状态后端的分类
状态后端是一个“开箱即用”的组件,可以在不改变应用程序逻辑的情况下独立配置。
Flink 中提供了两类不同的状态后端,一种是“哈希表状态后端”(HashMapStateBackend),另一种是“内嵌 RocksDB 状态后端”(EmbeddedRocksDBStateBackend)。如果没有特别配置,系统默认的状态后端是 HashMapStateBackend。
(1)哈希状态后端(HashMapStateBackend)
这种方式会把状态存放在内存里。具体实现上,哈希表状态后端在内部会直接把状态当作对象(objects),保存在 Taskmanager 的 JVM 堆(heap)上。普通的状态,以及窗口中收集的数据和触发器(triggers),都会以键值对(key-value)的形式存储起来,所以底层是一个哈希表(HashMap),这种状态后端也因此得名。
对于检查点的保存,一般是放在持久化的分布式文件系统(file system)中,也可以通过配置“检查点存储”(CheckpointStorage)来另外指定。HashMapStateBackend 是将本地状态全部放入内存的,这样可以获得最快的读写速度,使计算性能达到最佳;代价则是内存的占用。它适用于具有大状态、长窗口、大键值状态的作业,对所有高可用性设置也是有效的。
(2)内嵌 RocksDB 状态后端(RocksDB)
RocksDB 是一种内嵌的 key-value 存储介质,可以把数据持久化到本地硬盘。配置EmbeddedRocksDBStateBackend 后,会将处理中的数据全部放入 RocksDB 数据库中,RocksDB默认存储在 TaskManager 的本地数据目录里。
与 HashMapStateBackend 直接在堆内存中存储对象不同,这种方式下状态主要是放在RocksDB 中的。数据被存储为序列化的字节数组(Byte Arrays),读写操作需要序列化/反序列化,因此状态的访问性能要差一些。另外,因为做了序列化,key 的比较也会按照字节进行,而不是直接调用.hashCode()和.equals()方法。
对于检查点,同样会写入到远程的持久化文件系统中。EmbeddedRocksDBStateBackend 始终执行的是异步快照,也就是不会因为保存检查点而阻塞数据的处理;而且它还提供了增量式保存检查点的机制,这在很多情况下可以大大提升保存效率。
由于它会把状态数据落盘,而且支持增量化的检查点,所以在状态非常大、窗口非常长、键/值状态很大的应用场景中是一个好选择,同样对所有高可用性设置有效。
4.2、如何选择正确的状态后端
HashMap 和 RocksDB 两种状态后端最大的区别,就在于本地状态存放在哪里:前者是内存,后者是 RocksDB。在实际应用中,选择那种状态后端,主要是需要根据业务需求在处理性能和应用的扩展性上做一个选择。
HashMapStateBackend 是内存计算,读写速度非常快;但是,状态的大小会受到集群可用内存的限制,如果应用的状态随着时间不停地增长,就会耗尽内存资源。
而 RocksDB 是硬盘存储,所以可以根据可用的磁盘空间进行扩展,而且是唯一支持增量检查点的状态后端,所以它非常适合于超级海量状态的存储。不过由于每个状态的读写都需要做序列化/反序列化,而且可能需要直接从磁盘读取数据,这就会导致性能的降低,平均读写性能要比 HashMapStateBackend 慢一个数量级。
我们可以发现,实际应用就是权衡利弊后的取舍。最理想的当然是处理速度快且内存不受限制可以处理海量状态,那就需要非常大的内存资源了,这会导致成本超出项目预算。比起花更多的钱,稍慢的处理速度或者稍小的处理规模,老板可能更容易接受一点。
4.3、状态后端的配置
在不做配置的时候,应用程序使用的默认状态后端是由集群配置文件 flink-conf.yaml 中指定的,配置的键名称为 state.backend。这个默认配置对集群上运行的所有作业都有效,我们可以通过更改配置值来改变默认的状态后端。另外,我们还可以在代码中为当前作业单独配置状态后端,这个配置会覆盖掉集群配置文件的默认值。
(1)修改默认的状态后端配置
//默认状态后端 state.backend: hashmap // 或者修改为 rocksdb state.backend: rocksdb //存放检查点的文件路径 state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints
(2)为每个作业(Per-job/Application模式)单独配置状态后端
指定状态后端为 HashMapStateBackend:
env.setStateBackend(new HashMapStateBackend());
指定状态后端为 EmbeddedRocksDBStateBackend:
env.setStateBackend(new EmbeddedRocksDBStateBackend());
需要注意,如果想在 IDE 中使用 EmbeddedRocksDBStateBackend,需要为 Flink 项目添加依赖:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency>
而由于 Flink 发行版中(lib/flink-dist-1.17.0.jar)默认就包含了 RocksDB,所以我们打包项目的时候不需要打包这个依赖。
(3)提交参数时指定
flink run-application -t yarn-application -p 3 -Dstate.backend.type=rocksdb -c 全类名 jar包
总结
到今天10点,终于是把专业课的期末考完了,不由得要吐槽一下这试卷实在劣质,上午考的 Spark 考得一堆角落里没有的边角料,Spark 正二八经的核心重点几乎是没考,比如RDD 血缘关系、运行架构 ... 反倒是考了一堆细枝末节(Scala 的 for 花样循环守卫 ...),无语无语。还有一堆事情让人焦虑 ...
最近确实是累,断更 7 天了,这不下午两点都没睡就又来学习了。宿舍实在是人间炼狱,来了自习室以为会好点,就这功夫,左边的哥们又开始抖腿了,前边坐着的情侣实在碍眼,来自习室的路上看到了一对对情侣,想想咱也读了次大学,混的连个对象都没有,这不能怪咱不争取,属实是这大环境不好,毕业都眼瞅着吃土了还哪有那心思呢。
眼下每一次偷懒和懈怠都是罪恶的,愈发觉得时间的珍贵,现在努力学习保持单身才是对自己、对自己未来老婆孩子的负责哇。