关于Kafka消费者的这些参数你应该要知道?

简介: 关于Kafka消费者的这些参数你应该要知道?

1、Kafka Consumer核心参数览


个人觉得,要想深入了解Kafka Consumer的核心工作机制可以从它的核心参数切入,为后续深入了解它的队列负载机制、消息拉取模型、消费模型、位点提交等机制打下基础。

kafka Consumer的核心属性定义在ConsumerConfig中。


1.1 基础功能参数


  • group.id
    消费组名称。
  • client.id
    客户端标识id,默认为consumer-序号,在实践中建议包含客户端IP,在一个消费组中不能重复
  • bootstrap.servers
    broker服务端地址列表。
  • client.dns.lookup
    客户端寻找bootstrap地址的方式,支持如下两种方式:
  • resolve_canonical_bootstrap_servers_only
    这种方式,会依据bootstrap.servers提供的主机名(hostname),根据主机上的名称服务返回其IP地址的数组(InetAddress.getAllByName),然后依次获取inetAddress.getCanonicalHostName(),再建立tcp连接。
    一个主机可配置多个网卡,如果启用该功能,应该可以有效利用多网卡的优势,降低Broker的网络端负载压力
  • use_all_dns_ips
    这种方式会直接使用bootstrap.servers中提供的hostname、port创建tcp连接,默认选项。
  • enable.auto.commit
    是否开启自动位点提交,默认为true。
  • auto.commit.interval.ms
    如果开启自动位点提交,位点的提交频率,默认为5s。
  • partition.assignment.strategy
    消费端队列负载算法,默认为按区间平均分配(RangeAssignor),可选值:轮询(RoundRobinAssignor)
  • auto.offset.reset
    重置位点策略,但kafka提交位点时,对应的消息已被删除时采取的恢复策略,默认为latest,可选:earliest、none(会抛出异常)。
  • key.deserializer
    使用的key序列化类
  • value.deserializer
    消息体序列化类
  • interceptor.classes
    消费端拦截器,可以有多个。
  • check.crcs
    在消费端时是否需要校验CRC,默认为true。


1.2 网络相关参数


  • send.buffer.bytes
    网络通道(TCP)的发送缓存区大小,默认为128K。
  • receive.buffer.bytes
    网络通道(TCP)的接收缓存区大小,默认为32K。
  • reconnect.backoff.ms
    重新建立链接的等待时长,默认为50ms,属于底层网络参数,基本无需关注。
  • reconnect.backoff.max.ms
    重新建立链接的最大等待时长,默认为1s,连续两次对同一个连接建立重连,等待时间会在reconnect.backoff.ms的初始值上成指数级递增,但超过max后,将不再指数级递增。
  • retry.backoff.ms
    重试间隔时间,默认为100ms。
  • connections.max.idle.ms
    连接的最大空闲时间,默认为9s。
  • request.timeout.ms
    请求的超时时间,与Broker端的网络通讯的请求超时时间


1.3 核心工作参数


  • max.poll.records
    每一次poll方法调用拉取的最大消息条数,默认为500。
  • max.poll.interval.ms
    两次poll方法调用的最大间隔时间,单位毫秒,默认为5分钟。如果消费端在该间隔内没有发起poll操作,该消费者将被剔除,触发重平衡,将该消费者分配的队列分配给其他消费者
  • session.timeout.ms
    消费者与broker的心跳超时时间,默认10s,broker在指定时间内没有收到心跳请求,broker端将会将该消费者移出,并触发重平衡
  • heartbeat.interval.ms
    心跳间隔时间,消费者会以该频率向broker发送心跳,默认为3s,主要是确保session不会失效。
  • fetch.min.bytes
    一次拉取消息最小返回的字节数量,默认为1字节。
  • fetch.max.bytes
    一次拉取消息最大返回的字节数量,默认为1M,如果一个分区的第一批消息大小大于该值也会返回。
  • max.partition.fetch.bytes
    一次拉取每一个分区最大拉取字节数,默认为1M。
  • fetch.max.wait.ms
    fetch等待拉取数据符合fetch.min.bytes的最大等待时间。
  • metadata.max.age.ms
    元数据在客户端的过期时间,过期后客户端会向broker重新拉取最新的元数据,默认为5分钟。
  • internal.leave.group.on.close
    消费者关闭后是否立即离开订阅组,默认为true,即当客户端断开后立即触发重平衡。如果设置为false,则不会立即触发重平衡,而是要等session过期后才会触发。


2、KafkaConsumer核心组件与API


通过KafkaConsumer核心参数,我们基本可以窥探Kafka中的核心要点,接下来再介绍一下KafkaConsumer的核心组件,为后续深入研究Kafka消费者消费模型打下基础。


2.1 核心组件


59309d1a2a798726bd1f984aa7576765.png

KafkaConsumer由如下几个核心组件构成:


  • ConsumerNetworkClient
    消费端网络客户端,服务底层网络通讯,负责客户端与服务端的RPC通信。
  • ConsumerCoordinator
    消费端协调器,在Kafka的设计中,每一个消费组在集群中会选举一个broker节点成为该消费组的协调器,负责消费组状态的状态管理,尤其是消费组重平衡(消费者的加入与退出),该类就是消费者与broker协调器进行交互。
  • Fetcher
    消息拉取。

温馨提示:本文不打算对每一个组件进行详细解读,这里建议大家按照本文第一部分关于各个参数的含义,然后对照这些参数最终是传resume递给哪些组件,进行一个关联思考。


2.2 核心API概述


最后我们再来看一下消费者的核心API。

c25c9de07b16b7d9c2e1303ef9741f97.png

  • Set< TopicPartition> assignment()
    获取该消费者的队列分配列表。
  • Set< String> subscription()
    获取该消费者的订阅信息。
  • void subscribe(Collection< String> topics)
    订阅主题。
  • void subscribe(Collection< String> topics, ConsumerRebalanceListener callback)
    订阅主题,并指定队列重平衡的监听器。
  • void assign(Collection< TopicPartition> partitions)
    取代 subscription,手动指定消费哪些队列。
  • void unsubscribe()
    取消订阅关系。
  • ConsumerRecords
    poll(Duration timeout)
    拉取消息,是 KafkaConsumer 的核心方法,将在下文详细介绍。
  • void commitSync()
    同步提交消费进度,为本批次的消费提交,将在后续文章中详细介绍。
  • void commitSync(Duration timeout)
    同步提交消费进度,可设置超时时间。
  • void commitSync(Map
    offsets)
    显示同步提交消费进度, offsets 指明需要提交消费进度的信息。
  • void commitSync(final Map
    offsets, final Duration timeout)
    显示同步提交消费进度,带超时间。
  • void seek(TopicPartition partition, long offset)
    重置 consumer#poll 方法下一次拉消息的偏移量。
  • void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata)
    seek 方法重载方法。
  • void seekToBeginning(Collection< TopicPartition> partitions)
    将 poll 方法下一次的拉取偏移量设置为队列的初始偏移量。
  • void seekToEnd(Collection< TopicPartition> partitions)
    将 poll 方法下一次的拉取偏移量设置为队列的最大偏移量。
  • long position(TopicPartition partition)
    获取将被拉取的偏移量。
  • long position(TopicPartition partition, final Duration timeout)
    同上。
  • OffsetAndMetadata committed(TopicPartition partition)
    获取指定分区已提交的偏移量。
  • OffsetAndMetadata committed(TopicPartition partition, final Duration timeout)
    同上。
  • Map metrics()
    统计指标。
  • List< PartitionInfo> partitionsFor(String topic)
    获取主题的路由信息。
  • List< PartitionInfo> partitionsFor(String topic, Duration timeout)
    同上。
  • Map listTopics()
    获取所有 topic 的路由信息。
  • Map listTopics(Duration timeout)
    同上。
  • Set< TopicPartition> paused()
    获取已挂起的分区信息。
  • void pause(Collection< TopicPartition> partitions)
    挂起分区,下一次 poll 方法将不会返回这些分区的消息。
  • void resume(Collection< TopicPartition> partitions)
    恢复挂起的分区。
  • Map
    offsetsForTimes(MaptimestampsToSearch)
    根据时间戳查找最近的一条消息的偏移量。
  • Map
    offsetsForTimes(MaptimestampsToSearch, Duration timeout)
    同上。
  • Map
    beginningOffsets(Collection< TopicPartition> partitions)
    查询指定分区当前最小的偏移量。
  • Map
    beginningOffsets(Collection< TopicPartition> partitions, Duration timeout)
    同上。
  • Map
    endOffsets(Collection< TopicPartition> partitions)
    查询指定分区当前最大的偏移量。
  • Map
    endOffsets(Collection< TopicPartition> partitions, Duration timeout)
    同上。
  • void close()
    关闭消费者。
  • void close(Duration timeout)
    关闭消费者。
  • void wakeup()
    唤醒消费者。


Kafka提供的消费者并不像RocketMQ提供了Push模式自动拉取消息,需要应用程序自动组织这些API进行消息拉取。


值得注意的kafka消费者也支持位点自动提交机制,kafka的消费者(KafkaConsumer)对象是线程不安全的


基于KafkaConsumer的pause(暂停某些分区的消费)与resume(恢复某些分区的消费),可以轻松实现消费端限流机制。


相关文章
|
11天前
|
消息中间件 存储 负载均衡
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
40 2
|
1月前
|
消息中间件 SQL 分布式计算
大数据-62 Kafka 高级特性 主题 kafka-topics相关操作参数 KafkaAdminClient 偏移量管理
大数据-62 Kafka 高级特性 主题 kafka-topics相关操作参数 KafkaAdminClient 偏移量管理
26 6
|
3月前
|
消息中间件 负载均衡 大数据
揭秘Kafka背后的秘密!再均衡如何上演一场消费者组的‘权力游戏’,让消息处理秒变高能剧情?
【8月更文挑战第24天】Kafka是一款在大数据处理领域备受推崇的产品,以其出色的性能和可扩展性著称。本文通过一个具体案例介绍其核心机制之一——再均衡(Rebalancing)。案例中,“user_activity”主题下10个分区被3个消费者均衡消费。当新消费者加入或原有消费者离开时,Kafka将自动触发再均衡过程,确保所有消费者能有效处理分配给它们的分区。
134 62
|
1月前
|
消息中间件 存储 负载均衡
大数据-60 Kafka 高级特性 消息消费01-消费组图例 心跳机制图例 附参数详解与建议值
大数据-60 Kafka 高级特性 消息消费01-消费组图例 心跳机制图例 附参数详解与建议值
40 3
|
1月前
|
消息中间件 SQL 分布式计算
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
31 1
|
3月前
|
消息中间件 Kafka API
【Kafka消费新风潮】告别复杂,迎接简洁之美——深度解析Kafka新旧消费者API大比拼!
【8月更文挑战第24天】Apache Kafka作为一个领先的分布式流处理平台,广泛用于实时数据管道和流式应用的构建。随着其发展,消费者API经历了重大更新。旧消费者API(包括“低级”和“高级”API)虽提供灵活性但在消息顺序处理上存在挑战。2017年引入的新消费者API简化了接口,自动管理偏移量,支持更强大的消费组功能,显著降低了开发复杂度。通过对比新旧消费者API的代码示例可以看出,新API极大提高了开发效率和系统可维护性。
129 58
|
2月前
|
消息中间件 Kafka 测试技术
Kafka常用命令大全及kafka-console-consumer.sh及参数说明
该文章汇总了Kafka常用命令,包括集群管理、Topic操作、生产者与消费者的命令行工具使用方法等,适用于Kafka的日常运维和开发需求。
191 2
|
3月前
|
消息中间件 负载均衡 Kafka
【Kafka消费秘籍】深入了解消费者组与独立模式,掌握消息消费的两种超能力!
【8月更文挑战第24天】Apache Kafka是一款高性能的分布式消息系统,支持灵活多样的消费模型以适应不同的应用场景。消息按主题组织,每个主题可划分为多个分区,确保消息顺序性。本文深入探讨了Kafka中的两大核心消费模式:消费者组(Consumer Group)和独立消费者(Standalone Consumer)。消费者组允许多个消费者协同工作,实现负载均衡及故障恢复,是最常用的消费模式。独立消费者模式则适用于需要高度定制化处理逻辑的场景,如消息重放等。通过对比这两种模式的特点和提供的示例代码,开发者可以根据具体需求选择最合适的消费策略,从而更好地利用Kafka构建高效的数据流应用程序。
76 3
|
3月前
|
消息中间件 Java 大数据
"深入理解Kafka单线程Consumer:核心参数配置、Java实现与实战指南"
【8月更文挑战第10天】在大数据领域,Apache Kafka以高吞吐和可扩展性成为主流数据流处理平台。Kafka的单线程Consumer因其实现简单且易于管理而在多种场景中受到欢迎。本文解析单线程Consumer的工作机制,强调其在错误处理和状态管理方面的优势,并通过详细参数说明及示例代码展示如何有效地使用KafkaConsumer类。了解这些内容将帮助开发者优化实时数据处理系统的性能与可靠性。
86 7
|
3月前
|
图形学 C# 开发者
全面掌握Unity游戏开发核心技术:C#脚本编程从入门到精通——详解生命周期方法、事件处理与面向对象设计,助你打造高效稳定的互动娱乐体验
【8月更文挑战第31天】Unity 是一款强大的游戏开发平台,支持多种编程语言,其中 C# 最为常用。本文介绍 C# 在 Unity 中的应用,涵盖脚本生命周期、常用函数、事件处理及面向对象编程等核心概念。通过具体示例,展示如何编写有效的 C# 脚本,包括 Start、Update 和 LateUpdate 等生命周期方法,以及碰撞检测和类继承等高级技巧,帮助开发者掌握 Unity 脚本编程基础,提升游戏开发效率。
74 0