PySpark|RDD编程基础

简介: PySpark数据结构RDD编程基础

RDD(弹性分布式数据集)

RDD是Spark中最基本的数据抽象,其实就是分布式的元素集合。RDD有三个基本的特性:分区、不可变、并行操作。

分区:每一个 RDD 包含的数据被存储在系统的不同节点上。逻辑上我们可以将 RDD 理解成一个大的数组,数组中的每个元素就代表一个分区 (Partition) 。

不可变:不可变性是指每个 RDD 都是只读的,它所包含的分区信息是不可变的。由于已有的 RDD 是不可变的,所以我们只有对现有的 RDD 进行转化 (Transformation) 操作,才能得到新的 RDD ,一步一步的计算出我们想要的结果。

并行操作:因为 RDD 的分区特性,所以其天然支持并行处理的特性。即不同节点上的数据可以分别被处理,然后生成一个新的 RDD。

RDD创建

在Pyspark中我们可以通过两种方式来进行RDD的创建,RDD是一种无schema的数据结构,所以我们几乎可以混合使用任何类型的数据结构:tuple、dict、list都可以使用。

  • parallelize()

直接使用数据容器创建RDD。

data = sc.parallelize([('Amber', 22), ('Alfred', 23), ('Skye', 4),
                       ('Albert', 12), ('Amber', 9)])
  • textFile()

引用位于本地或者外部的某个文件(或者多个文件)。

data_from_file = sc.\
    textFile(
        'xxxxx',
        4)

RDD转换

我们可以通过转换操作来进行数据集的调整,包括映射、筛选、连接、转换数据集中的值等操作。

  • map()

和python中的map映射相同,经常配合lambda使用。

data_2020 = data_from_file_conv.map(lambda row: int(row[16]))
  • filter()

从数据集中选择元素,该元素符合特定的标准。

data_filtered = data_from_file_conv.filter(lambda row: row[5] == 'F' and row[21] == '0')
  • flatMap()

和map()相似,但是返回一个扁平的列表(可以过滤一些格式不正确的记录)。

data_2014_flat = data_from_file_conv.flatMap(lambda row: (row[16], int(row[16]) + 1))
  • distinct()

返回指定列中不同值的列表

distinct_gender = data_from_file_conv.map(lambda row: row[5]).distinct().collect()
  • sample()

返回数据集的随机样本:
参数1:指定采样是否应该替换;
参数2:定义返回数据的分数(百分之多少);

参数3:随机种子。

fraction = 0.1
data_sample = data_from_file_conv.sample(False, fraction, 666)
  • leftOuterJoin()

左链接

rdd1 = sc.parallelize([('a', 1), ('b', 4), ('c',10)])
rdd2 = sc.parallelize([('a', 4), ('a', 1), ('b', '6'), ('d', 15)])

rdd3 = rdd1.leftOuterJoin(rdd2)
  • join()

只留下能够关联的内容

rdd4 = rdd1.join(rdd2)
  • intersection()

返回两个RDD中相等的记录

rdd5 = rdd1.intersection(rdd2)
  • repartition()

重新对数据进行分区

rdd1 = rdd1.repartition(4)

RDD操作

和上面的转换不同,操作执行数据集上的计划任务。

  • take()

返回单个数据分区的前n行。

data_first = data_from_file_conv.take(1)
data_first
  • collect()

将所有RDD的元素返回给驱动程序。

rdd5.collect()
  • reduce()

使用指定的方法减少RDD中的元素。

rdd1.map(lambda row: row[1]).reduce(lambda x, y: x + y)
  • count()

统计RDD中元素的个数。

data_reduce.count()
  • countByKey()

获取不同键的计数。

data_key.countByKey().items()
  • saveAsTextFile

让RDD保存为文本文件。

data_key.saveAsTextFile('xxx')
  • foreach()

对RDD中的每个元素,使用迭代的方式应用相同的函数。

def f(x): 
    print(x)

data_key.foreach(f)

总结

在这里插入图片描述

相关文章
|
7月前
|
分布式计算 关系型数据库 MySQL
Spark编程实验四:Spark Streaming编程
Spark编程实验四:Spark Streaming编程
215 2
|
7月前
|
存储 分布式计算 Hadoop
Spark 【RDD编程(一)RDD编程基础】
Spark 【RDD编程(一)RDD编程基础】
|
7月前
|
缓存 分布式计算 Java
Spark【RDD编程(二)RDD编程基础】
Spark【RDD编程(二)RDD编程基础】
|
缓存 分布式计算 Spark
Spark RDD开发
开发步骤
75 0
|
存储 缓存 分布式计算
Spark RDD编程基础(Scala版)
Spark RDD编程基础(Scala版)
|
SQL 缓存 分布式计算
PySpark|比RDD更快的DataFrame
PySpark基础数据结构讲解
PySpark|比RDD更快的DataFrame
|
机器学习/深度学习 人工智能 分布式计算
PySpark数据分析基础:PySpark原理详解
PySpark数据分析基础:PySpark原理详解
432 1
PySpark数据分析基础:PySpark原理详解
|
SQL 机器学习/深度学习 分布式计算
PySpark数据分析基础:PySpark基础功能及DataFrame操作基础语法详解
PySpark数据分析基础:PySpark基础功能及DataFrame操作基础语法详解
2255 0
PySpark数据分析基础:PySpark基础功能及DataFrame操作基础语法详解
|
分布式计算 Hadoop Java
【Spark】(三)Spark 架构原理和RDD使用详解2
【Spark】(三)Spark 架构原理和RDD使用详解2
153 0
【Spark】(三)Spark 架构原理和RDD使用详解2
|
存储 缓存 分布式计算
【Spark】(三)Spark 架构原理和RDD使用详解1
【Spark】(三)Spark 架构原理和RDD使用详解1
226 0
【Spark】(三)Spark 架构原理和RDD使用详解1