Flink CDC里EXACTLY_ONCE semantic, but checkpointing is not enabled. Switching to NONE semantic. 为什么开checkpoint producer不可以设置EXACTLY_ONCE?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
在Flink CDC中,当您尝试将Kafka Producer的语义设置为EXACTLY_ONCE
时,如果未启用Flink的Checkpoint机制,系统会自动降级为NONE
语义。这是因为EXACTLY_ONCE
语义的实现依赖于Flink的Checkpoint机制来确保数据的一致性和可靠性。
以下是详细的原因和背景说明:
EXACTLY_ONCE
语义的核心是通过Flink的Checkpoint机制来保证数据的精确一次处理。具体来说: - Flink会在每次Checkpoint时,将Kafka事务的状态与Flink的状态进行同步。 - Kafka Producer会使用事务来确保消息的写入要么完全成功,要么完全回滚,从而避免数据丢失或重复。 - 如果未启用Checkpoint,Flink无法记录Kafka事务的状态,也就无法保证EXACTLY_ONCE
语义。
因此,当Checkpoint未启用时,Flink无法提供事务性保障,系统会自动切换到NONE
语义(即不提供任何保障)。
为了支持EXACTLY_ONCE
语义,您需要在Flink作业中显式启用Checkpoint,并正确配置相关参数。以下是一个典型的配置示例:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 启用Checkpoint,设置间隔时间(例如10秒)
env.enableCheckpointing(10000, CheckpointingMode.EXACTLY_ONCE);
此外,还需要确保以下几点: - Kafka集群支持事务:Kafka Broker版本需为0.11及以上,且启用了事务功能。 - Producer池大小与并发Checkpoint数量匹配:EXACTLY_ONCE
模式下,Flink会使用一个固定大小的Kafka Producer池。如果并发的Checkpoint数量超过Producer池的大小,可能会导致作业失败。
即使启用了Checkpoint,使用EXACTLY_ONCE
语义时仍需注意以下事项: - 事务延迟:在Flink作业正常运行期间,写入Kafka的数据可能会有延迟,延迟时间约为Checkpoint的平均间隔。 - 故障恢复:如果Flink作业在第一个Checkpoint之前发生故障,重启后不会保留之前的Producer池信息。因此,在第一个Checkpoint完成之前缩减并行度是不安全的[1]</。 - Consumer隔离级别:对于运行在read_committed
模式下的Kafka Consumer,任何未结束的事务都会阻塞对该Topic的读取。因此,建议合理设置事务超时时间,以避免长时间阻塞。
如果您希望使用EXACTLY_ONCE
语义,请按照以下步骤操作: 1. 启用Checkpoint:
env.enableCheckpointing(10000, CheckpointingMode.EXACTLY_ONCE);
DeliveryGuarantee.EXACTLY_ONCE
:
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers("localhost:9092")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("your-topic")
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.build();
transaction.timeout.ms
以避免事务超时。EXACTLY_ONCE
语义的实现依赖于Flink的Checkpoint机制。如果未启用Checkpoint,Flink无法保证事务性写入,因此会自动降级为NONE
语义。要解决此问题,请确保在Flink作业中启用Checkpoint,并正确配置Kafka Producer的相关参数。
如果您仍有疑问或需要进一步的帮助,请提供更多上下文信息,我将为您详细解答。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。