1:Spark中的Python和Scala的shell
2:Spark核心概念简介
3:独立应用
4:Spark数据集
一:Spark中的Python 和Scala 的shell
1:shell设置显示日志
进入Spark的安装目录,启动spark的集群,输入bin/pyspark,但此时会伴有大量的日志信息,在这里想要缩减启动信息的显示,可以调整日志的级别来控制输出的信息量,在conf目目录下创建log4j.properties的文件来管理系统日志设置,Spark开发者默认在conf下已经加入了日志设置的模板,为 log4l,proper.template,赋值一份修改为 log4j.properties找到
log4j.rootCategory=INFO,console
然后通过下面的设定降低日志的优先级,只显示警告和更严重的信息
log4j.rootCategory=WARN,console
2:利用python-shell进行行数统计
>>> lines = sc.textFile("file:///usr/local/hadoop/spark/README.md") #创建一个lines的RDD >>> lines.count() #统计RDD中元素的个数 95 >>> lines.first() #返回RDD中的第一个元素,也就是README.md的第一行 u'# Apache Spark' >>>需要注意的是这里的文件路径,如果是本地文件,需要加上 file:// 若是hdfs文件则路径为:hdfs://ip:9000/xxx/xxx
textFile的参数是一个path,这个path可以是:
1. 一个文件路径,这时候只装载指定的文件
2. 一个目录路径,这时候只装载指定目录下面的所有文件(不包括子目录下面的文件)
3. 压缩文件读取,如 textFile(”/my/directory/*.gz“) 注意必须是gzip压缩的
4. 通过通配符的形式加载多个文件或者加载多个目录下面的所有文件
第四点是一个使用小技巧,现在假设我的数据结构为先按天分区,再按小时分区的,在hdfs上的目录结构类似于:
/user/hdfs/input/dt=20130728/hr=00/
/user/hdfs/input/dt=20130728/hr=01/
...
/user/hdfs/input/dt=20130728/hr=23/
具体的数据都在hr等于某个时间的目录下面,现在我们要分析20130728这一天的数据,我们就必须把这个目录下面的所有hr=*的子目录下面的数据全部装载进RDD,于是我们可以这样写:sc.textFile("hdfs://n1:8020/user/hdfs/input/dt=20130728/hr=*/"),注意到hr=*,是一个模糊匹配的方式。
3:Scala进行函数统计
scala> val lines = sc.textFile("file:///usr/local/hadoop/spark/README.md") lines: org.apache.spark.rdd.RDD[String] = file:///usr/local/hadoop/spark/README.md MapPartitionsRDD[1] at textFile at <console>:27 scala> lines.count() res0: Long = 95 scala> lines.first() res1: String = # Apache Spark scala>
二:Spark核心概念简介
1:RDD(弹性分布数据集)
(1):从Hadoop文件系统(或与Hadoop兼容的其它存储系统)输入(例如HDFS)创建
(2):从父RDD转换得到新RDD
2:驱动器程序(driver program)
每个spark应用用驱动器程序发起集群上的各种并行操作,包含了应用的main函数,并定义了集群上的分布式数据集,还对这些分布式数据集应用了相关操作
驱动器程序通过一个sparkContext对象来访问Spark,这个对象代表对计算机集群的一个连接,shell在启动时就已经创建了一个SparkContext对象,是一个sc变量,一旦有了SparkContext就可以用它来创建RDD(弹性分布数据集)
3:执行器(executor)
在Worker Node上为某Application启动一个进程,该进程负责运行任务,并且负责将数据存在硬盘或者内存中;每个Application都有各自独立的executors;
4:Application
基于spark的用户程序,包含了一个Driver Program以及集群上中多个executor;
5:Cluster Manager
在集群上获取资源的外部服务。如:standalone、yarn、mesos;6:Worker Node
7:Job
8:Stage
9:Task
10:Partition
三:独立应用
1:Python中初始化Spark
from pyspark import SparkConf, SparkContext conf = SparkConf().setMaster("local").setAppName("My App") sc = SparkContext(conf = conf) #<span style="font-family:Microsoft YaHei;">以上为初始化sc</span> line = sc.textFile("file:///usr/local/hadoop/spark/README.md") print line.count() print line.first()执行:bin/spark-submit my_script.py
运行结果:
2:Scala中初始化Spark
import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.SparkContext_ val conf = new SparkConf().setMaster("local").setAppName("My App") val sc = new SparkContext(conf)
3:在Java中初始化Spark
import org.apache.spark.SparkConf import org.apache.spark.api.java.JavaSparkContext SparkConf conf = new SparkConf().setMaster("local").setAppName("My App") JavaSparkContext sc = new JavaSparkContext(conf)
四:Spark数据集
– Spark 可以将任何 hadoop 所支持存储资源转化成 RDD ,如本地文件、 HDFS 、 Cassandra 、 HBase, Amazon S3 等。
– Spark 支持 text files, SequenceFiles 和任何 Hadoop InputFormat 格式
●使用 textFile() 方法可以将本地文件或 HDFS 文件转换成 RDD
– 如果读取本地文件,各节点都要有该文件;或者使用网络共享文件
– 支持整个文件目录读取,如 textFile("/my/directory")
– 压缩文件读取,如 textFile("/my/directory/*.gz")
– 通配符文件读取,如 textFile("/my/directory/*.txt")
– textFile() 有可选的第二个参数 slice ,默认情况下,为每个 block 创建一个分片,用户也可以通过 slice 指定更多的分片,
但不能使用少于 block 数的分片
●使用 wholeTextFiles() 读取目录里面的小文件,返回 ( 文件名,内容 ) 对
●使用 sequenceFile[K,V]() 方法可以将 SequenceFile 转换成 RDD
●使用 hadoopRDD() 方法可以将其他任何 Hadoop 的输入类型转化成 RDD