大数据-60 Kafka 高级特性 消息消费01-消费组图例 心跳机制图例 附参数详解与建议值

简介: 大数据-60 Kafka 高级特性 消息消费01-消费组图例 心跳机制图例 附参数详解与建议值

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)

HDFS(已更完)

MapReduce(已更完)

Hive(已更完)

Flume(已更完)

Sqoop(已更完)

Zookeeper(已更完)

HBase(已更完)

Redis (已更完)

Kafka(正在更新…)

章节内容

上节我们完成了如下的内容:


Kafka拦截器

Kafka自定义拦截器

Kafka原理剖析

0ea48b8be06c7ed7a102f084605f6408_75daa9094a574a7799d39b19bbaa530d.png Kafka 消息消费(Message Consumption)

消息消费是 Kafka 系统中另一个重要的环节,它决定了数据如何从 Kafka 集群传递到应用程序中。


消费者(Consumer)与消费者组(Consumer Group)

消费者(Consumer):Kafka 消费者是从 Kafka 主题的分区中读取消息的客户端应用程序。每个消费者都可以独立地读取一个或多个分区的数据。

消费者组(Consumer Group):消费者组是由多个消费者组成的一个逻辑集合,用来共同消费一个主题中的消息。Kafka 中,每条消息只会被消费者组中的一个消费者消费,这使得消费者组成为了 Kafka 中的消息负载均衡机制。

消息消费的过程

分配分区:当一个消费者加入消费者组时,Kafka 会根据分区的数量和消费者的数量,分配特定的分区给消费者。每个分区只能被消费者组中的一个消费者消费,但一个消费者可以消费多个分区的数据。

拉取(Pull)模型:Kafka 使用拉取(Pull)模型,即消费者主动从 Kafka 中拉取消息。这样可以让消费者灵活地控制消息处理的速度和节奏。

偏移量管理:Kafka 中每条消息都有一个唯一的偏移量(Offset)。消费者在消费消息后,会提交当前的偏移量,以标识下次从哪里开始消费。Kafka 提供了自动提交和手动提交偏移量两种模式。

消费者组的优势

容错性:如果一个消费者发生故障,Kafka 会自动将该消费者的分区重新分配给组内的其他消费者,从而保证消息的持续消费。

负载均衡:通过消费者组,可以实现多个消费者之间的负载均衡,确保消息消费的高效性。

Kafka 的心跳机制(Heartbeat Mechanism)

心跳机制是 Kafka 保证消费者组成员关系稳定和消息消费一致性的重要机制。它用于检测消费者的存活状态,并帮助协调分区的重新分配。


心跳机制的工作原理

心跳(Heartbeat):消费者会定期向 Kafka 的协调者(Coordinator)发送心跳请求,以表明自己仍然存活并继续消费分配到的分区。如果协调者长时间没有收到某个消费者的心跳,便会认为该消费者已经失效,并触发再均衡(Rebalance)。

会话超时(Session Timeout):如果在会话超时时间内,协调者没有收到消费者的心跳,则认为消费者已失去联系,从而启动分区的重新分配过程。

再均衡(Rebalance):在再均衡过程中,Kafka 会重新分配分区给消费者组中的其他存活消费者。再均衡是一个相对较重的操作,因为在此期间消费者不能正常处理消息,所以应尽量避免频繁发生再均衡。

心跳机制的参数调优

session.timeout.ms:这是消费者与协调者之间的会话超时时间,通常设置为几秒到几十秒。当消费者在此时间内没有发送心跳,协调者便认为消费者已失效。

heartbeat.interval.ms:这是消费者发送心跳的间隔时间。通常设置为比 session.timeout.ms 更小的值,以确保协调者能及时感知消费者的状态。

max.poll.interval.ms:这是消费者调用 poll() 方法的最大间隔时间。如果消费者在这个时间内未进行消息拉取,也会被视为失效。

心跳机制的意义

保持消费者组的稳定性:心跳机制确保了 Kafka 能够及时检测到消费者的故障,并做出响应,以保持消费者组的稳定性和分区的有效消费。

优化消息处理的延迟:通过合理设置心跳机制相关的参数,可以减少不必要的再均衡,优化消息处理的延迟和系统的稳定性。

编号解释

P 表示 Partition 分区

C 表示 Consumer 消费者

(4-1)C,表示原来是4个C,离线了1个C。

消费组

消费者从订阅的主题消费消息,消费消息的偏移量保存在Kafka的名字是:

__consumer_offsets

消费者可以将自己的偏移量存储到ZooKeeper,需要设置:

(推荐使用Kafka自己存储消费者的偏移量,因为ZooKeeper并不适合高并发场景)

offset.storage=zookeeper

多个消费者可以加入到一个消费组中,共享 group_id, group_id 一般设置为应用的逻辑名称。

configs.put("group_id", "xxx");
• 1

消费组01 4P1C

消费组均衡地给消费者分配分区,每个分区只有消费组中的一个消费者消费:

消费组02 4P,2C

一个拥有四个分区的主题,包含一个消费者的消费组。如果消费组有2个,则每个消费者分别从两个分区中接收消息。

消费组03 4P,4C

如果消费组有四个消费者,则每个消费者可以分配到一个分区。

消费组04 4P,C5

如果消费组中有过多的消费者,超过主题分区的数量,那么一部分消费者就会闲置,不会接受任何消息。

消费组05 4P,C4G1,C2G2

如果是两个消费组一起消费,会如图所示:

心跳机制

4P4C

假设正在消费稳定消费,会形成如下的样子:

4P,(4-1)C

消费过程中,如果消费者宕机,退出了消费组,触发了再平衡,重新给消费组中的消费者分配分区。

配置参数

Kafka的心跳是KafkaConsumer和Broker之间的健康检查,只有当Broker Coordinator 正常时,Consumer才会发起心跳。

其他的相关参数如下:

session.timeout.ms

含义:session.timeout.ms 是 Kafka 用来判断消费者是否存活的会话超时时间。消费者需要在这个时间范围内定期向协调者(Coordinator)发送心跳,以保持其在消费者组中的成员资格。

默认值:45,000 毫秒(45 秒)

配置建议:


如果消费者的心跳间隔时间超过这个超时时间,Kafka 会认为消费者已经失效,并触发分区的再均衡。

该值不宜过大,因为会延迟故障检测时间,但也不宜过小,以避免因网络抖动或短暂的 GC 停顿导致的错误移除。

heartbeat.interval.ms

含义:heartbeat.interval.ms 是消费者发送心跳的间隔时间。这个参数控制了消费者向协调者发送心跳的频率。

默认值:3,000 毫秒(3 秒)

配置建议:


通常这个值应小于 session.timeout.ms,以确保消费者能够在超时时间内多次发送心跳,从而避免被错误地视为失效。

如果设置过小,可能会增加协调者的负担和网络开销;如果设置过大,则可能导致在 session.timeout.ms 之前心跳次数不足。

max.poll.interval.ms

含义:max.poll.interval.ms 定义了消费者从 Kafka 拉取消息(调用 poll() 方法)的最大间隔时间。如果消费者在这个时间内没有进行消息拉取,Kafka 将认为消费者已经失效,导致其从消费者组中移除,并触发再均衡。

默认值:300,000 毫秒(5 分钟)

配置建议:


这个参数对于那些需要处理大量消息或耗时任务的消费者特别重要。如果消息处理时间过长,需要适当增加这个值。

如果消费者处理每批次消息的时间超过了这个间隔时间,可以通过调整 max.poll.interval.ms 来避免消费者被错误移除。

request.timeout.ms

含义:request.timeout.ms 定义了消费者等待来自 Kafka 服务器响应的最大时间。这个时间与心跳机制密切相关,因为如果消费者长时间未能接收到响应,可能会导致心跳失败。

默认值:30,000 毫秒(30 秒)

配置建议:


该值应大于 session.timeout.ms,以防止在网络延迟或 Kafka 服务器响应缓慢时,消费者错误地认为自己被移除。

fetch.max.wait.ms

含义:fetch.max.wait.ms 定义了消费者等待 Kafka 服务器返回消息的最大时间。与心跳机制相比,这个参数主要影响消息拉取的延迟。

默认值:500 毫秒

配置建议:


对于低延迟的消息消费场景,可以适当减小这个值;而对于高吞吐量的场景,可以结合 fetch.min.bytes 参数适当增加此值以优化批量拉取的性能。

metadata.max.age.ms

含义:metadata.max.age.ms 定义了消费者强制从 Kafka 服务器刷新元数据的最大间隔时间。元数据包括分区的位置信息、领导者信息等,这些信息对于心跳和再均衡过程至关重要。

默认值:300,000 毫秒(5 分钟)

配置建议:


这个参数不直接影响心跳机制,但间接影响再均衡过程。在频繁发生分区领导者变化的场景中,可以减少这个值以加快元数据更新速度。

auto.offset.reset

含义:auto.offset.reset 定义了当消费者无法找到有效的偏移量时(例如在分区重新分配或消费者首次启动时),应采取的策略。可选值包括 earliest(从最早的偏移量开始消费)和 latest(从最新的偏移量开始消费)。

默认值:latest

配置建议:


这个参数虽然不属于心跳参数的范畴,但对于消费者组重新平衡后的消费行为影响较大。在心跳检测后,如果出现偏移量丢失或错误配置,此参数决定了消费者如何恢复消费。

实际应用中的优化建议

在实际应用中,合理设置消费者和心跳机制的参数至关重要。通过合适的参数配置,可以在提高系统容错能力的同时,确保高效的消息处理和低延迟。


消费者组规模控制:避免消费者组内成员数量过多,以减少再均衡的频率和复杂性。

心跳机制参数调优:根据业务的实际需求,调整心跳和会话超时参数,平衡系统响应速度和消费者组稳定性之间的关系。


相关文章
|
26天前
|
弹性计算 人工智能 架构师
阿里云携手Altair共拓云上工业仿真新机遇
2024年9月12日,「2024 Altair 技术大会杭州站」成功召开,阿里云弹性计算产品运营与生态负责人何川,与Altair中国技术总监赵阳在会上联合发布了最新的“云上CAE一体机”。
阿里云携手Altair共拓云上工业仿真新机遇
|
3天前
|
人工智能 Rust Java
10月更文挑战赛火热启动,坚持热爱坚持创作!
开发者社区10月更文挑战,寻找热爱技术内容创作的你,欢迎来创作!
335 14
|
18天前
|
存储 关系型数据库 分布式数据库
GraphRAG:基于PolarDB+通义千问+LangChain的知识图谱+大模型最佳实践
本文介绍了如何使用PolarDB、通义千问和LangChain搭建GraphRAG系统,结合知识图谱和向量检索提升问答质量。通过实例展示了单独使用向量检索和图检索的局限性,并通过图+向量联合搜索增强了问答准确性。PolarDB支持AGE图引擎和pgvector插件,实现图数据和向量数据的统一存储与检索,提升了RAG系统的性能和效果。
|
6天前
|
JSON 自然语言处理 数据管理
阿里云百炼产品月刊【2024年9月】
阿里云百炼产品月刊【2024年9月】,涵盖本月产品和功能发布、活动,应用实践等内容,帮助您快速了解阿里云百炼产品的最新动态。
阿里云百炼产品月刊【2024年9月】
|
21天前
|
人工智能 IDE 程序员
期盼已久!通义灵码 AI 程序员开启邀测,全流程开发仅用几分钟
在云栖大会上,阿里云云原生应用平台负责人丁宇宣布,「通义灵码」完成全面升级,并正式发布 AI 程序员。
|
23天前
|
机器学习/深度学习 算法 大数据
【BetterBench博士】2024 “华为杯”第二十一届中国研究生数学建模竞赛 选题分析
2024“华为杯”数学建模竞赛,对ABCDEF每个题进行详细的分析,涵盖风电场功率优化、WLAN网络吞吐量、磁性元件损耗建模、地理环境问题、高速公路应急车道启用和X射线脉冲星建模等多领域问题,解析了问题类型、专业和技能的需要。
2588 22
【BetterBench博士】2024 “华为杯”第二十一届中国研究生数学建模竞赛 选题分析
|
5天前
|
存储 人工智能 搜索推荐
数据治理,是时候打破刻板印象了
瓴羊智能数据建设与治理产品Datapin全面升级,可演进扩展的数据架构体系为企业数据治理预留发展空间,推出敏捷版用以解决企业数据量不大但需构建数据的场景问题,基于大模型打造的DataAgent更是为企业用好数据资产提供了便利。
178 2
|
3天前
|
编译器 C#
C#多态概述:通过继承实现的不同对象调用相同的方法,表现出不同的行为
C#多态概述:通过继承实现的不同对象调用相同的方法,表现出不同的行为
104 65
|
6天前
|
Linux 虚拟化 开发者
一键将CentOs的yum源更换为国内阿里yum源
一键将CentOs的yum源更换为国内阿里yum源
299 2
|
22天前
|
机器学习/深度学习 算法 数据可视化
【BetterBench博士】2024年中国研究生数学建模竞赛 C题:数据驱动下磁性元件的磁芯损耗建模 问题分析、数学模型、python 代码
2024年中国研究生数学建模竞赛C题聚焦磁性元件磁芯损耗建模。题目背景介绍了电能变换技术的发展与应用,强调磁性元件在功率变换器中的重要性。磁芯损耗受多种因素影响,现有模型难以精确预测。题目要求通过数据分析建立高精度磁芯损耗模型。具体任务包括励磁波形分类、修正斯坦麦茨方程、分析影响因素、构建预测模型及优化设计条件。涉及数据预处理、特征提取、机器学习及优化算法等技术。适合电气、材料、计算机等多个专业学生参与。
1580 16
【BetterBench博士】2024年中国研究生数学建模竞赛 C题:数据驱动下磁性元件的磁芯损耗建模 问题分析、数学模型、python 代码