<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"> <html><head><meta http-equiv="Cont

本文涉及的产品
转发路由器TR,750小时连接 100GB跨地域
简介: Spark Streaming 的一些问题,做选型前关注这些问题可以有效的降低使用风险。checkpointcheckpoint 是个很好的恢复机制。

Spark Streaming 的一些问题,做选型前关注这些问题可以有效的降低使用风险。

checkpoint


checkpoint 是个很好的恢复机制。但是方案比较粗暴,直接通过序列化的机制写入到文件系统,导致代码变更和配置变更无法生效。实际场景是升级往往比系统崩溃的频率高太多。但是升级需要能够无缝的衔接上一次的偏移量。所以spark streaming在无法容忍数据有丢失的情况下,你需要自己记录偏移量,然后从上一次进行恢复。



我们目前是重写了相关的代码,每次记录偏移量,不过只有在升级的时候才会读取自己记录的偏移量,其他情况都是依然采用checkpoint机制。


Kafka

这个和Spark Streaming相关,也不太相关。说相关是因为Spark 对很多异常处理比较简单。很多是和Kafka配置相关的。我举个例子:

如果消息体太大了,超过 fetch.message.max.bytes=1m,那么Spark Streaming会直接抛出OffsetOutOfRangeException异常,然后停止服务。
对应的错误会从这行代码抛出:
if (!iter.hasNext) {
 assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part))
 finished = true
 null.asInstanceOf[R]
}


其实就是消费的完成后 实际的消费数据量和预先估计的量不一致。
你在日志中看到的信息其实是这个代码答应出来的:
private def errRanOutBeforeEnd(part: KafkaRDDPartition): String =
 s"Ran out of messages before reaching ending offset ${part.untilOffset} " +
 s"for topic ${part.topic} partition ${part.partition} start ${part.fromOffset}." +    " This should not happen, and indicates that messages may have been lost"


解决办法自然是把 fetch.message.max.bytes 设置大些。


如果你使用Spark Streaming去追数据,从头开始消费kafka,而Kafka因为某种原因,老数据快速的被清理掉,也会引发OffsetOutOfRangeException错误。并且使得Spark Streaming程序异常的终止。


解决办法是事先记录kafka偏移量和时间的关系(可以隔几秒记录一次),然后根据时间找到一个较大的偏移量开始消费。

或者你根据目前Kafka新增数据的消费速度,给smallest获取到的偏移量再加一个较大的值,避免出现Spark Streaming 在fetch的时候数据不存在的情况。


textFileStream

其实使用textFileStream 的人应该也不少。因为可以很方便的监控HDFS上某个文件夹下的文件,并且进行计算。这里我们遇到的一个问题是,如果底层比如是压缩文件,遇到有顺坏的文件,你是跳不过去的,直接会让Spark Streaming 异常退出。 官方并没有提供合适的方式让你跳过损坏的文件。我们目前是通过重写FileInputDStream 等相关类来修正该问题。


内存

Shuffle (尤其是每个周期数据量很大的情况)是Spark Streaming 不可避免的疼痛。譬如,与Kafka的集成, Kafka的分区数决定了你的并行度(我们假设你使用Direct Approach的模式集成)。你为了获得更大的并行度,则需要进行一次repatition。 为了能够避免Shuffle,并且提高Spark Streaming处理的并行度,我们重写了DirectKafkaInputDStream,KafkaRDD,KafkaUtils等类,实现可以按Kafka 分区按倍数扩大并行度。

我们期望官方能够实现将一个Kafka的partition 映射为多个Spark 的partition,避免数据的多次移动。


再次,如果单个Executor 并行度过大,可能也会导致对内存压力增大。在使用Spark Streaming的过程中,我们多次遇到Executor Lost 相关的问题(譬如 shuffle fetch 失败,Task失败重试等),目前比较有效的方式是:

提高Executor 数目
减少单个Executor的 CPU 核数

为了保证处理的效率,请保证CPU总核数保持不变。


监控

Spark Streaming 的UI 上的Executors Tab缺少一个最大的监控,就是Worker内存GC详情。虽然我们可以将这些信息导入到 第三方监控中,然而终究是不如在 Spark UI上展现更加方便。 为此我们也将该功能列入研发计划。

目录
相关文章
|
Web App开发 前端开发
|
Web App开发 存储 前端开发
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"> <html><head><meta http-equiv="Cont
1.HBase依赖于HDFS,HBase按照列族将数据存储在不同的hdfs文件中;MongoDB直接存储在本地磁盘中,MongoDB不分列,整个文档都存储在一个(或者说一组)文件中 (存储) 2.
717 0
|
存储 监控 数据库
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"> <html><head><meta http-equiv="Cont
为首次部署MongoDB做好准备:容量计划和监控 作者Mat Keep ,译者孙镜涛如果你已经完成了自己新的MongoDB应用程序的开发,并且现在正准备将它部署进产品中,那么你和你的运营团队需要讨论一些关键的问题: 最佳部署实践是什么? 为了确保应用程序满足它所必须的服务层次我们需要监控哪些关键指标? 如何能够确定添加分片的时机? 有哪些工具可以对数据库进行备份和恢复? 怎样才能安全地访问所有新的实时大数据? 本文介绍了硬件选择、扩展、HA和监控。
2589 0
|
Web App开发 Apache
|
Web App开发 前端开发 Java
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"> <html><head><meta http-equiv="Cont
服务端需在vm arguments一栏下加上    -agentlib:jdwp=transport=dt_socket,server=y,address=8000 并以run模式启动 如果以debug模式启动服务端...
714 0
|
Web App开发 前端开发 Java
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"> <html><head><meta http-equiv="Cont
 Connection reset by peer的常见原因: 1)服务器的并发连接数超过了其承载量,服务器会将其中一些连接关闭;    如果知道实际连接服务器的并发客户数没有超过服务器的承载量,看下有没有网络流量异常。
852 0
|
Web App开发 前端开发 程序员
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"> <html><head><meta http-equiv="Cont
Facebook 内部分享:不论你如何富有,你都赚不到更多的时间,你也回不到过去。没有那么多的假如,只有指针滴答的时光飞逝和你应该好好把握的现在,以下25张PPT的分享将为您带来时间价值管理的技巧。
607 0
|
Web App开发 前端开发 Java
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"> <html><head><meta http-equiv="Cont
kafka.common.ConsumerRebalanceFailedException: group_dd-1446432618163-2746a209 can't rebalance after 10 retries  at kafka.
803 0

热门文章

最新文章