3. 聚合状态(AggregatingState)
我们举一个简单的例子,首先自定义一个产生随机整数的自定义数据源,然后进行累加。当累加到999时,清空聚合状态变量,然后重新累加。可以看到我们这里使用RichFlatMapFunction实现了sum的功能。
public class AggregateStateExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env .addSource(new SourceFunction<Tuple2<String, Integer>>() { private boolean running = true; private Random random = new Random(); @Override public void run(SourceContext<Tuple2<String, Integer>> sourceContext) throws Exception { while (true) { sourceContext.collect(Tuple2.of("key", random.nextInt())); } } @Override public void cancel() { running = false; } }) .keyBy(r -> r.f0) .flatMap(new CountFunction()) .print(); env.execute(); } public static class CountFunction extends RichFlatMapFunction<Tuple2<String, Integer>, Integer> { private int count = 0; // 声明聚合状态变量 private AggregatingState<Tuple2<String, Integer>, Integer> aggregatingState; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); AggregatingStateDescriptor<Tuple2<String, Integer>, Integer, Integer> descriptor = new AggregatingStateDescriptor<Tuple2<String, Integer>, Integer, Integer>( "aggregatingState", new AggregateFunction<Tuple2<String, Integer>, Integer, Integer>() { @Override public Integer createAccumulator() { return 0; } @Override public Integer add(Tuple2<String, Integer> value, Integer accumulator) { return accumulator + 1; } @Override public Integer getResult(Integer accumulator) { return accumulator; } @Override public Integer merge(Integer a, Integer b) { return a + b; } }, Types.INT); aggregatingState = getRuntimeContext().getAggregatingState(descriptor); } @Override public void flatMap(Tuple2<String, Integer> value, Collector<Integer> out) throws Exception { count++; if (count % 1000 == 0) { out.collect(aggregatingState.get()); aggregatingState.clear(); // 清空状态变量 } else { // 增量更新AggregatingState,这里每来一个新元素,对ACC累加1 aggregatingState.add(value); } } } }
13.2.4 状态生存时间(TTL)
在实际应用中,很多状态会随着时间的推移逐渐增长,如果不加以限制,最终就会导致存储空间的耗尽。一个优化的思路是直接在代码中调用.clear()方法去清除状态,但是有时候我们的逻辑要求不能直接清除。这时就需要配置一个状态的“生存时间”(time-to-live,TTL),当状态在内存中存在的时间超出这个值时,就将它清除。
具体实现上,如果用一个进程不停地扫描所有状态看是否过期,显然会占用大量资源做无用功。状态的失效其实不需要立即删除,所以我们可以给状态附加一个属性,也就是状态的“失效时间”。状态创建的时候,设置 失效时间 = 当前时间 + TTL;之后如果有对状态的访问和修改,我们可以再对失效时间进行更新;当设置的清除条件被触发时(比如,状态被访问的时候,或者每隔一段时间扫描一次失效状态),就可以判断状态是否失效、从而进行清除了。
配置状态的TTL时,需要创建一个StateTtlConfig配置对象,然后调用状态描述器的.enableTimeToLive()方法启动TTL功能。
StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.seconds(10)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build(); ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("my state", String.class); stateDescriptor.enableTimeToLive(ttlConfig);
这里用到了几个配置项:
.newBuilder() 状态TTL配置的构造器方法,必须调用,返回一个Builder之后再调用.build()方法就可以得到StateTtlConfig了。方法需要传入一个Time作为参数,这就是设定的状态生存时间。 .setUpdateType() 设置更新类型。更新类型指定了什么时候更新状态失效时间,这里的OnCreateAndWrite表示只有创建状态和更改状态(写操作)时更新失效时间。另一种类型OnReadAndWrite则表示无论读写操作都会更新失效时间,也就是只要对状态进行了访问,就表明它是活跃的,从而延长生存时间。这个配置默认为OnCreateAndWrite。 .setStateVisibility() 设置状态的可见性。所谓的“状态可见性”,是指因为清除操作并不是实时的,所以当状态过期之后还有可能继续存在,这时如果对它进行访问,能否正常读取到就是一个问题了。这里设置的NeverReturnExpired是默认行为,表示从不返回过期值,也就是只要过期就认为它已经被清除了,应用不能继续读取;这在处理会话或者隐私数据时比较重要。对应的另一种配置是ReturnExpireDefNotCleanedUp,就是如果过期状态还存在,就返回它的值。
除此之外,TTL配置还可以设置在保存检查点(checkpoint)时触发清除操作,或者配置增量的清理(incremental cleanup),还可以针对RocksDB状态后端使用压缩过滤器(compaction filter)进行后台清理。这里需要注意,目前的TTL设置只支持处理时间。
13.3 算子状态(Operator State)
除按键分区状态(Keyed State)之外,另一大类受控状态就是算子状态(Operator State)。从某种意义上说,算子状态是更底层的状态类型,因为它只针对当前算子并行任务有效,不需要考虑不同key的隔离。算子状态功能不如按键分区状态丰富,应用场景较少,它的调用方法也会有一些区别。
13.3.1 基本概念和特点
算子状态(Operator State)就是一个算子并行实例上定义的状态,作用范围被限定为当前算子任务。算子状态跟数据的key无关,所以不同key的数据只要被分发到同一个并行子任务,就会访问到同一个Operator State。算子状态的实际应用场景不如Keyed State多,一般用在Source或Sink等与外部系统连接的算子上,或者完全没有key定义的场景。比如Flink的Kafka连接器中,就用到了算子状态。当算子的并行度发生变化时,算子状态也支持在并行的算子任务实例之间做重组分配。根据状态的类型不同,重组分配的方案也会不同。
13.3.2 状态类型
算子状态也支持不同的结构类型,主要有三种:ListState、UnionListState和BroadcastState。
1. 列表状态(ListState)
与Keyed State中的ListState一样,将状态表示为一组数据的列表。与Keyed State中的列表状态的区别是:在算子状态的上下文中,不会按键(key)分别处理状态,所以每一个并行子任务上只会保留一个“列表”(list),也就是当前并行子任务上所有状态项的集合。列表中的状态项就是可以重新分配的最细粒度,彼此之间完全独立。当算子并行度进行缩放调整时,算子的列表状态中的所有元素项会被统一收集起来,相当于把多个分区的列表合并成了一个“大列表”,然后再均匀地分配给所有并行任务。这种“均匀分配”的具体方法就是“轮询”(round-robin),与之前介绍的rebanlance数据传输方式类似,是通过逐一“发牌”的方式将状态项平均分配的。这种方式也叫作“平均分割重组”(even-split redistribution)。算子状态中不会存在“键组”(key group)这样的结构,所以为了方便重组分配,就把它直接定义成了“列表”(list)。这也就解释了,为什么算子状态中没有最简单的值状态(ValueState)。
2. 联合列表状态(UnionListState)
与ListState类似,联合列表状态也会将状态表示为一个列表。它与常规列表状态的区别在于,算子并行度进行缩放调整时对于状态的分配方式不同。UnionListState的重点就在于“联合”(union)。在并行度调整时,常规列表状态是轮询分配状态项,而联合列表状态的算子则会直接广播状态的完整列表。这样,并行度缩放之后的并行子任务就获取到了联合后完整的“大列表”,可以自行选择要使用的状态项和要丢弃的状态项。这种分配也叫作“联合重组”(union redistribution)。如果列表中状态项数量太多,为资源和效率考虑一般不建议使用联合重组的方式。
3. 广播状态(BroadcastState)
有时我们希望算子并行子任务都保持同一份“全局”状态,用来做统一的配置和规则设定。这时所有分区的所有数据都会访问到同一个状态,状态就像被“广播”到所有分区一样,这种特殊的算子状态,就叫作广播状态(BroadcastState)。因为广播状态在每个并行子任务上的实例都一样,所以在并行度调整的时候就比较简单,只要复制一份到新的并行任务就可以实现扩展;而对于并行度缩小的情况,可以将多余的并行子任务连同状态直接砍掉——因为状态都是复制出来的,并不会丢失。
13.4 状态持久化和状态后端
在Flink的状态管理机制中,很重要的一个功能就是对状态进行持久化(persistence)保存,这样就可以在发生故障后进行重启恢复。Flink对状态进行持久化的方式,就是将当前所有分布式状态进行“快照”保存,写入一个“检查点”(checkpoint)或者保存点(savepoint)保存到外部存储系统中。具体的存储介质,一般是分布式文件系统(distributed file system)。
13.4.1 检查点(Checkpoint)
有状态流应用中的检查点(checkpoint),其实就是所有任务的状态在某个时间点的一个快照(一份拷贝)。简单来讲,就是一次“存盘”,让我们之前处理数据的进度不要丢掉。在一个流应用程序运行时,Flink 会定期保存检查点,在检查点中会记录每个算子的id和状态;如果发生故障,Flink就会用最近一次成功保存的检查点来恢复应用的状态,重新启动处理流程,就如同“读档”一样。
如果保存检查点之后又处理了一些数据,然后发生了故障,那么重启恢复状态之后这些数据带来的状态改变会丢失。为了让最终处理结果正确,我们还需要让源(Source)算子重新读取这些数据,再次处理一遍。这就需要流的数据源具有“数据重放”的能力,一个典型的例子就是Kafka,我们可以通过保存消费数据的偏移量、故障重启后重新提交来实现数据的重放。这是对“至少一次”(at least once)状态一致性的保证,如果希望实现“精确一次”(exactly once)的一致性,还需要数据写入外部系统时的相关保证。关于这部分内容我们会在第10章继续讨论。
默认情况下,检查点是被禁用的,需要在代码中手动开启。直接调用执行环境的.enableCheckpointing()方法就可以开启检查点。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getEnvironment(); env.enableCheckpointing(1000);
这里传入的参数是检查点的间隔时间,单位为毫秒。关于检查点的详细配置,可以参考第10章的内容。除了检查点之外,Flink还提供了“保存点”(savepoint)的功能。保存点在原理和形式上跟检查点完全一样,也是状态持久化保存的一个快照;区别在于,保存点是自定义的镜像保存,所以不会由Flink自动创建,而需要用户手动触发。这在有计划地停止、重启应用时非常有用。
13.4.2 状态后端(State Backends)
检查点的保存离不开JobManager和TaskManager,以及外部存储系统的协调。在应用进行检查点保存时,首先会由JobManager向所有TaskManager发出触发检查点的命令;TaskManger收到之后,将当前任务的所有状态进行快照保存,持久化到远程的存储介质中;完成之后向JobManager返回确认信息。这个过程是分布式的,当JobManger收到所有TaskManager的返回信息后,就会确认当前检查点成功保存,如图所示。而这一切工作的协调,就需要一个“专职人员”来完成。
在Flink中,状态的存储、访问以及维护,都是由一个可插拔的组件决定的,这个组件就叫作状态后端(state backend)。状态后端主要负责两件事:一是本地的状态管理,二是将检查点(checkpoint)写入远程的持久化存储。
1. 状态后端的分类
状态后端是一个“开箱即用”的组件,可以在不改变应用程序逻辑的情况下独立配置。Flink中提供了两类不同的状态后端,一种是“哈希表状态后端”(HashMapStateBackend),另一种是“内嵌RocksDB状态后端”(EmbeddedRocksDBStateBackend)。如果没有特别配置,系统默认的状态后端是HashMapStateBackend。
(1)哈希表状态后端(HashMapStateBackend)
这种方式就是我们之前所说的,把状态存放在内存里。具体实现上,哈希表状态后端在内部会直接把状态当作对象(objects),保存在Taskmanager的JVM堆(heap)上。普通的状态,以及窗口中收集的数据和触发器(triggers),都会以键值对(key-value)的形式存储起来,所以底层是一个哈希表(HashMap),这种状态后端也因此得名。对于检查点的保存,一般是放在持久化的分布式文件系统(file system)中,也可以通过配置“检查点存储”(CheckpointStorage)来另外指定。HashMapStateBackend是将本地状态全部放入内存的,这样可以获得最快的读写速度,使计算性能达到最佳;代价则是内存的占用。它适用于具有大状态、长窗口、大键值状态的作业,对所有高可用性设置也是有效的。
(2)内嵌RocksDB状态后端(EmbeddedRocksDBStateBackend)
RocksDB是一种内嵌的key-value存储介质,可以把数据持久化到本地硬盘。配置EmbeddedRocksDBStateBackend后,会将处理中的数据全部放入RocksDB数据库中,RocksDB默认存储在TaskManager的本地数据目录里。与HashMapStateBackend直接在堆内存中存储对象不同,这种方式下状态主要是放在RocksDB中的。数据被存储为序列化的字节数组(Byte Arrays),读写操作需要序列化/反序列化,因此状态的访问性能要差一些。另外,因为做了序列化,key的比较也会按照字节进行,而不是直接调用.hashCode()和.equals()方法。对于检查点,同样会写入到远程的持久化文件系统中。EmbeddedRocksDBStateBackend始终执行的是异步快照,也就是不会因为保存检查点而阻塞数据的处理;而且它还提供了增量式保存检查点的机制,这在很多情况下可以大大提升保存效率。由于它会把状态数据落盘,而且支持增量化的检查点,所以在状态非常大、窗口非常长、键/值状态很大的应用场景中是一个好选择,同样对所有高可用性设置有效。
2. 如何选择正确的状态后端
HashMap和RocksDB两种状态后端最大的区别,就在于本地状态存放在哪里:前者是内存,后者是RocksDB。在实际应用中,选择那种状态后端,主要是需要根据业务需求在处理性能和应用的扩展性上做一个选择。HashMapStateBackend是内存计算,读写速度非常快;但是,状态的大小会受到集群可用内存的限制,如果应用的状态随着时间不停地增长,就会耗尽内存资源。而RocksDB是硬盘存储,所以可以根据可用的磁盘空间进行扩展,而且是唯一支持增量检查点的状态后端,所以它非常适合于超级海量状态的存储。不过由于每个状态的读写都需要做序列化/反序列化,而且可能需要直接从磁盘读取数据,这就会导致性能的降低,平均读写性能要比HashMapStateBackend慢一个数量级。我们可以发现,实际应用就是权衡利弊后的取舍。最理想的当然是处理速度快且内存不受限制可以处理海量状态,那就需要非常大的内存资源了,这会导致成本超出项目预算。比起花更多的钱,稍慢的处理速度或者稍小的处理规模,老板可能更容易接受一点。
3. 状态后端的配置
在不做配置的时候,应用程序使用的默认状态后端是由集群配置文件flink-conf.yaml中指定的,配置的键名称为state.backend。这个默认配置对集群上运行的所有作业都有效,我们可以通过更改配置值来改变默认的状态后端。另外,我们还可以在代码中为当前作业单独配置状态后端,这个配置会覆盖掉集群配置文件的默认值。
(1)配置默认的状态后端
在flink-conf.yaml中,可以使用state.backend来配置默认状态后端。配置项的可能值为hashmap,这样配置的就是HashMapStateBackend;也可以是rocksdb,这样配置的就是EmbeddedRocksDBStateBackend。另外,也可以是一个实现了状态后端工厂StateBackendFactory的类的完全限定类名。
下面是一个配置HashMapStateBackend的例子:
# 默认状态后端 state.backend: hashmap # 存放检查点的文件路径 state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints 这里的state.checkpoints.dir配置项,定义了状态后端将检查点和元数据写入的目录。
(2)为每个作业(Per-job)单独配置状态后端
每个作业独立的状态后端,可以在代码中,基于作业的执行环境直接设置。代码如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(new HashMapStateBackend());
上面代码设置的是HashMapStateBackend,如果想要设置EmbeddedRocksDBStateBackend,可以用下面的配置方式:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(new EmbeddedRocksDBStateBackend());
需要注意,如果想在IDE中使用EmbeddedRocksDBStateBackend,需要为Flink项目添加依赖:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency>
而由于Flink发行版中默认就包含了RocksDB,所以只要我们的代码中没有使用RocksDB的相关内容,就不需要引入这个依赖。
十四、容错机制
在分布式架构中,当某个节点出现故障,其它节点基本不受影响。这时只需要重启应用,恢复之前某个时间点的状态继续处理就可以了。这一切看似简单,可是在实时流处理中,我们不仅需要保证故障后能够重启继续运行,还要保证结果的正确性、故障恢复的速度、对处理性能的影响,这就需要在架构上做出更加精巧的设计。在Flink中,有一套完整的容错机制(fault tolerance)来保证故障后的恢复,其中最重要的就是检查点(checkpoint)。在第九章中,我们已经介绍过检查点的基本概念和用途,接下来我们就深入探讨一下检查点的原理和Flink的容错机制。
14.1 检查点(Checkpoint)
在流处理中,我们可以用存档读档的思路,把之前的计算结果做个保存,这样重启之后就可以继续处理新数据、而不需要重新计算了。进一步地,我们知道在有状态的流处理中,任务继续处理新数据,并不需要“之前的计算结果”,而是需要任务“之前的状态”。所以我们最终的选择,就是将之前某个时间点所有的状态保存下来,这份“存档”就是所谓的“检查点”(checkpoint)。遇到故障重启的时候,我们可以从检查点中“读档”,恢复出之前的状态,这样就可以回到当时保存的一刻接着处理数据了。检查点是Flink容错机制的核心。这里所谓的“检查”,其实是针对故障恢复的结果而言的:故障恢复之后继续处理的结果,应该与发生故障前完全一致,我们需要“检查”结果的正确性。所以,有时又会把checkpoint叫做“一致性检查点”。
14.1.1 检查点的保存
1. 周期性的触发保存
“随时存档”确实恢复起来方便,可是需要我们不停地做存档操作。如果每处理一条数据就进行检查点的保存,当大量数据同时到来时,就会耗费很多资源来频繁做检查点,数据处理的速度就会收到影响。所以更好的方式是,每隔一段时间去做一次存档,这样既不会影响数据的正常处理,也不会有太大的延迟——毕竟故障恢复的情况不是随时发生的。在Flink中,检查点的保存是周期性触发的,间隔时间可以进行设置。
所以检查点作为应用状态的一份“存档”,其实就是所有任务状态在同一时间点的一个“快照”(snapshot),它的触发是周期性的。具体来说,当每隔一段时间检查点保存操作被触发时,就把每个任务当前的状态复制一份,按照一定的逻辑结构放在一起持久化保存起来,就构成了检查点。
2. 保存的时间点
我们应该在所有任务都恰好处理完一个相同的输入数据的时候,将它们的状态保存下来。首先,这样避免了除状态之外其它额外信息的存储,提高了检查点保存的效率。其次,一个数据要么就是被所有任务完整地处理完,状态得到了保存;要么就是没处理完,状态全部没保存:这就相当于构建了一个“事务”(transaction)。如果出现故障,我们恢复到之前保存的状态,故障时正在处理的所有数据都需要重新处理;所以我们只需要让源(source)任务向数据源重新提交偏移量、请求重放数据就可以了。当然这需要源任务可以把偏移量作为算子状态保存下来,而且外部数据源能够重置偏移量;kafka就是满足这些要求的一个最好的例子。
3. 保存的具体流程
检查点的保存,最关键的就是要等所有任务将“同一个数据”处理完毕。下面我们通过一个具体的例子,来详细描述一下检查点具体的保存过程。回忆一下我们最初实现的统计词频的程序——word count。这里为了方便,我们直接从数据源读入已经分开的一个个单词,例如这里输入的就是:
“hello”,“world”,“hello”,“flink”,“hello”,“world”,“hello”,“flink”…
对应的代码就可以简化为:
SingleOutputStreamOperator<Tuple2<String, Long>> wordCountStream = env.addSource(...) .map(word -> Tuple2.of(word, 1L)) .returns(Types.TUPLE(Types.STRING, Types.LONG)); .keyBy(t -> t.f0); .sum(1);
源(Source)任务从外部数据源读取数据,并记录当前的偏移量,作为算子状态(Operator State)保存下来。然后将数据发给下游的Map任务,它会将一个单词转换成(word, count)二元组,初始count都是1,也就是(“hello”, 1)、(“world”, 1)这样的形式;这是一个无状态的算子任务。进而以word作为键(key)进行分区,调用.sum()方法就可以对count值进行求和统计了;Sum算子会把当前求和的结果作为按键分区状态(Keyed State)保存下来。最后得到的就是当前单词的频次统计(word, count),如图所示。
当我们需要保存检查点(checkpoint)时,就是在所有任务处理完同一条数据后,对状态做个快照保存下来。例如上图中,已经处理了3条数据:“hello”“world”“hello”,所以我们会看到Source算子的偏移量为3;后面的Sum算子处理完第三条数据“hello”之后,此时已经有2个“hello”和1个“world”,所以对应的状态为“hello”-> 2,“world”-> 1(这里KeyedState底层会以key-value形式存储)。此时所有任务都已经处理完了前三个数据,所以我们可以把当前的状态保存成一个检查点,写入外部存储中。至于具体保存到哪里,这是由状态后端的配置项“检查点存储”(CheckpointStorage)来决定的,可以有作业管理器的堆内存(JobManagerCheckpointStorage)和文件系统(FileSystemCheckpointStorage)两种选择。一般情况下,我们会将检查点写入持久化的分布式文件系统。
14.1.2 从检查点恢复状态
在运行流处理程序时,Flink会周期性地保存检查点。当发生故障时,就需要找到最近一次成功保存的检查点来恢复状态。例如在上节的word count示例中,我们处理完三个数据后保存了一个检查点。之后继续运行,又正常处理了一个数据“flink”,在处理第五个数据“hello”时发生了故障,如图所示。
这里Source任务已经处理完毕,所以偏移量为5;Map任务也处理完成了。而Sum任务在处理中发生了故障,此时状态并未保存。接下来就需要从检查点来恢复状态了。具体的步骤为:
(1)重启应用
遇到故障之后,第一步当然就是重启。我们将应用重新启动后,所有任务的状态会清空,如图所示。
(2)读取检查点,重置状态
找到最近一次保存的检查点,从中读出每个算子任务状态的快照,分别填充到对应的状态中。这样,Flink内部所有任务的状态,就恢复到了保存检查点的那一时刻,也就是刚好处理完第三个数据的时候,如图所示。