图解Kafka消息发送者核心参数与工作机制

简介: 图解Kafka消息发送者核心参数与工作机制

将Kafka Producer相关的参数分成如下几个类型:


  • 常规参数
  • 工作原理(性能相关)参数(图解)


本文会结合图解方式,重点阐述与Kafka生产者运作机制密切相关的参数。


1、常规参数


为了更好的使用Kafka Producer,首先介绍一下几个基本参数。


  • bootstrap.servers
    配置Kafka broker的服务器地址列表,多个用英文逗号分开,可以不必写全,Kafka内部有自动感知Kafka broker的机制。
  • client.dns.lookup
    客户端寻找bootstrap地址的方式,支持如下两种方式:
  • resolve_canonical_bootstrap_servers_only
    这种方式,会依据bootstrap.servers提供的主机名(hostname),根据主机上的名称服务返回其IP地址的数组(InetAddress.getAllByName),然后依次获取inetAddress.getCanonicalHostName(),再建立tcp连接。
    一个主机可配置多个网卡,如果启用该功能,应该可以有效利用多网卡的优势,降低Broker的网络端负载压力
  • use_all_dns_ips
    这种方式会直接使用bootstrap.servers中提供的hostname、port创建tcp连接,默认选项。
  • compression.type
    消息的压缩算法,目前可选值:none、gzip、snappy、lz4、zstd,默认不压缩,建议与Kafka服务器配置的一样,当然Kafka服务端可以配置的压缩类型为 producer,即采用与发送方配置的压缩类型。发送方与Broker 服务器采用相同的压缩类型,可有效避免在Broker服务端进行消息的压缩与解压缩,大大降低Broker的CPU使用压力。
  • client.id
    客户端ID,如果不设置默认为producer-递增,强烈建议设置该值,尽量包含ip,port,pid。
  • send.buffer.bytes
    网络通道(TCP)的发送缓存区大小,默认为128K。
  • receive.buffer.bytes
    网络通道(TCP)的接收缓存区大小,默认为32K。
  • reconnect.backoff.ms
    重新建立链接的等待时长,默认为50ms,属于底层网络参数,基本无需关注。
  • reconnect.backoff.max.ms
    重新建立链接的最大等待时长,默认为1s,连续两次对同一个连接建立重连,等待时间会在reconnect.backoff.ms的初始值上成指数级递增,但超过max后,将不再指数级递增。
  • key.serializer
    消息key的序列化策略,为org.apache.kafka.common.serialization接口的实现类。
  • value.serializer
    消息体的序列化策略
  • partitioner.class
    消息发送队列负载算法,其默 DefaultPartitioner,路由算法如下:
  • 如果指定了 key ,则使用 key 的 hashcode 与分区数取模。
  • 如果未指定 key,则轮询所有的分区。
  • interceptor.classes
    拦截器列表,kafka运行在消息真正发送到broker之前对消息进行拦截加工。
  • enable.idempotence
    是否开启发送端的幂等,这个机制后续会重点剖析其实现原理,默认为false。
  • transaction.timeout.ms
    事务协调器等待客户端的事务状态反馈的最大超时时间,默认为60s。
  • transactional.id
    事务id,用于在一个事务中唯一标识一个客户端。


2、工作原理相关参数


2.1 核心参数一览


工作机制相关参数,涉及到消息发送是如何工作的,本节首先将罗列参数,做简单说明,然后再给出运作图,进一步阐述其工作机制。


  • buffer.memory
    用于设置一个生产者(KafkaProducer)中缓存池的内存大小,默认为32M。
  • max.block.ms
    当消息发送者申请空闲内存时,如果可用内存不足的等待时长,默认为60s,如果在指定时间内未申请到内存,消息发送端会直接报TimeoutException,这个时间包含了发送端用于查找元信息的时间
  • retries
    重试次数,Kafka Sender线程从缓存区尝试发送到Broker端的重试次数,默认为Integer.MAX_VALUE,为了避免无限重试,只针对可恢复的异常,例如Leader选举中这种异常就是可恢复的,重试最终是能解决问题的。
  • acks
    用来定义消息“已提交”的条件(标准),就是 Broker 端向客户端承偌已提交的条件,可选值如下:
  • 0
    表示生产者不关心该条消息在 broker 端的处理结果,只要调用 KafkaProducer 的 send 方法返回后即认为成功,显然这种方式是最不安全的,因为 Broker 端可能压根都没有收到该条消息或存储失败。
  • all 或 -1
    表示消息不仅需要 Leader 节点已存储该消息,并且要求其副本(准确的来说是 ISR 中的节点)全部存储才认为已提交,才向客户端返回提交成功。这是最严格的持久化保障,当然性能也最低。
  • 1
    表示消息只需要写入 Leader 节点后就可以向客户端返回提交成功。
  • batch.size
    在消息发送端Kafka引入了批的概念,发送到服务端的消息通常不是一条一条发送,而是一批一批发送,该值用于设置每一个批次的内存大小,一个批次对应源码层级为ProducerBatch对象,默认为16K。
  • linger.ms
    该参数与batch.size配合使用。Kafka希望一个批次一个批次去发送到Broker,应用程序往KafkaProducer中发送一条消息,首先会进入到内部缓冲区,具体是会进入到某一个批次中(ProducerBatch),等待该批次堆满后一次发送到Broker,这样能提高消息的吞吐量,但其消息发送的延迟也会相应提高,试想一下,如果在某一个时间端,应用端发送到broker的消息太少,不足以填满一个批次,那岂不是消息一直无法发送到Broker端吗?
    为了解决该问题,linger.ms参数应运而生。它的作用是控制在缓存区中未积满时来控制消息发送线程的行为。如果linger.ms 设置为 0表示立即发送,如果设置为大于0,则消息发送线程会等待这个值后才会向broker发送。有点类似于 TCP 领域的 Nagle 算法
  • delivery.timeout.ms
    消息在客户端缓存中的过期时间,在Kafka的消息发送模型中,消息先进入到消息发送端的双端缓存队列中,然后单独一个线程将缓存区中的消息发送到Broker,该参数控制在双端队列中的过期时间,默认为120s,从进入双端队列开始计时,超过该值后会返回超时异常(TimeoutException)。
  • request.timeout.ms
    请求的超时时间,主要是Kafka消息发送线程(Sender)与Broker端的网络通讯的请求超时时间
  • max.request.size
    Send线程一次发送的最大字节数量,也就是Send线程向服务端一次消息发送请求的最大传输数据,默认为1M。
  • max.in.flight.requests.per.connection
    设置每一个客户端与服务端连接,在应用层一个通道的积压消息数量,默认为5,有点类似Netty用高低水位线控制发送缓冲区中积压的多少,避免内存溢出。


2.2 图解工作原理


上面的核心参数在表述上可能不够直观,接下来我想简单通过两张图阐述一下Kafka消息发送相关的核心原理。


首先,我们来看一下消息发送者相关的数据结构:

e95439e23f6a8dbf7a2d85dbff234cf2.png

Kafka的每一个消息发送者,也就是KafkaProducer对象内部会有一块缓存区,其总大小由buffer.memory指定,默认为32M,但内存的组织会按照topic+parition构建双端队列,队列中的每一个元素为一个ProducerBatch对象,表示一个消息发送批次,但发送线程将消息发送到Broker端时,一次可以包含多个批次。一次允许发送的消息总大小受max.request.size控制,默认为1M。


在了解了核心数据结构后,我们再看一下各个核心参数在消息发送的各个阶段是如何工作的。

7b6b32a257223293301a4fbc0c6dfae2.png


2.3 性能优化


从Kafka Producer 的工作原理来看,在客户端所谓的性能优化,其实就是延迟、吞吐率、数据完整性的一个权衡。在具体的实践中通常可以调整的参数主要如下:


  • acks 这个只能是根据业务的特点,对数据丢失的容忍度,通常该参数在实践过程中遇到性能瓶颈后,调整该参数的可能性几乎没有,因为需要牺牲数据的完整性,此举并不是一个好的方案
  • batch.size 与 linger.ms
    通常可以适当修改batch.size与linger.ms的值,特别是linger.ms值,牺牲一定的延时,方便更多数据进入到Batch,从而提高Sender线程一次发送的数据大小,提高带宽,显著提高吞吐率,但牺牲延时。当然如果是想提高响应延迟,则采取的手段则恰恰相反
  • buffer.memory、max.request.size
    如果需要进一步提高吞吐量,可以适当提高buffer.memory的大小,让客户端能缓存更多数据,并且调高max.request.size,进一步提高单次消息发送的消息量。


从工作原理图中要得到上述的方式并不难,但我们有没有办法正确的评估到底是需要调整batch.size,还是要调整uffer.memory呢?


相关文章
|
3月前
|
消息中间件 SQL 分布式计算
大数据-62 Kafka 高级特性 主题 kafka-topics相关操作参数 KafkaAdminClient 偏移量管理
大数据-62 Kafka 高级特性 主题 kafka-topics相关操作参数 KafkaAdminClient 偏移量管理
44 6
|
3月前
|
消息中间件 存储 负载均衡
大数据-60 Kafka 高级特性 消息消费01-消费组图例 心跳机制图例 附参数详解与建议值
大数据-60 Kafka 高级特性 消息消费01-消费组图例 心跳机制图例 附参数详解与建议值
74 3
|
8月前
|
消息中间件 关系型数据库 Kafka
实时计算 Flink版产品使用合集之想要加快消费 Kafka 数据的速度,该怎么配置参数
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
213 2
|
3月前
|
消息中间件 Java 大数据
Kafka ISR机制详解!
本文详细解析了Kafka的ISR(In-Sync Replicas)机制,阐述其工作原理及如何确保消息的高可靠性和高可用性。ISR动态维护与Leader同步的副本集,通过不同ACK确认机制(如acks=0、acks=1、acks=all),平衡可靠性和性能。此外,ISR机制支持故障转移,当Leader失效时,可从ISR中选取新的Leader。文章还包括实例分析,展示了ISR在不同场景下的变化,并讨论了其优缺点,帮助读者更好地理解和应用ISR机制。
105 0
Kafka ISR机制详解!
|
4月前
|
消息中间件 Kafka 测试技术
Kafka常用命令大全及kafka-console-consumer.sh及参数说明
该文章汇总了Kafka常用命令,包括集群管理、Topic操作、生产者与消费者的命令行工具使用方法等,适用于Kafka的日常运维和开发需求。
737 2
|
3月前
|
消息中间件 Java Kafka
Kafka ACK机制详解!
本文深入剖析了Kafka的ACK机制,涵盖其原理、源码分析及应用场景,并探讨了acks=0、acks=1和acks=all三种级别的优缺点。文中还介绍了ISR(同步副本)的工作原理及其维护机制,帮助读者理解如何在性能与可靠性之间找到最佳平衡。适合希望深入了解Kafka消息传递机制的开发者阅读。
252 0
|
5月前
|
消息中间件 负载均衡 Java
揭秘Kafka背后的秘密!Kafka 架构设计大曝光:深入剖析Kafka机制,带你一探究竟!
【8月更文挑战第24天】Apache Kafka是一款专为实时数据处理及流传输设计的高效率消息系统。其核心特性包括高吞吐量、低延迟及出色的可扩展性。Kafka采用分布式日志模型,支持数据分区与副本,确保数据可靠性和持久性。系统由Producer(消息生产者)、Consumer(消息消费者)及Broker(消息服务器)组成。Kafka支持消费者组,实现数据并行处理,提升整体性能。通过内置的故障恢复机制,即使部分节点失效,系统仍能保持稳定运行。提供的Java示例代码展示了如何使用Kafka进行消息的生产和消费,并演示了故障转移处理过程。
58 3
|
5月前
|
消息中间件 Java Kafka
如何在Kafka分布式环境中保证消息的顺序消费?深入剖析Kafka机制,带你一探究竟!
【8月更文挑战第24天】Apache Kafka是一款专为实时数据管道和流处理设计的分布式平台,以其高效的消息发布与订阅功能著称。在分布式环境中确保消息按序消费颇具挑战。本文首先介绍了Kafka通过Topic分区实现消息排序的基本机制,随后详细阐述了几种保证消息顺序性的策略,包括使用单分区Topic、消费者组搭配单分区消费、幂等性生产者以及事务支持等技术手段。最后,通过一个Java示例演示了如何利用Kafka消费者确保消息按序消费的具体实现过程。
183 3
|
5月前
|
消息中间件 Java 大数据
"深入理解Kafka单线程Consumer:核心参数配置、Java实现与实战指南"
【8月更文挑战第10天】在大数据领域,Apache Kafka以高吞吐和可扩展性成为主流数据流处理平台。Kafka的单线程Consumer因其实现简单且易于管理而在多种场景中受到欢迎。本文解析单线程Consumer的工作机制,强调其在错误处理和状态管理方面的优势,并通过详细参数说明及示例代码展示如何有效地使用KafkaConsumer类。了解这些内容将帮助开发者优化实时数据处理系统的性能与可靠性。
112 7
|
5月前
|
消息中间件 负载均衡 Java
"深入Kafka核心:探索高效灵活的Consumer机制,以Java示例展示数据流的优雅消费之道"
【8月更文挑战第10天】在大数据领域,Apache Kafka凭借其出色的性能成为消息传递与流处理的首选工具。Kafka Consumer作为关键组件,负责优雅地从集群中提取并处理数据。它支持消息的负载均衡与容错,通过Consumer Group实现消息的水平扩展。下面通过一个Java示例展示如何启动Consumer并消费数据,同时体现了Kafka Consumer设计的灵活性与高效性,使其成为复杂消费场景的理想选择。
141 4

热门文章

最新文章