Spark Streaming 数据产生与导入相关的内存分析

简介: 这篇文章描述Spark Streaming 的Receiver在内存方面的表现。

前言

我这篇文章会分几个点来描述Spark Streaming 的Receiver在内存方面的表现。
  • 一个大致的数据接受流程
  • 一些存储结构的介绍
  • 哪些点可能导致内存问题,以及相关的配置参数
另外,有位大牛写了 Spark Streaming 源码解析系列,我觉得写的不错,这里也推荐下。
我在部门尽力推荐使用Spark Streaming做数据处理,目前已经应用在日志处理,机器学习等领域。这期间也遇到不少问题,尤其是Kafka在接受到的数据量非常大的情况下,会有一些内存相关的问题。
另外特别说明下,我们仅仅讨论的是High Level的Kafka Stream,也就是输入流通过如下方式创建:
KafkaUtils.createStream
并且不开启WAL的情况下。

数据接受流程

启动Spark Streaming(后续缩写为SS)后,SS 会选择一台Executor 启动ReceiverSupervisor,并且标记为Active状态。接着按如下步骤处理:
  1. ReceiverSupervisor会启动对应的Receiver(这里是KafkaReceiver)
  2. KafkaReceiver 会根据配置启动新的线程接受数据,在该线程中调用 ReceiverSupervisor.store 方法填充数据,注意,这里是一条一条填充的。
  3. ReceiverSupervisor 会调用 BlockGenerator.addData 进行数据填充。
到目前为止,整个过程不会有太多内存消耗,正常的一个线性调用。所有复杂的数据结构都隐含在 BlockGenerator 中。

BlockGenerator 存储结构

BlockGenerator 会复杂些,这里有几个点,
  1. 维护了一个缓存 currentBuffer ,就是一个无限长度的ArrayBuffer。currentBuffer  并不会被复用,而是每次都会新建,然后把老的对象直接封装成Block,BlockGenerator会负责保证currentBuffer 只有一个。currentBuffer 填充的速度是可以被限制的,以秒为单位,配置参数为 spark.streaming.receiver.maxRate。这个是Spark内存控制的第一道防线,填充currentBuffer 是阻塞的,消费Kafka的线程直接做填充。
  2. 维护了一个 blocksForPushing 队列, size 默认为10个(1.5.1版本),可通过 spark.streaming.blockQueueSize 进行配置。该队列主要用来实现生产-消费模式。每个元素其实是一个currentBuffer形成的block。
  3. blockIntervalTimer 是一个定时器。其实是一个生产者,负责将currentBuffer 的数据放到 blocksForPushing 中。通过参数 spark.streaming.blockInterval 设置,默认为200ms。放的方式很简单,直接把currentBuffer做为Block的数据源。这就是为什么currentBuffer不会被复用。
  4. blockPushingThread 也是一个定时器,负责将Block从blocksForPushing取出来,然后交给BlockManagerBasedBlockHandler.storeBlock 方法。10毫秒会取一次,不可配置。到这一步,才真的将数据放到了Spark的BlockManager中。
步骤描述完了,我们看看有哪些值得注意的地方。
currentBuffer
首先自然要说下currentBuffer,如果200ms期间你从Kafka接受的数据足够大,则足以把内存承包了。而且currentBuffer使用的并不是spark的storage内存,而是有限的用于运算存储的内存。 默认应该是 heap*0.4。除了把内存搞爆掉了,还有一个是GC。导致receiver所在的Executor 极容易挂掉,处理速度也巨慢。 如果你在SparkUI发现Receiver挂掉了,考虑有没有可能是这个问题。
blocksForPushing
blocksForPushing 这个是作为currentBuffer 和BlockManager之间的中转站。默认存储的数据最大可以达到  10*currentBuffer 大小。一般不打可能,除非你的 spark.streaming.blockInterval 设置的比10ms 还小,官方推荐最小也要设置成 50ms,你就不要搞对抗了。所以这块不用太担心。
blockPushingThread
blockPushingThread 负责从 blocksForPushing 获取数据,并且写入 BlockManager 。这里很蛋疼的事情是,blockPushingThread只写他自己所在的Executor的 blockManager,也就是每个batch周期的数据都会被 一个Executor给扛住了。 这是导致内存被撑爆的最大风险。 也就是说,每个batch周期接受到的数据最好不要超过接受Executor的内存(Storage)的一半。否则有你受的。我发现在数据量很大的情况下,最容易挂掉的就是Receiver所在的Executor了。  建议Spark-Streaming团队最好是能将数据写入到多个BlockManager上。
StorageLevel 的配置问题
另外还有几个值得注意的问题:
  • 如果你配置成Memory_Disk ,如果Receiver所在的Executor一旦挂掉,你也歇菜了,整个Spark Streaming作业会失败。失败的原因是一部分block找不到了。
  • 如果你配置成Memory_Disk_2,数据会被replication到不同的节点。一般而言不会出现作业失败或者丢数据。但解决不了Receiver也容易挂的问题,当然还是主要还是内存引起的。
  • 最好是采用默认设置 MEMORY_AND_DISK_SER_2 比较靠谱些。
  • 这里面还有一个风险点就是,如果某个batch processing延迟了,那么对应的BlockManager的数据不会被释放,然后下一个batch的数据还在进,也会加重内存问题。

动态控制消费速率以及相关论文

另外,spark的消费速度可以设置上限以外,亦可以根据processing time 来动态调整。通过 spark.streaming.backpressure.enabled 设置为true 可以打开。算法的论文可参考: Socc 2014: Adaptive Stream Processing using Dynamic Batch Sizing  ,还是有用的,我现在也都开启着。
Spark里除了这个 Dynamic,还有一个就是Dynamic Allocation,也就是Executor数量会根据资源使用情况,自动伸缩。我其实蛮喜欢Spark这个特色的。具体的可以查找下相关设计文档。

后话

接下来一篇文章会讲一些解决方案。
目录
相关文章
|
9月前
|
消息中间件 存储 缓存
kafka 的数据是放在磁盘上还是内存上,为什么速度会快?
Kafka的数据存储机制通过将数据同时写入磁盘和内存,确保高吞吐量与持久性。其日志文件按主题和分区组织,使用预写日志(WAL)保证数据持久性,并借助操作系统的页缓存加速读取。Kafka采用顺序I/O、零拷贝技术和批量处理优化性能,支持分区分段以实现并行处理。示例代码展示了如何使用KafkaProducer发送消息。
|
8月前
|
SQL 分布式计算 Serverless
基于阿里云 EMR Serverless Spark 版快速搭建OSS日志分析应用
基于阿里云 EMR Serverless Spark 版快速搭建OSS日志分析应用
172 0
|
12月前
|
SQL 分布式计算 Serverless
EMR Serverless Spark:一站式全托管湖仓分析利器
本文根据2024云栖大会阿里云 EMR 团队负责人李钰(绝顶) 演讲实录整理而成
576 58
|
11月前
|
存储 编译器 数据处理
C 语言结构体与位域:高效数据组织与内存优化
C语言中的结构体与位域是实现高效数据组织和内存优化的重要工具。结构体允许将不同类型的数据组合成一个整体,而位域则进一步允许对结构体成员的位进行精细控制,以节省内存空间。两者结合使用,可在嵌入式系统等资源受限环境中发挥巨大作用。
329 12
|
12月前
|
监控 算法 应用服务中间件
“四两拨千斤” —— 1.2MB 数据如何吃掉 10GB 内存
一个特殊请求引发服务器内存用量暴涨进而导致进程 OOM 的惨案。
231 14
|
12月前
|
存储
共用体在内存中如何存储数据
共用体(Union)在内存中为所有成员分配同一段内存空间,大小等于最大成员所需的空间。这意味着所有成员共享同一块内存,但同一时间只能存储其中一个成员的数据,无法同时保存多个成员的值。
|
12月前
|
存储 C语言
数据在内存中的存储方式
本文介绍了计算机中整数和浮点数的存储方式,包括整数的原码、反码、补码,以及浮点数的IEEE754标准存储格式。同时,探讨了大小端字节序的概念及其判断方法,通过实例代码展示了这些概念的实际应用。
808 1
|
12月前
|
监控 Java easyexcel
面试官:POI大量数据读取内存溢出?如何解决?
【10月更文挑战第14天】 在处理大量数据时,使用Apache POI库读取Excel文件可能会导致内存溢出的问题。这是因为POI在读取Excel文件时,会将整个文档加载到内存中,如果文件过大,就会消耗大量内存。以下是一些解决这一问题的策略:
1388 1
|
11月前
|
分布式计算 流计算 Spark
【赵渝强老师】Spark Streaming中的DStream
本文介绍了Spark Streaming的核心概念DStream,即离散流。DStream通过时间间隔将连续的数据流转换为一系列不连续的RDD,再通过Transformation进行转换,实现流式数据的处理。文中以MyNetworkWordCount程序为例,展示了DStream生成RDD的过程,并附有视频讲解。
217 0
|
3月前
|
存储
阿里云轻量应用服务器收费标准价格表:200Mbps带宽、CPU内存及存储配置详解
阿里云香港轻量应用服务器,200Mbps带宽,免备案,支持多IP及国际线路,月租25元起,年付享8.5折优惠,适用于网站、应用等多种场景。
829 0