像写SQL一样去处理内存中的数据,SparkSQL入门教程

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
RDS MySQL DuckDB 分析主实例,集群系列 8核16GB
简介: 读取到的数据是DataFrame,接下来的操作就是对DataFrame的操作了。(五)总结SparkSQL是对Spark原生RDD的增强,虽然很多功能通过RDD就可以实现,但是SparkSQL可以更加灵活地实现一些功能。我是鱼仔,我们下期再见。

(一)概述


(五)总结

SparkSQL是对Spark原生RDD的增强,虽然很多功能通过RDD就可以实现,但是SparkSQL可以更加灵活地实现一些功能。我是鱼仔,我们下期再见。

Dataset是一个数据的分布式集合,是Spark1.6之后新增的接口,它提供了RDD的优点和SparkSQL优化执行引擎的优点,一个Dataset相当于RDD+Schema的结合。


Dataset的底层封装是RDD,当RDD的泛型是Row类型时,该类型就可以称为DataFrame。DataFrame是一种表格型的数据结构,就和传统的Mysql结构一样,通过DataFrame我们可以更加高效地去执行Sql。


(二)SparkSQL实战


使用SparkSQL首先需要引入相关的依赖:


<dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>3.0.0</version></dependency>

该依赖需要和sparkCore保持一致。


SparkSQL的编码主要通过四步:


  1. 创建SparkSession


  1. 获取数据


  1. 执行SQL


  1. 关闭SparkSession
publicclassSqlTest {
publicstaticvoidmain(String[] args) {
SparkSessionsparkSession=SparkSession.builder()
                .appName("sql")
                .master("local")
                .getOrCreate();
Dataset<Row>json=sparkSession.read().json("data/json");
json.printSchema();
json.show();
sparkSession.stop();
    }
}

在data的目录下创建一个名为json的文件

{"name":"a","age":23}
{"name":"b","age":24}
{"name":"c","age":25}
{"name":"d","age":26}
{"name":"e","age":27}
{"name":"f","age":28}

运行项目后输出两个结果,schema结果如下:


网络异常,图片无法展示
|


Dataset<Row>输出结果如下:


网络异常,图片无法展示
|


通过SparkSQL可以执行和SQL十分相似的查询操作:

publicclassSqlTest {
publicstaticvoidmain(String[] args) {
SparkSessionsparkSession=SparkSession.builder()
                .appName("sql")
                .master("local")
                .getOrCreate();
Dataset<Row>json=sparkSession.read().json("data/json");
json.select("age","name").where("age > 26").show();
sparkSession.stop();
    }
}

在上面的语句中,通过一系列的API实现了SQL查询操作,除此之外,SparkSQL还支持直接写原始SQL语句的操作。


在写SQL语句之前,首先需要让Spark知道对哪个表进行查询,因此需要建立一张临时表,再执行SQL查询:


json.createOrReplaceTempView("json");
sparkSession.sql("select * from json where age > 26").show();

(三)非JSON格式的Dataset创建


在上一节中创建Dataset时使用了最简单的json,因为json自己带有schema结构,因此不需要手动去增加,如果是一个txt文件,就需要在创建Dataset时手动塞入schema。

下面展示读取txt文件的例子,首先创建一个user.txt


a23b24c25d26

现在我要将上面的这几行变成DataFrame,第一列表示姓名,第二列表示年龄,于是就可以像下面这样操作:

publicclassSqlTest2 {
publicstaticvoidmain(String[] args) {
SparkSessionsparkSession=SparkSession.builder()
                .appName("sql")
                .master("local")
                .getOrCreate();
SparkContextsparkContext=sparkSession.sparkContext();
JavaSparkContextsc=newJavaSparkContext(sparkContext);
JavaRDD<String>lines=sc.textFile("data/user.txt");
//将String类型转化为Row类型JavaRDD<Row>rowJavaRDD=lines.map(newFunction<String, Row>() {
@OverridepublicRowcall(Stringv1) throwsException {
String[] split=v1.split(" ");
returnRowFactory.create(
split[0],
Integer.valueOf(split[1])
                );
            }
        });
//定义schemaList<StructField>structFields=Arrays.asList(
DataTypes.createStructField("name", DataTypes.StringType, true),
DataTypes.createStructField("age", DataTypes.IntegerType, true)
        );
StructTypestructType=DataTypes.createStructType(structFields);
//生成dataFrameDataset<Row>dataFrame=sparkSession.createDataFrame(rowJavaRDD, structType);
dataFrame.show();
    }
}

(四)通过JDBC创建DataFrame


通过JDBC可直接将对应数据库中的表放入Spark中进行一些处理,下面通过MySQL进行展示。 使用MySQL需要在依赖中引入MySQL的引擎:

<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.46</version></dependency>

接着通过类似JDBC的方式读取MySQL数据:

publicclassSqlTest3 {
publicstaticvoidmain(String[] args) {
SparkSessionsparkSession=SparkSession.builder()
                .appName("sql")
                .master("local")
                .getOrCreate();
Map<String,String>options=newHashMap<>();
options.put("url","jdbc:mysql://127.0.0.1:3306/books");
options.put("driver","com.mysql.jdbc.Driver");
options.put("user","root");
options.put("password","123456");
options.put("dbtable","book");
Dataset<Row>jdbc=sparkSession.read().format("jdbc").options(options).load();
jdbc.show();
sparkSession.close();
    }
}

读取到的数据是DataFrame,接下来的操作就是对DataFrame的操作了。


(五)总结


SparkSQL是对Spark原生RDD的增强,虽然很多功能通过RDD就可以实现,但是SparkSQL可以更加灵活地实现一些功能。我是鱼仔,我们下期再见。

相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。 &nbsp; 相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情:&nbsp;https://www.aliyun.com/product/rds/mysql&nbsp;
相关文章
|
4月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
简介:本文整理自阿里云高级技术专家李麟在Flink Forward Asia 2025新加坡站的分享,介绍了Flink 2.1 SQL在实时数据处理与AI融合方面的关键进展,包括AI函数集成、Join优化及未来发展方向,助力构建高效实时AI管道。
805 43
|
4月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
本文整理自阿里云的高级技术专家、Apache Flink PMC 成员李麟老师在 Flink Forward Asia 2025 新加坡[1]站 —— 实时 AI 专场中的分享。将带来关于 Flink 2.1 版本中 SQL 在实时数据处理和 AI 方面进展的话题。
291 0
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
|
5月前
|
SQL
SQL如何只让特定列中只显示一行数据
SQL如何只让特定列中只显示一行数据
|
9月前
|
SQL 自然语言处理 数据库
【Azure Developer】分享两段Python代码处理表格(CSV格式)数据 : 根据每列的内容生成SQL语句
本文介绍了使用Python Pandas处理数据收集任务中格式不统一的问题。针对两种情况:服务名对应多人拥有状态(1/0表示),以及服务名与人名重复列的情况,分别采用双层for循环和字典数据结构实现数据转换,最终生成Name对应的Services列表(逗号分隔)。此方法高效解决大量数据的人工处理难题,减少错误并提升效率。文中附带代码示例及执行结果截图,便于理解和实践。
229 4
|
4月前
|
SQL 分布式计算 大数据
SparkSQL 入门指南:小白也能懂的大数据 SQL 处理神器
在大数据处理的领域,SparkSQL 是一种非常强大的工具,它可以让开发人员以 SQL 的方式处理和查询大规模数据集。SparkSQL 集成了 SQL 查询引擎和 Spark 的分布式计算引擎,使得我们可以在分布式环境下执行 SQL 查询,并能利用 Spark 的强大计算能力进行数据分析。
|
5月前
|
SQL
SQL中如何删除指定查询出来的数据
SQL中如何删除指定查询出来的数据
|
5月前
|
SQL 关系型数据库 MySQL
SQL如何对不同表的数据进行更新
本文介绍了如何将表A的Col1数据更新到表B的Col1中,分别提供了Microsoft SQL和MySQL的实现方法,并探讨了多表合并后更新的优化方式,如使用MERGE语句提升效率。适用于数据库数据同步与批量更新场景。
|
7月前
|
SQL 数据挖掘 关系型数据库
【SQL 周周练】一千条数据需要做一天,怎么用 SQL 处理电表数据(如何动态构造自然月)
题目来自于某位发帖人在某 Excel 论坛的求助,他需要将电表缴费数据按照缴费区间拆开后再按月份汇总。当时用手工处理数据,自称一千条数据就需要处理一天。我将这个问题转化为 SQL 题目。
252 12
|
6月前
|
SQL DataWorks 数据管理
SQL血缘分析实战!数据人必会的3大救命场景
1. 开源工具:Apache Atlas(元数据管理)、Spline(血缘追踪) 2. 企业级方案:阿里DataWorks血缘分析、腾讯云CDW血缘引擎 3. 自研技巧:在ETL脚本中植入版本水印,用注释记录业务逻辑变更 📌 重点总结:
|
7月前
|
SQL 数据采集 资源调度
【SQL 周周练】爬取短视频发现数据缺失,如何用 SQL 填充
爬虫爬取抖音和快手的短视频数据时,如果遇到数据缺失的情况,如何使用 SQL 语句完成数据的补全。
177 5