<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"> <html><head><meta http-equiv="Cont

本文涉及的产品
转发路由器TR,750小时连接 100GB跨地域
简介: 最近在使用Spark Streaming过程中,对foreachRDD有点疑问,查阅资料后记录如下:foreachRDD(func)的官方解释为The most generic output operator ...
 
 

最近在使用Spark Streaming过程中,对foreachRDD有点疑问,查阅资料后记录如下:

foreachRDD(func)的官方解释为

The most generic output operator that applies a function, func, to each RDD generated from the stream. This function should push the data in each RDD to an external system, such as saving the RDD to files, or writing it over the network to a database. Note that the function func is executed in the driver process running the streaming application, and will usually have RDD actions in it that will force the computation of the streaming RDDs.

对于这个定义会产生一个疑问:在一个batch interval里面会产生几个RDD? 结论:有且只有一个

那么定义里面所说的“each RDD”应该如何理解呢?

DStream可以理解为是基于时间的,即每个interval产生一个RDD,所以如果以时间为轴,每隔一段时间就会产生一个RDD,那么定义中的“each RDD”应该理解为每个interval的RDD,而不是一个interval中的每个RDD

从spark的源码分析

DStream中的foreachRDD方法最终会调用如下的代码

private def foreachRDD(
    foreachFunc: (RDD[T], Time) => Unit,
    displayInnerRDDOps: Boolean): Unit = {
  new ForEachDStream(this,
    context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
}

可以看到这个方法里面并没有任何的Iterator,可以对比一下RDD中的foreachPartitionforeach方法,这两个方法是会遍历RDD,所以才会有Iterator类型的引用

def foreach(f: T => Unit): Unit = withScope {
  val cleanF = sc.clean(f)
  sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}

def foreachPartition(f: Iterator[T] => Unit): Unit = withScope {
  val cleanF = sc.clean(f)
  sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
}

而如果每个interval中有多个RDD,那么DStream中的foreachRDD也一定会有Iterator类型的引用,但是从上述的代码中并没有。



作者:Woople
链接:http://www.jianshu.com/p/9116043b0c21

目录
相关文章
|
Web App开发 前端开发 Java
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"> <html><head><meta http-equiv="Cont
线程的状态有:new、runnable、running、waiting、timed_waiting、blocked、dead 当执行new Thread(Runnabler)后,新创建出来的线程处于new状态,这种线程不可能执行 当执行thread.start()后,线程处于runnable状态,这种情况下只要得到CPU,就可以开始执行了。
836 0
|
Web App开发 前端开发
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"> <html><head><meta http-equiv="Cont
总结和计划总是让人喜悦或镇痛,一方面以前一段时间没有荒废,能给现在的行动以信心,另一方面看到一年的时间并不能完成很多事情,需要抓紧时间。
725 0
|
Web App开发 前端开发
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"> <html><head><meta http-equiv="Cont
Every Programmer Should Know These Latency Numbers 1秒=1000毫秒(ms) 1秒=1,000,000 微秒(μs) 1秒=1,000,000,000 纳秒(ns) 1秒=1,000,000,000,000 皮秒(ps) L1 cache reference .
722 0
|
Web App开发 数据库
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"> <html><head><meta http-equiv="Cont
可伸缩系统的架构经验 Feb 27th, 2013 | Comments 最近,阅读了Will Larson的文章Introduction to Architecting System for Scale,感觉很有价值。
2428 0
|
Web App开发 前端开发 Linux
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"> <html><head><meta http-equiv="Cont
 频繁的文件访问会导致系统的Cache使用量大增   $ free -m   total used free shared buffers cached   Mem: 3955 3926 28 0 55 3459   -...
707 0
|
Web App开发 前端开发 Java
|
新零售 监控
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"> <html><head><meta http-equiv="Cont
千万级规模高性能、高并发的网络架构经验分享 主 题 :INTO100沙龙时间 :2015年11月21日下午地点 :梦想加联合办公空间分享人:卫向军(毕业于北京邮电大学,现任微博平台架构师,先后在微软、金山云、新浪微博从事技术研发工作,专注于系统架构设计、音视频通讯系统、分布式文件系统和数据挖掘等领域。
1362 0
|
Web App开发 前端开发
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"> <html><head><meta http-equiv="Cont
一个典型的星型模式包括一个大型的事实表和一组逻辑上围绕这个事实表的维度表。  事实表是星型模型的核心,事实表由主键和度量数据两部分组成。
603 0

热门文章

最新文章