最近生产上频繁暴雷,问题总结于以下几点:
1、kafka消费数据丢失:
历史代码如下:
由于该消费者日消费数据量可以达到6000万-1亿。上图代码中,用到的是每次拉取单条数据,然后提交给多线程,造成的结果是:
1、丢数据:没有等多线程结束,就提交了偏移量。部分数据处理业务逻辑时报错,但是偏移量已提交,数据丢失。
2、由于交给多线程工作,并发量大,导致系统连接数据库时爆掉。
优化后代码:去掉了代码中的多线程,修改kafka消费配置,使其可以并行消费。
kafka批量拉取数据:
重要参数介绍:
①concurrency:默认情况下, Spring-Kafka @KafkaListener 串行消费的。缺点显而易见生产者生产的数据过多时,消费端容易导致消息积压的问题。我们可以通过启动多个进程,实现 多进程的并发消费;也取决于你的TOPIC的 partition的数量。 如果设置 concurrency=2 时,Spring-Kafka 就会为该 @KafkaListener标注的方法消费的消息 创建 2个线程,进行并发消费。 当然了,这是有前置条件的。 不要超过 partitions 的大小。
当concurrency < partition 的数量,会出现消费不均的情况,一个消费者的线程可能消费多个partition 的数据
当concurrency = partition 的数量,最佳状态,一个消费者的线程消费一个 partition 的数据
当concurrency > partition 的数量,会出现有的消费者的线程没有可消费的partition, 造成资源的浪费
Spring-Kafka 提供的并发消费,需要创建多个 Kafka Consumer 对象,并且每个 Consumer 都单独分配一个线程,然后 Consumer 拉取完消息之后,在各自的线程中执行消费。
本地代码实测:当concurrency=2时,起动本地环境,本地会有两个消费者同时工作。加上uat环境,相当于共有两台机器工作,每台机器有2个消费者,因此共有4个消费者。
比如:当前线上改topic的分区是36个。我们共有6台机器。所以concurrency设为6,应该才是最佳状态。
②batchListener: 设置batchListener参数为true, 可以支持批量消费
③max.poll.interval.ms: 使用消费者组管理时调用 poll() 之间的最大延迟。这为消费者在获取更多记录之前可以空闲的时间量设置了上限。如果在此超时到期之前未调用 poll(),则认为消费者失败,组将重新平衡,以便将分区重新分配给另一个成员。对于使用达到此超时的非空 group.instance.id 的消费者,不会立即重新分配分区。相反,消费者将停止发送心跳,并且分区将在 session.timeout.ms 到期后重新分配。这反映了已关闭的静态消费者的行为。默认300000 (5 minutes)
④max.poll.records: 在一次 poll() 调用中返回的最大记录数。请注意, max.poll.records 不会影响底层的获取行为。消费者将缓存来自每个获取请求的记录,并从每次轮询中递增地返回它们. 默认500.
⑤enable.auto.commit: 如果为 true,消费者的偏移量将在后台定期提交。如果需要手动提交,则设为false. 默认是true.
⑥auto.offset.reset:
earliest:自动将偏移量重置为最早的偏移量
latest:自动将偏移量重置为最新的偏移量
none:如果没有为消费者组找到先前的偏移量,则向消费者抛出异常
anything else:向消费者抛出异常
⑦session.timeout.ms: 用于检测工作程序故障的超时。工作人员定期发送心跳以向代理指示其活跃性。如果在此会话超时到期之前代理没有收到心跳,那么代理将从组中删除工作人员并启动重新平衡。请注意,该值必须在代理配置中 group.min.session.timeout.ms 和 group.max.session.timeout.ms 配置的允许范围内。默认 10000 (10 seconds)
⑧heartbeat.interval.ms: 使用 Kafka 的组管理设施时,与消费者协调器之间的心跳之间的预期时间。心跳用于确保消费者的会话保持活跃,并在新消费者加入或离开组时促进重新平衡。该值必须设置为低于 session.timeout.ms,但通常应设置为不高于该值的 1/3。它可以调整得更低,以控制正常重新平衡的预期时间。默认 3000 (3 seconds)
⑨ContainerProperties.AckMode.MANUAL_IMMEDIATE:
eg: 生产kafka监控举例
① 当前生产者分区数为36,设置的消费者数为36,下图所示,达到最佳消费状态。
②当前生产者分区数为36,设置的消费者数为12,下图所示,未达到最佳消费状态。