本节书摘来自华章计算机《Scala机器学习》一书中的第3章,第3.2节,作者:[美] 亚历克斯·科兹洛夫(Alex Kozlov),更多章节内容可以访问云栖社区“华章计算机”公众号查看。
3.2 理解Spark的架构
并行化是将工作负载划分为在不同线程或不同节点上执行的子任务。下面介绍Spark实现并行化的原理,以及它如何管理子任务的执行和子任务之间的通信。
3.2.1 任务调度
Spark工作负载的划分由弹性分布式数据集(Resilient Distributed Dataset,RDD)的分区数决定,这是Spark的基本抽象和管道结构。RDD是一种可并行操作的、不可变元素的分区集合。具体细节可能取决于Spark的运行模式,图3-2为Spark任务/资源调度的示意图。
图3-2 通用的Spark任务调度示意图。尽管在图中没有明确标识,Spark Context通常会在端口4040上打开一个HTTP UI(并发情形将打开4041、4042等),在任务执行期间会一直这样。Spark Master UI的端口通常是8080(虽然在CDH中改为了18080),而Worker UI的端口通常是7078。每个节点可以运行多个执行器,每个执行器可运行多个任务
读者会发现Spark和Hadoop有很多参数。其中一些指定为环境变量(保存在$SPARK_HOME / conf / spark-env.sh文件中),但有些被当作命令行参数。此外,一些文件(其名称是预先定义好的)含有改变Spark行为的参数,比如core-site.xml文件。这可能会令人困惑,本章和后面的章节会尽可能多地介绍这方面的内容。如果使用了Hadoop分布式文件系统(HDFS),则core-site.xml和hdfs-site.xml文件将包含HDFS主节点的建议和规范。在CLASSPATH Java进程上要求加载这些文件,这可通过指定HADOOP_CONF_DIR或SPARK_CLASSPATH环境变量来设置。通常由于有源代码,有时需要通过查看源代码来了解各种参数的含义,所以在笔记本电脑上保留一个源代码树的副本是不错的做法。
集群中的每个节点可以运行一个或多个执行器,每个执行器可以调度一系列任务来执行Spark操作。Spark驱动负责调度执行,并与集群调度器(如Mesos或YARN)一起工作,实现对可用资源的调度。Spark驱动通常在客户端计算机上运行,但在最新版本中,它也可以在集群的集群管理器下运行。YARN和Mesos都有动态管理每个节点上并发运行的多个执行器的能力,并能对资源进行约束。
在独立模式下,Spark主节点要执行集群调度器的工作,这可能在分配资源方面效率较低,但它总比缺少预配置的Mesos或YARN要好。Spark标准发行版在sbin目录中有用来启动具有独立模式的Spark的shell脚本。Spark主节点和驱动会直接与一个或多个运行在单个节点上的Spark worker进行通信。一旦主节点运行,可用如下命令来启动Spark shell:
注意,总可在本地模式下运行Spark,也就是说,所有任务将通过在单个JVM中指定--master local [2]来执行,其中2是线程数,至少为2。实际上,本书经常会使用本地模式来运行一些小例子。
从Spark的角度来看,Spark shell是一个应用程序。一旦开始一个Spark应用程序,便能在Spark Master UI中的“运行的应用程序”下看到它(或在相应的集群管理器中),这会重定向到Spark应用程序HTTP UI,其端口为4040,在这里可以看到子任务执行的时间线和其他重要属性,如环境设置,类路径(classpath),传递到JVM的参数和有关资源使用的信息(参见图3-3):
在Spark的本地模式和集群模式之间切换的方法有:采用命令行选项--master;设置一个MASTER环境变量;修改spark-defaults.conf(该文件给出了执行期间的类路径);直接使用Scala中SparkConf对象上的setters方法(这将在后面介绍)。
图3-3 在独立模式下,Spark驱动的UI的时间分解
最常用的主节点UI端口是8080,应用UI端口是4040。其他Spark端口都汇总在下表中。
此外,在随源码发行的docs子目录中还有一些文档,但可能已经过期。
3.2.2 Spark的组件
自Spark发布以来,已经有多个基于Spark的缓存RDD功能编写的应用,比如Shark、Spork(Pig on Spark)、图形库(GraphX、GraphFrame)、流媒体、MLlib等,其中一些将在本章和以后的章节中讨论。
本节将主要介绍用来收集、存储和分析数据的Spark架构组件。第2章介绍过一个更完整的数据生命周期架构,而下面只介绍Spark特有的组件:
图3-4 Spark的组件和架构
3.2.3 MQTT、ZeroMQ、Flume和Kafka
这些组件采用不同的方法将数据从一个地方可靠移动到另一个地方。这些组件通常都会实现一个发布、订阅模型,其中多个写入器(writer)和读取器(reader)采用不同的保障机制从相同队列写入和读取。著名的Flume是第一个分布式日志和事件管理系统,但它慢慢被Kafka取代,Kafka由LinkedIn开发,是一个功能齐全的发布-订阅分布式消息队列,可在分布式节点上进行持久存储。上一章简要介绍了Flume和Kafka。Flume配置基于文件,通常用于将消息从一个Flume源(source)传递到一个或多个Flume接收器。其中一个常见的源是netcat,它会监听来自各个端口上的原始数据。例如,以下配置描述了一个代理接收数据,每30秒(默认)将数据写入HDFS:
此文件可在本书提供的源代码的chapter03/conf目录中找到。可下载并启动Flume代理(用http://flume.apache.org/download.html所提供的内容来检查MD5总和):
现在可在单独的窗口键入netcat命令将文本发送给Flume代理:
Flume代理将首先创建一个以tmp为后缀名的文件,然后将其重命名为一个没有扩展名的文件(文件扩展名可以用于过滤掉正在写入的文件):
这里的每一行由一个Unix时间(以毫秒为单位)和接收的数据构成。在这种情况下可将数据放入HDFS,通过Spark / Scala程序来分析存储在HDFS上的这些数据,并排除那些以文件名*.tmp形式写入的文件。Spark还有一些平台支持流,如果读者对一些最新、最有价值的平台感兴趣,可以参考本章接下来几节的内容。
3.2.4 HDFS、Cassandra、S3和Tachyon
HDFS、Cassandra、S3和Tachyon采用不同的方式来持久保存数据,并采用不同的方式来保障计算节点所需的资源。HDFS是Hadoop的一部分,它实现的分布式存储是Hadoop生态系统中多个产品的后台(backend)。HDFS将每个文件划分成大小为128 MB的块,并将每个块至少存储在三个节点上。尽管HDFS是可靠的,并且支持HA,但是HDFS存储的效率低,特别是用于机器学习时更是如此。Cassandra是一个通用键/值存储,它能存储一行的多个副本,并且可通过配置来支持不同级别的一致性,以优化读取或写入速度。相对于HDFS模型而言,Cassandra的优点是没有中央主节点,它通过共识算法来进行读写。但有时Cassandra可能不稳定。S3是Amazon存储:数据存储在群集外,这会影响I/O速度。最近开发的Tachyon声称可利用节点的内存来优化对跨节点工作集的访问。
此外还有不断在开发的新后台,例如来自Cloudera的Kudu(http://getkudu.io/kudu.pdf)和来自GridGain的Ignite文件系统(IGFS)(http://apacheignite.gridgain.org/v1.0/docs/igfs)。它们都是基于Apache许可协议的开源项目。
3.2.5 Mesos、YARN和Standalone
正如之前提到的,Spark能运行在不同的集群资源调度器下。这些在集群上的调度器是为了调度Spark的容器和任务而具体实现的。调度器可视为集群核心,其功能与操作系统内核的调度器相似:资源分配、调度、I/O优化、应用服务和UI。
Mesos是最早的集群管理器之一,它的设计原则与Linux内核相同,只是抽象级别不同。Mesos的从节点运行在每台计算机上,并为整个数据中心和云环境中的资源管理和调度提供API。Mesos是用C++编写的。
YARN是雅虎最近开发的集群管理器。YARN中的每个节点运行节点管理器,它可与运行在单独的节点上的资源管理器通信。资源管理器调度任务来满足内存和CPU约束。Spark驱动本身可在集群中运行,这称为YARN的集群模式。也可在客户端模式下运行,这时只有Spark执行器运行在集群中,而调度Spark管道的驱动所运行的计算机与Spark shell或提交程序的计算机是同一台机器。在这种情况下,Spark执行器将通过随机打开的端口与本地主机通信。YARN是用Java编写的,这会出现不可预测的GC暂停,从而导致较重的延迟长尾。
如果这些资源调度程序都不可用,则独立模式会在每个节点上启动org.apache.spark.deploy.worker.Worker进程,该进程会与Spark 主节点进程通信,主节点进程会以org.apache.spark.deploy.master.Master运行。工作进程完全由主节点管理并可以运行多个执行器和任务(见图3-2)。
在具体的实现中,建议通过驱动器的UI来跟踪程序的并行性和所需资源。如果需要,可调整并行性、可用内存以及增加并行性。下一节将会开始介绍如何用Spark中的Scala来解决不同的问题。