Spark Streaming 数据产生与导入相关的内存分析

简介: 这篇文章描述Spark Streaming 的Receiver在内存方面的表现。

前言

我这篇文章会分几个点来描述Spark Streaming 的Receiver在内存方面的表现。
  • 一个大致的数据接受流程
  • 一些存储结构的介绍
  • 哪些点可能导致内存问题,以及相关的配置参数
另外,有位大牛写了 Spark Streaming 源码解析系列,我觉得写的不错,这里也推荐下。
我在部门尽力推荐使用Spark Streaming做数据处理,目前已经应用在日志处理,机器学习等领域。这期间也遇到不少问题,尤其是Kafka在接受到的数据量非常大的情况下,会有一些内存相关的问题。
另外特别说明下,我们仅仅讨论的是High Level的Kafka Stream,也就是输入流通过如下方式创建:
KafkaUtils.createStream
并且不开启WAL的情况下。

数据接受流程

启动Spark Streaming(后续缩写为SS)后,SS 会选择一台Executor 启动ReceiverSupervisor,并且标记为Active状态。接着按如下步骤处理:
  1. ReceiverSupervisor会启动对应的Receiver(这里是KafkaReceiver)
  2. KafkaReceiver 会根据配置启动新的线程接受数据,在该线程中调用 ReceiverSupervisor.store 方法填充数据,注意,这里是一条一条填充的。
  3. ReceiverSupervisor 会调用 BlockGenerator.addData 进行数据填充。
到目前为止,整个过程不会有太多内存消耗,正常的一个线性调用。所有复杂的数据结构都隐含在 BlockGenerator 中。

BlockGenerator 存储结构

BlockGenerator 会复杂些,这里有几个点,
  1. 维护了一个缓存 currentBuffer ,就是一个无限长度的ArrayBuffer。currentBuffer  并不会被复用,而是每次都会新建,然后把老的对象直接封装成Block,BlockGenerator会负责保证currentBuffer 只有一个。currentBuffer 填充的速度是可以被限制的,以秒为单位,配置参数为 spark.streaming.receiver.maxRate。这个是Spark内存控制的第一道防线,填充currentBuffer 是阻塞的,消费Kafka的线程直接做填充。
  2. 维护了一个 blocksForPushing 队列, size 默认为10个(1.5.1版本),可通过 spark.streaming.blockQueueSize 进行配置。该队列主要用来实现生产-消费模式。每个元素其实是一个currentBuffer形成的block。
  3. blockIntervalTimer 是一个定时器。其实是一个生产者,负责将currentBuffer 的数据放到 blocksForPushing 中。通过参数 spark.streaming.blockInterval 设置,默认为200ms。放的方式很简单,直接把currentBuffer做为Block的数据源。这就是为什么currentBuffer不会被复用。
  4. blockPushingThread 也是一个定时器,负责将Block从blocksForPushing取出来,然后交给BlockManagerBasedBlockHandler.storeBlock 方法。10毫秒会取一次,不可配置。到这一步,才真的将数据放到了Spark的BlockManager中。
步骤描述完了,我们看看有哪些值得注意的地方。
currentBuffer
首先自然要说下currentBuffer,如果200ms期间你从Kafka接受的数据足够大,则足以把内存承包了。而且currentBuffer使用的并不是spark的storage内存,而是有限的用于运算存储的内存。 默认应该是 heap*0.4。除了把内存搞爆掉了,还有一个是GC。导致receiver所在的Executor 极容易挂掉,处理速度也巨慢。 如果你在SparkUI发现Receiver挂掉了,考虑有没有可能是这个问题。
blocksForPushing
blocksForPushing 这个是作为currentBuffer 和BlockManager之间的中转站。默认存储的数据最大可以达到  10*currentBuffer 大小。一般不打可能,除非你的 spark.streaming.blockInterval 设置的比10ms 还小,官方推荐最小也要设置成 50ms,你就不要搞对抗了。所以这块不用太担心。
blockPushingThread
blockPushingThread 负责从 blocksForPushing 获取数据,并且写入 BlockManager 。这里很蛋疼的事情是,blockPushingThread只写他自己所在的Executor的 blockManager,也就是每个batch周期的数据都会被 一个Executor给扛住了。 这是导致内存被撑爆的最大风险。 也就是说,每个batch周期接受到的数据最好不要超过接受Executor的内存(Storage)的一半。否则有你受的。我发现在数据量很大的情况下,最容易挂掉的就是Receiver所在的Executor了。  建议Spark-Streaming团队最好是能将数据写入到多个BlockManager上。
StorageLevel 的配置问题
另外还有几个值得注意的问题:
  • 如果你配置成Memory_Disk ,如果Receiver所在的Executor一旦挂掉,你也歇菜了,整个Spark Streaming作业会失败。失败的原因是一部分block找不到了。
  • 如果你配置成Memory_Disk_2,数据会被replication到不同的节点。一般而言不会出现作业失败或者丢数据。但解决不了Receiver也容易挂的问题,当然还是主要还是内存引起的。
  • 最好是采用默认设置 MEMORY_AND_DISK_SER_2 比较靠谱些。
  • 这里面还有一个风险点就是,如果某个batch processing延迟了,那么对应的BlockManager的数据不会被释放,然后下一个batch的数据还在进,也会加重内存问题。

动态控制消费速率以及相关论文

另外,spark的消费速度可以设置上限以外,亦可以根据processing time 来动态调整。通过 spark.streaming.backpressure.enabled 设置为true 可以打开。算法的论文可参考: Socc 2014: Adaptive Stream Processing using Dynamic Batch Sizing  ,还是有用的,我现在也都开启着。
Spark里除了这个 Dynamic,还有一个就是Dynamic Allocation,也就是Executor数量会根据资源使用情况,自动伸缩。我其实蛮喜欢Spark这个特色的。具体的可以查找下相关设计文档。

后话

接下来一篇文章会讲一些解决方案。
目录
相关文章
|
4月前
|
Web App开发 监控 JavaScript
监控和分析 JavaScript 内存使用情况
【10月更文挑战第30天】通过使用上述的浏览器开发者工具、性能分析工具和内存泄漏检测工具,可以有效地监控和分析JavaScript内存使用情况,及时发现和解决内存泄漏、过度内存消耗等问题,从而提高JavaScript应用程序的性能和稳定性。在实际开发中,可以根据具体的需求和场景选择合适的工具和方法来进行内存监控和分析。
|
10天前
|
存储 Java
课时4:对象内存分析
接下来对对象实例化操作展开初步分析。在整个课程学习中,对象使用环节往往是最棘手的问题所在。
|
10天前
|
Java 编译器 Go
go的内存逃逸分析
内存逃逸分析是Go编译器在编译期间根据变量的类型和作用域,确定变量分配在堆上还是栈上的过程。如果变量需要分配在堆上,则称作内存逃逸。Go语言有自动内存管理(GC),开发者无需手动释放内存,但编译器需准确分配内存以优化性能。常见的内存逃逸场景包括返回局部变量的指针、使用`interface{}`动态类型、栈空间不足和闭包等。内存逃逸会影响性能,因为操作堆比栈慢,且增加GC压力。合理使用内存逃逸分析工具(如`-gcflags=-m`)有助于编写高效代码。
|
15天前
|
SQL 分布式计算 Serverless
基于阿里云 EMR Serverless Spark 版快速搭建OSS日志分析应用
基于阿里云 EMR Serverless Spark 版快速搭建OSS日志分析应用
|
4月前
|
并行计算 算法 测试技术
C语言因高效灵活被广泛应用于软件开发。本文探讨了优化C语言程序性能的策略,涵盖算法优化、代码结构优化、内存管理优化、编译器优化、数据结构优化、并行计算优化及性能测试与分析七个方面
C语言因高效灵活被广泛应用于软件开发。本文探讨了优化C语言程序性能的策略,涵盖算法优化、代码结构优化、内存管理优化、编译器优化、数据结构优化、并行计算优化及性能测试与分析七个方面,旨在通过综合策略提升程序性能,满足实际需求。
108 1
|
4月前
|
JavaScript
如何使用内存快照分析工具来分析Node.js应用的内存问题?
需要注意的是,不同的内存快照分析工具可能具有不同的功能和操作方式,在使用时需要根据具体工具的说明和特点进行灵活运用。
85 3
|
4月前
|
开发框架 监控 .NET
【Azure App Service】部署在App Service上的.NET应用内存消耗不能超过2GB的情况分析
x64 dotnet runtime is not installed on the app service by default. Since we had the app service running in x64, it was proxying the request to a 32 bit dotnet process which was throwing an OutOfMemoryException with requests >100MB. It worked on the IaaS servers because we had the x64 runtime install
|
4月前
|
Web App开发 JavaScript 前端开发
使用 Chrome 浏览器的内存分析工具来检测 JavaScript 中的内存泄漏
【10月更文挑战第25天】利用 Chrome 浏览器的内存分析工具,可以较为准确地检测 JavaScript 中的内存泄漏问题,并帮助我们找出潜在的泄漏点,以便采取相应的解决措施。
614 9
|
5月前
|
并行计算 算法 IDE
【灵码助力Cuda算法分析】分析共享内存的矩阵乘法优化
本文介绍了如何利用通义灵码在Visual Studio 2022中对基于CUDA的共享内存矩阵乘法优化代码进行深入分析。文章从整体程序结构入手,逐步深入到线程调度、矩阵分块、循环展开等关键细节,最后通过带入具体值的方式进一步解析复杂循环逻辑,展示了通义灵码在辅助理解和优化CUDA编程中的强大功能。
|
5月前
|
SQL 分布式计算 Serverless
EMR Serverless Spark:一站式全托管湖仓分析利器
本文根据2024云栖大会阿里云 EMR 团队负责人李钰(绝顶) 演讲实录整理而成
289 2