【Spark】(task1)PySpark基础数据处理

简介: )Scala 是一门多范式(multi-paradigm)的编程语言,设计初衷是要集成面向对象编程和函数式编程的各种特性。Scala 运行在 Java 虚拟机上,并兼容现有的 Java 程序。

一、Spark介绍

hadoop生态圈:

image.png

1.1 Scala和PySpark

(1)Scala 是一门多范式(multi-paradigm)的编程语言,设计初衷是要集成面向对象编程和函数式编程的各种特性。

Scala 运行在 Java 虚拟机上,并兼容现有的 Java 程序。

Scala 源代码被编译成 Java 字节码,所以它可以运行于 JVM 之上,并可以调用现有的 Java 类库。

(2)Apache Spark是用 Scala编程语言 编写的。为了用Spark支持Python,Apache Spark社区发布了一个工具PySpark。使用PySpark,也可以使用Python编程语言中的 RDD 。

(3)PySpark提供了 PySpark Shell,它将Python API链接到spark核心并初始化Spark上下文。将Python与Spark集成就对数据科学研究更加方便。

Spark的开发语言是Scala,这是Scala在并行和并发计算方面优势的体现,这是微观层面函数式编程思想的一次胜利。此外,Spark在很多宏观设计层面都借鉴了函数式编程思想,如接口、惰性求值和容错等。

1.2 Spark原理

Spark是业界主流的大数据处理利器。

分布式:指的是计算节点之间不共享内存,需要通过网络通信的方式交换数据。

Spark 是一个分布式计算平台。Spark 最典型的应用方式就是建立在大量廉价的计算节点上,这些节点可以是廉价主机,也可以是虚拟的 Docker Container(Docker 容器)。

Spark 的架构图中:

Spark 程序由 Manager Node(管理节点)进行调度组织

由 Worker Node(工作节点)进行具体的计算任务执行

最终将结果返回给 Drive Program(驱动程序)。

在物理的 Worker Node 上,数据还会分为不同的 partition(数据分片),可以说 partition 是 Spark 的基础数据单元。

image.png

图1 Spark架构图

Spark 计算集群能够比传统的单机高性能服务器具备更强大的计算能力,就是由这些成百上千,甚至达到万以上规模的工作节点并行工作带来的。

1.3 一个具体栗子

那在执行一个具体任务的时候,Spark 是怎么协同这么多的工作节点,通过并行计算得出最终的结果呢?这里我们用一个任务来解释一下 Spark 的工作过程。

一个具体任务过程:

(1)先从本地硬盘读取文件 textFile;

(2)再从分布式文件系统 HDFS 读取文件 hadoopFile;

(3)然后分别对它们进行处理;

(4)再把两个文件按照 ID 都 join 起来得到最终的结果。

在 Spark 平台上处理这个任务的时候,会将这个任务拆解成一个子任务 DAG(Directed Acyclic Graph,有向无环图),再根据 DAG 决定程序各步骤执行的方法。从图 2 中可以看到,这个 Spark 程序分别从 textFile 和 hadoopFile 读取文件,再经过一系列 map、filter 等操作后进行 join,最终得到了处理结果。

image.png

图2 某Spark程序的任务有向无环图

最关键的过程是要理解哪些是可以纯并行处理的部分,哪些是必须 shuffle(混洗)和 reduce 的部分:这里的 shuffle 指的是所有 partition 的数据必须进行洗牌后才能得到下一步的数据,最典型的操作就是图 2 中的 groupByKey 操作和 join 操作。以 join 操作为例,必须对 textFile 数据和 hadoopFile 数据做全量的匹配才可以得到 join 后的 dataframe(Spark 保存数据的结构)。而 groupByKey 操作则需要对数据中所有相同的 key 进行合并,也需要全局的 shuffle 才能完成。

与之相比,map、filter 等操作仅需要逐条地进行数据处理和转换,不需要进行数据间的操作,因此各 partition 之间可以完全并行处理。

在得到最终的计算结果之前,程序需要进行 reduce 的操作,从各 partition 上汇总统计结果,随着 partition 的数量逐渐减小,reduce 操作的并行程度逐渐降低,直到将最终的计算结果汇总到 master 节点(主节点)上。可以说,shuffle 和 reduce 操作的触发决定了纯并行处理阶段的边界。

image.png

图3 被shuffle操作分割的DAG stages

注意:

(1)shuffle 操作需要在不同计算节点之间进行数据交换,非常消耗计算、通信及存储资源,因此 shuffle 操作是 spark 程序应该尽量避免的。shuffle可以理解为一个串行操作,需要等到在此之前的并行工作完成之后才可以顺序开始。

(2)简述Spark 的计算过程:Stage 内部数据高效并行计算,Stage 边界处进行消耗资源的 shuffle 操作或者最终的 reduce 操作。

二、安装方式

Windows 10:不适合开发程序,因为不支持命令行工具、隐藏坑较多、解决方案的资料较少

Windows Subsystem Linux (WSL):需要安装较多软件和配置较多环境变量,非常麻烦

ubuntu / CentOS:未尝试,但与WSL比较相似

docker:简单、高效、可迁移

docker方式(在ubuntu环境):

安装docker:curl -fsSL https://get.docker.com | bash -s docker --mirror Aliyun;

拉取镜像:docker pull jupyter/pyspark-notebook

创建容器:

docker run \
    -d \
    -p 8022:22 \
    -p 4040:4040 \
    -v /home/fyb:/data \
    -e GRANT_SUDO=yes \
    --name myspark \
    jupyter/pyspark-notebook

配置docker容器的SSH登录

安装openssh-server等常用软件:apt update && apt install openssh-server htop tmux

设置允许root通过ssh登录:echo "PermitRootLogin yes" >> /etc/ssh/sshd_config

重启ssh服务:service ssh --full-restart,设置root用户密码:passwd root

测试docker容器内的ssh是否设置成功:ssh root@127.0.0.1 -p 8022

容器内的配置python环境:

以root用户登录SSH会话后,安装python依赖工具:apt install pip

安装PySpark依赖包:pip3 install pyspark numpy pandas tqdm

测试是否正确安装并执行了全部修改:python3 /usr/local/spark/examples/src/main/python/pi.py

三、测试是否安装成功

四、Spark程序的模块分类

image.png

五、数据处理任务

5.1 使用Python链接Spark环境

import pandas as pd
from pyspark.sql import SparkSession
# 创建spark应用 mypyspark
spark = SparkSession.builder.appName('mypyspark').getOrCreate()

5.2 创建dateframe数据

这里和pandas等工具类似,创建表时注意这里的表头组成的list列表,放在数据的后面。

test = spark.createDataFrame([('001','1',100,87,67,83,98), ('002','2',87,81,90,83,83), ('003','3',86,91,83,89,63),
                            ('004','2',65,87,94,73,88), ('005','1',76,62,89,81,98), ('006','3',84,82,85,73,99),
                            ('007','3',56,76,63,72,87), ('008','1',55,62,46,78,71), ('009','2',63,72,87,98,64)],                           
                             ['number','class','language','math','english','physic','chemical'])
test.show(5)

image.png

5.3 用spark执行以下逻辑:找到数据行数、列数

# 查看表前2行
test.head(2)
test.describe().show()
# 列出表头属性
test.columns
# 列出第一行的数据
test.first()
# 数据大小  shape
print('test.shape: %s行 %s列'%(test.count(), len(test.columns)))
# 上面打印出 test.shape: 9行 7列

5.4 用spark筛选class为1的样本

这里可以使用df.filterdf.where

# 方法一
test.filter(test['class'] ==1).show()
# 方法二
test.filter('class == 1' ).show()

image.png

5.5 用spark筛选language >90 或 math> 90的样本

test.filter('language>90 or math>90').show()
test.where('language>90 or math>90').show()
test.filter((test['language']>90)|(test['math']>90)).show()

image.png

任务汇总:

image.png

相关文章
|
7月前
|
分布式计算 大数据 数据处理
Apache Spark:提升大规模数据处理效率的秘籍
【4月更文挑战第7天】本文介绍了Apache Spark的大数据处理优势和核心特性,包括内存计算、RDD、一站式解决方案。分享了Spark实战技巧,如选择部署模式、优化作业执行流程、管理内存与磁盘、Spark SQL优化及监控调优工具的使用。通过这些秘籍,可以提升大规模数据处理效率,发挥Spark在实际项目中的潜力。
593 0
|
SQL 分布式计算 HIVE
pyspark笔记(RDD,DataFrame和Spark SQL)1
pyspark笔记(RDD,DataFrame和Spark SQL)
138 1
|
6月前
|
分布式计算 运维 Serverless
EMR Serverless Spark PySpark流任务体验报告
阿里云EMR Serverless Spark是一款全托管的云原生大数据计算服务,旨在简化数据处理流程,降低运维成本。测评者通过EMR Serverless Spark提交PySpark流任务,体验了从环境准备、集群创建、网络连接到任务管理的全过程。通过这次测评,可以看出阿里云EMR Serverless Spark适合有一定技术基础的企业,尤其是需要高效处理大规模数据的场景,但新用户需要投入时间和精力学习和适应。
7193 43
EMR Serverless Spark PySpark流任务体验报告
|
5月前
|
弹性计算 分布式计算 Serverless
全托管一站式大规模数据处理和分析Serverless平台 | EMR Serverless Spark 评测
【7月更文挑战第6天】全托管一站式大规模数据处理和分析Serverless平台 | EMR Serverless Spark 评测
23733 42
|
5月前
|
分布式计算 运维 Serverless
EMR Serverless Spark 实践教程 | 通过 EMR Serverless Spark 提交 PySpark 流任务
在大数据快速发展的时代,流式处理技术对于实时数据分析至关重要。EMR Serverless Spark提供了一个强大而可扩展的平台,它不仅简化了实时数据处理流程,还免去了服务器管理的烦恼,提升了效率。本文将指导您使用EMR Serverless Spark提交PySpark流式任务,展示其在流处理方面的易用性和可运维性。
296 7
EMR Serverless Spark 实践教程 | 通过 EMR Serverless Spark 提交 PySpark 流任务
|
4月前
|
分布式计算 Hadoop 大数据
Spark 与 Hadoop 的大数据之战:一场惊心动魄的技术较量,决定数据处理的霸权归属!
【8月更文挑战第7天】无论是 Spark 的高效内存计算,还是 Hadoop 的大规模数据存储和处理能力,它们都为大数据的发展做出了重要贡献。
96 2
|
5月前
|
分布式计算 监控 数据处理
Spark Streaming:解锁实时数据处理的力量
【7月更文挑战第15天】Spark Streaming作为Spark框架的一个重要组成部分,为实时数据处理提供了高效、可扩展的解决方案。通过其微批处理的工作模式和强大的集成性、容错性特性,Spark Streaming能够轻松应对各种复杂的实时数据处理场景。然而,在实际应用中,我们还需要根据具体需求和资源情况进行合理的部署和优化,以确保系统的稳定性和高效性。
|
4月前
|
机器学习/深度学习 分布式计算 数据处理
|
5月前
|
分布式计算 Hadoop Serverless
数据处理的艺术:EMR Serverless Spark实践及应用体验
阿里云EMR Serverless Spark是基于Spark的全托管大数据处理平台,融合云原生弹性与自动化,提供任务全生命周期管理,让数据工程师专注数据分析。它内置高性能Fusion Engine,性能比开源Spark提升200%,并有成本优化的Celeborn服务。支持计算存储分离、OSS-HDFS兼容、DLF元数据管理,实现一站式的开发体验和Serverless资源管理。适用于数据报表、科学项目等场景,简化开发与运维流程。用户可通过阿里云控制台快速配置和体验EMR Serverless Spark服务。
|
6月前
|
分布式计算 运维 Serverless
通过Serverless Spark提交PySpark流任务的实践体验
EMR Serverless Spark服务是阿里云推出的一种全托管、一站式的数据计算平台,旨在简化大数据计算的工作流程,让用户更加专注于数据分析和价值提炼,而非基础设施的管理和运维。下面就跟我一起通过Serverless Spark提交PySpark流任务吧。
378 1