kafka-工作中遇到的问题

简介: kafka-工作中遇到的问题

最近生产上频繁暴雷,问题总结于以下几点:

1、kafka消费数据丢失:
历史代码如下:

由于该消费者日消费数据量可以达到6000万-1亿。上图代码中,用到的是每次拉取单条数据,然后提交给多线程,造成的结果是:
1、丢数据:没有等多线程结束,就提交了偏移量。部分数据处理业务逻辑时报错,但是偏移量已提交,数据丢失。
2、由于交给多线程工作,并发量大,导致系统连接数据库时爆掉。

优化后代码:去掉了代码中的多线程,修改kafka消费配置,使其可以并行消费。

kafka批量拉取数据:

重要参数介绍:

①concurrency:默认情况下, Spring-Kafka @KafkaListener 串行消费的。缺点显而易见生产者生产的数据过多时,消费端容易导致消息积压的问题。我们可以通过启动多个进程,实现 多进程的并发消费;也取决于你的TOPIC的 partition的数量。 如果设置 concurrency=2 时,Spring-Kafka 就会为该 @KafkaListener标注的方法消费的消息 创建 2个线程,进行并发消费。 当然了,这是有前置条件的。 不要超过 partitions 的大小。
当concurrency < partition 的数量,会出现消费不均的情况,一个消费者的线程可能消费多个partition 的数据

当concurrency = partition 的数量,最佳状态,一个消费者的线程消费一个 partition 的数据

当concurrency > partition 的数量,会出现有的消费者的线程没有可消费的partition, 造成资源的浪费
Spring-Kafka 提供的并发消费,需要创建多个 Kafka Consumer 对象,并且每个 Consumer 都单独分配一个线程,然后 Consumer 拉取完消息之后,在各自的线程中执行消费。

本地代码实测:当concurrency=2时,起动本地环境,本地会有两个消费者同时工作。加上uat环境,相当于共有两台机器工作,每台机器有2个消费者,因此共有4个消费者。
比如:当前线上改topic的分区是36个。我们共有6台机器。所以concurrency设为6,应该才是最佳状态。

②batchListener: 设置batchListener参数为true, 可以支持批量消费

③max.poll.interval.ms: 使用消费者组管理时调用 poll() 之间的最大延迟。这为消费者在获取更多记录之前可以空闲的时间量设置了上限。如果在此超时到期之前未调用 poll(),则认为消费者失败,组将重新平衡,以便将分区重新分配给另一个成员。对于使用达到此超时的非空 group.instance.id 的消费者,不会立即重新分配分区。相反,消费者将停止发送心跳,并且分区将在 session.timeout.ms 到期后重新分配。这反映了已关闭的静态消费者的行为。默认300000 (5 minutes)

④max.poll.records: 在一次 poll() 调用中返回的最大记录数。请注意, max.poll.records 不会影响底层的获取行为。消费者将缓存来自每个获取请求的记录,并从每次轮询中递增地返回它们. 默认500.

⑤enable.auto.commit: 如果为 true,消费者的偏移量将在后台定期提交。如果需要手动提交,则设为false. 默认是true.

⑥auto.offset.reset:
earliest:自动将偏移量重置为最早的偏移量

    latest:自动将偏移量重置为最新的偏移量
    none:如果没有为消费者组找到先前的偏移量,则向消费者抛出异常
    anything else:向消费者抛出异常

⑦session.timeout.ms: 用于检测工作程序故障的超时。工作人员定期发送心跳以向代理指示其活跃性。如果在此会话超时到期之前代理没有收到心跳,那么代理将从组中删除工作人员并启动重新平衡。请注意,该值必须在代理配置中 group.min.session.timeout.ms 和 group.max.session.timeout.ms 配置的允许范围内。默认 10000 (10 seconds)
⑧heartbeat.interval.ms: 使用 Kafka 的组管理设施时,与消费者协调器之间的心跳之间的预期时间。心跳用于确保消费者的会话保持活跃,并在新消费者加入或离开组时促进重新平衡。该值必须设置为低于 session.timeout.ms,但通常应设置为不高于该值的 1/3。它可以调整得更低,以控制正常重新平衡的预期时间。默认 3000 (3 seconds)
⑨ContainerProperties.AckMode.MANUAL_IMMEDIATE:

eg: 生产kafka监控举例
① 当前生产者分区数为36,设置的消费者数为36,下图所示,达到最佳消费状态。

②当前生产者分区数为36,设置的消费者数为12,下图所示,未达到最佳消费状态。

相关文章
|
4天前
|
消息中间件 大数据 Kafka
【Kafka】Kafka 中生产者运行流程
【4月更文挑战第10天】【Kafka】Kafka 中生产者运行流程
|
4天前
|
消息中间件 存储 Kafka
【Kafka】Kafka 架构设计分析
【4月更文挑战第5天】【Kafka】kafka 架构设计分析
|
4天前
|
消息中间件 存储 算法
Kafka基础 (上)
Kafka基础 (上)
15 0
|
4天前
|
消息中间件 存储 Prometheus
【Kafka】Kafka 提供了哪些系统工具
【4月更文挑战第11天】【Kafka】Kafka 提供了哪些系统工具
|
4天前
|
消息中间件 运维 监控
【Kafka】Kafka生产过程中何时会发生QueueFullExpection以及如何处理
【4月更文挑战第11天】【Kafka】Kafka生产过程中何时会发生QueueFullExpection以及如何处理
|
7月前
|
消息中间件 存储 Kafka
Kafka最基础使用
Kafka最基础使用
79 0
|
4天前
|
消息中间件 Kafka
Kafka - 深入了解Kafka基础架构:Kafka的基本概念
Kafka - 深入了解Kafka基础架构:Kafka的基本概念
30 0
|
4天前
|
消息中间件 存储 分布式计算
Apache Kafka-初体验Kafka(01)-入门整体认识kafka
Apache Kafka-初体验Kafka(01)-入门整体认识kafka
44 0
|
10月前
|
消息中间件 SpringCloudAlibaba 大数据
kafka:基础篇
**Kafka传统定义**:Kafka是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。**发布/订阅**:消息的发布者不会将消息直接发送给特定的订阅者,而是将发布的消息 分为不同的类别,订阅者只接收感兴趣的消息。
109 0
kafka:基础篇
|
11月前
|
消息中间件 存储 缓存
【Kafka从入门到放弃系列 四】Kafka架构深入——生产者策略
【Kafka从入门到放弃系列 四】Kafka架构深入——生产者策略
146 0
【Kafka从入门到放弃系列 四】Kafka架构深入——生产者策略