【夯实Kafka实战性能调优技能】消息队列服务端出现内存溢出OOM以及相关性能调优实战分析

简介: 【夯实Kafka实战性能调优技能】消息队列服务端出现内存溢出OOM以及相关性能调优实战分析

内存问题


本篇文章介绍Kafka处理大文件出现内存溢出 java.lang.OutOfMemoryError: Direct buffer memory,主要内容包括基础应用、实用技巧、原理机制等方面,希望对大家有所帮助。



bin目录下的kafka-run-class.sh中须要配置的参数


kafka是由scala和java编写的。因此须要调一些jvm的参数。java的内存分为堆内内存和堆外内存。



JVM参数系列


  • -Xms2048m, -Xmx2048m,设置的是堆内内存。


  • -Xms是初始可用的最大堆内内存。-Xmx设置的是最大可用的堆内内存。两者设置成同样是由于效率问题,可让jvm少作一些运算。若是这两个参数设置的过小,kafka会出现java.lang.OutOfMemoryError: Java heap space的错误。


  • -XX:MaxDirectMemorySize=8192m。这个参数配置的过小,kafka会出现java.lang.OutOfMemoryError: Direct buffer memory的错误。 由于kafka的网络IO使用了java的nio中的DirectMemory的方式,而这个申请的是堆外内存。




producer端


Kafka设计的初衷是迅速处理短小的消息,一般10K大小的消息吞吐性能最好。但有时候,我们需要处理更大的消息,比如XML文档或JSON内容,一个消息差不多有10-100M,这种情况下,Kakfa应该如何处理?



配置建议:


  1. (传输资源文件的位置-再消费的时候进行获取真正的资源)首选的方法是不直接传送这些大的数据。如果有共享存储,如NAS, HDFS, S3等,可以把这些大的文件存放到共享存储,然后使用Kafka来传送文件的位置信息。


  1. (传输资源文件的进行数据流的拆分,分批次发送)将大的消息数据切片或切块,在生产端将数据切片为10K大小,使用分区主键确保一个大消息的所有部分会被发送到同一个kafka分区(这样每一部分的拆分顺序得以保留),如此以来,当消费端使用时会将这些部分重新还原为原始的消息。
  2. (传输资源文件的进行数据流压缩,降低传输量)Kafka的生产端可以压缩消息,如果原始消息是XML,当通过压缩之后,消息可能会变得不那么大。在生产端的配置参数中使用compression.codec和commpressed.topics可以开启压缩功能,压缩算法可以使用GZip或Snappy。



不过如果上述方法都不是你需要的,而你最终还是希望传送大的消息,那么,则可以在kafka中设置下面一些参数:


broker端


配置文件须要配置的参数


  • message.max.bytes (默认:1000000) : kafka会接收单个消息size的最大限制, 默认为1M左右。若是producer发送比这个大的消息,kafka默认会丢掉。producer能够从callback函数中得到错误码:10。设置了之后控制了broker能接收消息的最大字节数,这个值应该比消费端的fetch.message.max.bytes更小才对,否则broker就会因为消费端无法使用这个消息而挂起。


  • log.segment.bytes(默认: 1GB)  : kafka数据文件的大小。默认为1G, 须要确保此值大于一个消息的最大大小。一般说来使用默认值即可(一般一个消息很难大于1G,因为这是一个消息系统,而不是文件系统)。


  • replica.fetch.max.bytes (默认: 1MB) : broker可复制的消息的最大字节数, 默认为1M。这个值比message.max.bytes大,不然broker会接收此消息,但没法将此消息复制出去,从而形成数据丢失。


注意:message.max.bytes,要设置大于发送最大数据的大小,不然会produce失败




consumer端


broker为什么会返回总量为1000大小的数据呢?


librdkafka有这样一个参数:fetch.max.bytes, 它有这样的描述:


Maximum amount of data the broker shall return for a Fetch request. Messages are fetched in batches by the consumer and if the first message batch in the first non-empty partition of the Fetch request is larger than this value, then the message batch will still be returned to ensure the consumer can make progress. The maximum message batch size accepted by the broker is defined via message.max.bytes (broker config) or max.message.bytes (broker topic config). fetch.max.bytes is automatically adjusted upwards to be at least message.max.bytes (consumer config).



配置的参数


receive.message.max.bytes(默认 1MB)  : kafka协议response的最大长度,也是消费者能读取的最大消息。这个值应该大于或等于message.max.bytes,不然消费会失败。


  • 较低版本的librdkafka的receive.message.max.bytes只支持1000到1000000000。
  • 最新版本的能够支持到2147483647。


否则会出现 “Receive failed: Invalid message size 1047207987 (0..1000000000): increase receive.message.max.bytes”这样的错误。


  • 使用此参数的时候还须要注意一个问题,在broker端设置的message.max.bytes为1000,consumer端设置的receive.message.max.bytes也为1000,可是除了数据,response还有协议相关字段,这时候整个response的大小就会超过1000。


注意:一定要选择kafka来传送大的消息,还有些事项需要考虑。要传送大的消息,不是当出现问题之后再来考虑如何解决,而是在一开始设计的时候,就要考虑到大消息对集群和主题的影响。



性能影响


  • 根据前面提到的性能测试,kafka在消息为10K时吞吐量达到最大,更大的消息会降低吞吐量,在设计集群的容量时,尤其要考虑这点


可用的内存和分区数造成OOM的场景


  • Brokers会为每个分区分配replica.fetch.max.bytes参数指定的内存空间,假设replica.fetch.max.bytes=1M,且有1000个分区,则需要差不多1G的内存,确保分区数 * 最大的消息不会超过服务器的内存,否则会报OOM错误。


  • 消费端的fetch.message.max.bytes指定了最大消息需要的内存空间,同样,分区数 * 最大需要内存空间不能超过服务器的内存。所以,如果你有大的消息要传送,则在内存一定的情况下,只能使用较少的分区数或者使用更大内存的服务器。



垃圾回收


到现在为止,我在kafka的使用中还没发现过此问题,但这应该是一个需要考虑的潜在问题。更大的消息会让GC的时间更长(因为broker需要分配更大的块),随时关注GC的日志和服务器的日志信息。如果长时间的GC导致kafka丢失了zookeeper的会话,则需要配置zookeeper.session.timeout.ms参数为更大的超时时间。


推荐使用1.7出来的G1垃圾回收机制代替CMS。



  • G1>CMS的优势


  • G1在压缩空间方面有优势


  • G1通过将内存空间分成区域(Region)的方式避免内存碎片问题


  • Eden, Survivor, Old区不再固定、在内存使用效率上来说更灵活


  • G1可以通过设置预期停顿时间(Pause Time)来控制垃圾收集时间避免应用雪崩现象


  • G1在回收内存后会马上同时做合并空闲内存的工作、而CMS默认是在STW(stop the world)
  • 的时候做


  • G1会在Young GC中使用、而CMS只能在O区使用



  • G1适合的场景:

  • 服务端多核CPU、JVM内存占用较大的应用(至少大于4G)


  • 应用在运行过程中会产生大量内存碎片、需要经常压缩空间


  • 想要更可控、可预期的GC停顿周期;防止高并发下应用雪崩现象


  • 我们的kafka的kafka-run-class.sh 中已经包含了
KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true"
复制代码


所以只需要修改kafka-server-start.sh。这里面将内存设置为4G,因为当前kafka的堆内存使用了800多M,1个G的内存不够用。但是分配太多,也没什么用,还容易影响到pagecache,降低效率:

export KAFKA_HEAP_OPTS="-Xms4g -Xmx4g"
复制代码




内存方面须要考虑的问题


Brokers allocate a buffer the size of replica.fetch.max.bytes for each partition they replicate. If replica.fetch.max.bytes is set to 1 MiB, and you have 1000 partitions, about 1 GiB of RAM is required. Ensure that the number of partitions multiplied by the size of the largest message does not exceed available memory.

The same consideration applies for the consumer fetch.message.max.bytes setting. Ensure that you have enough memory for the largest message for each partition the consumer replicates. With larger messages, you might need to use fewer partitions or provide more RAM.


若是一个消息须要的处理时间很长,broker会认为consumer已经挂了,把partition分配给其余的consumer,而后循环往复, 这条record永远无法消费。方法是增长max.poll.interval.ms 参数。




提高性能调优能力


Don't fear the filesystem!中提到kafka使用page cache进行文件存储。计算机的内存分为虚拟内存和物理内存。物理内存是真实的内存,虚拟内存是用磁盘来代替内存。并通过swap机制实现磁盘到物理内存的加载和替换,这里面用到的磁盘我们称为swap磁盘。


pageCache机制


在写文件的时候,Linux首先将数据写入没有被使用的内存中,这些内存被叫做内存页(page cache)。然后读的时候,Linux会优先从page cache中查找,如果找不到就会从硬盘中查找。


当物理内存使用达到一定的比例后,Linux就会使用进行swap,使用磁盘作为虚拟内存。通过cat /proc/sys/vm/swappiness可以看到swap参数。这个参数表示虚拟内存中swap磁盘占了多少百分比。0表示最大限度的使用内存,100表示尽量使用swap磁盘。


系统默认的参数是60,当物理内存使用率达到40%,就会频繁进行swap,影响系统性能,推荐将vm.swappiness 设置为较低的值1。最终我设置为10,因为我们的机器的内存还是比较小的,只有40G,设置的太小,可能会影响到虚拟内存的使用吧。




脏文件


当大量的持续不断的数据写入cache内存中后,这些数据就被称为脏数据。需要尽快将这些脏数据flush到磁盘中,释放内存。


这里需要关注两个参数:


  • vm.dirty_background_ratio:这个参数指定了当文件系统缓存脏页数量达到系统内存百分之多少时(如5%)就会触发pdflush/flush/kdmflush等后台回写进程运行,将一定缓存的脏页异步地刷入外存;


  • vm.dirty_ratio:这个参数则指定了当文件系统缓存脏页数量达到系统内存百分之多少时(如10%),系统不得不开始处理缓存脏页(因为此时脏页数量已经比较多,为了避免数据丢失需要将一定脏页刷入外存);在此过程中很多应用进程可能会因为系统转而处理文件IO而阻塞。


这里推荐将vm.dirty_background_ratio设置为5, vm.dirty_ratio有的人设置为10,但是我觉得太小了,还是默认的就可以了。




网络


kafka集群对网络的要求比较高,可以将socket的缓冲设置为原来的两倍。


  • net.core.wmem_default 设置为128K
  • net.core.rmem_default 设置为128K



学习资料





相关文章
|
13天前
|
存储 安全 Java
synchronized原理-字节码分析、对象内存结构、锁升级过程、Monitor
本文分析的问题: 1. synchronized 字节码文件分析之 monitorenter、monitorexit 指令 2. 为什么任何一个Java对象都可以成为一把锁? 3. 对象的内存结构 4. 锁升级过程 (无锁、偏向锁、轻量级锁、重量级锁) 5. Monitor 是什么、源码查看(hotspot虚拟机源码) 6. JOL工具使用
|
6天前
|
Java
堆内存的溢出案例分析
堆内存的溢出案例分析
4 0
|
7天前
|
JSON 数据管理 测试技术
自动化测试工具Selenium Grid的深度应用分析深入理解操作系统的内存管理
【5月更文挑战第28天】随着互联网技术的飞速发展,软件测试工作日益复杂化,传统的手工测试已无法满足快速迭代的需求。自动化测试工具Selenium Grid因其分布式执行特性而受到广泛关注。本文旨在深入剖析Selenium Grid的工作原理、配置方法及其在复杂测试场景中的应用优势,为测试工程师提供高效测试解决方案的参考。
|
20天前
|
存储 Arthas 监控
JVM工作原理与实战(三十):堆内存状况的对比分析
JVM作为Java程序的运行环境,其负责解释和执行字节码,管理内存,确保安全,支持多线程和提供性能监控工具,以及确保程序的跨平台运行。本文主要介绍了堆内存状况的对比分析、产生内存溢出的原因等内容。
16 0
|
20天前
|
缓存 Java 开发工具
OOM out of memory 内存溢出
OOM out of memory 内存溢出
21 1
|
20天前
|
消息中间件 存储 传感器
Kafka消息队列原理及应用详解
【5月更文挑战第6天】Apache Kafka是高性能的分布式消息队列,常用于实时数据管道和流应用。它提供高性能、持久化、分布式和可伸缩的消息处理,支持解耦、异步通信和流量控制。Kafka的核心概念包括Broker、Topic、Partition、Producer、Consumer和Consumer Group。其特点是高吞吐、低延迟、数据持久化、分布式架构和容错性。常见应用包括实时数据流处理、日志收集、消息传递和系统间数据交换。
|
20天前
|
缓存 Linux
linux性能分析之内存分析(free,vmstat,top,ps,pmap等工具使用介绍)
这些工具可以帮助你监视系统的内存使用情况、识别内存泄漏、找到高内存消耗的进程等。根据具体的问题和需求,你可以选择使用其中一个或多个工具来进行内存性能分析。注意,内存分析通常需要综合考虑多个指标和工具的输出,以便更好地理解系统的行为并采取相应的优化措施。
43 6
|
20天前
|
机器学习/深度学习 分布式计算 数据处理
Spark是一个基于内存的通用数据处理引擎,可以进行大规模数据处理和分析
【5月更文挑战第2天】Spark是一个基于内存的通用数据处理引擎,可以进行大规模数据处理和分析
30 3
|
18天前
|
存储 算法 关系型数据库
实时计算 Flink版产品使用合集之在Flink Stream API中,可以在任务启动时初始化一些静态的参数并将其存储在内存中吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
93 4
|
1天前
|
存储
18.(C进阶)数据在内存中的存储
18.(C进阶)数据在内存中的存储

热门文章

最新文章

相关产品

  • 云消息队列 Kafka 版