开发者学堂课程【高校精品课-上海交通大学-企业级应用体系架构:storm&spark1】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/75/detail/15843
storm&spark1
内容介绍
一、storm
二、spark
一、storm
1.与 Hadoop 的比较
上节课学了 Hadoop,了解到了定型的框架是如何处理的,但是 Hadoop 属于一种框架,总的来讲属于一种批处理方式,因为处理的方式可以看这张原始图
实际上输入的数据已经在最左侧了,是因为数据太大,在实际的处理过程中会被切分成很多不同的小块,但是总体数据已经在这里了,看到的是文件
所以将它切开以后,每一个 map 处理一部分数据,Reduce 也是同样的,这里画的是一个,实际上可能有两个,两个reduce 也是各处理一部分数据,所以在前面看到的 Hadoop 这种 map reduce 框架之下,处理的数据全部在里面,一开始都准备好了,以块儿的方式切开再进行处理,所以说它是批处理方式。与它相对应的是 storm 这样的框架,Storm 眼里看到的数据是没有边界的,无穷无尽一直在过来像流一样的数据,
举例:Storm 本身是从推特衍生出来的,在推特里面会发许多推文,推文时时刻刻都在发,有大量的人都在发推文,如果这时想做一个处理,分析推文中在推特的人的情绪,比如现在新冠肺炎,每天都有很多的新闻,大家看到这些新闻后都会在推特上发推描述态度,如果想做这样的处理会发现与 Hadoop 里所有的数据已经具备了放在那里是文件不同,Storm 是数据像流一样,不断的产生不断的有新数据进来,要不断的进行处理,在这种情形下,用 hadoop 是不可以的,Hadoop 是所有数据具备以后直接切块再去做,现在的情况是数据源源不断的更新再做处理。
再举一个例子,在学校的十字路口安装了很多的摄像头,想知道汽车是否超速,这时可以看到有大量的视频监控数据出现,要不断的进行处理,可以后台有一些机器有分布式的集群在不断的做实时处理,要及时的发现视频中超速的汽车,这种情况也不是 Hadoop map reduce 批处理方式可以进行处理的,所以 storm 提出来的处理的事务是由推特开发出来的,所以他能够处理的是向推文这样的流式数据,这是 storm 与 Hadoop 最大的差异。有这样的特性也可以看到同样是在分布式集群里面,数据流不断的过来,在不断的做实时处理,所以看起来效率蛮高的,适合做实时分析,在线的机器学习,连续性持续型计算,分布式的远程方法调用,数据清洗等等,这是 storm 的结构。
2.Storm 特性
因为用了分布式系统,所以比较快,在分布式的系统里之前讲过有很多机器除了快,另外可扩展,可扩展在理想的情况下是一台机器可以处理100个用户,两台机器就可以处理两百个用户,但是实际上可能达不到,只能处理190个,系统如果开发的比较差,两台机器也可能只能服务于160个,Storm 的共性就比较好,因为在集训中所以容错,这是storm 的特性。
3.结构
结构是在处理中,Storm 也可以做 Word count,给 storm 发送一些推文,数一数里面的不同单词有多少个,在计算过程里面需要 top,他把计算定义成 top logy,Top 对应的是 Hadoop 里看到的 job,预先开始要定义好,里面会有数据源去抓取所有的推文,数据源叫做 spout,比较形象像喷嘴一样把数据喷出来,出来之后后面的任何一个处理都叫做bolt,Boss 要写针对过来的数据,些数据以元组的方式发过来一般是一个键值对,以大量元组的方式发过来,发过来以后 bolt 做数据的处理,例如先解析推文,推文里面包含很多的单词将其断开在做标点符号的去除,人称和时态的统一,比如第三人称单数和原型统一成一个词,处理完以后再继续传递的就是一些断开的词,所以仍然是 taple,在针对 taple 里面不同的k计算出现了多少条数据,这样就得到了统计的结果。过程与之前的 word count 一样,但是有两个区别,第一个区别是数据过来以后不是一次性给这么多数据都拿过来处理,处理不过来时进行切块,而是有源源不断的数据流进来,源源不断的在经历这个过程做统计,这边会有大量用户的推文,对这些推文做统计,例如按照用户类型做一段时间的统计发送所有推文的 word count,整个结构被称为计算的 top,Top 包含产生数据的数据源,和bolt做计算处理过程,与 Hadoop 的差异是没有明确的讲有什么 map 或者 reduce,在里面做数据处理的是bolt,这些bolt可以形成 top,Top 是一个完整的作业。在这里可以看到,如果输入的数据源非常多,在推特上非常多,也可以用一组集群做,例如第一个是中国的,第二个是美国的,第三个处理欧洲的用户,最后一个处理大洋洲的用户,他们各自产生的数据发送到后台,后台的bolt可以有多个实例分别去做处理,处理完成以后,还可以把处理结果发到后面又是一组集群统一单词数量去处理,在这个过程中与之前谈到的 map reduce 发给 map 的结果 reduce 去处理是一样的,不一定是一对一的关系,每一个 bolt 的结果可能一部分会给第一个 bolt 实例,一部分给第二个,下一个 bolt 实例,构成这样的情况,这样就有了群组的关系,这些被认为 group,统一在完成 parse tweet 的动作,产生的结果会发送给下一个 group,这是在 storm 中发生的
在 storm 中可以看到有 nimbus,nimbus 的角色就像前面看到的在 Hadoop 里面的 jobtracor,他在跟踪整个系统当前的状态包括硬件的状态以及任务,zookeeper 与前面看到的资源管理器一样的角色, 在管理后面一组机器,每一个机器上都包含 supervisor,最右边是其中的一个机器展开的样子,Supervisor 在管理真正物理节点上的运行的各种各样的 worker,展开看实际上里面会有 executors,executor 在真正执行任务,一个机器上可以有若干个 worker,他们可以并行的执行多个任务,这是 storm 一个大的结构。现在的东西叫做 top,Top 对应的东西是 map reduce 中的 job,他们作用是类似的,但是不一样的点是 top 处理的流数据,所以 top 在处理数据时是一直在处理的,不停,除非人为的停下来,而 map reduce 不一样,在接收一批数据时,针对输入进行处理,最终会停下来,这就是 map reduce和top的差异,所以 top messenger forever 是流处理,以为 Eventually finished 是批处理,在集群中 nimbus 类似于Hadoop中的 jobtracker,Master 是主节点,管理整个集群中的机器,要考虑如何分配资源,考虑如何把资源分配到各个机器,还要了解任务或节点是否有失败,在每一个物理节点,也就是 worker 节点上有一个守护县城 supervisor,Supervisor监听 nimbus 发配给这台机器的任务到底执行的怎么样,执行这些任务需要启动 worker 进程进行处理,关于他的启动和停止都是依靠 supervisor 处理,Supervisor 接收到 nimbus 给的任务,在这台机器机器上就启动相关的进程,不断的查看执行的效果怎样,实际上会将结果汇报给 nimbus,nimbus 是主导全局性的所有节点本身如何,以及任务执行的效果如何,真正的 worker Process,也就是 supervisor 启动的进程才是在执行任务,实际上其中执行的 top 只是部分,一个 bolt 或者两个 bolt,整个完整的 top 可能是在跨多个机器的多个 worker 进程中执行的,此机器执行一个 bolt,传递给下一个机器另外一个 bolt,是这样去执行的,这就是大家看到的结构。Top 是看到的计算,将所有的 bolt 和 sport 合 到一起化成 top 化成一张计算图,其中计算图就包含了整个计算的过程,计算图里的每一个节点都在执行一定的处理逻辑,这些节点之间的连接表示数据应该在这些节点中传递,就像上面看到的图一样,是从 port 传给 bolt,再传递给下一个 bolt,如果启动复杂带分支,会有相应的判断条件,是并行还是分支,要编写一个比较复杂的 top,整个storm 的核心是流,流是不加限定的,没有边界的元组序列,数据会源源不断的涌现,源源不断,首先强调的是没有边界,其次强调的是不是一起来的,在做处理时不能向Hadoop一样一开始就有一个已经放在里面的数据,可能是一直不断的来,一直有,而且可能单看数据量并不大,但是在一段时间里发送过来的所有的 tuple 合起来发现尺寸可能是大的,这就是我们能够看到的东西,每一个 bolt 做的事情是不断处理流将它转换成新的流,不断的转换不断的转换,最后就得到了转换的最终结果,就像在视频流中转换超速的不断地处理,把视频流最终映射成超速的人的帧测结果信息,这个信息本身也像流一样不断的出现。
用到的 storm 中产生数据源的是 spout 以及做数据转换的 bolt,它们之间最终要形成有项无环图,因为有环肯定是不对的,形成有向无环图,最终从 bolt 中产生输出,或者底下的 bolt 产生两种输出,上面的 spout 是视频流,下面的 spou t 是在不断的读取人员流,例如每一个人身上有 ID,在经过一些路口时可以扫描到这些人是谁,上面的 spout直接从视频中获取,下面的 bolt 出来的就是这些人路过这些路口时穿着,穿了些什么衣服,要获得视频以及人的 ID,这两个合起来就可以知道人的衣着了,视频传递过来以后,bolt 在抽取人的体型,另一边抽取它的宿主,最终产生的是它是不是超速这样的结果。在这里看到有两个 spout,产生两种不同输出的计算图,不论如何,必须是有向无环图,不能有环形的依赖,如果有循环依赖图肯定会处理。
spout 实际是数据流的来源,bolt 是在接受一些事务,可以只有一个 spout,,也可以接受两个 spout 语言,可以接受任何事务的数据流做一些处理,会产生一些新的流作为结果输出,这是 bolt 的结构。最终看到的整个所谓的spout里面的 storm 中的 top 就是由 spout 和 bolt 构成的网络,在里面不断的做执行,这就是 storm 中的情形。在做处理时
一个 spout 或者 bolt 本身处理的工作如果比较大的情况下,可以做一下群,这样会看到有一组实例共同完成 bolt指定动作,spout 在拿到数据以后要发给 A bolt 中所有的实例,同时要发给 C,A 在处理完成之后,将结果发给下一个bolt 的所有的实例,稍后再 spark 中这个是宽依赖,如果做得好还有一种情况是,每个 bolt 都会有相应的输出,一条一条相对应起来,每一个大概只有一个依赖,这样对应的是窄依赖,窄依赖显然性能会高一点,对设计要求也会高一些,所以将此合起来的原因是希望并行,处理很多任务时并行可以提高性能。
4.例子
public class ExclamationTopologyextends ConfigurableTopology{
public static void main(String[] args) throws Exception{
ConfigurableTopologystart(new ExclamationTopology()args);
}
protected int run(String[] args){
TopologyBuilder builder=newTopologyBuilder();
buildersetSpout("word"newTestWordSpout10);
builder.setBolt("exclaim1",new ExclamationBolt()3)shuffleGrouping("word);
buildersetBolt("exclaim2",newExclamationBolt()2)shuffleGrouping("exclaim1")
confsetDebug(true);
String topologyName="test";
confsetNumWorkers(3);
if(args!=null &&args.length >0){ topologyName=args[0];
}
return submit(topologyNameconfbuilder);
public static class ExclamationBolt extends BaseRichBolt{ OutputCollector collector;
@Override
public void prepare(Map<String,Object>confTopologyContext context
OutputCollector collector){ this.collector=collector
}
@Override
public void execute(Tuple tuple){
collectoremit(tuple,new Values(tuplegetString(0)+"!!!")) collectorack(tuple);}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer){ declarerdeclare(newFields("word"));
}
}
}
这个是在 storm 官网给出的例子,实际上这个例子大家看到后面是由很多的代码,这里只是举了其中一个,如果要跑这个测试配置的都没有问题,会是正确的。今天讲解一下这个例子,刚才的 top 是什么意思,在 storm 中,如果自己想写 top,要写像 Hadoop 中的 job,有一个叫做 ConfigurableTopology 这样的类去扩展它,扩展它的目的是要调用此类中的 start 方法,将当前对象跑起来,new 对象出来然后跑一下,这样 start 就跑起来了,跑的时候很显然是跑了一个线程,线程在前面讲解多线程时谈到过,线程中有一个很重要的方法 run,进行了封装,可以实现逻辑,前面讲所有的Topology 都是一个 top,要去建立这个 top,所以引入 Topology builder 对象建立 top,使用 T opology builder 相应接口的 setspout 或者 setbolt 方法将前面图表达的含义表示出来,在这个我代码中可以看到,先是 setspout,spout 是数据的源,先说明源在哪里,新创建一个源,源的名字叫做 Word,newTestWordSpout 有10个,也就是刚刚讲的实例,下面设置 bolt,bolt 的名字是 exclaim1,创建 bolt 对象有三个实例,这三个和十个的关系是十个都要传输给三个,如果是要用 shuffleGrouping 说清楚前面接的是谁,这里接的是 Word,与这个到底类似,底下仍然是设置 bolt,名字是exclaim2,接在 exclaim1的后面,所以现在看到的是有一个 spout 后面跟了两 个 bolt 这样的结构,刚才是要写多少实例,底下是最终把作业提交给 nimbus,整个要做一个设置,总共有多少个 worker 机器在里面跑,模拟其中有多少进程在跑,最后讲述刚才创建出来的 Topology 按照 conf 配置给整个 Topology 起名字为 test,整个丢给 storm 去执行,使用的是 submit,然后就去跑了,跑了之后 spout 是 newTestWordSpout 实际上它可能比较简单只是读一下文本,就不深入了解了。ExclamationBolt 拿到元组究竟做什么,在底下写一下,bolt 可以扩展 BaseRichBolt 系统中提供的 bolt,就是 storm 自己携带的 bolt,bolt 有一些商用方法,比如在 prepare 中在做相应的一些设置,设置 OutputCollector,关键是 execute,输入过来的是一个一个 tuple,tuple 进入以后会产生一个新的 tuple,拿到新的 tuple 要将值替换一下,值是原来 tuple 里面索引为0的字符串,拿出来以后加上三个叹号,因为e xclamation 就是叹号的意思,将值取出后加三个叹号作为新的值发回去,发过去以后产生了新的 tuple,用产生的新的值替换t uple 里的值,然后再将其发回去,原原本本发回去,这样 tuple 中的内容就被改写了,经过两个 ExclamationBolt 的处理之后,原来从 spout 传入的 tuple 里面包含的单词每一个都会加六个感叹号,这就是 bolt 的作用。
为了跑这个例子,需要先启动nimbus,再启动 supervisor 要在里面做一些配置,storm 在下载完成以后要做一些配置,做配置实际上是在跑时 nimbus 依靠的是 paper,要跑在127.0.0.1中,Supervisor 在里面进行监听,每一个worker machine 上都会有有一个 supervisor,在一台机器上跑端口在那些地方,于是就进行配置处理,会模拟出有若干个 worker1,worker machine,然后将刚才讲解的spout或者bolt分布到这些模拟出来的worker machine上去跑,包括其中有多少个实例就放在上面。
这是跑起来的例子,实际上这个工程中的东西是非常多的,里面其实有很多的 Topology 只是选了其中一个比较容易理解的为大家讲解,其他的大家可以自己去看,这个东西本身可以跑,底下有几个 test 文件可以跑一下,跑之前 storm 要启动起来,启动时要先跑 nimbus,nimbus 跑起来相当于 master 跑起来了,然后跑storm,跑 storm 要跑supervisor,启动以后在端口都可以接收请求,可以跑一个 test 试试看,如果所有东西都配置的正确,跑完没有问题就会说所有示例都通过,就能够得到效果,具体 T opology 写法是举的那一个例子,代码与刚才的一样,创建一个spout,创建两个 bolt,通过描述放在一起构成一个完整的 Topology,然后就可以放入 storm 中去跑了,这个例子告诉大家写代码其实和前面没有差太多,代码的主要内容
buildersetSpout("word"newTestWordSpout10);
builder.setBolt("exclaim1",new ExclamationBolt()3)shuffleGrouping("word);
buildersetBolt("exclaim2",newExclamationBolt()2)shuffleGrouping("exclaim1")
是这里,知道是如何写Topology,怎么由一组spout或者bolt连起来,跑这个例子在这台机器看可能没有太大的优势,因为在一台机器上做,而且例子比较简单,如果有很复杂像视频数据要在集群中跑的时候,才能看出它的优势
这是刚刚跑完大家能够看到的,会看到这样的信息,能去跑每一个 Topology 可以知道里面是什么样子的,但是一定要先启动 spout 和 spark 以及 nimbus。
二、spark
1.原理
刚才看到的是 storm 处理流式数据,Hadoop 处理批处理数据,spark 最早是加州大学 amp 研发的,经过演化慢慢变成了由 Apache 项目之一,在前面的处理中可以看到例如 Hadoop 中,有一个问题,Hadoop 所有的输入都来自于硬盘,在硬盘上包括之前讲如果map处理完,要将结果放到某一个地方,reduce 继续处理,这个地方也在硬盘,会频繁的借助硬盘实现它们的逻辑,而硬盘涉及到计算机I/O操作,这个I/O操作在计算机里面在蓝桥上挂着,速度比北桥慢很多,所以马上会想到如果借助于硬盘做这件事,看起来效率不太高,能不能快一点,想要快就不是硬盘这种I/O,要速度比较快的 I/O,想到的是内存,spark 的想法是应该大量的使用内存,在内存中进行操作,不要把数据都写入硬盘中,包括中间结果,全部都在内存里不要在硬盘上,内存的 IO 速度与硬盘相比至少是数量级的提升,所以速度会快,当然前提是内存的数量,尺寸要大,充分利用内存,spark 除了这点之外,对外支持的语言比较多,但是本身使用Scala 写,Scala 语言比较简洁,写出同样功能的代码比 Java 短,执行效率会高一些,另外一个更重要的是支持函数式编程,函数式编程的意义是写代码以后处理都应该是函数式的,函数式是例如 y=f(x),当给自变量 x 传递一个值时,会计算出来y,但是数学上的函数无论怎么计算得到 y,x 是不变的,x 传进来的值是不会发生变化的,整个计算过程当中不会产生任何变化,Java 代码中肯定不是这样的,不能保证一定都这样,要想做到这一点要依靠自己实现,例如f(x)这个方法,是 int 类型的,传递 int,x=x+1,于是改变了 x 的值,而 Java 中默认的索引X发生变化,函数式编程就不会存在这一点,变量的值是不会发生变化的,反过来讲就是在看到的 Scala 编 写的这种语言中,当定义一个变量之后,实际上在函数进行处理时本身不会发生变化,要想改变X值,要取 x=f(x)这种方式,但是这种方式显然并不好,因为如果改变了x的值,例如x值是一个数组,如果x值不发生变化,可以共享,比如x指首元素位置,里面放的是01234,x 表示的是自然数,y 表示的是正整数,这就不需要再开一个空间去存储123456789了,y指向数组的第一个位置,从一开始,z表示大于二的整数,从一二之间开始,大家可以共享一片空间,又因为 xyz 都不会发生变化,所以其他用到这不用担心y使用时会不会将后面值都改掉了,导致x值也发生变化,满足不会发生变化这个条件就是函数式编程的优点,有了它以后想做这件事情必须使 y=f(x),换句话来讲,当有一个x放到这里,希望对他进行操作修改产生结果的时候不能修改x,例如 x+1,会在内存中重新生产1234,把原来的重新处理一下,x 还是 x,下面赋给 y,这是它的基本原理,就是用 Scala 来写,是函数式编程中很重要的一个特征,spark 首先是函数式编程用 Scala 来写的,效率比较高,而且在内存中因为是函数式编程,所以带来一些额外的好处,其次在充分利用内存,不像 Hadoop 必须依靠硬盘做处理,大量的使用内存,所以速度会变快,进一步变快。效率高,速度快,就是大家现在看到的肯定大规模分布式集群计算快,都在基于它做扩展。
2.简介
要求在内存中做,本质上是内存计算的东西,有多样的用途,Sparta core 支持在内存里做类似于 map reduce 这样的批处理的动作
在此基础上还有 spark SQL 如果在内存中出现的是 SQL 这样结构化的数据,支持执行 SQL 语句,第二个是流式处理,流式处理与刚才的 storm 类似,其实它构建在类似 map reduce 也就是 Hadoop 之上这种批处理,因为从宏观看是流处理,从微观看一定是一个批处理,在一个很巧的时间段内拿到了一批数据去做处理,但是这个时间段比较小从宏观上看好像是数据在源源不断的再做处理,streaming 东西类似于批处理的东西,第三个是机器学习的库,第四个是处理图数据的库,由节点或者编构成的数据库,所以在 Hadoop 和 spark 上跑逻辑回归算法可以看到差异非常大
一个用硬盘一个用内存,两者实现的语言也不太一样,所以差异非常大,所以 spark 在很多实际看到的用户里面都在在用 spark
3.spark 和 Hadoop 的比较
spark 底层仍然可以构建在Hadoop之上,但是 spark 实际上再用内存,map reduce 没有充分利用这些内存,大量的全是硬盘操作,硬盘操作实际上在读进来,map 进行处理时,假设有10G,将1P的数据切成100块,每块都是10G,map 在进行处理时内存显然没有这么大全装内存,所以是读一块数据进来,就把它写到硬盘上,再读1G 在写硬盘,在读1G 处理之后再写到硬盘上,比如在统计10G 里面不同单词有多少个,所以读一G 处理结果写出来,读一G 处理结果写出来,写完十个为止,这十个是 spill 文件,相当于一碗水装满了溢出来,内存不够溢出来放到硬盘上,这十个文件放到这里之后,硬盘放满了之后说将文件七分以下切成两份,一份放到某一个地方,另一份放到另外一个地方,然后现在有两个 reduce 在读数据,所以大家可以看到在频繁的切,首先数据在硬盘上,输出以后也在硬盘上,中间处理的结果在大量的写入硬盘,所以效率非常低,但是如果这些东西都发生在内存里,速度就很快了,因为内存本身速度就快,再加上内存是线性的,不像硬盘是三维的,访问起来效率本来就比较差。
现在看一个 s park 官网给的例子,如果有一个日志文件,将他 log 在内存里,想在日志文件里过滤到不需要的东西,只把有关错误的消息拿出来,因为在前面开发时,大家也可以看到,如果跑 storm 或者 tomcat 这些应用,日志文件其实很长很长,包括系统自己写的,也包括编程时写进去的东西,但是只关心长的有错误的信息,假设有四台机器,构成了一个 spark 集群,集群中有一个作为主的,另外三个作为 worker 执行任务,首先写他的代码
lines=sparktextFile(“hdfs://..")
errors =lines.filter(_.startswith("ERROR)) messages = errors.map(_.split('\t’)(2)) cachedMsgs =messages.cache(
cachedMsgs.filter(_.contains(“foo")).count cachedMsgs.filter(_.contains(“bar")).count
代码是 spark Scala,看起来很像 JAVA,但是是Scala写的,意思是首先要加载文件进来,Spark 是系统提供的Spark对象,Text file 是读取文件方法,括号里面是文件的内容,是 url,底层仍然是可以基于分布式系统➕map reduce这样的框架,底下仍然是一个分布式文件系统,加载了一个文件,加载完以后到了内存里是一行一行的内容,文本文件中一行一行的内容就读取进来了,组成了 lines 这样的东西,lines 放在内存中,内存中所有的对象叫做 rdd 弹性分布式数据集,所以弹性分布式数据集可以简单理解为就一个内存中像 key value 组成的集合对一样,就是这样的一个数据集,但是有一个特点是不能修改的,在数据集 rdd 中里面有很多很多的操作,其中有 filter,Filter 是将 lines 里面符合标准的原组拿出来,将 key value 拿出来,标准是以 error 开头的那些行,拿出来以后 r dd 的基本原则是不能改,所以不能将 lines 改掉,过滤出来以后赋给 errors 这样的 rdd,这时相当于对 lines 的 rdd 做了一次转换,将其一部分内容过滤出来生成新的 errors rdd,对此 rdd 就可以做相应的操作了,例如大家熟悉的 map,这一行的消息对于 map 来讲不能整体处理,要将其断开,使用指定的字符断开,断开以后的结果同样要放到新的rdd里,messages rdd,现在总共有三个 rdd,Message 里面其实是断开以后,是以error开头的,所以断开以后第一个就是 error,第二个是在error后面跟着的东西,将其取出来,到 messenger 中去,这个 message 可能不断的去使用,因为前面三行代码全部都在内存里,内存不够了,可能会删掉一些内容,Spark 源里面可能会把某些东西写入硬盘中,但是 message 是后面计算会频繁使用的,不应该将其写入硬盘上,如果因为内存不够 spark 将其写在硬盘上,操作系统的原理就是内存不够写入硬盘,Spark 也是一样的,如果内存不够,将 messenger 写到硬盘上内存空出来就没有意义了,所以会告诉 spark 无论如何都要将 message 写入内存上,要 cache 住,然后看到了 cache 住以后的 rdd 一定在内存里,之后对其再进行操作过滤,spark 想要的东西计算其数量,一旦 count 会在多彩机器上做并行操作,cache messages 这个 rdd 可能很大,分了三块儿,在三台机器上,分别执行 count 这个动作,然后将结果返回给 driver,Driver 最后拿到统计的结果,
这三块其实都被 cache 住了,都在这个集群的内存中 cache 了,所以是在内存中发生的,一定会保证是在内存中执行的操作,如果再执行一次过滤再计算,仍然是执行相同的动作,但是会在 cache 住的内存里的rdd中进行操作,就没有在硬盘中 load 数据了,在维基百科中做全文搜索,速度可以小于一 sec。