PySpark|RDD编程基础

简介: PySpark数据结构RDD编程基础

RDD(弹性分布式数据集)

RDD是Spark中最基本的数据抽象,其实就是分布式的元素集合。RDD有三个基本的特性:分区、不可变、并行操作。

分区:每一个 RDD 包含的数据被存储在系统的不同节点上。逻辑上我们可以将 RDD 理解成一个大的数组,数组中的每个元素就代表一个分区 (Partition) 。

不可变:不可变性是指每个 RDD 都是只读的,它所包含的分区信息是不可变的。由于已有的 RDD 是不可变的,所以我们只有对现有的 RDD 进行转化 (Transformation) 操作,才能得到新的 RDD ,一步一步的计算出我们想要的结果。

并行操作:因为 RDD 的分区特性,所以其天然支持并行处理的特性。即不同节点上的数据可以分别被处理,然后生成一个新的 RDD。

RDD创建

在Pyspark中我们可以通过两种方式来进行RDD的创建,RDD是一种无schema的数据结构,所以我们几乎可以混合使用任何类型的数据结构:tuple、dict、list都可以使用。

  • parallelize()

直接使用数据容器创建RDD。

data = sc.parallelize([('Amber', 22), ('Alfred', 23), ('Skye', 4),
                       ('Albert', 12), ('Amber', 9)])
  • textFile()

引用位于本地或者外部的某个文件(或者多个文件)。

data_from_file = sc.\
    textFile(
        'xxxxx',
        4)

RDD转换

我们可以通过转换操作来进行数据集的调整,包括映射、筛选、连接、转换数据集中的值等操作。

  • map()

和python中的map映射相同,经常配合lambda使用。

data_2020 = data_from_file_conv.map(lambda row: int(row[16]))
  • filter()

从数据集中选择元素,该元素符合特定的标准。

data_filtered = data_from_file_conv.filter(lambda row: row[5] == 'F' and row[21] == '0')
  • flatMap()

和map()相似,但是返回一个扁平的列表(可以过滤一些格式不正确的记录)。

data_2014_flat = data_from_file_conv.flatMap(lambda row: (row[16], int(row[16]) + 1))
  • distinct()

返回指定列中不同值的列表

distinct_gender = data_from_file_conv.map(lambda row: row[5]).distinct().collect()
  • sample()

返回数据集的随机样本:
参数1:指定采样是否应该替换;
参数2:定义返回数据的分数(百分之多少);

参数3:随机种子。

fraction = 0.1
data_sample = data_from_file_conv.sample(False, fraction, 666)
  • leftOuterJoin()

左链接

rdd1 = sc.parallelize([('a', 1), ('b', 4), ('c',10)])
rdd2 = sc.parallelize([('a', 4), ('a', 1), ('b', '6'), ('d', 15)])

rdd3 = rdd1.leftOuterJoin(rdd2)
  • join()

只留下能够关联的内容

rdd4 = rdd1.join(rdd2)
  • intersection()

返回两个RDD中相等的记录

rdd5 = rdd1.intersection(rdd2)
  • repartition()

重新对数据进行分区

rdd1 = rdd1.repartition(4)

RDD操作

和上面的转换不同,操作执行数据集上的计划任务。

  • take()

返回单个数据分区的前n行。

data_first = data_from_file_conv.take(1)
data_first
  • collect()

将所有RDD的元素返回给驱动程序。

rdd5.collect()
  • reduce()

使用指定的方法减少RDD中的元素。

rdd1.map(lambda row: row[1]).reduce(lambda x, y: x + y)
  • count()

统计RDD中元素的个数。

data_reduce.count()
  • countByKey()

获取不同键的计数。

data_key.countByKey().items()
  • saveAsTextFile

让RDD保存为文本文件。

data_key.saveAsTextFile('xxx')
  • foreach()

对RDD中的每个元素,使用迭代的方式应用相同的函数。

def f(x): 
    print(x)

data_key.foreach(f)

总结

在这里插入图片描述

相关文章
|
8月前
|
缓存 分布式计算 并行计算
Spark3:RDD概述
Spark3:RDD概述
92 0
|
2月前
|
存储 分布式计算 Hadoop
Spark 【RDD编程(一)RDD编程基础】
Spark 【RDD编程(一)RDD编程基础】
|
2月前
|
缓存 分布式计算 Java
Spark【RDD编程(二)RDD编程基础】
Spark【RDD编程(二)RDD编程基础】
|
8月前
|
存储 分布式计算 并行计算
182 Spark RDD概述
182 Spark RDD概述
28 0
|
缓存 分布式计算 Spark
Spark RDD开发
开发步骤
59 0
|
存储 缓存 分布式计算
Spark RDD编程基础(Scala版)
Spark RDD编程基础(Scala版)
|
SQL 缓存 分布式计算
PySpark|比RDD更快的DataFrame
PySpark基础数据结构讲解
PySpark|比RDD更快的DataFrame
|
机器学习/深度学习 人工智能 分布式计算
PySpark数据分析基础:PySpark原理详解
PySpark数据分析基础:PySpark原理详解
350 1
PySpark数据分析基础:PySpark原理详解
|
分布式计算 Hadoop Java
【Spark】(三)Spark 架构原理和RDD使用详解2
【Spark】(三)Spark 架构原理和RDD使用详解2
132 0
【Spark】(三)Spark 架构原理和RDD使用详解2
|
存储 缓存 分布式计算
【Spark】(三)Spark 架构原理和RDD使用详解1
【Spark】(三)Spark 架构原理和RDD使用详解1
177 0
【Spark】(三)Spark 架构原理和RDD使用详解1

热门文章

最新文章

  • 1
    流量控制系统,用正则表达式提取汉字
    27
  • 2
    Redis09-----List类型,有序,元素可以重复,插入和删除快,查询速度一般,一般保存一些有顺序的数据,如朋友圈点赞列表,评论列表等,LPUSH user 1 2 3可以一个一个推
    27
  • 3
    Redis08命令-Hash类型,也叫散列,其中value是一个无序字典,类似于java的HashMap结构,Hash结构可以将对象中的每个字段独立存储,可以针对每字段做CRUD
    27
  • 4
    Redis07命令-String类型字符串,不管是哪种格式,底层都是字节数组形式存储的,最大空间不超过512m,SET添加,MSET批量添加,INCRBY age 2可以,MSET,INCRSETEX
    28
  • 5
    S外部函数可以访问函数内部的变量的闭包-闭包最简单的用不了,闭包是内层函数+外层函数的变量,简称为函数套函数,外部函数可以访问函数内部的变量,存在函数套函数
    25
  • 6
    Redis06-Redis常用的命令,模糊的搜索查询往往会对服务器产生很大的压力,MSET k1 v1 k2 v2 k3 v3 添加,DEL是删除的意思,EXISTS age 可以用来查询是否有存在1
    31
  • 7
    Redis05数据结构介绍,数据结构介绍,官方网站中看到
    22
  • 8
    JS字符串数据类型转换,字符串如何转成变量,+号只要有一个是字符串,就会把另外一个转成字符串,- * / 都会把数据转成数字类型,数字型控制台是蓝色,字符型控制台是黑色,
    21
  • 9
    JS数组操作---删除,arr.pop()方法从数组中删除最后一个元素,并返回该元素的值,arr.shift() 删除第一个值,arr.splice()方法,删除指定元素,arr.splice,从第一
    21
  • 10
    定义好变量,${age}模版字符串,对象可以放null,检验数据类型console.log(typeof str)
    20