producer参数---Kafka从入门到精通(七)

简介: producer参数---Kafka从入门到精通(七)

上篇文章说了,kafka需要先构造properties指定serverkafka集群,key valuestringSerialize序列化,通过producer发送send,需要records参数指定topicvalue,之后发送消息,有异步和同步,最后关闭。

构造producer---Kafka从入门到精通(六)


一、producer参数


除了前面说的三个servers,和key.serializervalue.serializer外,java版本还有很多其他重要参数。


Acks

Acks参数控制producer生产消息的持久化(durability)。对于任何producer而言,kafka在乎的是“已提交”消息持久化,一旦消息被成功提交,那么只有任何一个保存该消息的副本存在数据,则不会丢失。经常碰到用户抱怨kafkaproducer消息丢失,其实这里概念混淆,对于这些消失的消息并没有成功写入kafka,换句话说,他们并没有成功提交。当然,producerapi提供了回调机制解决发送失败的请求数据。

Producer发送消息给kafka集群时,这条消息会指定topic分区leader所在的brokerproducer等待从该leader broker返回消息写入结果,(并不是无限等待,有超时时间),以确定消息发送成功。当收到消息后,producer才可以继续发送消息,kafka和关系型数据库的事务类型,永远不会消费未提交的数据。

显然,leader broker何时发送结果返回给producer,这个关系到整个kafka的吞吐量,所以这个参数就是为了控制这件事,acks有三个参数,01-1all)。

Acks =0:设置成0 代表producer完全不理睬leader broker端处理结果,此时producer发送一条之后立马发送下一条消息,不需要等待leader broker返回结果,因此这种情况下的producer回调也会失去作用,所以acks的时候,并不会保证消息发送成功,但这种情况下 吞吐量是最高的。

Acks=-1或者all:表示发送消息后,leader broker不仅会把消息写入本地日志,同时也会把ISR中其他副本都成功写入他们各自的本地日志,才发送消息给producer,显然只要在ISR中有一个副本存处于存货状态,这时候消息就肯定不会丢失,达到最高持久化。

Acks=1:这是一种折中方式,当leader broker把消息写入本地日志后,则会直接返回给producer,无须等待ISR中的其他副本写入该消息,所以只要leader broker没有宕机,数据就不会丢失。这种方法即可以保证消息持久性,同时也可以保证吞吐量。


不光在java配置文件可以设置,在properties也可以设置,值得注意的是,如果在properties里设置,该参数是字符串,否则会报错。


Buffer.memory

该参数指定是producer端用于缓存消息的缓冲大小,单位是字节,默认是33554432,即是32MB。如上所述,采用异步发送消息的设计架构,java版本的producer会在启动的时候 先创建一块内存缓存区用于保存待发送的消息(mysql也是在服务器启动的时候会创建buffer pool缓存区),然后由另一个专属线程再去负责从缓冲区真正的执行发送,这部分空间则由buffer.memory参数指定,若producer向缓冲区写消息的速度超过了专属I/O线程发送消息的速度,那么必然会内存里数据会不断增大,此时producer会停止手头上的工作等待I/O线程执行,若一段时间还未执行完毕,则producer则会抛出异常期望用户介入。

虽然producer在工作过程中会用到很多部分内存,但我们几乎可用认定该参数指定的内存大小就是producer程序使用的内存大小。若producer程序要发送很多分区,那么就需要仔细设置该参数,防止内存过小降低了整个producer的吞吐量。


Compression.type

这个参数设置producer端是否压缩消息,默认值是none,即不压缩消息,和任何系统相同的是,kafkaproducer端引入压缩后可以显著降低I/O网络传输开销,从而提升整个吞吐量,但也会增加producer端机器cpu的开销。另外,如果broker端压缩参数设置的与producer不同时候,broker端写入消息的时候额外占用cpu资源对消息进行解压缩-重压缩操作。

目前kafka支持三种压缩方法,GZIP/snappyLZ4,根据实际应用场景来看,producer结合LZ4性能最好。对于kafka1.0.0版本而言,参数最好设置为LZ4Kafka20168月份的时候,开发了Zstander来压缩producer消息,相信这种会在后续版本中效率是最高的。


Retries

Broker在处理写入请求的时候可能因为瞬时故障(比如kafkaleader选举或者网络抖动)导致消息发送失败。这种故障通常可以自行恢复,如果把这种错误封装进入回调函数,producer也是 自己处理重新发送,所以与其这样,还不如kafka内部自己通过这个参数来自身调用,当然前提是要设置reties参数,0以上才会重试。不过设置这个参数时候,有两点需要注意:

1、重试可能造成消息重复发送:比如由于顺时的网络抖动使得broker端已经成功写入消息,但是没有发送给producer,这时候producer会认为发送消息失败,这就需要consumer方式重复消费。但kafka0.11.0.0版本开始支持“精准一次”处理语义,从而在设计上避免了此类问题。

2、重试可能造成乱序:当producer将多个消息发送,在缓存中时候,如果发生了消息重试,可能造成消息流乱序。为了避免乱序,java版本producer提供了max.in.flight.request.per.connection参数,一旦吧该参数设置成1,表示producer在某一时刻只能发送一次。

另外两次重试时间会有间隔,防止频繁调用对系统带来的冲击,这段时间也可以通过retry.backoff.ms来指定,默认是100ms。由于leader选举换届是最常见的瞬时错误,建议通过测试来计算平均leader选举时间,根据改时间来设定retiesretry.backff.ms


Batch.size

Batch.sizeproducer重要参数之一,他对于调优producer吞吐量和延迟性有着重要意义,前面说过,producer会将发往同一个分区多条消息封装进一个batch中,当batch满了的时候,当batch满了,会发送所有消息,不过不是总等待batch满了才发送,只要batch在空闲时间就会发送。

所以当batch设置过小的时候,一次请求发送的数据也少,吞吐量则会比较低。单若一个batch非常巨大的时候,那么内存也会带来更大的压力,因为 不管是否能够填充满,producer都会为该batch分配固定大小的内存,因此batch.size参数设置其实是一种时间与空间权衡的体现。默认值是16384,即16kb。这是一个很保守的数字,在实际生活中设置,通常producer吞吐量会有相应提升。


Linger.ms

上面说到batch.size时,我们提到了消息没有被填满也可以batch发送的情况,这是为什么呢,难道不是满了发送才好嘛,实际这是一种权衡,即吞吐量 和 延时之间的权衡。当前参数就是控制消息发送延迟的权衡,默认是0。大多数情况下这是合理的,不必等待是否填满,直接发送消息就好,不过这样会拉低吞吐量,可以设置成100ms


Max.request.size

改参数在官方文档说的是,控制producer参数发送请求的大小,实际上是控制producer端发送参数最大消息。由于请求有一些头部信息,所以包含一条消息大小要比消息本体大,可以理解他当做消息最大尺寸安全。如果producer要发送消息的尺寸很大,那么这个参数就要被设置的,默认是1048576字节大小,通常无法满足企业级消息大小要求。可以后面加个0设置。


Request.timeout.ms

producer发送请求消息给broker后,broker需要在规定时间范围内将结果返回给producer,这段时间则由改参数控制,默认是30秒。也就是说30秒内如果broker没有返回消息给producer,则会在回调函数抛出超时异常。

默认设置通常情况足够的,但是遇到发送负载很大的数据,这时候就需要考虑调整改参数,调高。

相关文章
|
7月前
|
消息中间件 存储 NoSQL
kafka整合springboot以及核心参数的使用
kafka整合springboot以及核心参数的使用
237 0
|
6月前
|
消息中间件 缓存 Kafka
连Producer端的主线程模块运行原理都不清楚,就敢说自己精通Kafka?
连Producer端的主线程模块运行原理都不清楚,就敢说自己精通Kafka?
31 0
|
2月前
|
消息中间件 Kafka
Kafka - 3.x Producer 生产者最佳实践
Kafka - 3.x Producer 生产者最佳实践
49 0
|
3月前
|
消息中间件 缓存 Kafka
kafka源码解析——第一篇:producer
kafka源码解析——第一篇:producer
34 0
|
3月前
|
消息中间件 Kafka Apache
Kafka 架构深度解析:生产者(Producer)和消费者(Consumer)
Apache Kafka 作为分布式流处理平台,其架构中的生产者和消费者是核心组件,负责实现高效的消息生产和消费。本文将深入剖析 Kafka 架构中生产者和消费者的工作原理、核心概念以及高级功能。
|
3月前
|
消息中间件 Kafka 流计算
在Flink我从holo读取数据,数据往kafka 写,好像差12个小时, 有没有参数哪里可以配置 ?
在Flink我从holo读取数据,数据往kafka 写,好像差12个小时, 有没有参数哪里可以配置 ?
25 1
|
6月前
|
消息中间件 存储 Kafka
(四)kafka从入门到精通之安装教程
Kafka是一个高性能、低延迟、分布式的分布式数据库,可以在分布式环境中实现数据的实时同步和分发。Zookeeper是一种开源的分布式数据存储系统,它可以在分布式环境中存储和管理数据库中的数据。它的主要作用是实现数据的实时同步和分发,可以用于实现分布式数据库、分布式文件系统、分布式日志系统等。Zookeeper的设计目标是高可用性、高性能、低延迟,它支持多种客户端协议,包括TCP和HTTP,可以方便地与其他分布式系统进行集成。
82 0
|
6月前
|
消息中间件 传感器 Kafka
(三)kafka从入门到精通之使用场景
Kafka 是一种流处理平台,主要用于处理大量数据流,如实时事件、日志文件和传感器数据等。Kafka的目的是实现高吞吐量、低延迟和高可用性的数据处理。Kafka提供了一个高度可扩展的架构,可以轻松地添加和删除节点,并且能够处理数百亿条消息/分区。Kafka的消息可以容错,即使某个节点失败,消息也会在集群中的其他节点上得到处理。总的来说,Kafka 是一个非常强大的数据处理平台,可以用于实时数据处理、日志文件处理、传感器数据处理和流处理等场景。
42 0
|
6月前
|
消息中间件 缓存 Java
聊聊 Kafka: Producer 的网络模型
聊聊 Kafka: Producer 的网络模型
|
6月前
|
消息中间件 缓存 Java
聊聊 Kafka:Producer Metadata 读取与更新机制
聊聊 Kafka:Producer Metadata 读取与更新机制