Spark 的 Shell操作,核心概念,构建独立应用

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 1:Spark中的Python和Scala的shell 2:Spark核心概念简介 3:独立应用 4:Spark数据集 一:Spark中的Python 和Scala  的shell        1:shell设置显示日志        进入Spark的安装目录,启动spar...

1:Spark中的Python和Scala的shell

2:Spark核心概念简介

3:独立应用

4:Spark数据集

一:Spark中的Python 和Scala  的shell

       1:shell设置显示日志

       进入Spark的安装目录,启动spark的集群,输入bin/pyspark,但此时会伴有大量的日志信息,在这里想要缩减启动信息的显示,可以调整日志的级别来控制输出的信息量,在conf目目录下创建log4j.properties的文件来管理系统日志设置,Spark开发者默认在conf下已经加入了日志设置的模板,为 log4l,proper.template,赋值一份修改为 log4j.properties找到

       log4j.rootCategory=INFO,console

       然后通过下面的设定降低日志的优先级,只显示警告和更严重的信息

      log4j.rootCategory=WARN,console


      2:利用python-shell进行行数统计

>>> lines = sc.textFile("file:///usr/local/hadoop/spark/README.md")             #创建一个lines的RDD
>>> lines.count()                                                               #统计RDD中元素的个数
95
>>> lines.first()                                                               #返回RDD中的第一个元素,也就是README.md的第一行
u'# Apache Spark'
>>> 
      需要注意的是这里的文件路径,如果是本地文件,需要加上 file://       若是hdfs文件则路径为:hdfs://ip:9000/xxx/xxx

      textFile的参数是一个path,这个path可以是:

      1. 一个文件路径,这时候只装载指定的文件

      2. 一个目录路径,这时候只装载指定目录下面的所有文件(不包括子目录下面的文件

      3. 压缩文件读取,如 textFile(”/my/directory/*.gz“) 注意必须是gzip压缩的

     4. 通过通配符的形式加载多个文件或者加载多个目录下面的所有文件

      第四点是一个使用小技巧,现在假设我的数据结构为先按天分区,再按小时分区的,在hdfs上的目录结构类似于:

      /user/hdfs/input/dt=20130728/hr=00/

      /user/hdfs/input/dt=20130728/hr=01/

      ...

      /user/hdfs/input/dt=20130728/hr=23/

      具体的数据都在hr等于某个时间的目录下面,现在我们要分析20130728这一天的数据,我们就必须把这个目录下面的所有hr=*的子目录下面的数据全部装载进RDD,于是我们可以这样写:sc.textFile("hdfs://n1:8020/user/hdfs/input/dt=20130728/hr=*/"),注意到hr=*,是一个模糊匹配的方式。

    3:Scala进行函数统计

scala> val lines = sc.textFile("file:///usr/local/hadoop/spark/README.md")
lines: org.apache.spark.rdd.RDD[String] = file:///usr/local/hadoop/spark/README.md MapPartitionsRDD[1] at textFile at <console>:27

scala> lines.count()
res0: Long = 95

scala> lines.first()
res1: String = # Apache Spark

scala> 

二:Spark核心概念简介

       1:RDD(弹性分布数据集)

        RDD是Spark的最基本抽象,是对分布式内存的抽象使用,实现了以操作本地集合的方式来操作分布式数据集的抽象实现。RDD是Spark最核心的东西,它表示已被分区,不可变的并能够被并行操作的数据集合,不同的数据集格式对应不同的RDD实现。RDD必须是可序列化的。RDD可以cache到内存中,每次对RDD数据集的操作之后的结果,都可以存放到内存中,下一个操作可以直接从内存中输入,省去了MapReduce大量的磁盘IO操作。这对于迭代运算比较常见的机器学习算法, 交互式数据挖掘来说,效率提升比较大。
RDD的特点:
  (1). 它是在集群节点上的不可变的、已分区的集合对象。
  (2). 通过并行转换的方式来创建如(map, filter, join,etc)。
  (3). 失败自动重建。
  (4). 可以控制存储级别(内存、磁盘等)来进行重用。
  (5). 必须是可序列化的。
        (6). 是静态类型的。
       RDD创建的两种方式

        (1):从Hadoop文件系统(或与Hadoop兼容的其它存储系统)输入(例如HDFS)创建

        (2):从父RDD转换得到新RDD


       2:驱动器程序(driver program)

       每个spark应用用驱动器程序发起集群上的各种并行操作,包含了应用的main函数,并定义了集群上的分布式数据集,还对这些分布式数据集应用了相关操作

       驱动器程序通过一个sparkContext对象来访问Spark,这个对象代表对计算机集群的一个连接,shell在启动时就已经创建了一个SparkContext对象,是一个sc变量,一旦有了SparkContext就可以用它来创建RDD(弹性分布数据集)

     

       3:执行器(executor)

      在Worker Node上为某Application启动一个进程,该进程负责运行任务,并且负责将数据存在硬盘或者内存中;每个Application都有各自独立的executors

       比如:应用A在一个Node上启动Executor,B应用也在同一个Node上启动Executor,他们各自的Executor是相互隔离的,运行在不同的JVM上。不同的应用对应不同的Executor;

      4:Application

       基于spark的用户程序,包含了一个Driver Program以及集群上中多个executor;

      spark中只要有一个sparkcontext就是一个application
       启动一个spark-shell也是一个application,因为在启动shark-shell时就内置了一个sc(SparkContext的实例);
 

     5:Cluster Manager

      在集群上获取资源的外部服务。如:standalone、yarn、mesos;
   各种不同的集群的区别:只是任务调度的粗细粒度不同,对学习spark没有影响,自己在学习时使用standalone即可;

     6:Worker Node

      集群中任何一个可以运行Application代码的节点
      可以在Worker Node启动Executor进程;

     7:Job

      包含很多task的并行计算,spark中的一个action对应一个job,如:collect、count、saveAsTextFile;
      用户提交的Job会提交给DAGScheduler,Job会被分解成Stage(TaskSet) DAG
      RDD的transformation只会记录对元数据的操作(map/filter),而并不会真正执行,只有action触发时才会执行job;

     8:Stage

     个Job会被拆分成多组任务,每组任务被称为一个Stage,可称为TaskSet
     一个stage的边界往往是从某个地方取数据开始(如:sc.readTextFile),在shuffle时(如:join、reduceByKey等)终止
     一个job的结束(如:count、saveAsTextFile等)往往也是一个stage的边界;
     有两种类型的Stage:shuffle和result;

    9:Task

     被送到executor上的工作单元
     在Spark中有两类Task:shuffleMapTask和ResultTask,第一类Task的输出是shuffle所需数据,第二类task的输出是result;
  stage的划分也以此为依据,shuffle之前的所有变换是一个stage,shuffle之后的操作是另一个stage;
     比如:rdd.parallize(1 to 10).foreach(println)这个操作没有shuffle,直接就输出了,那么它的task就是resulttask,stage也只有一个;
     如果rdd.map((x,1)).reduceByKey(_+_).foreach(println),这个job因为有reduce,所以有个一shuffle过程,那么reduceByKey之前是一个stage,执行shuffleMapTask,输出shuffle所需要的数据,reduceByKey到最后是一个stage,直接就输出结果了。
     如果一个job中有多次shuffle,那么每个shuffle之前都是一个stage;

         

    10:Partition

     partition类似hadoop的split,计算是以partition为单位进行的

三:独立应用

      1:Python中初始化Spark


from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf = conf)                      #<span style="font-family:Microsoft YaHei;">以上为初始化sc</span>
line = sc.textFile("file:///usr/local/hadoop/spark/README.md")
print line.count()
print line.first()
       执行:bin/spark-submit my_script.py

       运行结果:

                     

       2:Scala中初始化Spark

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext_

val conf = new SparkConf().setMaster("local").setAppName("My App")
val sc = new SparkContext(conf)

      3:在Java中初始化Spark

import org.apache.spark.SparkConf
import org.apache.spark.api.java.JavaSparkContext

SparkConf conf = new SparkConf().setMaster("local").setAppName("My App")
JavaSparkContext sc = new JavaSparkContext(conf)

四:Spark数据集

       – Spark 可以将任何 hadoop 所支持存储资源转化成 RDD ,如本地文件、 HDFS 、 Cassandra 、 HBase, Amazon S3 等。
       – Spark 支持 text files, SequenceFiles 和任何 Hadoop InputFormat 格式
            ●使用 textFile() 方法可以将本地文件或 HDFS 文件转换成 RDD
                  – 如果读取本地文件,各节点都要有该文件;或者使用网络共享文件
                  – 支持整个文件目录读取,如 textFile("/my/directory")
                  – 压缩文件读取,如 textFile("/my/directory/*.gz")
                  – 通配符文件读取,如 textFile("/my/directory/*.txt")
                  – textFile() 有可选的第二个参数 slice ,默认情况下,为每个 block 创建一个分片,用户也可以通过 slice 指定更多的分片,

                      但不能使用少于 block 数的分片
            ●使用 wholeTextFiles() 读取目录里面的小文件,返回 ( 文件名,内容 ) 对
            ●使用 sequenceFile[K,V]() 方法可以将 SequenceFile 转换成 RDD
            ●使用 hadoopRDD() 方法可以将其他任何 Hadoop 的输入类型转化成 RDD

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
6月前
|
人工智能 机器人 Shell
【shell】shell字符串操作(声明、长度、拼接、切片、转换、替换等操作)
【shell】shell字符串操作(声明、长度、拼接、切片、转换、替换等操作)
|
6月前
|
Shell Linux C语言
【Shell 命令集合 磁盘管理 】Linux 控制光驱或可移动媒体设备的弹出和关闭操作 eject命令使用教程
【Shell 命令集合 磁盘管理 】Linux 控制光驱或可移动媒体设备的弹出和关闭操作 eject命令使用教程
111 1
|
6月前
|
存储 算法 Shell
【Shell 命令集合 磁盘维护 】Linux 对软盘进行格式化操作 fdformat命令使用指南
【Shell 命令集合 磁盘维护 】Linux 对软盘进行格式化操作 fdformat命令使用指南
78 0
|
2天前
|
XML JSON 监控
Shell脚本要点和难点以及具体应用和优缺点介绍
Shell脚本在系统管理和自动化任务中扮演着重要角色。尽管存在调试困难、可读性差等问题,但其简洁高效、易于学习和强大的功能使其在许多场景中不可或缺。通过掌握Shell脚本的基本语法、常用命令和函数,并了解其优缺点,开发者可以编写出高效的脚本来完成各种任务,提高工作效率。希望本文能为您在Shell脚本编写和应用中提供有价值的参考和指导。
10 1
|
1月前
|
存储 移动开发 网络协议
【实战指南】从零构建嵌入式远程Shell,提升跨地域协作效率(2)
本文《从零构建嵌入式远程Shell》的第二部分,介绍了如何通过模块化设计和功能增强来优化远程Shell,包括支持阻塞命令、增加用户主动结束Shell进程的能力等,提升了跨地域协作效率。文中提供了详细的代码示例和验证步骤,适合开发者深入学习。
44 5
|
1月前
|
存储 分布式计算 算法
大数据-105 Spark GraphX 基本概述 与 架构基础 概念详解 核心数据结构
大数据-105 Spark GraphX 基本概述 与 架构基础 概念详解 核心数据结构
47 0
|
1月前
|
消息中间件 分布式计算 Kafka
大数据-98 Spark 集群 Spark Streaming 基础概述 架构概念 执行流程 优缺点
大数据-98 Spark 集群 Spark Streaming 基础概述 架构概念 执行流程 优缺点
39 0
|
5月前
|
移动开发 监控 安全
【实战指南】从零构建嵌入式远程Shell,提升跨地域协作效率
构建嵌入式远程Shell的文章概述: - 目标:解决嵌入式软件测试中的远程调试难题,提供轻量级解决方案。 - 功能:包括远程交互、命令执行与反馈,强调多客户端并发连接和稳定性。 - 设计:基于Socket服务端架构,使用I/O多路复用和popen函数,确保命令执行与结果反馈。 - 需求:支持命令解析、执行和结果回传,考虑网络不稳定情况下的连接保持。 - 安全性:仅限内部调试,未详细讨论加密等安全措施。 - 实现:关注点在Socket服务端程序,包括监听、数据过滤和命令处理。 - 测试:通过 Telnet 和Windows网络助手验证连接和命令执行功能。
321 68
|
6月前
|
Linux Shell 文件存储
【Linux技术专题】「夯实基本功系列」带你一同学习和实践操作Linux服务器必学的Shell指令(深入df和dh的区别和探索)
【Linux技术专题】「夯实基本功系列」带你一同学习和实践操作Linux服务器必学的Shell指令(深入df和dh的区别和探索)
186 1
|
3月前
|
存储 分布式计算 监控