内存问题
本篇文章介绍Kafka处理大文件出现内存溢出 java.lang.OutOfMemoryError: Direct buffer memory,主要内容包括基础应用、实用技巧、原理机制等方面,希望对大家有所帮助。
bin目录下的kafka-run-class.sh中须要配置的参数
kafka是由scala和java编写的。因此须要调一些jvm的参数。java的内存分为堆内内存和堆外内存。
JVM参数系列
- -Xms2048m, -Xmx2048m,设置的是堆内内存。
- -Xms是初始可用的最大堆内内存。-Xmx设置的是最大可用的堆内内存。两者设置成同样是由于效率问题,可让jvm少作一些运算。若是这两个参数设置的过小,kafka会出现java.lang.OutOfMemoryError: Java heap space的错误。
- -XX:MaxDirectMemorySize=8192m。这个参数配置的过小,kafka会出现java.lang.OutOfMemoryError: Direct buffer memory的错误。 由于kafka的网络IO使用了java的nio中的DirectMemory的方式,而这个申请的是堆外内存。
producer端
Kafka设计的初衷是迅速处理短小的消息,一般10K大小的消息吞吐性能最好。但有时候,我们需要处理更大的消息,比如XML文档或JSON内容,一个消息差不多有10-100M,这种情况下,Kakfa应该如何处理?
配置建议:
- (传输资源文件的位置-再消费的时候进行获取真正的资源)首选的方法是不直接传送这些大的数据。如果有共享存储,如NAS, HDFS, S3等,可以把这些大的文件存放到共享存储,然后使用Kafka来传送文件的位置信息。
- (传输资源文件的进行数据流的拆分,分批次发送)将大的消息数据切片或切块,在生产端将数据切片为10K大小,使用分区主键确保一个大消息的所有部分会被发送到同一个kafka分区(这样每一部分的拆分顺序得以保留),如此以来,当消费端使用时会将这些部分重新还原为原始的消息。
- (传输资源文件的进行数据流压缩,降低传输量)Kafka的生产端可以压缩消息,如果原始消息是XML,当通过压缩之后,消息可能会变得不那么大。在生产端的配置参数中使用compression.codec和commpressed.topics可以开启压缩功能,压缩算法可以使用GZip或Snappy。
不过如果上述方法都不是你需要的,而你最终还是希望传送大的消息,那么,则可以在kafka中设置下面一些参数:
broker端
配置文件须要配置的参数
- message.max.bytes (默认:1000000) : kafka会接收单个消息size的最大限制, 默认为1M左右。若是producer发送比这个大的消息,kafka默认会丢掉。producer能够从callback函数中得到错误码:10。设置了之后控制了broker能接收消息的最大字节数,这个值应该比消费端的fetch.message.max.bytes更小才对,否则broker就会因为消费端无法使用这个消息而挂起。
- log.segment.bytes(默认: 1GB) : kafka数据文件的大小。默认为1G, 须要确保此值大于一个消息的最大大小。一般说来使用默认值即可(一般一个消息很难大于1G,因为这是一个消息系统,而不是文件系统)。
- replica.fetch.max.bytes (默认: 1MB) : broker可复制的消息的最大字节数, 默认为1M。这个值比message.max.bytes大,不然broker会接收此消息,但没法将此消息复制出去,从而形成数据丢失。
注意:message.max.bytes,要设置大于发送最大数据的大小,不然会produce失败。
consumer端
broker为什么会返回总量为1000大小的数据呢?
librdkafka有这样一个参数:fetch.max.bytes, 它有这样的描述:
Maximum amount of data the broker shall return for a Fetch request. Messages are fetched in batches by the consumer and if the first message batch in the first non-empty partition of the Fetch request is larger than this value, then the message batch will still be returned to ensure the consumer can make progress. The maximum message batch size accepted by the broker is defined via message.max.bytes (broker config) or max.message.bytes (broker topic config). fetch.max.bytes is automatically adjusted upwards to be at least message.max.bytes (consumer config).
配置的参数
receive.message.max.bytes(默认 1MB) : kafka协议response的最大长度,也是消费者能读取的最大消息。这个值应该大于或等于message.max.bytes,不然消费会失败。
- 较低版本的librdkafka的
receive.message.max.bytes
只支持1000到1000000000。 - 最新版本的能够支持到2147483647。
否则会出现 “Receive failed: Invalid message size 1047207987 (0..1000000000): increase receive.message.max.bytes”这样的错误。
- 使用此参数的时候还须要注意一个问题,在broker端设置的message.max.bytes为1000,consumer端设置的receive.message.max.bytes也为1000,可是除了数据,response还有协议相关字段,这时候整个response的大小就会超过1000。
注意:一定要选择kafka来传送大的消息,还有些事项需要考虑。要传送大的消息,不是当出现问题之后再来考虑如何解决,而是在一开始设计的时候,就要考虑到大消息对集群和主题的影响。
性能影响
- 根据前面提到的性能测试,kafka在消息为10K时吞吐量达到最大,更大的消息会降低吞吐量,在设计集群的容量时,尤其要考虑这点。
可用的内存和分区数造成OOM的场景
- Brokers会为每个分区分配replica.fetch.max.bytes参数指定的内存空间,假设replica.fetch.max.bytes=1M,且有1000个分区,则需要差不多1G的内存,确保分区数 * 最大的消息不会超过服务器的内存,否则会报OOM错误。
- 消费端的fetch.message.max.bytes指定了最大消息需要的内存空间,同样,分区数 * 最大需要内存空间不能超过服务器的内存。所以,如果你有大的消息要传送,则在内存一定的情况下,只能使用较少的分区数或者使用更大内存的服务器。
垃圾回收
到现在为止,我在kafka的使用中还没发现过此问题,但这应该是一个需要考虑的潜在问题。更大的消息会让GC的时间更长(因为broker需要分配更大的块),随时关注GC的日志和服务器的日志信息。如果长时间的GC导致kafka丢失了zookeeper的会话,则需要配置zookeeper.session.timeout.ms参数为更大的超时时间。
推荐使用1.7出来的G1垃圾回收机制代替CMS。
- G1>CMS的优势
- G1在压缩空间方面有优势
- G1通过将内存空间分成区域(Region)的方式避免内存碎片问题
- Eden, Survivor, Old区不再固定、在内存使用效率上来说更灵活
- G1可以通过设置预期停顿时间(Pause Time)来控制垃圾收集时间避免应用雪崩现象
- G1在回收内存后会马上同时做合并空闲内存的工作、而CMS默认是在STW(stop the world)
- 的时候做
- G1会在Young GC中使用、而CMS只能在O区使用
- G1适合的场景:
- 服务端多核CPU、JVM内存占用较大的应用(至少大于4G)
- 应用在运行过程中会产生大量内存碎片、需要经常压缩空间
- 想要更可控、可预期的GC停顿周期;防止高并发下应用雪崩现象
- 我们的kafka的kafka-run-class.sh 中已经包含了
KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true" 复制代码
所以只需要修改kafka-server-start.sh。这里面将内存设置为4G,因为当前kafka的堆内存使用了800多M,1个G的内存不够用。但是分配太多,也没什么用,还容易影响到pagecache,降低效率:
export KAFKA_HEAP_OPTS="-Xms4g -Xmx4g" 复制代码
内存方面须要考虑的问题
Brokers allocate a buffer the size of replica.fetch.max.bytes for each partition they replicate. If replica.fetch.max.bytes is set to 1 MiB, and you have 1000 partitions, about 1 GiB of RAM is required. Ensure that the number of partitions multiplied by the size of the largest message does not exceed available memory.
The same consideration applies for the consumer fetch.message.max.bytes setting. Ensure that you have enough memory for the largest message for each partition the consumer replicates. With larger messages, you might need to use fewer partitions or provide more RAM.
若是一个消息须要的处理时间很长,broker会认为consumer已经挂了,把partition分配给其余的consumer,而后循环往复, 这条record永远无法消费。方法是增长max.poll.interval.ms 参数。
提高性能调优能力
Don't fear the filesystem!中提到kafka使用page cache进行文件存储。计算机的内存分为虚拟内存和物理内存。物理内存是真实的内存,虚拟内存是用磁盘来代替内存。并通过swap机制实现磁盘到物理内存的加载和替换,这里面用到的磁盘我们称为swap磁盘。
pageCache机制
在写文件的时候,Linux首先将数据写入没有被使用的内存中,这些内存被叫做内存页(page cache)。然后读的时候,Linux会优先从page cache中查找,如果找不到就会从硬盘中查找。
当物理内存使用达到一定的比例后,Linux就会使用进行swap,使用磁盘作为虚拟内存。通过cat /proc/sys/vm/swappiness可以看到swap参数。这个参数表示虚拟内存中swap磁盘占了多少百分比。0表示最大限度的使用内存,100表示尽量使用swap磁盘。
系统默认的参数是60,当物理内存使用率达到40%,就会频繁进行swap,影响系统性能,推荐将vm.swappiness 设置为较低的值1。最终我设置为10,因为我们的机器的内存还是比较小的,只有40G,设置的太小,可能会影响到虚拟内存的使用吧。
脏文件
当大量的持续不断的数据写入cache内存中后,这些数据就被称为脏数据。需要尽快将这些脏数据flush到磁盘中,释放内存。
这里需要关注两个参数:
- vm.dirty_background_ratio:这个参数指定了当文件系统缓存脏页数量达到系统内存百分之多少时(如5%)就会触发pdflush/flush/kdmflush等后台回写进程运行,将一定缓存的脏页异步地刷入外存;
- vm.dirty_ratio:这个参数则指定了当文件系统缓存脏页数量达到系统内存百分之多少时(如10%),系统不得不开始处理缓存脏页(因为此时脏页数量已经比较多,为了避免数据丢失需要将一定脏页刷入外存);在此过程中很多应用进程可能会因为系统转而处理文件IO而阻塞。
这里推荐将vm.dirty_background_ratio设置为5, vm.dirty_ratio有的人设置为10,但是我觉得太小了,还是默认的就可以了。
网络
kafka集群对网络的要求比较高,可以将socket的缓冲设置为原来的两倍。
- net.core.wmem_default 设置为128K
- net.core.rmem_default 设置为128K