大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: 大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)

HDFS(已更完)

MapReduce(已更完)

Hive(已更完)

Flume(已更完)

Sqoop(已更完)

Zookeeper(已更完)

HBase(已更完)

Redis (已更完)

Kafka(正在更新…)

章节内容

上节我们完成了如下内容:


Kafka 一致性保证

LogAndOffset(LEO)

HightWatermark(HW)

Leader和Follower何时更新 LEO

Leader和Follower何时更新 HW

833b0a15561a2b3569ffe39c176ecd75_bf10c86a6db4482a87c82aa6b642dec4.png 基本介绍

消息重复和丢失是Kafka中很常见的问题,主要发生在以下三个阶段:


生产者阶段

Broke阶段

消费者阶段

生产者阶段丢失

出现场景

生产者发送消息没有收到正确Broke的响应,导致生产者重试。

生产者发送出一条消息,Broker落盘以后因为网络等种种原因发送端得到一个发送失败的响应或者网络中断,然后生产者收到一个可恢复的Exception重试消息导致消息重复。


重试过程

c3595f688a79f2ffa8ce2421582249b8_e5dc69d8183d4c7f9c09cacb6c651562.png 上图说明:


new KafkaProducer()创建一个后台线程KafkaThread扫描RecordAccumulator中是否有消息

调用KafkaProducer.send()发送消息,实际上只是把消息保存到RecordAccumulator中

后台线程KafkaThread扫描到RecordAccumulator中有消息后,将消息发送到Kafka集群

如果发送成功,那么返回成功

如果发送失败,判断是否重试,如果不允许重试则失败。允许重试则再保存到RecordAccumulator中,等待后台线程KafkaThread扫描再次发送

可恢复异常

异常是 RetriableException类型 或者 TransactionManager允许重试,RetriableException类集成关系如下:

bcf16baa701bc3eebd76882c501c8938_dd34538277a8486e99f431ef49eb1a86.png

消息顺序问题

如果设置max.in.flight.requests.per.connection大于1(默认5,单个连接上发送的未确认的请求的最大数量,表示上一个发出的请求没有确认下一个请求又发出去了)。大于1可能会改变记录的顺序,因为如果将两个Batch发送到单个分区,第一个batch处理失败并重试,但是第二个batch处理成功,那么第二个batch处理中的记录可能先出现被消费掉。

如果设置max.in.flight.requests.per.connection等于1,则可能会影响吞吐量,可以解决单个生产者发送顺序问题,如果多个生产者,生产者1先发送一个请求,生产者2后发送请求,此时生产者1返回可恢复异常,且重试成功,此时虽然1先发送,但是2是先被消费的。

解决方案

幂等性

启动Kafka幂等性:


enable.idempotence=true

ack=all

retries>=1

ack=0且不重试

可能会丢失消息,适用于吞吐量指标重要性高于数据丢失,比如:日志采集。


生产者-Broker阶段丢失

出现场景

ack=0且不重试

生产者发送消息完,不管结果了,如果发送失败也就丢失了


ack=1, Leader宕机

生产者发送完消息,只等待Leader写入成功就返回了,Leader分区丢失了,此时Follower没来得及同步,消息丢失。


unclean.leader.election.enable配置为true

允许选举ISR以外的副本作为Leader,会导致数据丢失,默认为False。生产者发送异步消息,只等待Leader写入成功就返回,Leader分区丢失,此时ISR中就没有Follower,Leader从OSR中选举,因为OSR中本来就落后于Leader,造成了消息的丢失。


解决方案

禁用unclean选举 ACK=ALL

ack=all 或者 -1

tries > 1

unclean.leader.election.enable = false

生产者发送完消息后,等待Follower同步完再返回,如果异常则重试。副本的数量可能影响吞吐量,不超过5个,一般是3个。

不允许unclean的Leader参与选举。

min.insync.replicas > 1

当生产者acks设置all(或-1)时,min.insync.replicas > 1。指定确认消息写成功需要的最小副本数量。达不到这个最小值,生产者将引发一个异常。(可能是NotEnoughReplicas,可能是NotEnoughReplicasAfterAppend)。

当一起使用时,min.insync.replicas和ack允许执行更大的持久性保证。一个典型的场景是创建一个复制因子为3的主题,设置min.insync复制到2个,用all配置发送。将确保如果大多数副本没有收到写操作,则生产者将引发异常。


失败的offset单独记录

生产者发送消息,会自动重试,遇到不可能恢复异常会跳出。这是可以捕获异常记录到数据库或者缓存,进行单独的处理。


消费者数据重复场景

出现场景

数据消费完没有及时的提交offset到Broker。

消费消息端在消费过程中挂掉没有及时的提交offset到Broker,另一个消费端启动之后拿到之前的offset记录开始消费,由于offset的滞后性可能会导致启动的客户端有少量的重复消费。


解决方案

取消自动提交

每次消费完或者程序退出时手动提交,这可能也没法保证一条重复。


下游做幂等

一般是让下游做幂等或者尽量每消费一条消息都记录offset,对于少数严格的场景可能需要把offset或唯一ID(例如订单ID)和下游状态更新放在同一个数据库里做事务来保证精确的一次更新。

或在下游数据表里同时记录消费offset,然后更新下游数据时用消费位移做乐观锁拒绝旧位移的数据更新


__Consumer_offsets

ZooKeeper不适合大批量的频繁写入操作

Kafka1.0.2将Consumer的位移信息保存在Kafka内部的topic中,即__consumer_offsets主题,并且默认提供了kafka-consumer-groups.sh脚本供用户查看consumer的信息。


目录
相关文章
|
2月前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
184 0
|
2月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
45 0
|
2月前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
101 0
|
1月前
|
消息中间件 存储 负载均衡
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
78 2
|
2月前
|
消息中间件 存储 druid
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
47 3
|
2月前
|
消息中间件 druid 大数据
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
39 2
|
2月前
|
消息中间件 分布式计算 druid
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
60 1
|
2月前
|
消息中间件 NoSQL Kafka
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
191 0
|
2月前
|
消息中间件 资源调度 大数据
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
47 0
|
1月前
|
存储 分布式计算 数据挖掘
数据架构 ODPS 是什么?
数据架构 ODPS 是什么?
287 7