Python大数据之PySpark(六)RDD的操作

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: Python大数据之PySpark(六)RDD的操作

RDD的操作

函数分类

  • *Transformation操作只是建立计算关系,而Action 操作才是实际的执行者*
  • Transformation算子
  • 转换算子
  • 操作之间不算的转换,如果想看到结果通过action算子触发
  • Action算子
  • 行动算子
  • 触发Job的执行,能够看到结果信息

Transformation函数

  • 值类型valueType
  • map
  • flatMap
  • filter
  • mapValue

双值类型DoubleValueType

  • intersection
  • union
  • difference
  • distinct

Key-Value值类型

  • reduceByKey
  • groupByKey
  • sortByKey
  • combineByKey是底层API
  • foldBykey
  • aggreateBykey

Action函数

  • collect
  • saveAsTextFile
  • first
  • take
  • takeSample
  • top

基础练习[Wordcount快速演示]

Transformer算子

  • 单value类型代码
# -*- coding: utf-8 -*-
# Program function:完成单Value类型RDD的转换算子的演示
from pyspark import SparkConf,SparkContext
import re
'''
分区内:一个rdd可以分为很多分区,每个分区里面都是有大量元素,每个分区都需要线程执行
分区间:有一些操作分区间做一些累加
'''
if __name__ == '__main__':
# 1-创建SparkContext申请资源
conf = SparkConf().setAppName("mini").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf=conf)
sc.setLogLevel("WARN")#一般在工作中不这么写,直接复制log4j文件
# 2-map操作
rdd1 = sc.parallelize([1, 2, 3, 4, 5, 6])
rdd__map = rdd1.map(lambda x: x * 2)
print(rdd__map.glom().collect())#[2, 4, 6, 8, 10, 12],#[[2, 4, 6], [8, 10, 12]]
# 3-filter操作
print(rdd1.glom().collect())
print(rdd1.filter(lambda x: x > 3).glom().collect())
# 4-flatMap
rdd2 = sc.parallelize(["  hello      you", "hello me  "])
print(rdd2.flatMap(lambda word: re.split("\s+", word.strip())).collect())
# 5-groupBY
x = sc.parallelize([1, 2, 3])
y = x.groupBy(lambda x: 'A' if (x % 2 == 1) else 'B')
print(y.mapValues(list).collect())#[('A', [1, 3]), ('B', [2])]
# 6-mapValue
x1 = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"])])
def f(x): return len(x)
print(x1.mapValues(f).collect())
  • 双value类型的代码
# -*- coding: utf-8 -*-
# Program function:完成单Value类型RDD的转换算子的演示
from pyspark import SparkConf, SparkContext
import re
'''
分区内:一个rdd可以分为很多分区,每个分区里面都是有大量元素,每个分区都需要线程执行
分区间:有一些操作分区间做一些累加
'''
if __name__ == '__main__':
# 1-创建SparkContext申请资源
conf = SparkConf().setAppName("mini2").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf=conf)
sc.setLogLevel("WARN")  # 一般在工作中不这么写,直接复制log4j文件
# 2-对两个RDD求并集
rdd1 = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8])
Union_RDD = rdd1.union(rdd2)
print(Union_RDD.collect())
print(rdd1.intersection(rdd2).collect())
print(rdd2.subtract(rdd1).collect())
# Return a new RDD containing the distinct elements in this RDD.
print(Union_RDD.distinct().collect())
print(Union_RDD.distinct().glom().collect())
  • key-Value算子
# -*- coding: utf-8 -*-
# Program function:完成单Value类型RDD的转换算子的演示
from pyspark import SparkConf, SparkContext
import re
'''
分区内:一个rdd可以分为很多分区,每个分区里面都是有大量元素,每个分区都需要线程执行
分区间:有一些操作分区间做一些累加
'''
if __name__ == '__main__':
# 1-创建SparkContext申请资源
conf = SparkConf().setAppName("mini2").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf=conf)
sc.setLogLevel("WARN")  # 一般在工作中不这么写,直接复制log4j文件
# 2-key和value类型算子
# groupByKey
rdd1 = sc.parallelize([("a", 1), ("b", 2)])
rdd2 = sc.parallelize([("c", 1), ("b", 3)])
rdd3 = rdd1.union(rdd2)
key1 = rdd3.groupByKey()
print("groupByKey:",key1.collect())
#groupByKey:
# [('b', <pyspark.resultiterable.ResultIterable object at 0x7f001c469c40),
# ('c', <pyspark.resultiterable.ResultIterable object at 0x7f001c469310),
# ('a', <pyspark.resultiterable.ResultIterable object at 0x7f001c469a00)]
print(key1.mapValues(list).collect())#需要通过mapValue获取groupByKey的值
print(key1.mapValues(tuple).collect())
# reduceByKey
key2 = rdd3.reduceByKey(lambda x, y: x + y)
print(key2.collect())
# sortByKey
print(key2.map(lambda x: (x[1], x[0])).sortByKey(False).collect())#[(5, 'b'), (1, 'c'), (1, 'a')]
# countByKey
print(rdd3.countByValue())#defaultdict(<class 'int', {('a', 1): 1, ('b', 2): 1, ('c', 1): 1, ('b', 3): 1})

Action算子

  • 部分操作
# -*- coding: utf-8 -*-
# Program function:完成单Value类型RDD的转换算子的演示
from pyspark import SparkConf, SparkContext
import re
'''
分区内:一个rdd可以分为很多分区,每个分区里面都是有大量元素,每个分区都需要线程执行
分区间:有一些操作分区间做一些累加
'''
if __name__ == '__main__':
# 1-创建SparkContext申请资源
conf = SparkConf().setAppName("mini2").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf=conf)
sc.setLogLevel("WARN")  # 一般在工作中不这么写,直接复制log4j文件
# 2-key和value类型算子
# groupByKey
rdd1 = sc.parallelize([("a", 1), ("b", 2)])
rdd2 = sc.parallelize([("c", 1), ("b", 3)])
print(rdd1.first())
print(rdd1.take(2))
print(rdd1.top(2))
print(rdd1.collect())
rdd3 = sc.parallelize([1, 2, 3, 4, 5])
from operator import add
from operator import mul
print(rdd3.reduce(add))
print(rdd3.reduce(mul))
rdd4 = sc.parallelize(range(0, 10))
# 能否保证每次抽样结果是一致的,使用seed随机数种子
print(rdd4.takeSample(True, 3, 123))
print(rdd4.takeSample(True, 3, 123))
print(rdd4.takeSample(True, 3, 123))
print(rdd4.takeSample(True, 3, 34))
  • 其他补充算子
# -*- coding: utf-8 -*-
# Program function:完成单Value类型RDD的转换算子的演示
from pyspark import SparkConf, SparkContext
import re
'''
分区内:一个rdd可以分为很多分区,每个分区里面都是有大量元素,每个分区都需要线程执行
分区间:有一些操作分区间做一些累加
'''
def f(iterator):  # 【1,2,3】 【4,5】
for x in iterator:  # for x in 【1,2,3】  x=1,2,3 print 1.2.3
print(x)
def f1(iterator):  # 【1,2,3】 【4,5】  sum(1+2+3) sum(4+5)
yield sum(iterator)
if __name__ == '__main__':
# 1-创建SparkContext申请资源
conf = SparkConf().setAppName("mini2").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf=conf)
sc.setLogLevel("WARN")  # 一般在工作中不这么写,直接复制log4j文件
# 2-foreach-Applies a function to all elements of this RDD.
rdd1 = sc.parallelize([("a", 1), ("b", 2)])
print(rdd1.glom().collect())
# def f(x):print(x)
rdd1.foreach(lambda x: print(x))
# 3-foreachPartition--Applies a function to each partition of this RDD.
# 从性能角度分析,按照分区并行比元素更加高效
rdd1.foreachPartition(f)
# 4-map---按照元素进行转换
rdd2 = sc.parallelize([1, 2, 3, 4])
print(rdd2.map(lambda x: x * 2).collect())
# 5-mapPartiton-----按照分区进行转换
# Return a new RDD by applying a function to each partition of this RDD.
print(rdd2.mapPartitions(f1).collect())  # [3, 7]

重要函数

基本函数

  • 基础的transformation
  • 和action操作

分区操作函数

  • mapPartition
  • foreachPartition

分区函数

# -*- coding: utf-8 -*-
# Program function:完成单Value类型RDD的转换算子的演示
from pyspark import SparkConf, SparkContext
import re
'''
分区内:一个rdd可以分为很多分区,每个分区里面都是有大量元素,每个分区都需要线程执行
分区间:有一些操作分区间做一些累加
alt+6 可以调出来所有TODO,
TODO是Python提供了预留功能的地方
'''
if __name__ == '__main__':
#TODO:  1-创建SparkContext申请资源
conf = SparkConf().setAppName("mini2").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf=conf)
sc.setLogLevel("WARN")  # 一般在工作中不这么写,直接复制log4j文件
#TODO:   2-执行重分区函数--repartition
rdd1 = sc.parallelize([1, 2, 3, 4, 5, 6], 3)
print("partitions num:",rdd1.getNumPartitions())
print(rdd1.glom().collect())#[[1, 2], [3, 4], [5, 6]]
print("repartition result:")
#TODO:   repartition可以增加分区也可以减少分区,但是都会产生shuflle,如果减少分区的化建议使用coalesc避免发生shuffle
rdd__repartition1 = rdd1.repartition(5)
print("increase partition",rdd__repartition1.glom().collect())#[[], [1, 2], [5, 6], [3, 4], []]
rdd__repartition2 = rdd1.repartition(2)
print("decrease partition",rdd__repartition2.glom().collect())#decrease partition [[1, 2, 5, 6], [3, 4]]
#TODO:   3-减少分区--coalese
print(rdd1.coalesce(2).glom().collect())#[[1, 2], [3, 4, 5, 6]]
print(rdd1.coalesce(5).glom().collect())#[[1, 2], [3, 4], [5, 6]]
print(rdd1.coalesce(5,True).glom().collect())#[[], [1, 2], [5, 6], [3, 4], []]
# 结论:repartition默认调用的是coalese的shuffle为True的方法
# TODO:  4-PartitonBy,可以调整分区,还可以调整分区器(一种hash分区器(一般打散数据),一种range分区器(排序拍好的))
# 此类专门针对RDD中数据类型为KeyValue对提供函数
# rdd五大特性中有第四个特点key-value分区器,默认是hashpartitioner分区器
rdd__map = rdd1.map(lambda x: (x, x))
print("partitions length:",rdd__map.getNumPartitions())#partitions length: 3
print(rdd__map.partitionBy(2).glom().collect())

聚合函数

  • 代码:
# -*- coding: utf-8 -*-
# Program function:完成单Value类型RDD的转换算子的演示
from pyspark import SparkConf, SparkContext
import re
'''
分区内:一个rdd可以分为很多分区,每个分区里面都是有大量元素,每个分区都需要线程执行
分区间:有一些操作分区间做一些累加
alt+6 可以调出来所有TODO,
TODO是Python提供了预留功能的地方
'''
def addNum(x,y):
return x+y
if __name__ == '__main__':
# TODO:  1-创建SparkContext申请资源
conf = SparkConf().setAppName("mini2").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf=conf)
sc.setLogLevel("WARN")  # 一般在工作中不这么写,直接复制log4j文件
# TODO:   2-使用reduce进行聚合计算
rdd1 = sc.parallelize([1, 2, 3, 4, 5, 6], 3)
from operator import add
# 直接得到返回值-21
print(rdd1.reduce(add))
# TODO: 3-使用fold进行聚合计算
# 第一个参数zeroValue是初始值,会参与分区的计算
 #第二个参数是执行运算的operation
print(rdd1.fold(0, add))  # 21
print(rdd1.getNumPartitions())  # 3
print(rdd1.glom().collect())
print("fold result:", rdd1.fold(10, add))
# TODO: 3-使用aggreate进行聚合计算
# seqOp分区内的操作, combOp分区间的操作
print(rdd1.aggregate(0, add, add))  # 21
print(rdd1.glom().collect())
print("aggregate result:", rdd1.aggregate(1, add, add))  # aggregate result: 25
# 结论:fold是aggregate的简化版本,fold分区内和分区间的函数是一致的
print("aggregate result:", rdd1.aggregate(1, addNum, addNum))  # aggregate result: 25
* byKey类的聚合函数
* **groupByKey----如何获取value的数据?------答案:result.mapValue(list).collect**
* **reduceByKey**
* foldBykey

  • aggregateByKey
  • CombineByKey:这是一个更为底层实现的bykey 聚合算子,可以实现更多复杂功能

  • 案例1:
# -*- coding: utf-8 -*-
# Program function:完成单Value类型RDD的转换算子的演示
from pyspark import SparkConf, SparkContext
import re
'''
分区内:一个rdd可以分为很多分区,每个分区里面都是有大量元素,每个分区都需要线程执行
分区间:有一些操作分区间做一些累加
alt+6 可以调出来所有TODO,
TODO是Python提供了预留功能的地方
'''
'''
对初始值进行操作
'''
def createCombiner(value): #('a',[1])
return [value]
# 这里的x=createCombiner得到的[value]结果
def mergeValue(x,y): #这里相同a的value=y=1
x.append(y)#('a', [1, 1]),('b', [1])
return x
def mergeCombiners(a,b):
a.extend(b)
return a
if __name__ == '__main__':
# TODO:  1-创建SparkContext申请资源
conf = SparkConf().setAppName("mini2").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf=conf)
sc.setLogLevel("WARN")  # 一般在工作中不这么写,直接复制log4j文件
# TODO:  2-基础数据处理
from operator import add
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
# [(a:[1,1]),(b,[1,1])]
print(sorted(rdd.groupByKey().mapValues(list).collect()))
# 使用自定义集聚合函数组合每个键的元素的通用功能。
# - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
# 对初始值进行操作
# - `mergeValue`, to merge a V into a C (e.g., adds it to the end ofa list)
# 对分区内的元素进行合并
# - `mergeCombiners`, to combine two C's into a single one (e.g., merges the lists)
# 对分区间的元素进行合并
by_key_result = rdd.combineByKey(createCombiner, mergeValue, mergeCombiners)
print(sorted(by_key_result.collect()))#[('a', [1, 1]), ('b', [1])]
  • 案例2
# -*- coding: utf-8 -*-
# Program function:完成单Value类型RDD的转换算子的演示
from pyspark import SparkConf, SparkContext
import re
'''
分区内:一个rdd可以分为很多分区,每个分区里面都是有大量元素,每个分区都需要线程执行
分区间:有一些操作分区间做一些累加
alt+6 可以调出来所有TODO,
TODO是Python提供了预留功能的地方
'''
'''
对初始值进行操作
[value,1],value指的是当前学生成绩,1代表的是未来算一下一个学生考了几次考试
("Fred", 88)----------[88,1]
'''
def createCombiner(value):  #
return [value, 1]
'''
x代表的是 [value,1]值,x=[88,1]
y代表的相同key的value,比如("Fred", 95)的95,执行分区内的累加
'''
def mergeValue(x, y):
return [x[0] + y, x[1] + 1]
'''
a = a[0] value,a[1] 几次考试
'''
def mergeCombiners(a, b):
return [a[0] + b[0], a[1] + b[1]]
if __name__ == '__main__':
# TODO:  1-创建SparkContext申请资源
conf = SparkConf().setAppName("mini2").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf=conf)
sc.setLogLevel("WARN")  # 一般在工作中不这么写,直接复制log4j文件
# TODO:  2-基础数据处理
from operator import add
# 这里需要实现需求:求解一个学生的平均成绩
x = sc.parallelize([("Fred", 88), ("Fred", 95), ("Fred", 91), ("Wilma", 93), ("Wilma", 95), ("Wilma", 98)], 3)
print(x.glom().collect())
# 第一个分区("Fred", 88), ("Fred", 95)
# 第二个分区("Fred", 91), ("Wilma", 93),
# 第三个分区("Wilma", 95), ("Wilma", 98)
# reduceByKey
reduce_by_key_rdd = x.reduceByKey(lambda x, y: x + y)
print("reduceBykey:", reduce_by_key_rdd.collect())  # [('Fred', 274), ('Wilma', 286)]
# 如何求解平均成绩?
# 使用自定义集聚合函数组合每个键的元素的通用功能。
# - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
# 对初始值进行操作
# - `mergeValue`, to merge a V into a C (e.g., adds it to the end ofa list)
# 对分区内的元素进行合并
# - `mergeCombiners`, to combine two C's into a single one (e.g., merges the lists)
# 对分区间的元素进行合并
combine_by_key_rdd = x.combineByKey(createCombiner, mergeValue, mergeCombiners)
print(combine_by_key_rdd.collect())  # [('Fred', [274, 3]), ('Wilma', [286, 3])]
# 接下来平均值如何实现--('Fred', [274, 3])---x[0]=Fred x[1]= [274, 3],x[1][0]=274,x[1][1]=3
print(combine_by_key_rdd.map(lambda x: (x[0], int(x[1][0] / x[1][1]))).collect())
  • 面试题:

  • 关联函数

AI副业实战手册:http://www.yibencezi.com/notes/253200?affiliate_id=1317(目前40+工具及实战案例,持续更新,实战类小册排名第一,做三个月挣不到钱找我退款,交个朋友的产品)

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
28天前
|
机器学习/深度学习 人工智能 分布式计算
我的阿里云社区年度总结报告:Python、人工智能与大数据领域的探索之旅
我的阿里云社区年度总结报告:Python、人工智能与大数据领域的探索之旅
105 35
|
26天前
|
人工智能 分布式计算 大数据
MaxFrame 产品评测:大数据与AI融合的Python分布式计算框架
MaxFrame是阿里云MaxCompute推出的自研Python分布式计算框架,支持大规模数据处理与AI应用。它提供类似Pandas的API,简化开发流程,并兼容多种机器学习库,加速模型训练前的数据准备。MaxFrame融合大数据和AI,提升效率、促进协作、增强创新能力。尽管初次配置稍显复杂,但其强大的功能集、性能优化及开放性使其成为现代企业与研究机构的理想选择。未来有望进一步简化使用门槛并加强社区建设。
60 7
|
1月前
|
SQL 分布式计算 DataWorks
MaxCompute MaxFrame评测 | 分布式Python计算服务MaxFrame(完整操作版)
在当今数字化迅猛发展的时代,数据信息的保存与分析对企业决策至关重要。MaxCompute MaxFrame是阿里云自研的分布式计算框架,支持Python编程接口、兼容Pandas接口并自动进行分布式计算。通过MaxCompute的海量计算资源,企业可以进行大规模数据处理、可视化数据分析及科学计算等任务。本文将详细介绍如何开通MaxCompute和DataWorks服务,并使用MaxFrame进行数据操作。包括创建项目、绑定数据源、编写PyODPS 3节点代码以及执行SQL查询等内容。最后,针对使用过程中遇到的问题提出反馈建议,帮助用户更好地理解和使用MaxFrame。
|
2月前
|
分布式计算 大数据 数据处理
技术评测:MaxCompute MaxFrame——阿里云自研分布式计算框架的Python编程接口
随着大数据和人工智能技术的发展,数据处理的需求日益增长。阿里云推出的MaxCompute MaxFrame(简称“MaxFrame”)是一个专为Python开发者设计的分布式计算框架,它不仅支持Python编程接口,还能直接利用MaxCompute的云原生大数据计算资源和服务。本文将通过一系列最佳实践测评,探讨MaxFrame在分布式Pandas处理以及大语言模型数据处理场景中的表现,并分析其在实际工作中的应用潜力。
102 2
|
3月前
|
并行计算 数据挖掘 大数据
Python数据分析实战:利用Pandas处理大数据集
Python数据分析实战:利用Pandas处理大数据集
|
4月前
|
分布式计算 Hadoop 大数据
大数据体系知识学习(一):PySpark和Hadoop环境的搭建与测试
这篇文章是关于大数据体系知识学习的,主要介绍了Apache Spark的基本概念、特点、组件,以及如何安装配置Java、PySpark和Hadoop环境。文章还提供了详细的安装步骤和测试代码,帮助读者搭建和测试大数据环境。
113 1
|
4月前
|
分布式计算 Java 大数据
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
59 0
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
|
4月前
|
消息中间件 分布式计算 Kafka
大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队列流
大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队列流
50 0
|
4月前
|
SQL 分布式计算 大数据
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
129 0
|
20天前
|
SQL 数据可视化 大数据
从数据小白到大数据达人:一步步成为数据分析专家
从数据小白到大数据达人:一步步成为数据分析专家
182 92