<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"> <html><head><meta http-equiv="Cont

本文涉及的产品
转发路由器TR,750小时连接 100GB跨地域
简介: Spark Streaming 的一些问题,做选型前关注这些问题可以有效的降低使用风险。checkpointcheckpoint 是个很好的恢复机制。

Spark Streaming 的一些问题,做选型前关注这些问题可以有效的降低使用风险。

checkpoint


checkpoint 是个很好的恢复机制。但是方案比较粗暴,直接通过序列化的机制写入到文件系统,导致代码变更和配置变更无法生效。实际场景是升级往往比系统崩溃的频率高太多。但是升级需要能够无缝的衔接上一次的偏移量。所以spark streaming在无法容忍数据有丢失的情况下,你需要自己记录偏移量,然后从上一次进行恢复。



我们目前是重写了相关的代码,每次记录偏移量,不过只有在升级的时候才会读取自己记录的偏移量,其他情况都是依然采用checkpoint机制。


Kafka

这个和Spark Streaming相关,也不太相关。说相关是因为Spark 对很多异常处理比较简单。很多是和Kafka配置相关的。我举个例子:

如果消息体太大了,超过 fetch.message.max.bytes=1m,那么Spark Streaming会直接抛出OffsetOutOfRangeException异常,然后停止服务。
对应的错误会从这行代码抛出:
if (!iter.hasNext) {
 assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part))
 finished = true
 null.asInstanceOf[R]
}


其实就是消费的完成后 实际的消费数据量和预先估计的量不一致。
你在日志中看到的信息其实是这个代码答应出来的:
private def errRanOutBeforeEnd(part: KafkaRDDPartition): String =
 s"Ran out of messages before reaching ending offset ${part.untilOffset} " +
 s"for topic ${part.topic} partition ${part.partition} start ${part.fromOffset}." +    " This should not happen, and indicates that messages may have been lost"


解决办法自然是把 fetch.message.max.bytes 设置大些。


如果你使用Spark Streaming去追数据,从头开始消费kafka,而Kafka因为某种原因,老数据快速的被清理掉,也会引发OffsetOutOfRangeException错误。并且使得Spark Streaming程序异常的终止。


解决办法是事先记录kafka偏移量和时间的关系(可以隔几秒记录一次),然后根据时间找到一个较大的偏移量开始消费。

或者你根据目前Kafka新增数据的消费速度,给smallest获取到的偏移量再加一个较大的值,避免出现Spark Streaming 在fetch的时候数据不存在的情况。


textFileStream

其实使用textFileStream 的人应该也不少。因为可以很方便的监控HDFS上某个文件夹下的文件,并且进行计算。这里我们遇到的一个问题是,如果底层比如是压缩文件,遇到有顺坏的文件,你是跳不过去的,直接会让Spark Streaming 异常退出。 官方并没有提供合适的方式让你跳过损坏的文件。我们目前是通过重写FileInputDStream 等相关类来修正该问题。


内存

Shuffle (尤其是每个周期数据量很大的情况)是Spark Streaming 不可避免的疼痛。譬如,与Kafka的集成, Kafka的分区数决定了你的并行度(我们假设你使用Direct Approach的模式集成)。你为了获得更大的并行度,则需要进行一次repatition。 为了能够避免Shuffle,并且提高Spark Streaming处理的并行度,我们重写了DirectKafkaInputDStream,KafkaRDD,KafkaUtils等类,实现可以按Kafka 分区按倍数扩大并行度。

我们期望官方能够实现将一个Kafka的partition 映射为多个Spark 的partition,避免数据的多次移动。


再次,如果单个Executor 并行度过大,可能也会导致对内存压力增大。在使用Spark Streaming的过程中,我们多次遇到Executor Lost 相关的问题(譬如 shuffle fetch 失败,Task失败重试等),目前比较有效的方式是:

提高Executor 数目
减少单个Executor的 CPU 核数

为了保证处理的效率,请保证CPU总核数保持不变。


监控

Spark Streaming 的UI 上的Executors Tab缺少一个最大的监控,就是Worker内存GC详情。虽然我们可以将这些信息导入到 第三方监控中,然而终究是不如在 Spark UI上展现更加方便。 为此我们也将该功能列入研发计划。

目录
相关文章
|
Web App开发 前端开发
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"> <html><head><meta http-equiv="Cont
总结和计划总是让人喜悦或镇痛,一方面以前一段时间没有荒废,能给现在的行动以信心,另一方面看到一年的时间并不能完成很多事情,需要抓紧时间。
640 0
|
Web App开发 监控 前端开发
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"> <html><head><meta http-equiv="Cont
系统的升级涉及各个架构组件,细节很多。常年累月的修修补补使老系统积累了很多问题。 系统升级则意味着需要repair之前埋下的雷,那为何还要升级,可以考虑以下几个方面 成熟老系统常见问题: 1. 缺乏文档(这应该是大小公司都存在的问题。
638 0
|
Web App开发 前端开发
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"> <html><head><meta http-equiv="Cont
1.使用lsmod查看ipv6的模块是否被加载。 lsmod | grep ipv6 [root@dmhadoop011 ~]# lsmod | grep ipv6 ipv6                  317340  127 bonding 如果加载了,则进行如下操作: 2.
814 0
|
Web App开发 前端开发
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"> <html><head><meta http-equiv="Cont
【CRM五策略】           对客户进行分类,不是根据规模,而是根据和你的关系,越细腻越好;           不定期更新客户资料,信息越全面越好;           主动对客户进行关怀,拿出你的诚意和...
656 0
|
Web App开发 存储 前端开发
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"> <html><head><meta http-equiv="Cont
一、引言   最近在整理理大数据模式下的数据仓库数据模型,资料来自互联网和读过的数据仓库理论和实践相关。 二、3NF (1)1NF-无重复的列   数据库表的每一列都是不可分割的基本数据项,同一列中不能有多个值,即实体中的某个属性不能有多个值或者不能有重复的属性。
757 0
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"> <html><head><meta http-equiv="Cont
在上一期的专栏文章中,我们曾经提到:数据分析系统的总体架构分为四个部分 —— 源系统、数据仓库、多维数据库、客户端(图一:pic1.bmp) 其中,数据仓库(DW)起到了数据大集中的作用。
1163 0
|
Web App开发 前端开发 Linux
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"> <html><head><meta http-equiv="Cont
nproc是操作系统级别对每个用户创建的进程数的限制,在Linux下运行多线程时,每个线程的实现其实是一个轻量级的进程,对应的术语是:light weight process(LWP)。
1182 0
|
Web App开发 监控 前端开发
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"> <html><head><meta http-equiv="Cont
zookeeper的maxSessionTimeout默认值导致hbase regionserver超时 在hbase中经常会遇到regionserver挂掉的情况,查看日志会看到这样的错误信息 2016-02-16 11:51:24,882 WARN  [master/hadoop02/192.
777 0
|
Web App开发 Java
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"> <html><head><meta http-equiv="Cont
                                                                                序列化对单例的破坏 本文将通过实例+阅读Java源码的方式介绍序列化是如何破坏单例模式的,以及如何避免序列化对单例的破坏。
952 0
|
SQL Web App开发 前端开发
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"> <html><head><meta http-equiv="Cont
在运行一个group by的sql时,抛出以下错误信息: Task with the most failures(4):  -----Task ID:  task_201411191723_723592_m_000004URL:  http://DDS0204.
1009 0