开发者学堂课程【高校精品课-上海交通大学-企业级应用体系架构:】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/75/detail/15844
storm&spark2
内容介绍
一、spark
二、Spark 程序流程示例
一、spark
1.原理
lines=sparktextFile(“hdfs://..")
errors =lines.filter(_.startswith("ERROR)) messages = errors.map(_.split('\t’)(2)) cachedMsgs =messages.cache(
cachedMsgs.filter(_.contains(“foo")).count
cachedMsgs.filter(_.contains(“bar")).count
上节课讲的例子是 spark 官网上的例子,用简单的几行代码告诉大家 Spark 的工作原理,cache 一定要解释一下,内存总的是有大小的,不可能无限制的把所有的案例全部放进去,所以当内存满的时候,会将内存里的东西写到硬盘上,需要时在读取回来,就像之前讲解的绘画状态一样,管理用的也是这种方式,但是这种 messenges 已经过滤出来的是想要的东西,不希望被写到硬盘,因为后面可能在频繁的使用,不希望被放入硬盘中,这样速度会慢,所以需要 cache,cache 动作就是像钉子一样钉到内存里,但是所有的 rdd 都是不能修改的,一旦创建出来不能修改,所以cache 成了另外一个 rdd,这就是大家看到的 spark 原理。
真正在执行的时候就可以看见在做操作时,因为有大量磁盘 io 操作,看到的 hdfs 会比较差,但是在 hdfs 中 因为有分布是存储,所以增加了副本提高容错性,为了容错必须有副本必须有磁盘 io,但是这样在 spark 世界中就会显得比较低效,所以才会说大家在内存中处理不要写入硬盘中,将所有的东西写入内存中,内存的速度快,所以比 Hadoop快,这是他的基本原理
2.spark 架构
工作时知道了前面的 Hadoop 以及刚才讲解的 storm,就可以知道了,大体的工作原理一样,首先要有驱动程序,编程时需 spark contest,还会有像 Hadoop中的 job tractor,或者 storm 中的 master 也就是 nimbus 节点,Spark 有cluster manager,后面有很多个 worker 节点,买个我可节点上包含一个 executor 执行若干个任务,所有的 rdd 都在内存中处理,这是 spark 的工作方式。在执行时有一个 manager 加一组 worker node,worker node 表示的是一个机器,机器之间是可以并行的,一个机器上可以跑多个 executor 进程,多个进程可以同时跑在多个盒上,每一个进程上有多个线程,也就是 task,这些 task 在并行的跑,在多个机器上面,每一个机器上有多个进程,多个进程有多个线程这样做并行,而且所有的操作都发生在内存中,速度非常快。
3.spark 运行模式
在跑的时候如果为大家演示,只能演示单机版,Master 和 slaves 在一台机器上,如果多台机器底下实际上是构建在Hadoop 上面,可以用一样的,可以用另一种方法 Mesos,这种方式不进行讲解,因为他被很多框架替换了,例如k8s(全称是kubernetes)代替了 docker,K8 s 很流行,很多企业都在用,一般 spark 也可以构建在K8 s上面,是现在更多的选择,扩展的东西非常多,不进行详细讲解。
4.spark rdd
spark 刚才可以看到所有的数据都在内存中,以弹性分布式数据集这种方式存在,是所有对象聚集在一起的一个不可变的分布式集合,不可变的原因在前面已经讲解了,因为是 Scala 语言写的,Scala 本身是函数式编程,在 y=f(x)的情况下x不应该发生变化,所以是不可变的,rdd可能很大,比如装进了整个数据集,在一台机器上放不下,整个数据集合起来可能有一个 t,每个可以被分割成小的部分,做分区的动作,每个分区可以分配到不同的节点上,这样在集群上去部署,rdd 的产生方式一种是由刚才通过加载外部数据集,自动读取进来就是 rdd,另外一种方式是在驱动程序中可能产生一个对象集合,使用 list 或者 set 方法,还有一种方法是刚才写的更多的,通过 lines 产生了其他的 rdd,通过转换,
lines=sparktextFile(“hdfs://..")
errors =lines.filter(_.startswith("ERROR)) messages = errors.map(_.split('\t’)(2)) cachedMsgs =messages.cache(
cachedMsgs.filter(_.contains(“foo")).count
cachedMsgs.filter(_.contains(“bar")).count
例如做过滤产生了 Erros,Error 做 map 产生 message,所以会不断的创建 rdd,其实创建 rdd 就是对某一个 rdd 在做转换生成新的 rdd,转换的目的是对一个rdd进行操作获得一个新的 rdd,刚才看到的 filter map union 都属于这一类,之前讲解过前两个,第三个还没有讲解到,这些已经对 rdd 转换得到了新的 rdd,如果得到的不是一个新的 rdd 而是值,单一的一个返回值要将其返回给提交任务的驱动程序或者写入到外部存储系统中的文件,这种操作不叫转换,叫做动作,例如 reduce,Reduce 是叫 map 的结果,将 rdd 中所有的内容最终转换成一个结果,Count 数 r dd 里面有多少值等等,简单讲对 rdd 可以进行操作,操作分两种,一种是产生了另外一个 rdd 操作这种操作称之为转换,另外一种是产生非 rdd 结果,比如一个值或者是一组简单的值,反正不再是一个 rdd,这个结果拿过来以后要给别的程序使用,所以是动作。rdd 只能被 spark 处理,不能被其他程序处理,所以转换是发生在中间的,Action 指的最终的是面向用户的返给action action 能识别的结果,在
lines=sparktextFile(“hdfs://..")
errors =lines.filter(_.startswith("ERROR)) messages = errors.map(_.split('\t’)(2)) cachedMsgs =messages.cache(
cachedMsgs.filter(_.contains(“foo")).count
cachedMsgs.filter(_.contains(“bar")).count
此程序里面,Filter 不会马上生成 error,一直没有生成,直到倒数第二个 count 才生成,想得到 count 要在第二行代码上实现,以此类推最终才到 lines 上面去做 Filter 动作,然后做相应的处理,动画的顺序执行到 count 时,才将任务分发到三台机器上,才真正在硬盘的 block 上读取数据,然后再返回结果,内容被 cache 住,不是按顺序执行,到 count 处才执行,而是要执行 count 在回推,才开始从 lines 底下执行,第二次执行 count 时才真正发生在 cache 中,这个操作的逻辑说明整个计算过程是一种 lazy calculate,就像在讲 Hibernate 称它为 lazy load 一样,数据没有马上被加载而是在访问时才进行加载,对 rdd 所有的变换,变换是指在spark中间进行的所有的操作,只有 action 是最终给用户返回的结果,所以不会立即执行,只有当执行动作时,例如执行count动作,需要给用户返回值,才向前进行追溯,将之前中间的能得到 count 的依赖的所有 rdd 依次一层一层向前推推到最前面,才去做所有的变换,这样做的好处是想加载的时候马上就加载,万一后面隔了一段很长时间才要求在上面做动作返回给用户值,前面做的动作就白费了,至少在很早的时候就已经占用内存了,显得不高效,所以借鉴的类似 于 hibernate 里面 lazy load 机制,做了 lazy calculate,以这种方式执行,在 lazy load 中真正要 get 属性时才会进行加载,在 lazy calculate 中当真正执行动作时,也就是要返回给驱动程序一个结果是才回向前倒推看哪一个变化去做,这就是 lazy calculate,主要的原因是尽量节约内存,当不得已必须执行计算时才真正的占用内存,在此之前不占用内存,所以内存使用效率显得非常高,刚才所讲的map属于变换中的一种,因为 map 出来的结果不能够直接返回,会产生rdd数据属于中间状态,Reduce之后才可以返回,map动作里执行的操作是自己写的,最终是要产生新的 rdd
nums = sc.parallelize([1, 2, 3,4])
squared = nums.map(lambda x: x * x).collect()
这里的rdd包含了四个值,使用的 lambda表达式,对x值进行计算,X 的平方,将他们的结果放到集合类里,返回squared 这样的 rdd,这就是 map
对应的还有一个 flat map,买个元素产生的不是一一对应的x的平方,而是要产生多个输出元素
lines = sc.parallelize(["hello world", "hi"])
words = lines.flatMap(lambda line: line.split(" "))
例如这里有两个字符串,对于 lines 里的每个元素起一个名字叫做 line,对 line 用空格断开,断开以后会产生一个结果,但是不想变成一个像二维数组的东西
不想变成上面这样的,而是想得到下面这样的,需要使用 flat map,Flat 是摊平的意思,将其断开之后将大家都摊平在一起,一个一维数组样式的 rdd。
Filter 是在做过滤,想知道里面满足要求的都有什么
inputRDD = sc.textFile("log.txt")
errorsRDD = inputRDD.filter(lamda x: "error" in x)
warningsRDD = inputRDD.filter(lamda x: "warning" in x)
前面是输入的 rdd,对于输入 rdd 里面的每一行,每一个元素过滤里面包含 error 出来放到一 个 rdd 中,包含warning的放入一个 rdd 就叫过滤。
Union 是前面没讲过的,Union 是将两个 rdd 合起来
badLinesRDD = errorsRDD.union(warningsRDD)
前面两个 rdd 通过其中一个使用 union 传递另外一个进去,就产生了一个新的 rdd
比如这个将两个合起来
除了转换真正执行动作时才去执行,rdd 的第一个动作是 reduce
rdd = sc.parallelize([1, 2, 3,4])
sum = rdd.reduce(lambda x, y: x+ y)
这个 reduce 的意思是对于这个 rdd 里面的元素 x 和 y,计算一下 x+y,这是 lambda 表达式,含义是将里面的1234全都加起来,得到一个值,现在得到的这个值不再是 rdd,只是一个单一的值,所以 action 的结果是要给人或者交给程序看,不是在给 spark 看是给提交作业的程序看,是想返回给其他代码的结果,当 sum 执行到 reduce 动作时,才向前回推 reduce 在 rdd 上做,rdd 又是什么,才执行前面的代码,依次类推。
Count 是刚才讲过的,是用来计算数量的,计算 rdd 里面元素的数量。
二、Spark 程序流程示例
1.流程
首先从外部数据创建一些作为输入的 rdd,肯定要用一些变换定义出新的 rdd,其中有一些 rdd 是你认为必须要被cache住的,所以就将其做 cache,最终通过一些动作执行出结果,返回给用户,大体上是这样的逻辑,在执行操作时,因为 rdd 可能在多个集群上执行,操作与 map reduce 相类似,也有 shuffle 动作
每一个 map 在执行时都会把数据按照指定的红色放在相应位置,色放在相应位置,绿色也放在相应的位置,分开放,这是在 shuffle write 上,每一个 map 会按照一定的逻辑将其产生的结果放到三个不同的地方,Reuse 是到三个不同的地方读取其要处理的数据,叫做 Shuffle fetch,第一个 reduce 只读红颜色,第二个只读黄颜色,第三个只读绿颜色,这样就得到了一部分数据,前面讲的Hadoop中的 spill 动作,Spill 是这里看到的每一个小块,有多少个 map 分为多少块,有多少个map有多少个task去读,整个出来的小块儿就是 spill 文件,是 N×K 个,对于 Hadoop 来说所有的 spell 文件在一排,对于像spark这样的是存在内存里的,所以这些东西越多越复杂,Spark 这种效率就会显得越高。
2.举例
现在举一个稍微细节一点的例子,假设内存中持有一个非常大的用户信息表,其中包含用户 ID 和一些其他的相关信息,信息里面包含例如订阅的主题列表,一些新闻,就像前面讲的消息队列方式一样,包含了所有订阅的主题列表,这些用户肯定会不断的修改订阅信息,还有一个很小的表,只记录过去五分钟里在网页上点击过链接的事件,就会有用户 ID 和 link info,每隔五分钟就将这个小的表并到大的表里面,要做这样的事情操作方式是一种数据在大的 rdd里面,不做分区
不是分区用户的数据在最左面,右面是很小的数据表,在做处理时,因为不做分区,有可能用户信息在第一块,在五分钟,可能右面的数据表都进入了,这五分钟看了很多网页,这两部分在做交应时中间的结果会把左面右面每一块数据都拿到,左边所有的用户数据和右面的事件形成关联,例如中间处理从 a 到 g 些人的信息,要从 a 到 g 拿到所有用户的信息,再从事件中拿将其合并起来,如果带分区就把分区按照一定的逻辑处理,比如按照某种哈希方式处理,未来在合并时就会利用这些信息使合并更高效
val sc = new SparkContext(...)
val userData = sc.sequenceFile UserID, UserInfol("hdfs://...").partitionBy(new HashPartitioner(100)) // Create 100 partitions.persist()
创建一个 spark context,再创建一个顺序化的序列化文件,里面包含 user ID,user info 这种大的文件,大的 rdd,信息在分布式文件系统中,都系进来的时候组成 user date 这样的 rdd,组成的 rdd 很大,要将其进行分区,按照哈希的方式分区分为100块,Persist 是用来置换的,这样进来以后数据会按照哈希的方式分成100块,将来在做合并时可以这样做
大家注意,这张图与不进行分区时,左面不一样,数据不是在哪里都有,已经约定好按照哈西的方式未来在处理时一个一个的过来,这就是所谓的窄依赖,窄依赖是指父 rdd 的每个分区只被子 rdd 的一个分区所使用,宽依赖是指父rdd的每个分区可能被多个子 rdd 分区所使用
从初中看左边就是窄依赖,右面是宽依赖,窄依赖上边进行的操作 Map filterUnion 都是窄依赖,如果 rdd 实例两个rdd 表在进行合并,但是合并基础是这两个表都用刚才提到的某一种哈希算法做了分区,可以说只有固定的区才会合并,合并完以后的结果也在其中,不涉及到其它块,因为是按照哈希做的,所以有关的相同数据只会出现在相应的两个区里面,合并出来的结果一定在相应位置。如果没有两个相同算法去做分区做 join,那就需要把左边所有的数据合起来去做合并,这就是窄依赖和宽依赖。如果做宽依赖意味着很多节点的数据传输,rdd 分区数据丢失时,spark 会对数据进行重新计算,但是如果是窄依赖会发现,因为每一个子分区 rdd 只对应前面 rdd 的一个分区,所以前面 rdd 的分区丢失,只需要根据前面的窄依赖,例如右面第一个数据丢失,只需要它对应的左面第一个数据进行处理,但是如果是右面的宽一来丢失,要对三个都进行处理,这是宽依赖和窄依赖之间的差异。rdd 在计算时做转换或者动作时
在刚才看到的这些处理中,蓝颜色表示的是一般的 rdd,黑颜色表示的是已经被 cache 住的 rdd,现在可以看到如果要计算 g,要去计算 b,b 要计算一,但是 B 是被 cache 住的,所以之前已经有了,G 的结果丢掉了要重新恢复,从上面看需要计算 b,但是 b 已经被 cache 住了,所以不需要依赖于 a,已经在上一次依赖 a 的时候计算出的结果被cache住了,所以不需要依赖于 a,如果要重新计算 g 对于上面这条通路讲到 b 已经结束了,直接拿 b 缓存出来的结果就可以了,不需要重新计算 a。下面这条通路f需要重新计算,F 向前地也要重新计算,一也要重新计算,但是下面一条不用重新计算,所谓的 stage 就相当于在一个很大的 top 中在计算时是如何计算的,Stage a 在 stage 1里面进行转换,一旦完成就进入后面 b,B 被 cache 住,所以 ab 之间就隔离了,未来如果b想恢复结果,直接从 cache 里拿结果就可以,要恢复g必须有b和前面的 f,f 都没有被缓存,一直推到 c 都要计算,从计算的角度看,Stage 二从 cdef 要一起计算才能继续进行处理,一起计算完成以后要等 b 给 b,g 要拿到 b 和 f 才能去计算,所以计算分为三个部分,第一部分是stage a,第二部分是 stage 2,最后拿到 b 和 f 以后计算 g,也就是 stage 3,如果这样划分,每个阶段stage内部尽可能多的包含一组具有窄依赖关系比较好,因为要一起进行计算,所以 stage 2是窄依赖,D 和 f 以及 e 和 f 都是窄依赖,A 和 b 不是依赖,是宽依赖,宽依赖在两个 stage 里面执行,并且宽依赖一旦执行完操作会比较耗时,要将其 cache 住,这是spark 分了 stage 的原理,可以理解为 stage 相当于计算,到哪里可以停一下,像一句话打一个逗号,总共有几句。
3.spark SQL
在整个 spark 看到的都是一些 rdd,但是也可以执行 SQL 语句,执行 SQL 语句是因为 spark 中可以处理结构化数据的模块,结构化数据包括数据库表,以及 data frame 对象,如果学过 Python 会更好的理解 data frame,他们都是二维表格类型的数据,所以是结构化数据,对,结构化数据当然用结构化查询语言查询是非常高效的,Data frame 实际上是二维表格数据
只不过是在内存里的结构化的表格,rdd person 转化成 date frame 就是上图所示,每一个人都有姓名,年龄以及身高。这里要提一个概念,也许现在大家没什么感受,就是在数据库里面看到数据是如何存储的,可以回想一下数据结构时候所讲的数据是如何存储的,存储时可以有这样的方式
这个表有 abc 三行三列,在存储的时候,可以向上面这样按行存储,按行存储是在硬盘或者内存里,比如在spark中看到的是内存,在数据库中看到的是硬盘,在地址上先存一行,一行的内容是连续存储的,在存储第二行,这是按行存储,按行存储的好处是如果建立了索引要读取这一行数据时非常快,就按照内存读取或者按硬盘读取这么长一段内容出来,就是这一行的内容,在进行数据库处理时,特别适合于事务型的数据处理,事务是在表上访问一行或者几行加个锁,让别人不能改写,然后做处理,例如产生新的交易就是这种操作,事物本身就是读取一行,按行存储访问速度快,在硬盘或者内存里连续访问一个空间,把内存中的一行或硬盘中的一行全拿出来,但是不利于另外一个操作,另外一个操作是分析类的操作,联机分析处理或在线事务处理,olap 想知道一列信息,例如统计订单的总额是多少,或者订单的数量是多少,做类似于这种操作,如果这种操作在行式存储上会有问题,想得到 ae 在第一行读取,在到第二行读取,再到第三行读取,效率会很低,如果是列式存储,列式存储是将所有的列存在一起,所以 a1a2a3连续存储,b1b2b3连续存储,c1c2c3连续存储,如果是这样存储方式在 a 这一列算总数或者 count 是非常容易的,定位到这一列连续去读取就将所有的列读取出来的,这种读取方式比较适合列式的操作,适合这种分析式的操作,不一定哪一种方式一定好哪一种方式就一定差,适用的场景不一样,所以现在有人提出来是否能做一个混合存储,既有行式也有列式,或者有些数据是行式有些数据是列式,主要是看数据的访问类型是什么,例如阿里云上有一个合作项目,在做混合式存储研究,根据数据被访问的情况来决定数据应该是行式存储还是列式存储,Spark 里面推荐使用列式存储,就是底下这种方式,spark SQL 的写法特别适合列式存储,这是spark与关系型数据库行式存储有很大差异的地方,当表列较多时,可以从中挑选出所需的列,把相同的列取出来存在一起,这样存在一起还有一个附加的好处是方便编码和压缩,如果 a 是姓名,b 是年龄,C 是身高,做行式存储的时候很难压缩,但是做列式存储的时候就比较容易压缩了,例如年龄是18 18 18 18 19 19 19 20,就可以存第一列到第四列是18,第五列到第七列19,就不用存四个18,这样可以进行压缩,所以更容易压缩,如果横着看是张三18岁,身高1.8m,这样的一列数据无法进行压缩,所以列式存储的另外好处是方便进行压缩,在整个 spark 中,采用的数据存储方式是列式存储,为了支持 spark SQLData frame采用了列式存储
接下来查看如何操作,首先要做准备工作,创建 spark SQL contest,所有动作都是基于此的,与之前讲解数据库访问一样,对于 JDBC 关系型数据库要指定格式表的内存,用户名,密码,数据库,这样就真正连到了数据库上,然后load
sc = SparkContext()
sqlContext = SQLContext(sc)
df = sqlContext.read.format("jdbc")
.options(url = table["url"])\.options(user = table["user"])
.options(password = table["password"]).options(dbtable = table["dbname"]).load()
这张表里的数据就被加载到了 Data frame 里,如果 data frame 加载的不是数据库的东西,是其他东西,例如是个文件 CSV 文件,要说明路径在哪里,CSV 文件是用,或者空格分隔的,要不要自动识别schema,有没有文件头,指定分隔符是什么,漏就将其放在了 data frame 里
tmp = sqlContext.read.format("csv")\
.options(path = table["path"])\.options(inferschema = "true")\.options(header = "true")\.options(sep = ",")
df=tmp.load()
无论是 CSV 还是数据库里面的表,只要 load 进来变成date frame以后,对象的类型和存储的方式就一致了,除了这两种,还支持关系型数据库,hive,MongoDB/JSON/CSV/RDD/.….)将其加载为 frame 操作,一旦进来变成 data frame 大家就都是一样的了,漏的这种操作,无论是哪一种文件格式仍然是 lazy 的机制,也就是在这里不代表 data frame 中有东西,实际是用在 data frame 中,执行相应的操作时,执行某一种 action 时才真正跑过去加载数据,不要忘了,data frame 仍然是在内存里的一种对象,只要没有在上面进行执行 action,将其加载进来就是一种浪费,所以仍然 lazy 机制。这是 load 进来了,如果想写数据使用 write 方法指定格式
tmp = df.write.format("jdbc")\
.options(url = table["url"])\
.options(user = table["user"])\
.options(password = table["password"])\
.options(dbtable = table["dbname"])
tmp = tmp.mode("append")
tmp.save()
指定 URL,用户名,密码等等,还可以指定是要追加还是直接覆盖,这个 data frame 就被写了进去,模式包含追加,覆盖,忽略,如果数据已经存在了直接覆盖,与覆盖有些差异,覆盖会进行解释,在向下是如果数据已经存在了会抛一个异常,不允许覆盖也不会忽略它直接进行覆盖,会抛异常,默认情况下是使用 default case 方式,因为已有的数据直接覆盖掉是有问题的,因为默认是 default case,做追加要说明是追加的。
查询是最复杂的,因为有很多条件
# 假设我们封装了之前的 load 函数,获得 order 的 DataFrame order = load(sqlContext,conf,"order")
# 注册 order 的表名为 o
order.registerTempTable("o")
#设置 sq1语句
sql = "SELECT user, amount FROM O"
# 获取新的 DataFrame
result = sq1Context.sql(sq1)
#在控制台打印 DataFrame,此处,上述语句才真正得以执行 result.show()
查询看起来就像 SQL 的语句,Spark SQL 就与 hibernate SQL 一样基本上像 SQL 一样,按照 SQL 查询的对象是 Data frame,反复强调的是 data frame 的操作依然是 lazy 的机制,除非最后执行 action,要记住想要干什么,真正要做的动作是在后面才执行,不是一开始让做,例如要加载转换就做,只是相当于记录了动作,一直到执行 action 的时候,回溯表,真正要执行 action 时,应该做些什么,才开始去做,做完返回结果,反复要讲这样做的好处是节约 内存,内存使用率会提高,不需要返回结果时,压根儿不做任何操作不去占用内存,但要返回结果时,才向前进行操作,至少从让去加载和 action 进行处理真正加载中间过程这些内存都是没有被占用的,就可以充分的被其他程序利用,这就是spark 的工作原理,只是简单的介绍了其中一些的基本东西,spark不止这些东西,支持的语言很多,这样的 API可以到这三个地方去查看
Python
-http://spark.apache.org/docs/latest/api/python/index.html
Java
-http://spark.apache.org/docs/latest/api/java/index.html
Scala
-http://spark.apache.org/docs/latest/api/scala/index.html
这一部分内容讲的比较杂,有 Hadoop,storm,以及 spark,他们三者的关系是 Hadoop 相当于基础,处理批处理的东西,利用硬盘进行处理,Storm 处理流式的数据,但是微观上流式数据也不是进来一条处理一条,为了提高计算效率,可以认为是一个 mini batch,每隔一秒钟处理一下,在这一秒钟进入的所有数据当成mini batch进行处理,但是从宏观看会认为是流式处理,数据源源不断的进来进行处理,Swag从逻辑中支持map reduce,也有spark Streaming 流式处理,前面两个在 spark 中都可以实现,批处理和流处理都可以实现,在 spark 中操作完全是在内存中的,在频繁的使用内存也考虑了一下,内存中存放了所有的 rdd 不一定会放得下,所以当 rdd 在内存里导致内存使用没了以后也会写入到硬盘,但是此时是作为过渡,将数据持久化,在 spark 中会看到有 spark SQL 在进行处理结构化数据,Data frame 是按列存储数据的,总的来看 spark 支持前两个这种的批处理和流式操作,Spark 是在内存中处理的,效率比较高,底层实际是构建在 Hadoop 上,也可以不用 Hadoop,但是需要底层这样的组件,一般使用Hadoop,Spark 像一个集大成者,所以大家使用 spark 非常多,现在的例子在一个机器上很难跑,在实验室的服务器上跑了一个,如果要跑 spark,做的是 worker count 处理的结果是将文本处理完算出来每一个 word 出现多少次,文本非常大,所以处理结果会非常多,这个例子是在服务器上跑 docker,跑了几个 Hadoop 的 worker 节点,在用 worker节点跑 worker count
是对给定的输入目录做处理将其写出到输出目录里,写出的文件内容是 worker count 内容,本身看这个例子可能看不出什么,因为背后运行的东西看不见
JavaRDD<5tring> textFile=sc.textFile("hdfs://...");
JavaPairRDD<String, Integer>tounts=textFile
.flatMap(s -> Arrays.asList(s.split(""))iterator()).mapToPair(word -> new Tuple2<(word1)).reducebyker((a. b)->a+b): counts.saveAsTextFile("hdfs://...");
代码上面是 worker count 的逻辑,从 text file 中分布式文件系统中加载文件,生成了 rdd,因为是 JAVA 代码,所以生成的是 JAVArdd,对于文件要进行处理,第一步是 Flat map,Flat map 是将里面的内容处理完,全部变成一维数组,不要变成二维,用空格把里面的每一行全部断开,再将他们全部放入到一个数组里,这样相当于把整篇中的全部单词拿出来了,拿到以后产生了 map 对,产生了一些 tuple,里面遇到的每一个单词产生的单词唯一,出现了一次,然后 reduce by key,按照 key 去 reduce,如果这两个 key一样,就把他们的值加起来,不断的去 reduce 得到每一个 key 的总数是多少,按 key 来,如果后面跟的一个是 a,一个是 b,这两个 word 一样,就会将 a 和 b 加起来作为总数,最后出来的结果就是每一个单词出现了多少次,最后将其写入文本文件中得到结果,真正在跑的时候会指定 master 在哪里,要有一个 conf,要设置 conf 中的 master 是谁,就是刚刚配置的信息,也要包含输入信息,在这个目录中的文件做处理产生的文件写到 output 中,再跑一下 Wordcount,跑了代码就得到了刚才的逻辑
public class Testspark{
public static void main(String[] args){
Stringmaster="spark://172.18.0.2:7077"; SparkConfconf=newSparkConf); conf.setAppName("TestSpark"): conf.setMaster(master);
JavaSparkContext sc=newJavaSparkContext(conf);
string input ="hdfs://172.18.0.2:9000/example/big 1e0.txt". String output ="hdfs://172.18.0.2:9000/output2"; Operations.wordCount(sc,input, output);
这是三个分布式框架的各自差异,这里只是简单的介绍了皮毛,如果要深入了解,需要花很大的精力。