Flink(十二)【容错机制】(3)https://developer.aliyun.com/article/1532258
3、 端到端精确一次(end-to-end exactly-once)
实际应用中,最难做到、也最希望做到的一致性语义,无疑就是端到端(end-to-end)的“精确一次”(exactly-once)。我们知道,对于 Flink 内部来说,检查点机制可以保证故障恢复后数据不丢(在能够重放的前提下),并且只处理一次,所以已经可以做到 exactly-once 的一致性语义了。需要注意的是,我们说检查点能够保证故障恢复后数据只处理一次,并不是说之前统计过某个数据,现在就不能再次统计了;而是要看状态的改变和输出的结果,是否只包含了一次这个数据的处理。由于检查点保存的是之前所有任务处理完某个数据后的状态快照,所以重放的数据引起的状态改变一定不会包含在里面,最终结果中只处理了一次。所以,端到端一致性的关键点,就在于输入的数据源端和输出的外部存储端。
3.1、 输入端保证
输入端主要指的就是 Flink 读取的外部数据源。对于一些数据源来说,并不提供数据的缓冲或是持久化保存,数据被消费之后就彻底不存在了。例如 socket 文本流就是这样, socket服务器是不负责存储数据的,发送一条数据之后,我们只能消费一次,是“一锤子买卖”。对于这样的数据源,故障后我们即使通过检查点恢复之前的状态,可保存检查点之后到发生故障期间的数据已经不能重发了,这就会导致数据丢失。所以就只能保证 at-most-once 的一致性语义,相当于没有保证。
想要在故障恢复后不丢数据,外部数据源就必须拥有重放数据的能力。常见的做法就是对数据进行持久化保存,并且可以重设数据的读取位置。一个最经典的应用就是 Kafka。在 Flink的 Source 任务中将数据读取的偏移量保存为状态,这样就可以在故障恢复时从检查点中读取出来,对数据源重置偏移量,重新获取数据。数据源可重放数据,或者说可重置读取数据偏移量,加上 Flink 的 Source 算子将偏移量作为状态保存进检查点,就可以保证数据不丢。这是达到 at-least-once 一致性语义的基本要求,当然也是实现端到端 exactly-once 的基本要求。
3.2、 输出端保证
有了 Flink 的检查点机制,以及可重放数据的外部数据源,我们已经能做到 at-least-once了。但是想要实现 exactly-once 却有更大的困难:数据有可能重复写入外部系统。因为检查点保存之后,继续到来的数据也会一一处理,任务的状态也会更新,最终通过Sink 任务将计算结果输出到外部系统;只是状态改变还没有存到下一个检查点中。这时如果出现故障,这些数据都会重新来一遍,就计算了两次。我们知道对 Flink 内部状态来说,重复计算的动作是没有影响的,因为状态已经回滚,最终改变只会发生一次(检查点在持久化时某一时刻会有两份检查点:旧的检查点和正在保存的,只有正在保存的检查点保存成功了才会替换掉旧的检查点);但对于外部系统来说,已经写入的结果就是泼出去的水,已经无法收回了,再次执行写入就会把同一个数据写入两次。所以这时,我们只保证了端到端的 at-least-once 语义。为了实现端到端 exactly-once,我们还需要对外部存储系统、以及 Sink 连接器有额外的要求。能够保证 exactly-once 一致性的写入方式有两种:
- 幂等写入
- 事务写入
我们需要外部存储系统对这两种写入方式的支持,而 Flink 也为提供了一些 Sink 连接器接口。接下来我们进行展开讲解。
1. 幂等(idempotent)写入
所谓“幂等”操作,就是说一个操作可以重复执行很多次,但只导致一次结果更改。也就是说,后面再重复执行就不会对结果起作用了。
数学中一个典型的例子是,ex 的求导下操作,无论做多少次,得到的都是自身。而在数据处理领域,最典型的就是对 HashMap 的插入操作:如果是相同的键值对,后面的重复插入就都没什么作用了。这相当于说,我们并没有真正解决数据重复计算、写入的问题;而是说,重复写入也没关系,结果不会改变。所以这种方式主要的限制在于外部存储系统必须支持这样的幂等写入:比如 Redis 中键值存储,或者关系型数据库(如 MySQL)中满足查询条件的更新操作。需要注意,对于幂等写入,遇到故障进行恢复时,有可能会出现短暂的不一致。因为保存点完成之后到发生故障之间的数据,其实已经写入了一遍,回滚的时候并不能消除它们。如果有一个外部应用读取写入的数据,可能会看到奇怪的现象:短时间内,结果会突然“跳回”到之前的某个值,然后“重播”一段之前的数据。不过当数据的重放逐渐超过发生故障的点的时候,最终的结果还是一致的。
2. 事务(transactional)写入
如果说幂等写入对应用场景限制太多,那么事务写入可以说是更一般化的保证一致性的方式。之前我们提到,输出端最大的问题就是“覆水难收”,写入到外部系统的数据难以撤回。自然想到,那怎样可以收回一条已写入的数据呢?利用事务就可以做到。我们都知道,事务(transaction)是应用程序中一系列严密的操作,所有操作必须成功完成,否则在每个操作中所做的所有更改都会被撤消。事务有四个基本特性:原子性(Atomicity)、一致性(Correspondence)、隔离性(Isolation)和持久性(Durability),这就是著名的 ACID。在 Flink 流处理的结果写入外部系统时,如果能够构建一个事务,让写入操作可以随着检查点来提交和回滚,那么自然就可以解决重复写入的问题了。所以事务写入的基本思想就是:用一个事务来进行数据向外部系统的写入,这个事务是与检查点绑定在一起的。当 Sink 任务遇到 barrier 时,开始保存状态的同时就开启一个事务,接下来所有数据的写入都在这个事务中;待到当前检查点保存完毕时,将事务提交,所有写入的数据就真正可用了。如果中间过程出现故障,状态会回退到上一个检查点,而当前事务没有正常关闭(因为当前检查点没有保存完),所以也会回滚,写入到外部的数据就被撤销了。具体来说,又有两种实现方式:预写日志(WAL)和两阶段提交(2PC)
(1)预写日志(write-ahead-log,WAL)
我们发现,事务提交是需要外部存储系统支持事务的,否则没有办法真正实现写入的回撤。那对于一般不支持事务的存储系统,能够实现事务写入呢?预写日志(WAL)就是一种非常简单的方式。具体步骤是:
①先把结果数据作为日志(log)状态保存起来
②进行检查点保存时,也会将这些结果数据一并做持久化存储
③在收到检查点完成的通知时,将所有结果一次性写入外部系统。
我们会发现,这种方式类似于检查点完成时做一个批处理,一次性的写入会带来一些性能上的问题;而优点就是比较简单,由于数据提前在状态后端中做了缓存,所以无论什么外部存储系统,理论上都能用这种方式一批搞定。在 Flink 中 DataStream API 提供了一个模板类GenericWriteAheadSink,用来实现这种事务型的写入方式。
需要注意的是,预写日志这种一批写入的方式,有可能会写入失败;所以在执行写入动作之后,必须等待发送成功的返回确认消息。在成功写入所有数据后,在内部再次确认相应的检查点,这才代表着检查点的真正完成。这里需要将确认信息也进行持久化保存,在故障恢复时,只有存在对应的确认信息,才能保证这批数据已经写入,可以恢复到对应的检查点位置。但这种“再次确认”的方式,也会有一些缺陷。如果我们的检查点已经成功保存、数据也成功地一批写入到了外部系统,但是最终保存确认信息时出现了故障,Flink 最终还是会认为没有成功写入。于是发生故障时,不会使用这个检查点,而是需要回退到上一个;这样就会导致这批数据的重复写入。
(2)两阶段提交(two-phase-commit,2PC)
前面提到的各种实现 exactly-once 的方式,多少都有点缺陷,有没有更好的方法呢?自然是有的,这就是传说中的两阶段提交(2PC)。顾名思义,它的想法是分成两个阶段:先做“预提交”,等检查点完成之后再正式提交。这种提交方式是真正基于事务的,它需要外部系统提供事务支持。
具体的实现步骤为:
①当第一条数据到来时,或者收到检查点的分界线时,Sink 任务都会启动一个事务。
②接下来接收到的所有数据,都通过这个事务写入外部系统;这时由于事务没有提交,所以数据尽管写入了外部系统,但是不可用,是“预提交”的状态。
③当 Sink 任务收到 JobManager 发来检查点完成的通知时,正式提交事务,写入的结果就真正可用了。
当中间发生故障时,当前未提交的事务就会回滚,于是所有写入外部系统的数据也就实现了撤回。这种两阶段提交(2PC)的方式充分利用了 Flink 现有的检查点机制:分界线的到来,就标志着开始一个新事务;而收到来自 JobManager 的 checkpoint 成功的消息,就是提交事务的指令。每个结果数据的写入,依然是流式的,不再有预写日志时批处理的性能问题;最终提交时,也只需要额外发送一个确认信息。所以 2PC 协议不仅真正意义上实现了 exactly-once,而且通过搭载 Flink 的检查点机制来实现事务,只给系统增加了很少的开销。Flink 提供了 TwoPhaseCommitSinkFunction 接口,方便我们自定义实现两阶段提交的SinkFunction 的实现,提供了真正端到端的 exactly-once 保证。
不过两阶段提交虽然精巧,却对外部系统有很高的要求。这里将 2PC 对外部系统的要求,列举如下:
- 外部系统必须提供事务支持,或者 Sink 任务必须能够模拟外部系统上的事务。
- 在检查点的间隔期间里,必须能够开启一个事务并接受数据写入。
- 在收到检查点完成的通知之前,事务必须是“等待提交”的状态。在故障恢复的情况下,这可能需要一些时间。如果这个时候外部系统关闭事务(例如超时了),那么未提交的数据就会丢失。
- Sink 任务必须能够在进程失败后恢复事务。
- 提交事务必须是幂等操作。也就是说,事务的重复提交应该是无效的。
可见,2PC 在实际应用同样会受到比较大的限制。具体在项目中的选型,最终还应该是一致性级别和处理性能的权衡考量。
3.3、 Flink 和 Kafka 连接时的精确一次保证
在流处理的应用中,最佳的数据源当然就是可重置偏移量的消息队列了;它不仅可以提供数据重放的功能,而且天生就是以流的方式存储和处理数据的。所以作为大数据工具中消息队列的代表,Kafka 可以说与 Flink 是天作之合,实际项目中也经常会看到以 Kafka 作为数据源和写入的外部系统的应用。这里,我们就来具体讨论一下 Flink 和 Kafka 连接时,怎样保证端到端的 exactly-once 状态一致性。
这里我们的并行度为2,输入端是 Kafka ,数据从 Kafka产生,经过 Flink 处理之后再次输出到 Kafka。
1. 我们看到,两个并行度下,Kafka 的两个分区分别把单词 ‘c’ 和 ‘a’ 发送到下游的Source1 和 Source2,此时正好 JobManager 发出 Barrier ,于是Source1 和 Source2将各自的偏移量持久化到检查点当中。
2. 这里我们不考虑中间Flink内部算子怎么操作持以及久化检查点的,我们主要关心输出端是如何实现精确一次的。我们看到第一条数据到了 Sink 算子后(注意是整个程序的第一条数据并不是每个sink节点接收到的第一条数据,这里的图例有误), Sink 节点开启第一次事务(也就是第一个数据到下一个 Barrier 之间的数据将被保存为第一个版本的检查点状态),预提交开始。同时会将事务的状态保存到状态。
3. 预提交阶段:到达Sink的数据会调用 Kafka producer 的 send() 方法,数据写入缓冲区,再 flush() 。此时数据写入到 Kafka,标记为“未提交”状态,如果任意一个 Sink 节点预提交过程中出现失败,整个预提交会放弃(虽然放弃,但是毕竟数据已经写入到了 Kafka,我们Flink不可能进去Kafka去删除数据,只能在读取数据的时候对于标记为“预提交”的数据选择视而不见)。
4. id=1的barrier到达sink节点,触发barrier节点的本地状态保存到hdfs本地状态,包含自身的状态和事务快照。同时第一轮检查点保存结束,再次开启一个新的Kafka事务,用于该barrier后面的数据的预提交。只有第一个事务是由第一个数据开启,之后的事务都是由barrier开启。
5. 当全部的 节点做完本地的checkpoint,jobmanager向所有节点发送一个本轮成功的回调消息(JobManager就知道了本轮id的barrier持久化状态任务已经完成),预提交结束。
6. sink 收到chekpoint 完成的通知,进行事务的正式提交,将写入Kafka的数据标记修改为“已提交”,如果发生障碍回滚到上次完成快照的时间点。
7. 成功正式提交后,Kafka 会返回一个消息给sink节点,sink节点会将存在状态里的事务状态修改为finished状态。
1. 整体介绍
既然是端到端的 exactly-once,我们依然可以从三个组件的角度来进行分析:
(1)Flink 内部
Flink 内部可以通过检查点机制保证状态和处理结果的 exactly-once 语义(也就是开启检查点并设置状态一致性语义为精准一次)。
(2)输入端
输入数据源端的 Kafka 可以对数据进行持久化保存,并可以重置偏移量(offset)。所以我们可以在 Source 任务(FlinkKafkaConsumer)中将当前读取的偏移量保存为算子状态,写入到检查点中;当发生故障时,从检查点中读取恢复状态,并由连接器 FlinkKafkaConsumer 向 Kafka重新提交偏移量,就可以重新消费数据、保证结果的一致性了。
(3)输出端
输出端保证 exactly-once 的最佳实现,当然就是两阶段提交(2PC)。作为与 Flink 天生一对的 Kafka,自然需要用最强有力的一致性保证来证明自己。Flink 官方实现的 Kafka 连接器中,提供了写入到 Kafka 的 FlinkKafkaProducer,它就实现了 TwoPhaseCommitSinkFunction 接口。
也就是说我们写入 Kafka 的过程其实是一个两段式的提交处理完毕,得到结果写入 Kafka 是基于事物的“预提交”,等到检查点保存完毕才会提交事务,进行正式提交,如果中间出现故障,事故进行回滚,预提交就会被放弃,恢复状态之后也只能恢复所有已确认提交的操作。
2. 需要的配置
在具体应用中,实现真正的端到端 exactly-once ,还需要有一些额外的配置:
- 必须启用检查点
- 指定 Kafka Sink 的发送级别为 DeliveryGuarantee.EXACTLY_ONCE
- 配置 Kafka 读取数据的消费者隔离级别
这里所说的 Kafka ,是写入的外部系统。预提交阶段数据已经写入,只是被标记为“未提交”(uncommitted),而 Kafka 中默认的隔离级别 isolation.level 是 read_uncommited ,也就是可以读取未提交的数据。这样一来,外部应用就可以直接消费未提交的数据,对于事务性的保证就失效了 。所以应该将隔离级别进行配置。
为 read_commited ,表示消费者遇到未提交的消息时,会停止从分区中消费数据,直到消息被标记为已提交才会再次恢复消费。当然,这样做的话,外部应用消费就会有显著的延迟。
4. 事务超时配置
如果 checkpoint 周期 大于 事务时间,很可能我们要提交的时候事务已经关闭,所以我们要保证事务的超时大于checkpoint周期。
Flink 的 Kafka 连接器中配置的事务超时时间 transaction.timeout.ms 默认是一小时,而 Kafka 集群配置的事务超时时间 transaction.timeout.ms 默认是十五分钟。所以在检查点保存时间很长时,有可能出现 Kafka 已经认为事务超时了,丢弃了提交的数据;而 Sink 任务认为还可以继续等待。如果接下来检查点保存成功,发送故障后回滚到这个检查点的状态,这部分数据就被真正丢掉了。所以这两个超时时间前者应该小于等于后者。
编码演示
public class KafkaEOSDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // TODO 1. 检查点配置 // 1. 周期为 5s 默认就是barrier对齐的精准一次 env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); // 2. 指定检查点的存储位置 CheckpointConfig checkpointConfig = env.getCheckpointConfig(); checkpointConfig.setCheckpointStorage("file:///D://Desktop//FlinkStudy/chk");// 一般我们会存到云端 checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // TODO 2. 读取 Kafka // 从 Kafka 读取 KafkaSource<String> kafkaSource = KafkaSource.<String>builder() .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092") //指定kafka地址和端口 .setGroupId("lyh") // 指定消费者组id .setTopics("like") // 指定消费的topic,可以是多个用List<String> .setValueOnlyDeserializer(new SimpleStringSchema()) // 指定反序列化器 因为kafka是生产者 flink作为消费者要反序列化 .setStartingOffsets(OffsetsInitializer.latest()) // 指定flink消费kafka的策略 .build(); DataStreamSource<String> kafka_source = env .fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2000L)), "kafkaSource"); // TODO 3. 写出到 Kafka /* 写到 kafka 的一致性级别: 精准一次 / 至少一次 如果是精准一次 1.必须开启检查点 env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE) 2.必须设置事务的前缀 3.必须设置事务的超时时间: 大于 checkpoint间隔 小于 max 15分钟 */ KafkaSink<String> kafkaSink = KafkaSink.<String>builder() // 指定 kafka 的地址和端口 .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092") // 指定序列化器 我们是发送方 所以我们是生产者 .setRecordSerializer( KafkaRecordSerializationSchema.<String>builder() .setTopic("wc") .setValueSerializationSchema(new SimpleStringSchema()) .build() ) .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) // 开启两阶段提交 .setTransactionalIdPrefix("lyh-") // 事务前缀 .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,10*60*1000+"") .build(); kafka_source.sinkTo(kafkaSink); env.execute(); } }
我们在命令行开启一个Kafka消费者来消费Flink写入到Kafka的"wc"主题的数据,运行程序可以发现,当我们的生产者刚发送数据,还没到检查点周期结束呢就被保存了(现象就是生产者刚发送一条数据,消费者已经读取到了)。
我们可以在源码中看到,默认 Kafka 的消费者的隔离级别是读未提交,这种情况下,预提交的数据也会被读取到(这是不满足端到端精准一次的,因为如果我们的中间出故障了,预提交的数据应该被丢弃,但是显然现在预提交的数据已经被读取到了,事实上我们应该等到预处理的数据被标记为已提交的时候才能被读取),所以我们需要配置 Kafka 的消费者隔离级别。
我们开启一个消费者来读取我们Flink 写入到 Kafka 中的数据:
public class KafkaEOSSourceDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 从 Kafka 读取 KafkaSource<String> kafkaSource = KafkaSource.<String>builder() .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092") //指定kafka地址和端口 .setGroupId("lyh") // 指定消费者组id .setTopics("ws") // 指定消费的topic,可以是多个用List<String> .setValueOnlyDeserializer(new SimpleStringSchema()) // 指定反序列化器 因为kafka是生产者 flink作为消费者要反序列化 .setStartingOffsets(OffsetsInitializer.latest()) // 指定flink消费kafka的策略 .setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed") .build(); env.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2000L)),"kafkaSource") .print(); env.execute(); } /* * kafka 消费者的参数: * auto.reset.offsets: * earliest: 如果有offset,从offset继续消费;如果没有 就从 最早 消费 * latest : 如果有offset,从offset继续消费;如果没有 就从 最新 消费 * flink 的 kafkaSource offset消费者策略: offsetsInitializer,默认是 earliest * earliest: 一定从 最早 消费 (不管有没有offset) * latest : 一定从 最新 消费 (不管有没有offset) */ }
我们需要观察的是,当我们生产者生产一条数据后,多久才会写入到Kafka,是不是在一个checkpoint周期(我们这里设置的是5s)之后,如果是5s之后,说明是按照2pc来提交的。
所以,端到端精准一次对输出端(一般都是Kafka)是有要求的,比如这里就要求必须设置消费者隔离级别为 read_committed 。
总结
Flink 的容错机制终于是过完了,用时3天左右,收获满满,期待下次复习以及背面试题的时候再来了解,这种底层的原理是真的有意思。希望 Flink 以后可以是我工作的主要工具,太爱了。