Python+大数据学习笔记(一)

简介: Python+大数据学习笔记(一)

PySpark使用

pyspark:

• pyspark = python + spark

• 在pandas、numpy进行数据处理时,一次性将数据读入 内存中,当数据很大时内存溢出,无法处理;此外,很

多执行算法是单线程处理,不能充分利用cpu性能

spark的核心概念之一是shuffle,它将数据集分成数据块, 好处是:

• 在读取数据时,不是将数据一次性全部读入内存中,而

是分片,用时间换空间进行大数据处理

• 极大的利用了CPU资源

• 支持分布式结构,弹性拓展硬件资源。

pyspark:

• 在数据结构上Spark支持dataframe、sql和rdd模型 • 算子和转换是Spark中最重要的两个动作

• 算子好比是盖房子中的画图纸,转换是搬砖盖房子。有 时候我们做一个统计是多个动作结合的组合拳,spark常 将一系列的组合写成算子的组合执行,执行时,spark会

对算子进行简化等优化动作,执行速度更快

pyspark操作: • 对数据进行切片(shuffle)

config(“spark.default.parallelism”, 3000)

假设读取的数据是20G,设置成3000份,每次每个进程

(线程)读取一个shuffle,可以避免内存不足的情况

• 设置程序的名字

appName(“taSpark”)

• 读文件

data = spark.read.csv(cc,header=None, inferSchema=“true”)

• 配置spark context

Spark 2.0版本之后只需要创建一个SparkSession即可

from pyspark.sql import SparkSession
spark=SparkSession
.builder
.appName(‘hotel_rec_app’)
.getOrCreate()
# Spark+python 进行wordCount
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.appName("PythonWordCount")\
.master("local[*]")\
.getOrCreate()
# 将文件转换为RDD对象
lines = spark.read.text("input.txt").rdd.map(lambda r: r[0])
counts = lines.flatMap(lambda x: x.split(' ')) \
.map(lambda x: (x, 1)) \
.reduceByKey(lambda x, y: x + y)
output = counts.collect()
for (word, count) in output:
print("%s: %i" % (word, count))
spark.stop()

PySpark中的DataFrame

• DataFrame类似于Python中的数据表,允许处理大量结

构化数据

• DataFrame优于RDD,同时包含RDD的功能

# 从集合中创建RDD
rdd = spark.sparkContext.parallelize([
(1001, "张飞", 8341, "坦克"),
(1002, "关羽", 7107, "战士"),
(1003, "刘备", 6900, "战士")
])
# 指定模式, StructField(name,dataType,nullable)
# name: 该字段的名字,dataType:该字段的数据类型,
nullable: 指示该字段的值是否为空
from pyspark.sql.types import StructType, StructField, 
LongType, StringType # 导入类型
schema = StructType([
StructField("id", LongType(), True),
StructField("name", StringType(), True),
StructField("hp", LongType(), True), #生命值
StructField("role_main", StringType(), True)
])
# 对RDD应用该模式并且创建DataFrame
heros = spark.createDataFrame(rdd, schema)
heros.show()
# 利用DataFrame创建一个临时视图
heros.registerTempTable("HeroGames")
# 查看DataFrame的行数
print(heros.count())
# 使用自动类型推断的方式创建dataframe
data = [(1001, "张飞", 8341, "坦克"),
(1002, "关羽", 7107, "战士"),
(1003, "刘备", 6900, "战士")]
df = spark.createDataFrame(data, schema=['id', 'name', 
'hp', 'role_main'])
print(df) #只能显示出来是DataFrame的结果
df.show() #需要通过show将内容打印出来
print(df.count())
3
DataFrame[id: bigint, name: string, hp: bigint, role_main: 
string]
| id|name| hp|role_main|
+----+-------+-----+-------------+
|1001|张飞|8341| 坦克|
|1002|关羽|7107| 战士|
|1003|刘备|6900| 战士| +----+-------+-----+-------------+ 3
从CSV文件中读取
heros = spark.read.csv("./heros.csv", header=True, 
inferSchema=True)
heros.show()
• 从MySQL中读取
df = spark.read.format('jdbc').options(
url='jdbc:mysql://localhost:3306/wucai?useUnicode=true&
useJDBCCompliantTimezoneShift=true&useLegacyDatetim
eCode=false&serverTimezone=Asia/Shanghai',
dbtable='heros',
user='root',
password='passw0rdcc4'
).load()
print('连接JDBC,调用Heros数据表')
df.show()


相关实践学习
基于MaxCompute的热门话题分析
Apsara Clouder大数据专项技能认证配套课程:基于MaxCompute的热门话题分析
目录
相关文章
|
3月前
|
存储 分布式计算 大数据
基于Python大数据的的电商用户行为分析系统
本系统基于Django、Scrapy与Hadoop技术,构建电商用户行为分析平台。通过爬取与处理海量用户数据,实现行为追踪、偏好分析与个性化推荐,助力企业提升营销精准度与用户体验,推动电商智能化发展。
|
3月前
|
机器学习/深度学习 大数据 关系型数据库
基于python大数据的台风灾害分析及预测系统
针对台风灾害预警滞后、精度不足等问题,本研究基于Python与大数据技术,构建多源数据融合的台风预测系统。利用机器学习提升路径与强度预测准确率,结合Django框架实现动态可视化与实时预警,为防灾决策提供科学支持,显著提高应急响应效率,具有重要社会经济价值。
|
3月前
|
机器学习/深度学习 大数据 关系型数据库
基于python大数据的青少年网络使用情况分析及预测系统
本研究基于Python大数据技术,构建青少年网络行为分析系统,旨在破解现有防沉迷模式下用户画像模糊、预警滞后等难题。通过整合多平台亿级数据,运用机器学习实现精准行为预测与实时干预,推动数字治理向“数据驱动”转型,为家庭、学校及政府提供科学决策支持,助力青少年健康上网。
|
3月前
|
数据可视化 大数据 关系型数据库
基于python大数据技术的医疗数据分析与研究
在数字化时代,医疗数据呈爆炸式增长,涵盖患者信息、检查指标、生活方式等。大数据技术助力疾病预测、资源优化与智慧医疗发展,结合Python、MySQL与B/S架构,推动医疗系统高效实现。
|
4月前
|
机器学习/深度学习 传感器 分布式计算
数据才是真救命的:聊聊如何用大数据提升灾难预警的精准度
数据才是真救命的:聊聊如何用大数据提升灾难预警的精准度
335 14
|
6月前
|
数据采集 分布式计算 DataWorks
ODPS在某公共数据项目上的实践
本项目基于公共数据定义及ODPS与DataWorks技术,构建一体化智能化数据平台,涵盖数据目录、归集、治理、共享与开放六大目标。通过十大子系统实现全流程管理,强化数据安全与流通,提升业务效率与决策能力,助力数字化改革。
224 4
|
5月前
|
机器学习/深度学习 运维 监控
运维不怕事多,就怕没数据——用大数据喂饱你的运维策略
运维不怕事多,就怕没数据——用大数据喂饱你的运维策略
197 0

推荐镜像

更多