大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)

HDFS(已更完)

MapReduce(已更完)

Hive(已更完)

Flume(已更完)

Sqoop(已更完)

Zookeeper(已更完)

HBase(已更完)

Redis (已更完)

Kafka(正在更新…)

章节内容

上节我们完成了如下内容:


Kafka 一致性保证

LogAndOffset(LEO)

HightWatermark(HW)

Leader和Follower何时更新 LEO

Leader和Follower何时更新 HW

833b0a15561a2b3569ffe39c176ecd75_bf10c86a6db4482a87c82aa6b642dec4.png 基本介绍

消息重复和丢失是Kafka中很常见的问题,主要发生在以下三个阶段:


生产者阶段

Broke阶段

消费者阶段

生产者阶段丢失

出现场景

生产者发送消息没有收到正确Broke的响应,导致生产者重试。

生产者发送出一条消息,Broker落盘以后因为网络等种种原因发送端得到一个发送失败的响应或者网络中断,然后生产者收到一个可恢复的Exception重试消息导致消息重复。


重试过程

c3595f688a79f2ffa8ce2421582249b8_e5dc69d8183d4c7f9c09cacb6c651562.png 上图说明:


new KafkaProducer()创建一个后台线程KafkaThread扫描RecordAccumulator中是否有消息

调用KafkaProducer.send()发送消息,实际上只是把消息保存到RecordAccumulator中

后台线程KafkaThread扫描到RecordAccumulator中有消息后,将消息发送到Kafka集群

如果发送成功,那么返回成功

如果发送失败,判断是否重试,如果不允许重试则失败。允许重试则再保存到RecordAccumulator中,等待后台线程KafkaThread扫描再次发送

可恢复异常

异常是 RetriableException类型 或者 TransactionManager允许重试,RetriableException类集成关系如下:

bcf16baa701bc3eebd76882c501c8938_dd34538277a8486e99f431ef49eb1a86.png

消息顺序问题

如果设置max.in.flight.requests.per.connection大于1(默认5,单个连接上发送的未确认的请求的最大数量,表示上一个发出的请求没有确认下一个请求又发出去了)。大于1可能会改变记录的顺序,因为如果将两个Batch发送到单个分区,第一个batch处理失败并重试,但是第二个batch处理成功,那么第二个batch处理中的记录可能先出现被消费掉。

如果设置max.in.flight.requests.per.connection等于1,则可能会影响吞吐量,可以解决单个生产者发送顺序问题,如果多个生产者,生产者1先发送一个请求,生产者2后发送请求,此时生产者1返回可恢复异常,且重试成功,此时虽然1先发送,但是2是先被消费的。

解决方案

幂等性

启动Kafka幂等性:


enable.idempotence=true

ack=all

retries>=1

ack=0且不重试

可能会丢失消息,适用于吞吐量指标重要性高于数据丢失,比如:日志采集。


生产者-Broker阶段丢失

出现场景

ack=0且不重试

生产者发送消息完,不管结果了,如果发送失败也就丢失了


ack=1, Leader宕机

生产者发送完消息,只等待Leader写入成功就返回了,Leader分区丢失了,此时Follower没来得及同步,消息丢失。


unclean.leader.election.enable配置为true

允许选举ISR以外的副本作为Leader,会导致数据丢失,默认为False。生产者发送异步消息,只等待Leader写入成功就返回,Leader分区丢失,此时ISR中就没有Follower,Leader从OSR中选举,因为OSR中本来就落后于Leader,造成了消息的丢失。


解决方案

禁用unclean选举 ACK=ALL

ack=all 或者 -1

tries > 1

unclean.leader.election.enable = false

生产者发送完消息后,等待Follower同步完再返回,如果异常则重试。副本的数量可能影响吞吐量,不超过5个,一般是3个。

不允许unclean的Leader参与选举。

min.insync.replicas > 1

当生产者acks设置all(或-1)时,min.insync.replicas > 1。指定确认消息写成功需要的最小副本数量。达不到这个最小值,生产者将引发一个异常。(可能是NotEnoughReplicas,可能是NotEnoughReplicasAfterAppend)。

当一起使用时,min.insync.replicas和ack允许执行更大的持久性保证。一个典型的场景是创建一个复制因子为3的主题,设置min.insync复制到2个,用all配置发送。将确保如果大多数副本没有收到写操作,则生产者将引发异常。


失败的offset单独记录

生产者发送消息,会自动重试,遇到不可能恢复异常会跳出。这是可以捕获异常记录到数据库或者缓存,进行单独的处理。


消费者数据重复场景

出现场景

数据消费完没有及时的提交offset到Broker。

消费消息端在消费过程中挂掉没有及时的提交offset到Broker,另一个消费端启动之后拿到之前的offset记录开始消费,由于offset的滞后性可能会导致启动的客户端有少量的重复消费。


解决方案

取消自动提交

每次消费完或者程序退出时手动提交,这可能也没法保证一条重复。


下游做幂等

一般是让下游做幂等或者尽量每消费一条消息都记录offset,对于少数严格的场景可能需要把offset或唯一ID(例如订单ID)和下游状态更新放在同一个数据库里做事务来保证精确的一次更新。

或在下游数据表里同时记录消费offset,然后更新下游数据时用消费位移做乐观锁拒绝旧位移的数据更新


__Consumer_offsets

ZooKeeper不适合大批量的频繁写入操作

Kafka1.0.2将Consumer的位移信息保存在Kafka内部的topic中,即__consumer_offsets主题,并且默认提供了kafka-consumer-groups.sh脚本供用户查看consumer的信息。


目录
相关文章
|
16天前
|
消息中间件 存储 负载均衡
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
48 2
|
1月前
|
消息中间件 大数据 Kafka
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(二)
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(二)
27 2
|
1月前
|
消息中间件 NoSQL 大数据
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(一)
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(一)
33 1
|
29天前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
|
1月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
44 1
|
3月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
257 9
|
3月前
|
消息中间件 负载均衡 Java
"Kafka核心机制揭秘:深入探索Producer的高效数据发布策略与Java实战应用"
【8月更文挑战第10天】Apache Kafka作为顶级分布式流处理平台,其Producer组件是数据高效发布的引擎。Producer遵循高吞吐、低延迟等设计原则,采用分批发送、异步处理及数据压缩等技术提升性能。它支持按消息键值分区,确保数据有序并实现负载均衡;提供多种确认机制保证可靠性;具备失败重试功能确保消息最终送达。Java示例展示了基本配置与消息发送流程,体现了Producer的强大与灵活性。
66 3
|
3月前
|
vr&ar 图形学 开发者
步入未来科技前沿:全方位解读Unity在VR/AR开发中的应用技巧,带你轻松打造震撼人心的沉浸式虚拟现实与增强现实体验——附详细示例代码与实战指南
【8月更文挑战第31天】虚拟现实(VR)和增强现实(AR)技术正深刻改变生活,从教育、娱乐到医疗、工业,应用广泛。Unity作为强大的游戏开发引擎,适用于构建高质量的VR/AR应用,支持Oculus Rift、HTC Vive、Microsoft HoloLens、ARKit和ARCore等平台。本文将介绍如何使用Unity创建沉浸式虚拟体验,包括设置项目、添加相机、处理用户输入等,并通过具体示例代码展示实现过程。无论是完全沉浸式的VR体验,还是将数字内容叠加到现实世界的AR应用,Unity均提供了所需的一切工具。
127 0
|
3月前
|
消息中间件 存储 关系型数据库
实时计算 Flink版产品使用问题之如何使用Kafka Connector将数据写入到Kafka
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
消息中间件 监控 Kafka
实时计算 Flink版产品使用问题之处理Kafka数据顺序时,怎么确保事件的顺序性
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。