大数据实战之spark安装部署

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 楔子我是在2013年底第一次听说Spark,当时我对Scala很感兴趣,而Spark就是使用Scala编写的。一段时间之后,我做了一个有趣的数据科学项目,它试着去预测在泰坦尼克号上幸存。

楔子

我是在2013年底第一次听说Spark,当时我对Scala很感兴趣,而Spark就是使用Scala编写的。一段时间之后,我做了一个有趣的数据科学项目,它试着去预测在泰坦尼克号上幸存。对于进一步了解Spark内容和编程来说,这是一个很好的方式。对于任何有追求的、正在思考如何着手 Spark 的程序员,我都非常推荐这个项目。


今天,Spark已经被很多巨头使用,包括Amazon、eBay以及Yahoo!。很多组织都在拥有成千上万节点的集群上运行Spark。根据Spark FAQ,已知的最大的Spark集群拥有超过8000个节点。Spark确实是一个值得好好考虑和学习的技术。

Apache Spark是什么?一个简单介绍

Spark是一个Apache项目,它被标榜为“快如闪电的集群计算”。它拥有一个繁荣的开源社区,并且是目前最活跃的Apache项目。

Spark提供了一个更快、更通用的数据处理平台。和Hadoop相比,Spark可以让你的程序在内存中运行时速度提升100倍,或者在磁盘上运行时速度提升10倍。去年,在100 TB Daytona GraySort比赛中,Spark战胜了Hadoop,它只使用了十分之一的机器,但运行速度提升了3倍。Spark也已经成为 针对 PB 级别数据排序的最快的开源引擎

wKiom1nN2HbgiMfVAAcb4UgtLAA558.png-wh_50


wKiom1nN2IPirpi-AAIYyMtD3QE107.png-wh_50

Spark Core

Spark Core是一个基本引擎,用于大规模并行和分布式数据处理。它主要负责:

  • 内存管理和故障恢复

  • 在集群上安排、分布和监控作业

  • 和存储系统进行交互

Spark引入了一个称为弹性分布式数据集(RDD,Resilient Distributed Dataset)的概念,它是一个不可变的、容错的、分布式对象集合,我们可以并行的操作这个集合。RDD可以包含任何类型的对象,它在加载外部数据集或者从驱动应用程序分发集合时创建。

RDD支持两种操作类型:

  • 转换是一种操作(例如映射、过滤、联接、联合等等),它在一个RDD上执行操作,然后创建一个新的RDD来保存结果。

  • 行动是一种操作(例如归并、计数、第一等等),它在一个RDD上执行某种计算,然后将结果返回。

在Spark中,转换是“懒惰”的,也就是说它们不会立刻计算出结果。相反,它们只是“记住”要执行的操作以及要操作的数据集(例如文件)。只有当行为被调用时,转换才会真正的进行计算,并将结果返回给驱动器程序。这种设计让Spark运行得更有效率。例如,如果一个大文件要通过各种方式进行转换操作,并且文件被传递给第一个行为,那么Spark只会处理文件的第一行内容并将结果返回,而不会处理整个文件。

默认情况下,当你在经过转换的RDD上运行一个行为时,这个RDD有可能会被重新计算。然而,你也可以通过使用持久化或者缓存的方法,将一个RDD持久化从年初在内存中,这样,Spark就会在集群上保留这些元素,当你下一次查询它时,查询速度会快很多。

SparkSQL

SparkSQL是Spark的一个组件,它支持我们通过SQL或者Hive查询语言来查询数据。它最初来自于Apache Hive项目,用于运行在Spark上(来代替MapReduce),现在它已经被集成到Spark堆中。除了针对各种各样的数据源提供支持,它还让代码转换与SQL查询编织在一起变得可能,这最终会形成一个非常强大的工具。下面是一个兼容Hive的查询示例:

wKiom1nN2KqR_ECaAADBFxQSZrw191.png

Spark Streaming

Spark Streaming支持对流数据的实时处理,例如产品环境web服务器的日志文件(例如Apache Flume和HDFS/S3)、诸如Twitter的社交媒体以及像Kafka那样的各种各样的消息队列。在这背后,Spark Streaming会接收输入数据,然后将其分为不同的批次,接下来Spark引擎来处理这些批次,并根据批次中的结果,生成最终的流。

MLlib

MLlib是一个机器学习库,它提供了各种各样的算法,这些算法用来在集群上针对分类、回归、聚类、协同过滤等(可以在 machine learning 上查看Toptal的文章,来获取更过的信息)。其中一些算法也可以应用到流数据上,例如使用普通最小二乘法或者K均值聚类(还有更多)来计算线性回归。Apache Mahout(一个针对Hadoop的机器学习库)已经脱离MapReduce,转而加入Spark MLlib。

GraphX

wKiom1nN2MGw4ptXAADWvINZ650003.png


GraphX是一个库,用来处理图,执行基于图的并行操作。它针对ETL、探索性分析和迭代图计算提供了统一的工具。除了针对图处理的内置操作,GraphX还提供了一个库,用于通用的图算法,例如PageRank。

如何使用Apache Spark:事件探测用例

既然我们已经回答了“Apache Spark是什么?”这个问题,接下来让我们思考一下,使用Spark来解决什么样的问题或者挑战最有效率。

最近,我偶然看到了一篇关于 通过分析Twitter流的方式来探测地震 的文章。它展示了这种技术可以比日本气象厅更快的通知你日本哪里发生了地震。虽然那篇文章使用了不同的技术,但我认为这是一个很好的示例,可以用来说明我们如何通过简单的代码片段,在不需要”胶水代码“的情况下应用Spark。

首先,我们需要处理tweet,将那些和”地震“或”震动“等相关的内容过滤出来。我们可以使用Spark Streaming的方式很容易实现这一目标,如下所示:


1

2

TwitterUtils.createStream(...)

            .filter(_.getText.contains("earthquake") || _.getText.contains("shaking"))

然后,我们需要在tweets上运行一些语义分析,来确定它们是否代表当前发生了地震。例如,像“地震!”或者“现在正在震动”这样的tweets,可能会被认为是正向匹配,而像“参加一个地震会议”或者“昨天的地震真可怕”这样的tweets,则不是。这篇文章的作者使用了一个支持向量机(support vector machine, SVM)来实现这一点。我们在这里使用同样的方式,但也可以试一下 流版本。一个使用了MLlib的代码示例如下所示:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

// We would prepare some earthquake tweet data and load it in LIBSVM format.

val data = MLUtils.loadLibSVMFile(sc, "sample_earthquate_tweets.txt")

 

// Split data into training (60%) and test (40%).

val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L)

val training = splits(0).cache()

val test = splits(1)

 

// Run training algorithm to build the model

val numIterations = 100

val model = SVMWithSGD.train(training, numIterations)

 

// Clear the default threshold.

model.clearThreshold()

 

// Compute raw scores on the test set.

val scoreAndLabels = test.map { point =>

  val score = model.predict(point.features)

  (score, point.label)

}

 

// Get evaluation metrics.

val metrics = new BinaryClassificationMetrics(scoreAndLabels)

val auROC = metrics.areaUnderROC()

 

println("Area under ROC = " + auROC)


如果对于这个模型的预测比例满意,我们可以继续往下走,无论何时发现地震,我们都要做出反应。为了检测一个地震,我们需要在一个指定的时间窗口内(如文章中所述)有一定数量(例如密度)的正向tweets。请注意,对于带有Twitter位置服务信息的tweets来说,我们还能够从中提取地震的位置信息。有了这个只是以后,我们可以使用SparkSQL来查询现有的Hive表(保存那些对接收地震通知感兴趣的用户)来获取用户的邮箱地址,并向他们发送一些个性化的警告邮件,如下所示:

1

2

3

4

5

// sc is an existing SparkContext.

val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

// sendEmail is a custom function

sqlContext.sql("FROM earthquake_warning_users SELECT firstName, lastName, city, email")

          .collect().foreach(sendEmail)


1.实战文档如下


Spark下载

为了方便,我直接是进入到了/usr/src文件夹下面进行下载spark-2.1.1

 wget https://d3kbcqa49mib13.cloudfront.net/spark-2.2.0-bin-hadoop2.7.tgz

 

Spark安装之前的准备

文件的解压与改名

tar -zxf spark-2.1.1-bin-hadoop2.7.tgz

rm -rf spark-2.1.1-bin-hadoop2.7.tgz

为了我后面方便配置spark,在这里我把文件夹的名字给改了

mv spark-2.1.1-bin-hadoop2.7 spark-2.1.1

 

配置环境变量

vi /etc/profile

在最尾巴加入

export SPARK_HOME=/usr/src/spark-2.1.1

export PATH=$PATH:$SPARK_HOME/bin


wKiom1nNz_myaTrRAABWcQLqdHU332.png-wh_50


配置Spark环境

打开spark-2.1.1文件夹

cd spark-2.1.1

此处需要配置的文件为两个
spark-env.shslaves

wKiom1nN0HHCHvJ_AABhfCQE5zo380.png-wh_50


首先我们把缓存的文件spark-env.sh.template改为spark识别的文件spark-env.sh

cp conf/spark-env.sh.template conf /spark-env.sh

修改spark-env.sh文件

vi conf/spark-env.sh


注意!变量按照个人条件情况路径配置


在最尾巴加入

export JAVA_HOME=/usr/java/jdk1.7.0_141

export SCALA_HOME=/usr/scala-2.1.1

export HADOOP_HOME=/usr/local/hadoop-2.7.2

export HADOOP_CONF_DIR=/usr/local/hadoop-2.7.2/etc/hadoop

export SPARK_MASTER_IP=SparkMaster

export SPARK_WORKER_MEMORY=4g

export SPARK_WORKER_CORES=2

export SPARK_WORKER_INSTANCES=1


变量说明

  • JAVA_HOME:Java安装目录

  • SCALA_HOME:Scala安装目录

  • HADOOP_HOME:hadoop安装目录

  • HADOOP_CONF_DIR:hadoop集群的配置文件的目录

  • SPARK_MASTER_IP:spark集群的Master节点的ip地址

  • SPARK_WORKER_MEMORY:每个worker节点能够最大分配给exectors的内存大小

  • SPARK_WORKER_CORES:每个worker节点所占有的CPU核数目

  • SPARK_WORKER_INSTANCES:每台机器上开启的worker节点的数目


修改slaves文件

vi conf/slaves 或者

wKiom1nN0drDJ8XeAAAHN8pzTQA573.png-wh_50


在最后面修成为

SparkWorker1

SparkWorker2

wKioL1nN0fvh6E_AAACVeoYfOAE016.png-wh_50


注意!如果是dan台PC可以不用同步rsync


同步SparkWorker1SparkWorker2的配置

在此我们使用rsync命令

rsync -av /usr/src/spark-2.1.1/ SparkWorker1:/usr/src/spark-2.1.1/

rsync -av /usr/src/spark-2.1.1/ SparkWorker2:/usr/src/spark-2.1.1/


启动Spark集群

因为我们只需要使用hadoopHDFS文件系统,所以我们并不用把hadoop全部功能都启动。


启动hadoopHDFS文件系统

start-dfs.sh


但是在此会遇到一个情况,就是使用start-dfs.sh,启动之后,在SparkMaster已经启动了namenode,但在SparkWorker1SparkWorker2都没有启动了datanode,这里的原因是:datanodeclusterIDnamenodeclusterID不匹配。是因为SparkMaster多次使用了hadoop namenode -format格式化了。


==解决的办法:==

SparkMaster使用

cat /usr/src/hadoop-2.7.2/hdfs/name/current/VERSION

查看clusterID,并将其复制。

 wKioL1nN05eQY8niAACYYrvoRnU359.png-wh_50

SparkWorker1SparkWorker2上使用

vi /usr/src/hadoop-2.7.2/hdfs/name/current/VERSION

将里面的clusterID,更改成为SparkMasterVERSION里面的clusterID

 wKioL1nN06Tw0ERbAACvYoG8Pqw356.png-wh_50

做了以上两步之后,便可重新使用start-dfs.sh开启HDFS文件系统。

 

启动之后使用jps命令可以查看到SparkMaster已经启动了namenodeSparkWorker1SparkWorker2都启动了datanode,说明hadoopHDFS文件系统已经启动了。



启动Spark

因为hadoop/sbin以及spark/sbin均配置到了系统的环境中,它们同一个文件夹下存在同样的start-all.sh文件。最好是打开spark-2.2.0,在文件夹下面打开该文件。

./sbin/start-all.sh

成功打开Spark集群之后可以进入SparkWebUI界面,可以通过

SparkMaster_IP:8080   例:192.168.1.186:8080


wKioL1nN1Jzi1IC9AAB8YEH69vM289.png-wh_50


访问,可见有两个正在运行的Worker节点。


wKioL1nN1Krgxv9ZAADkQacyxeo263.png-wh_50


打开Spark-shell

使用

spark-shell  and  ./bin/spark-shell


wKiom1nN1YiQWNiJAACN-3WgSR8105.png-wh_50


便可打开Sparkshell

同时,因为shell 在运行,我们也可以通过

SparkMaster_IP:4040

访问WebUI查看当前执行的任务。


wKioL1nN1bPADdQTAACdo0E4FNs249.png-wh_50

结言

到此我们的Spark集群就搭建完毕了。搭建spark集群原来知识网络是挺庞大的,涉及到Linux基本操作,设计到ssh,设计到hadoop、Scala以及真正的Spark。在此也遇到不少问题,通过翻阅书籍以及查看别人的blog得到了解决。在此感谢分享知识的人希望自己越努力越幸运!


相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
1月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
130 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
1月前
|
SQL 机器学习/深度学习 分布式计算
Spark快速上手:揭秘大数据处理的高效秘密,让你轻松应对海量数据
【10月更文挑战第25天】本文全面介绍了大数据处理框架 Spark,涵盖其基本概念、安装配置、编程模型及实际应用。Spark 是一个高效的分布式计算平台,支持批处理、实时流处理、SQL 查询和机器学习等任务。通过详细的技术综述和示例代码,帮助读者快速掌握 Spark 的核心技能。
83 6
|
1月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
110 2
|
1月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第26天】本文详细探讨了Hadoop与Spark在大数据处理中的协同作用,通过具体案例展示了两者的最佳实践。Hadoop的HDFS和MapReduce负责数据存储和预处理,确保高可靠性和容错性;Spark则凭借其高性能和丰富的API,进行深度分析和机器学习,实现高效的批处理和实时处理。
75 1
|
1月前
|
分布式计算 Java 开发工具
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。
|
1月前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
69 1
|
1月前
|
并行计算 数据挖掘 大数据
Python数据分析实战:利用Pandas处理大数据集
Python数据分析实战:利用Pandas处理大数据集
|
2月前
|
Oracle 大数据 数据挖掘
企业内训|大数据产品运营实战培训-某电信运营商大数据产品研发中心
本课程是TsingtaoAI专为某电信运营商的大数据产品研发中心的产品支撑组设计,旨在深入探讨大数据在电信运营商领域的应用与运营策略。通过密集的培训,从数据的本质与价值出发,系统解析大数据工具和技术的最新进展,深入剖析行业内外的实践案例。课程涵盖如何理解和评估数据、如何有效运用大数据技术、以及如何在不同业务场景中实现数据的价值转化。
61 0
|
1月前
|
存储 分布式计算 数据挖掘
数据架构 ODPS 是什么?
数据架构 ODPS 是什么?
289 7
|
1月前
|
存储 分布式计算 大数据
大数据 优化数据读取
【11月更文挑战第4天】
44 2
下一篇
DataWorks