Spark编程实验三:Spark SQL编程

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: Spark编程实验三:Spark SQL编程

一、目的与要求

1、通过实验掌握Spark SQL的基本编程方法;

2、熟悉RDD到DataFrame的转化方法;

3、熟悉利用Spark SQL管理来自不同数据源的数据。

二、实验内容

1、Spark SQL基本操作

       将下列JSON格式数据复制到Linux系统中,并保存命名为employee.json。

{ "id":1 , "name":"Ella" , "age":36 }

{ "id":2, "name":"Bob","age":29 }

{ "id":3 , "name":"Jack","age":29 }

{ "id":4 , "name":"Jim","age":28 }

{ "id":5 , "name":"Damon" }

{ "id":5 , "name":"Damon" }

为employee.json创建DataFrame,并写出Python语句完成下列操作:

(1)查询所有数据;

(2)查询所有数据,并去除重复的数据;

(3)查询所有数据,打印时去除id字段;

(4)筛选出age>30的记录;

(5)将数据按age分组;

(6)将数据按name升序排列;

(7)取出前3行数据;

(8)查询所有记录的name列,并为其取别名为username;

(9)查询年龄age的平均值;

(10)查询年龄age的最小值。

2、编程实现将RDD转换为DataFrame

       源文件内容如下(包含id,name,age):

1,Ella,36

2,Bob,29

3,Jack,29

请将数据复制保存到Linux系统中,命名为employee.txt,实现从RDD转换得到DataFrame,并按“id:1,name:Ella,age:36”的格式打印出DataFrame的所有数据。请写出程序代码。

3、编程实现利用DataFrame读写MySQL的数据

(1)在MySQL数据库中新建数据库sparktest,再创建表employee,包含如表所示的两行数据。

(2)配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入如表所示的三行数据到MySQL中,最后打印出age的最大值和age的总和。

三、实验步骤

1、Spark SQL基本操作

       将下列JSON格式数据复制到Linux系统中,并保存命名为employee.json。

{ "id":1 , "name":"Ella" , "age":36 }

{ "id":2, "name":"Bob","age":29 }

{ "id":3 , "name":"Jack","age":29 }

{ "id":4 , "name":"Jim","age":28 }

{ "id":5 , "name":"Damon" }

{ "id":5 , "name":"Damon" }

为employee.json创建DataFrame,并写出Python语句完成下列操作:

>>> spark=SparkSession.builder.getOrCreate()
>>> df = spark.read.json("file:///home/zhc/mycode/sparksql/employee.json")

(1)查询所有数据;

>>> df.show()

(2)查询所有数据,并去除重复的数据;

>>> df.distinct().show()

(3)查询所有数据,打印时去除id字段;

>>> df.drop("id").show()

(4)筛选出age>30的记录;

>>> df.filter(df.age > 30).show()

(5)将数据按age分组;

>>> df.groupBy("age").count().show()

(6)将数据按name升序排列;

>>> df.sort(df.name.asc()).show()

(7)取出前3行数据;

>>> df.take(3)

(8)查询所有记录的name列,并为其取别名为username;

>>> df.select(df.name.alias("username")).show()

(9)查询年龄age的平均值;

>>> df.agg({"age": "mean"}).show()

(10)查询年龄age的最小值。

>>> df.agg({"age": "min"}).show()

2、编程实现将RDD转换为DataFrame

       源文件内容如下(包含id,name,age):

1,Ella,36

2,Bob,29

3,Jack,29

请将数据复制保存到Linux系统中,命名为employee.txt,实现从RDD转换得到DataFrame,并按“id:1,name:Ella,age:36”的格式打印出DataFrame的所有数据。请写出程序代码。

首先,在“/home/zhc/mycode/sparksql”目录下创建文件employee.txt

[root@bigdata sparksql]# vi employee.txt

然后,在该目录下新建一个py文件命名为rddtodf.py,然后写入如下py程序:

[root@bigdata sparksql]# vi rddtodf.py
#/home/zhc/mycode/sparksql/rddtodf.py
from pyspark.conf import SparkConf
from pyspark.sql.session import SparkSession
from pyspark import SparkContext
from pyspark.sql.types import Row
from pyspark.sql import SQLContext
if __name__ == "__main__":
        sc = SparkContext("local","Simple App")
        spark=SparkSession(sc)
        peopleRDD = spark.sparkContext.textFile("file:home/zhc/mycode/sparksql/employee.txt")
        rowRDD = peopleRDD.map(lambda line : line.split(",")).map(lambda attributes : Row(int(attributes[0]),attributes[1],int(attributes[2]))).toDF()
        rowRDD.createOrReplaceTempView("employee")
        personsDF = spark.sql("select * from employee")
        personsDF.rdd.map(lambda t : "id:"+str(t[0])+","+"Name:"+t[1]+","+"age:"+str(t[2])).foreach(print)

最后,运行该程序:

[root@bigdata sparksql]# python3 rddtodf.py

3、编程实现利用DataFrame读写MySQL的数据

(1)在MySQL数据库中新建数据库sparktest,再创建表employee,包含如表所示的两行数据。

(2)配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入如表所示的三行数据到MySQL中,最后打印出age的最大值和age的总和。

首先,启动mysql服务并进入到mysql数据库中:

[root@bigdata sparksql]# systemctl start mysqld.service
[root@bigdata sparksql]# mysql -u root -p

然后开始接下来的操作。

(1)在MySQL数据库中新建数据库sparktest,再创建表employee,包含如表所示的两行数据。

mysql> create database sparktest;
mysql> use sparktest;
mysql> create table employee (id int(4), name char(20), gender char(4), age int(4));
mysql> insert into employee values(1,'Alice','F',22);
mysql> insert into employee values(2,'John','M',25);

(2)配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入如表所示的三行数据到MySQL中,最后打印出age的最大值和age的总和。

首先,在“/home/zhc/mycode/sparksql”目录下面新建一个py程序并命名为mysqltest.py。

[root@bigdata sparksql]# vi mysqltest.py

接着,写入如下py程序:

#/home/zhc/mycode/sparksql/mysqltest.py
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession
spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()
#下面设置模式信息
schema = StructType([StructField("id",IntegerType(),True),StructField("name", StringType(), True),StructField("gender", StringType(), True),StructField("age",IntegerType(), True)])
employeeRDD = spark.sparkContext.parallelize(["3 Mary F 26","4 Tom M 23","5 zhanghc M 21"]).map(lambda x:x.split(" "))
#下面创建Row对象,每个Row对象都是rowRDD中的一行
rowRDD = employeeRDD.map(lambda p:Row(int(p[0].strip()), p[1].strip(), p[2].strip(), int(p[3].strip())))
#建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来
employeeDF = spark.createDataFrame(rowRDD, schema)
#写入数据库
prop = {}
prop['user'] = 'root'
prop['password'] = 'MYsql123!'
prop['driver'] = "com.mysql.jdbc.Driver"
employeeDF.write.jdbc("jdbc:mysql://localhost:3306/sparktest?useSSL=false",'employee','append', prop)
employeeDF.collect()
employeeDF.agg({"age": "max"}).show()
employeeDF.agg({"age": "sum"}).show()

然后,直接运行该py程序即可得到结果:

[root@bigdata sparksql]# python3 mysqltest.py

最后,到MySQL Shell中,即可查看employee表中的所有信息。

mysql> select * from employee;

四、结果分析与实验体会

       Spark SQL是Apache Spark中用于处理结构化数据的模块。它提供了一种类似于SQL的编程接口,可以用于查询和分析数据。通过实验掌握了Spark SQL的基本编程方法,SparkSession支持从不同的数据源加载数据,并把数据转换成DataFrame,并且支持把DataFrame转换成SQLContext自身中的表,然后使用SQL语句来操作数据。

       在使用Spark SQL之前,需要创建一个SparkSession对象。可以使用SparkSession的read方法加载数据。可以使用DataFrame的createOrReplaceTempView方法将DataFrame注册为一个临时视图。可以使用SparkSession的sql方法执行SQL查询。除了使用SQL查询外,还可以使用DataFrame的API进行数据操作和转换。可以使用DataFrame的write方法将数据写入外部存储。在使用完SparkSession后,应该调用其close方法来关闭SparkSession。

       最后,还掌握了RDD到DataFrame的转化方法,并可以利用Spark SQL管理来自不同数据源的数据。


相关实践学习
基于CentOS快速搭建LAMP环境
本教程介绍如何搭建LAMP环境,其中LAMP分别代表Linux、Apache、MySQL和PHP。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
1月前
|
分布式计算 关系型数据库 MySQL
Spark编程实验四:Spark Streaming编程
Spark编程实验四:Spark Streaming编程
53 2
|
13天前
|
SQL JSON 分布式计算
|
13天前
|
SQL 分布式计算 Java
|
6天前
|
SQL IDE Java
Java连接SQL Server数据库的详细操作流程
Java连接SQL Server数据库的详细操作流程
|
14天前
|
SQL DataWorks NoSQL
DataWorks产品使用合集之如何将SQL Server中的数据转存到MongoDB
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
241 1
|
1月前
|
SQL API 流计算
实时计算 Flink版产品使用合集之在Mac M1下的Docker环境中开启SQL Server代理的操作步骤是什么
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
161 1
|
7天前
|
SQL 存储 关系型数据库
关系型数据库中的SQL Server
【6月更文挑战第11天】
43 3
|
6天前
|
SQL IDE Java
Java连接SQL Server数据库的详细操作流程
Java连接SQL Server数据库的详细操作流程
|
7天前
|
SQL 弹性计算 API
云服务器 ECS产品使用问题之如何通过API调用阿里云服务器上SQL Server数据库中的数据
云服务器ECS(Elastic Compute Service)是各大云服务商阿里云提供的一种基础云计算服务,它允许用户租用云端计算资源来部署和运行各种应用程序。以下是一个关于如何使用ECS产品的综合指南。
|
3天前
|
SQL 存储 关系型数据库
ArcGIS Engine连接ArcSDE SQL Server(获得所有SDE图层)
ArcGIS Engine连接ArcSDE SQL Server(获得所有SDE图层)
14 0