上篇文章说了,kafka需要先构造properties指定server和kafka集群,key 和 value用stringSerialize序列化,通过producer发送send,需要records参数指定topic和value,之后发送消息,有异步和同步,最后关闭。
一、producer参数
除了前面说的三个servers,和key.serializer和value.serializer外,java版本还有很多其他重要参数。
Acks:
Acks参数控制producer生产消息的持久化(durability)。对于任何producer而言,kafka在乎的是“已提交”消息持久化,一旦消息被成功提交,那么只有任何一个保存该消息的副本存在数据,则不会丢失。经常碰到用户抱怨kafka的producer消息丢失,其实这里概念混淆,对于这些消失的消息并没有成功写入kafka,换句话说,他们并没有成功提交。当然,producer的api提供了回调机制解决发送失败的请求数据。
Producer发送消息给kafka集群时,这条消息会指定topic分区leader所在的broker,producer等待从该leader broker返回消息写入结果,(并不是无限等待,有超时时间),以确定消息发送成功。当收到消息后,producer才可以继续发送消息,kafka和关系型数据库的事务类型,永远不会消费未提交的数据。
显然,leader broker何时发送结果返回给producer,这个关系到整个kafka的吞吐量,所以这个参数就是为了控制这件事,acks有三个参数,0、1、-1(all)。
Acks =0:设置成0 代表producer完全不理睬leader broker端处理结果,此时producer发送一条之后立马发送下一条消息,不需要等待leader broker返回结果,因此这种情况下的producer回调也会失去作用,所以acks的时候,并不会保证消息发送成功,但这种情况下 吞吐量是最高的。
Acks=-1或者all:表示发送消息后,leader broker不仅会把消息写入本地日志,同时也会把ISR中其他副本都成功写入他们各自的本地日志,才发送消息给producer,显然只要在ISR中有一个副本存处于存货状态,这时候消息就肯定不会丢失,达到最高持久化。
Acks=1:这是一种折中方式,当leader broker把消息写入本地日志后,则会直接返回给producer,无须等待ISR中的其他副本写入该消息,所以只要leader broker没有宕机,数据就不会丢失。这种方法即可以保证消息持久性,同时也可以保证吞吐量。
不光在java配置文件可以设置,在properties也可以设置,值得注意的是,如果在properties里设置,该参数是字符串,否则会报错。
Buffer.memory:
该参数指定是producer端用于缓存消息的缓冲大小,单位是字节,默认是33554432,即是32MB。如上所述,采用异步发送消息的设计架构,java版本的producer会在启动的时候 先创建一块内存缓存区用于保存待发送的消息(mysql也是在服务器启动的时候会创建buffer pool缓存区),然后由另一个专属线程再去负责从缓冲区真正的执行发送,这部分空间则由buffer.memory参数指定,若producer向缓冲区写消息的速度超过了专属I/O线程发送消息的速度,那么必然会内存里数据会不断增大,此时producer会停止手头上的工作等待I/O线程执行,若一段时间还未执行完毕,则producer则会抛出异常期望用户介入。
虽然producer在工作过程中会用到很多部分内存,但我们几乎可用认定该参数指定的内存大小就是producer程序使用的内存大小。若producer程序要发送很多分区,那么就需要仔细设置该参数,防止内存过小降低了整个producer的吞吐量。
Compression.type:
这个参数设置producer端是否压缩消息,默认值是none,即不压缩消息,和任何系统相同的是,kafka的producer端引入压缩后可以显著降低I/O网络传输开销,从而提升整个吞吐量,但也会增加producer端机器cpu的开销。另外,如果broker端压缩参数设置的与producer不同时候,broker端写入消息的时候额外占用cpu资源对消息进行解压缩-重压缩操作。
目前kafka支持三种压缩方法,GZIP/snappy和LZ4,根据实际应用场景来看,producer结合LZ4性能最好。对于kafka1.0.0版本而言,参数最好设置为LZ4。Kafka在2016年8月份的时候,开发了Zstander来压缩producer消息,相信这种会在后续版本中效率是最高的。
Retries
Broker在处理写入请求的时候可能因为瞬时故障(比如kafka的leader选举或者网络抖动)导致消息发送失败。这种故障通常可以自行恢复,如果把这种错误封装进入回调函数,producer也是 自己处理重新发送,所以与其这样,还不如kafka内部自己通过这个参数来自身调用,当然前提是要设置reties参数,0以上才会重试。不过设置这个参数时候,有两点需要注意:
1、重试可能造成消息重复发送:比如由于顺时的网络抖动使得broker端已经成功写入消息,但是没有发送给producer,这时候producer会认为发送消息失败,这就需要consumer方式重复消费。但kafka0.11.0.0版本开始支持“精准一次”处理语义,从而在设计上避免了此类问题。
2、重试可能造成乱序:当producer将多个消息发送,在缓存中时候,如果发生了消息重试,可能造成消息流乱序。为了避免乱序,java版本producer提供了max.in.flight.request.per.connection参数,一旦吧该参数设置成1,表示producer在某一时刻只能发送一次。
另外两次重试时间会有间隔,防止频繁调用对系统带来的冲击,这段时间也可以通过retry.backoff.ms来指定,默认是100ms。由于leader选举换届是最常见的瞬时错误,建议通过测试来计算平均leader选举时间,根据改时间来设定reties和retry.backff.ms。
Batch.size
Batch.size是producer重要参数之一,他对于调优producer吞吐量和延迟性有着重要意义,前面说过,producer会将发往同一个分区多条消息封装进一个batch中,当batch满了的时候,当batch满了,会发送所有消息,不过不是总等待batch满了才发送,只要batch在空闲时间就会发送。
所以当batch设置过小的时候,一次请求发送的数据也少,吞吐量则会比较低。单若一个batch非常巨大的时候,那么内存也会带来更大的压力,因为 不管是否能够填充满,producer都会为该batch分配固定大小的内存,因此batch.size参数设置其实是一种时间与空间权衡的体现。默认值是16384,即16kb。这是一个很保守的数字,在实际生活中设置,通常producer吞吐量会有相应提升。
Linger.ms
上面说到batch.size时,我们提到了消息没有被填满也可以batch发送的情况,这是为什么呢,难道不是满了发送才好嘛,实际这是一种权衡,即吞吐量 和 延时之间的权衡。当前参数就是控制消息发送延迟的权衡,默认是0。大多数情况下这是合理的,不必等待是否填满,直接发送消息就好,不过这样会拉低吞吐量,可以设置成100ms。
Max.request.size
改参数在官方文档说的是,控制producer参数发送请求的大小,实际上是控制producer端发送参数最大消息。由于请求有一些头部信息,所以包含一条消息大小要比消息本体大,可以理解他当做消息最大尺寸安全。如果producer要发送消息的尺寸很大,那么这个参数就要被设置的,默认是1048576字节大小,通常无法满足企业级消息大小要求。可以后面加个0设置。
Request.timeout.ms
当producer发送请求消息给broker后,broker需要在规定时间范围内将结果返回给producer,这段时间则由改参数控制,默认是30秒。也就是说30秒内如果broker没有返回消息给producer,则会在回调函数抛出超时异常。
默认设置通常情况足够的,但是遇到发送负载很大的数据,这时候就需要考虑调整改参数,调高。