大数据生态圈简介
大数据生态圈可以分为7层,总的可以归纳为数据采集层、数据计算层和数据应用层。
spark
1.简介
spark是一种计算引擎,类似于hadoop架构下mapreduce,与mapreduce不同的是将计算的结果存入hdfs分布式文件系统。spark则是写入内存中,像mysql一样可以实现实时的计算,包括SQL查询。
spark不单单支持传统批量处理应用,更支持交互式查询、流式计算、机器学习、图计算等各种应用,
spark是由scala语言开发,具备python的接口,pyspark。
2.spark组件
spark包含着多个紧密集成的组件,如图所示:
2.1 spark core
实现spark基本功能,包含任务调度、内存管理、错误恢复、与存储系统交互等模块。
同时也包含对弹性分布式数据集(RDD),RDD表示分布在多个计算节点上可以并行操作的元素集合。
2.2 spark sql
spark sql用来操作结构化数据的程序包,我们可以使用sql或者hive语言来查询数据。
2.3 spark streaming
spark streaming上对实时数据进行流式计算的组件。例如:在网页服务日志,或者在网络服务中用户提交的状态更新组成的队列。
2.4 mlib
mlib提供机器学习功能程序库,提供多种机器学习算法
2.5 graphx
Graphx用来操作图,可以进行并行的图计算
2.6 集群管理器
Spark 设计为可以高效地在一个计算节点到数千个计算节点之间伸缩计
算。为了实现这样的要求,同时获得最大灵活性,Spark 支持在各种集群管理器。
搭建spark集群
- 步骤1:搭建hadoop单机和伪分布式环境
- 步骤2:构造分布式hadoop集群
- 步骤3:构造分布式spark集群
3.RDD编程
3.1RDD基础
实例1:读取外部数据集,并调用转化操作filter提取包含“python”的字符串,并调用first()行动,返回第一个包含python的字符串。
#初始化SparkContext import pyspark from pyspark import SparkContext,SparkConf #配置应用 conf=SparkConf().setMaster("local").setAppName("My App") #基于sparkconf创建一个sparkcontext sc=SparkContext(conf=conf) #读取外部数据 lines=sc.textFile("README.md") pythonlines=lines.filter(lambda line:"python" in line) pythonlines.first() out:u'## Interactive Python Shell'
实例2:spark的RDD会对每次行动进行重新计算,如果想复用同一个RDD,使用RDD.persist(),将RDD内容保存到内存中
pythonlines.persist pythonlines.count() pythonlines.first() out:u'## Interactive Python Shell'
3.2创建RDD
实例1:将程序中一个已有集合传递给SparkContext的parallelize()
#内部创建数据 lines=sc.parallelize(["pandas","i like pandas"]) #外部读取数据 lines=sc.textFile("/path/to/README.md")
3.3RDD操作
3.3.1 转化操作
实例1:假定有一个日志文件log.txt,内部含若干信息,希望提取出其中的错误信息
inputRDD=sc.textFile("log.txt") errorsRDD=inputRDD.filter(lambda x:"error" in x)
实例2:打印包含error或warning的行数
errorsRDD=inputRDD.filter(lambda x:"error" in x) warningsRDD=inputRDD.filter(lamdba x:"warning" in x) badlinesRDD=errorsRDD.union(warningsRDD)
3.3.2 行动操作
实例1:输出badlinesRDD的一些信息,count()返回计数结果,take()收集RDD部分元素,collect()获取整个RDD数据
print("Input had"+badlinesRDD.count()+"concerning lines") print("here are 10 examples:") for line in badlinesRDD.take(10): print line
3.4向spark传递函数
实例1:
#1 word=rdd.filter(lambda s:"error" in s) #2 def containserrors(s): return "error" in s word=rdd.filter(containserror)
实例2:
class wordfunctions(object): def getmatchesnoreference(self,rdd): query=self.query return rdd.filter(lambda x:query in x)
3.5常见转化操作和行动操作
3.5.1 基本RDD
map()和filter()
实例1:计算RDD中各值的平方
nums=sc.parallelize([1,2,3,4]) squared=nums.map(lambda x:x*x).collect() for num in squared: print "%i "(num)
实例2:使用flatMap()将行数据划分为单词
lines=sc.parallelize(["hello world","hi"]) words=lines.flatMap(lambda line:line.split(" ")) words.first()
其他转化操作:
集合操作
RDD笛卡儿积
转化操作列表
行动操作列表
4.键值对操作
4.1 创建Pair RDD
集合:(key,value)
pairs = lines.map(lambda x: (x.split(" ")[0], x)) • 1
对键值对集合{(1, 2), (3, 4), (3, 6)}为例
转化操作:
针对两个pair RDD的转化操作(rdd = {(1, 2), (3, 4), (3, 6)}other = {(3, 9)})
import os import pyspark from pyspark import SparkContext, SparkConf conf = SparkConf().setAppName("test_SamShare").setMaster("local[4]") sc = SparkContext(conf=conf) # 使用 parallelize方法直接实例化一个RDD rdd = sc.parallelize(range(1,11),4) # 这里的 4 指的是分区数量 rdd.take(100) # [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] """ ---------------------------------------------- Transform算子解析 ---------------------------------------------- """ # 以下的操作由于是Transform操作,因为我们需要在最后加上一个collect算子用来触发计算。 # 1. map: 和python差不多,map转换就是对每一个元素进行一个映射 rdd = sc.parallelize(range(1, 11), 4) rdd_map = rdd.map(lambda x: x*2) print("原始数据:", rdd.collect()) print("扩大2倍:", rdd_map.collect()) # 原始数据: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] # 扩大2倍: [2, 4, 6, 8, 10, 12, 14, 16, 18, 20] # 2. flatMap: 这个相比于map多一个flat(压平)操作,顾名思义就是要把高维的数组变成一维 rdd2 = sc.parallelize(["hello SamShare", "hello PySpark"]) print("原始数据:", rdd2.collect()) print("直接split之后的map结果:", rdd2.map(lambda x: x.split(" ")).collect()) print("直接split之后的flatMap结果:", rdd2.flatMap(lambda x: x.split(" ")).collect()) # 直接split之后的map结果: [['hello', 'SamShare'], ['hello', 'PySpark']] # 直接split之后的flatMap结果: ['hello', 'SamShare', 'hello', 'PySpark'] # 3. filter: 过滤数据 rdd = sc.parallelize(range(1, 11), 4) print("原始数据:", rdd.collect()) print("过滤奇数:", rdd.filter(lambda x: x % 2 == 0).collect()) # 原始数据: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] # 过滤奇数: [2, 4, 6, 8, 10] # 4. distinct: 去重元素 rdd = sc.parallelize([2, 2, 4, 8, 8, 8, 8, 16, 32, 32]) print("原始数据:", rdd.collect()) print("去重数据:", rdd.distinct().collect()) # 原始数据: [2, 2, 4, 8, 8, 8, 8, 16, 32, 32] # 去重数据: [4, 8, 16, 32, 2] # 5. reduceByKey: 根据key来映射数据 from operator import add rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) print("原始数据:", rdd.collect()) print("原始数据:", rdd.reduceByKey(add).collect()) # 原始数据: [('a', 1), ('b', 1), ('a', 1)] # 原始数据: [('b', 1), ('a', 2)] # 6. mapPartitions: 根据分区内的数据进行映射操作 rdd = sc.parallelize([1, 2, 3, 4], 2) def f(iterator): yield sum(iterator) print(rdd.collect()) print(rdd.mapPartitions(f).collect()) # [1, 2, 3, 4] # [3, 7] # 7. sortBy: 根据规则进行排序 tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)] print(sc.parallelize(tmp).sortBy(lambda x: x[0]).collect()) print(sc.parallelize(tmp).sortBy(lambda x: x[1]).collect()) # [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)] # [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)] # 8. subtract: 数据集相减, Return each value in self that is not contained in other. x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)]) y = sc.parallelize([("a", 3), ("c", None)]) print(sorted(x.subtract(y).collect())) # [('a', 1), ('b', 4), ('b', 5)] # 9. union: 合并两个RDD rdd = sc.parallelize([1, 1, 2, 3]) print(rdd.union(rdd).collect()) # [1, 1, 2, 3, 1, 1, 2, 3] # 10. interp: 取两个RDD的交集,同时有去重的功效 rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5, 2, 3]) rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8]) print(rdd1.interp(rdd2).collect()) # [1, 2, 3] # 11. cartesian: 生成笛卡尔积 rdd = sc.parallelize([1, 2]) print(sorted(rdd.cartesian(rdd).collect())) # [(1, 1), (1, 2), (2, 1), (2, 2)] # 12. zip: 拉链合并,需要两个RDD具有相同的长度以及分区数量 x = sc.parallelize(range(0, 5)) y = sc.parallelize(range(1000, 1005)) print(x.collect()) print(y.collect()) print(x.zip(y).collect()) # [0, 1, 2, 3, 4] # [1000, 1001, 1002, 1003, 1004] # [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)] # 13. zipWithIndex: 将RDD和一个从0开始的递增序列按照拉链方式连接。 rdd_name = sc.parallelize(["LiLei", "Hanmeimei", "Lily", "Lucy", "Ann", "Dachui", "RuHua"]) rdd_index = rdd_name.zipWithIndex() print(rdd_index.collect()) # [('LiLei', 0), ('Hanmeimei', 1), ('Lily', 2), ('Lucy', 3), ('Ann', 4), ('Dachui', 5), ('RuHua', 6)] # 14. groupByKey: 按照key来聚合数据 rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) print(rdd.collect()) print(sorted(rdd.groupByKey().mapValues(len).collect())) print(sorted(rdd.groupByKey().mapValues(list).collect())) # [('a', 1), ('b', 1), ('a', 1)] # [('a', 2), ('b', 1)] # [('a', [1, 1]), ('b', [1])] # 15. sortByKey: tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)] print(sc.parallelize(tmp).sortByKey(True, 1).collect()) # [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)] # 16. join: x = sc.parallelize([("a", 1), ("b", 4)]) y = sc.parallelize([("a", 2), ("a", 3)]) print(sorted(x.join(y).collect())) # [('a', (1, 2)), ('a', (1, 3))] # 17. leftOuterJoin/rightOuterJoin x = sc.parallelize([("a", 1), ("b", 4)]) y = sc.parallelize([("a", 2)]) print(sorted(x.leftOuterJoin(y).collect())) # [('a', (1, 2)), ('b', (4, None))] """ ---------------------------------------------- Action算子解析 ---------------------------------------------- """ # 1. collect: 指的是把数据都汇集到driver端,便于后续的操作 rdd = sc.parallelize(range(0, 5)) rdd_collect = rdd.collect() print(rdd_collect) # [0, 1, 2, 3, 4] # 2. first: 取第一个元素 sc.parallelize([2, 3, 4]).first() # 2 # 3. collectAsMap: 转换为dict,使用这个要注意了,不要对大数据用,不然全部载入到driver端会爆内存 m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap() m # {1: 2, 3: 4} # 4. reduce: 逐步对两个元素进行操作 rdd = sc.parallelize(range(10),5) print(rdd.reduce(lambda x,y:x+y)) # 45 # 5. countByKey/countByValue: rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) print(sorted(rdd.countByKey().items())) print(sorted(rdd.countByValue().items())) # [('a', 2), ('b', 1)] # [(('a', 1), 2), (('b', 1), 1)] # 6. take: 相当于取几个数据到driver端 rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) print(rdd.take(5)) # [('a', 1), ('b', 1), ('a', 1)] # 7. saveAsTextFile: 保存rdd成text文件到本地 text_file = "./data/rdd.txt" rdd = sc.parallelize(range(5)) rdd.saveAsTextFile(text_file) # 8. takeSample: 随机取数 rdd = sc.textFile("./test/data/hello_samshare.txt", 4) # 这里的 4 指的是分区数量 rdd_sample = rdd.takeSample(True, 2, 0) # withReplacement 参数1:代表是否是有放回抽样 rdd_sample # 9. foreach: 对每一个元素执行某种操作,不生成新的RDD rdd = sc.parallelize(range(10), 5) accum = sc.accumulator(0) rdd.foreach(lambda x: accum.add(x)) print(accum.value) # 45
5.数据读取与保存
spark支持很多种输入输出源,一部分原因spark本身基于hadoop生态圈而构建,特别说spark可以通过HadoopMapReduce所使用的InputFormat和OutputFormat接口访问。
5.1 文本文件
读取文本文件,保存文件
data=sc.textFile("file://home/README.md") data.saveAsTextFile(outputFile)
5.2 JSON文件
import json data=input.map(lambdax:json.loads(x)) data.filter(lambda x:x["lovesPandas"]).map(lambda x:json.dumps(x)).saveAsTextFile(outputFile)
5.3 逗号分隔值与制表符分隔值
import csv import StringIO def loadRecord(line): input=StringIO.stringIO(line) reader=csv.DictReader(input,fieldnames=["name","favouriteAnimal"]) return reader.next() input=sc.textFile(inputFile).map(loadRecord)