【夯实Kafka实战性能调优技能】消息队列服务端出现内存溢出OOM以及相关性能调优实战分析

简介: 【夯实Kafka实战性能调优技能】消息队列服务端出现内存溢出OOM以及相关性能调优实战分析

内存问题


本篇文章介绍Kafka处理大文件出现内存溢出 java.lang.OutOfMemoryError: Direct buffer memory,主要内容包括基础应用、实用技巧、原理机制等方面,希望对大家有所帮助。



bin目录下的kafka-run-class.sh中须要配置的参数


kafka是由scala和java编写的。因此须要调一些jvm的参数。java的内存分为堆内内存和堆外内存。



JVM参数系列


  • -Xms2048m, -Xmx2048m,设置的是堆内内存。


  • -Xms是初始可用的最大堆内内存。-Xmx设置的是最大可用的堆内内存。两者设置成同样是由于效率问题,可让jvm少作一些运算。若是这两个参数设置的过小,kafka会出现java.lang.OutOfMemoryError: Java heap space的错误。


  • -XX:MaxDirectMemorySize=8192m。这个参数配置的过小,kafka会出现java.lang.OutOfMemoryError: Direct buffer memory的错误。 由于kafka的网络IO使用了java的nio中的DirectMemory的方式,而这个申请的是堆外内存。




producer端


Kafka设计的初衷是迅速处理短小的消息,一般10K大小的消息吞吐性能最好。但有时候,我们需要处理更大的消息,比如XML文档或JSON内容,一个消息差不多有10-100M,这种情况下,Kakfa应该如何处理?



配置建议:


  1. (传输资源文件的位置-再消费的时候进行获取真正的资源)首选的方法是不直接传送这些大的数据。如果有共享存储,如NAS, HDFS, S3等,可以把这些大的文件存放到共享存储,然后使用Kafka来传送文件的位置信息。


  1. (传输资源文件的进行数据流的拆分,分批次发送)将大的消息数据切片或切块,在生产端将数据切片为10K大小,使用分区主键确保一个大消息的所有部分会被发送到同一个kafka分区(这样每一部分的拆分顺序得以保留),如此以来,当消费端使用时会将这些部分重新还原为原始的消息。
  2. (传输资源文件的进行数据流压缩,降低传输量)Kafka的生产端可以压缩消息,如果原始消息是XML,当通过压缩之后,消息可能会变得不那么大。在生产端的配置参数中使用compression.codec和commpressed.topics可以开启压缩功能,压缩算法可以使用GZip或Snappy。



不过如果上述方法都不是你需要的,而你最终还是希望传送大的消息,那么,则可以在kafka中设置下面一些参数:


broker端


配置文件须要配置的参数


  • message.max.bytes (默认:1000000) : kafka会接收单个消息size的最大限制, 默认为1M左右。若是producer发送比这个大的消息,kafka默认会丢掉。producer能够从callback函数中得到错误码:10。设置了之后控制了broker能接收消息的最大字节数,这个值应该比消费端的fetch.message.max.bytes更小才对,否则broker就会因为消费端无法使用这个消息而挂起。


  • log.segment.bytes(默认: 1GB)  : kafka数据文件的大小。默认为1G, 须要确保此值大于一个消息的最大大小。一般说来使用默认值即可(一般一个消息很难大于1G,因为这是一个消息系统,而不是文件系统)。


  • replica.fetch.max.bytes (默认: 1MB) : broker可复制的消息的最大字节数, 默认为1M。这个值比message.max.bytes大,不然broker会接收此消息,但没法将此消息复制出去,从而形成数据丢失。


注意:message.max.bytes,要设置大于发送最大数据的大小,不然会produce失败




consumer端


broker为什么会返回总量为1000大小的数据呢?


librdkafka有这样一个参数:fetch.max.bytes, 它有这样的描述:


Maximum amount of data the broker shall return for a Fetch request. Messages are fetched in batches by the consumer and if the first message batch in the first non-empty partition of the Fetch request is larger than this value, then the message batch will still be returned to ensure the consumer can make progress. The maximum message batch size accepted by the broker is defined via message.max.bytes (broker config) or max.message.bytes (broker topic config). fetch.max.bytes is automatically adjusted upwards to be at least message.max.bytes (consumer config).



配置的参数


receive.message.max.bytes(默认 1MB)  : kafka协议response的最大长度,也是消费者能读取的最大消息。这个值应该大于或等于message.max.bytes,不然消费会失败。


  • 较低版本的librdkafka的receive.message.max.bytes只支持1000到1000000000。
  • 最新版本的能够支持到2147483647。


否则会出现 “Receive failed: Invalid message size 1047207987 (0..1000000000): increase receive.message.max.bytes”这样的错误。


  • 使用此参数的时候还须要注意一个问题,在broker端设置的message.max.bytes为1000,consumer端设置的receive.message.max.bytes也为1000,可是除了数据,response还有协议相关字段,这时候整个response的大小就会超过1000。


注意:一定要选择kafka来传送大的消息,还有些事项需要考虑。要传送大的消息,不是当出现问题之后再来考虑如何解决,而是在一开始设计的时候,就要考虑到大消息对集群和主题的影响。



性能影响


  • 根据前面提到的性能测试,kafka在消息为10K时吞吐量达到最大,更大的消息会降低吞吐量,在设计集群的容量时,尤其要考虑这点


可用的内存和分区数造成OOM的场景


  • Brokers会为每个分区分配replica.fetch.max.bytes参数指定的内存空间,假设replica.fetch.max.bytes=1M,且有1000个分区,则需要差不多1G的内存,确保分区数 * 最大的消息不会超过服务器的内存,否则会报OOM错误。


  • 消费端的fetch.message.max.bytes指定了最大消息需要的内存空间,同样,分区数 * 最大需要内存空间不能超过服务器的内存。所以,如果你有大的消息要传送,则在内存一定的情况下,只能使用较少的分区数或者使用更大内存的服务器。



垃圾回收


到现在为止,我在kafka的使用中还没发现过此问题,但这应该是一个需要考虑的潜在问题。更大的消息会让GC的时间更长(因为broker需要分配更大的块),随时关注GC的日志和服务器的日志信息。如果长时间的GC导致kafka丢失了zookeeper的会话,则需要配置zookeeper.session.timeout.ms参数为更大的超时时间。


推荐使用1.7出来的G1垃圾回收机制代替CMS。



  • G1>CMS的优势


  • G1在压缩空间方面有优势


  • G1通过将内存空间分成区域(Region)的方式避免内存碎片问题


  • Eden, Survivor, Old区不再固定、在内存使用效率上来说更灵活


  • G1可以通过设置预期停顿时间(Pause Time)来控制垃圾收集时间避免应用雪崩现象


  • G1在回收内存后会马上同时做合并空闲内存的工作、而CMS默认是在STW(stop the world)
  • 的时候做


  • G1会在Young GC中使用、而CMS只能在O区使用



  • G1适合的场景:

  • 服务端多核CPU、JVM内存占用较大的应用(至少大于4G)


  • 应用在运行过程中会产生大量内存碎片、需要经常压缩空间


  • 想要更可控、可预期的GC停顿周期;防止高并发下应用雪崩现象


  • 我们的kafka的kafka-run-class.sh 中已经包含了
KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true"
复制代码


所以只需要修改kafka-server-start.sh。这里面将内存设置为4G,因为当前kafka的堆内存使用了800多M,1个G的内存不够用。但是分配太多,也没什么用,还容易影响到pagecache,降低效率:

export KAFKA_HEAP_OPTS="-Xms4g -Xmx4g"
复制代码




内存方面须要考虑的问题


Brokers allocate a buffer the size of replica.fetch.max.bytes for each partition they replicate. If replica.fetch.max.bytes is set to 1 MiB, and you have 1000 partitions, about 1 GiB of RAM is required. Ensure that the number of partitions multiplied by the size of the largest message does not exceed available memory.

The same consideration applies for the consumer fetch.message.max.bytes setting. Ensure that you have enough memory for the largest message for each partition the consumer replicates. With larger messages, you might need to use fewer partitions or provide more RAM.


若是一个消息须要的处理时间很长,broker会认为consumer已经挂了,把partition分配给其余的consumer,而后循环往复, 这条record永远无法消费。方法是增长max.poll.interval.ms 参数。




提高性能调优能力


Don't fear the filesystem!中提到kafka使用page cache进行文件存储。计算机的内存分为虚拟内存和物理内存。物理内存是真实的内存,虚拟内存是用磁盘来代替内存。并通过swap机制实现磁盘到物理内存的加载和替换,这里面用到的磁盘我们称为swap磁盘。


pageCache机制


在写文件的时候,Linux首先将数据写入没有被使用的内存中,这些内存被叫做内存页(page cache)。然后读的时候,Linux会优先从page cache中查找,如果找不到就会从硬盘中查找。


当物理内存使用达到一定的比例后,Linux就会使用进行swap,使用磁盘作为虚拟内存。通过cat /proc/sys/vm/swappiness可以看到swap参数。这个参数表示虚拟内存中swap磁盘占了多少百分比。0表示最大限度的使用内存,100表示尽量使用swap磁盘。


系统默认的参数是60,当物理内存使用率达到40%,就会频繁进行swap,影响系统性能,推荐将vm.swappiness 设置为较低的值1。最终我设置为10,因为我们的机器的内存还是比较小的,只有40G,设置的太小,可能会影响到虚拟内存的使用吧。




脏文件


当大量的持续不断的数据写入cache内存中后,这些数据就被称为脏数据。需要尽快将这些脏数据flush到磁盘中,释放内存。


这里需要关注两个参数:


  • vm.dirty_background_ratio:这个参数指定了当文件系统缓存脏页数量达到系统内存百分之多少时(如5%)就会触发pdflush/flush/kdmflush等后台回写进程运行,将一定缓存的脏页异步地刷入外存;


  • vm.dirty_ratio:这个参数则指定了当文件系统缓存脏页数量达到系统内存百分之多少时(如10%),系统不得不开始处理缓存脏页(因为此时脏页数量已经比较多,为了避免数据丢失需要将一定脏页刷入外存);在此过程中很多应用进程可能会因为系统转而处理文件IO而阻塞。


这里推荐将vm.dirty_background_ratio设置为5, vm.dirty_ratio有的人设置为10,但是我觉得太小了,还是默认的就可以了。




网络


kafka集群对网络的要求比较高,可以将socket的缓冲设置为原来的两倍。


  • net.core.wmem_default 设置为128K
  • net.core.rmem_default 设置为128K



学习资料





相关文章
|
5月前
|
存储 弹性计算 缓存
阿里云服务器ECS经济型、通用算力、计算型、通用和内存型选购指南及使用场景分析
本文详细解析阿里云ECS服务器的经济型、通用算力型、计算型、通用型和内存型实例的区别及适用场景,涵盖性能特点、配置比例与实际应用,助你根据业务需求精准选型,提升资源利用率并降低成本。
441 3
|
1月前
|
设计模式 缓存 Java
【JUC】(4)从JMM内存模型的角度来分析CAS并发性问题
本篇文章将从JMM内存模型的角度来分析CAS并发性问题; 内容包含:介绍JMM、CAS、balking犹豫模式、二次检查锁、指令重排问题
102 1
|
4月前
|
存储 人工智能 自然语言处理
AI代理内存消耗过大?9种优化策略对比分析
在AI代理系统中,多代理协作虽能提升整体准确性,但真正决定性能的关键因素之一是**内存管理**。随着对话深度和长度的增加,内存消耗呈指数级增长,主要源于历史上下文、工具调用记录、数据库查询结果等组件的持续积累。本文深入探讨了从基础到高级的九种内存优化技术,涵盖顺序存储、滑动窗口、摘要型内存、基于检索的系统、内存增强变换器、分层优化、图形化记忆网络、压缩整合策略以及类操作系统内存管理。通过统一框架下的代码实现与性能评估,分析了每种技术的适用场景与局限性,为构建高效、可扩展的AI代理系统提供了系统性的优化路径和技术参考。
247 4
AI代理内存消耗过大?9种优化策略对比分析
|
8月前
|
Java 编译器 Go
go的内存逃逸分析
内存逃逸分析是Go编译器在编译期间根据变量的类型和作用域,确定变量分配在堆上还是栈上的过程。如果变量需要分配在堆上,则称作内存逃逸。Go语言有自动内存管理(GC),开发者无需手动释放内存,但编译器需准确分配内存以优化性能。常见的内存逃逸场景包括返回局部变量的指针、使用`interface{}`动态类型、栈空间不足和闭包等。内存逃逸会影响性能,因为操作堆比栈慢,且增加GC压力。合理使用内存逃逸分析工具(如`-gcflags=-m`)有助于编写高效代码。
173 2
|
10月前
|
消息中间件 存储 缓存
kafka 的数据是放在磁盘上还是内存上,为什么速度会快?
Kafka的数据存储机制通过将数据同时写入磁盘和内存,确保高吞吐量与持久性。其日志文件按主题和分区组织,使用预写日志(WAL)保证数据持久性,并借助操作系统的页缓存加速读取。Kafka采用顺序I/O、零拷贝技术和批量处理优化性能,支持分区分段以实现并行处理。示例代码展示了如何使用KafkaProducer发送消息。
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
467 1
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
318 1
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
1142 9
|
消息中间件 监控 Kafka
实时计算 Flink版产品使用问题之处理Kafka数据顺序时,怎么确保事件的顺序性
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
消息中间件 负载均衡 Java
"Kafka核心机制揭秘:深入探索Producer的高效数据发布策略与Java实战应用"
【8月更文挑战第10天】Apache Kafka作为顶级分布式流处理平台,其Producer组件是数据高效发布的引擎。Producer遵循高吞吐、低延迟等设计原则,采用分批发送、异步处理及数据压缩等技术提升性能。它支持按消息键值分区,确保数据有序并实现负载均衡;提供多种确认机制保证可靠性;具备失败重试功能确保消息最终送达。Java示例展示了基本配置与消息发送流程,体现了Producer的强大与灵活性。
252 3

相关产品

  • 云消息队列 Kafka 版