像写SQL一样去处理内存中的数据,SparkSQL入门教程

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS PostgreSQL,集群系列 2核4GB
简介: 读取到的数据是DataFrame,接下来的操作就是对DataFrame的操作了。(五)总结SparkSQL是对Spark原生RDD的增强,虽然很多功能通过RDD就可以实现,但是SparkSQL可以更加灵活地实现一些功能。我是鱼仔,我们下期再见。

(一)概述


(五)总结

SparkSQL是对Spark原生RDD的增强,虽然很多功能通过RDD就可以实现,但是SparkSQL可以更加灵活地实现一些功能。我是鱼仔,我们下期再见。

Dataset是一个数据的分布式集合,是Spark1.6之后新增的接口,它提供了RDD的优点和SparkSQL优化执行引擎的优点,一个Dataset相当于RDD+Schema的结合。


Dataset的底层封装是RDD,当RDD的泛型是Row类型时,该类型就可以称为DataFrame。DataFrame是一种表格型的数据结构,就和传统的Mysql结构一样,通过DataFrame我们可以更加高效地去执行Sql。


(二)SparkSQL实战


使用SparkSQL首先需要引入相关的依赖:


<dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>3.0.0</version></dependency>

该依赖需要和sparkCore保持一致。


SparkSQL的编码主要通过四步:


  1. 创建SparkSession


  1. 获取数据


  1. 执行SQL


  1. 关闭SparkSession
publicclassSqlTest {
publicstaticvoidmain(String[] args) {
SparkSessionsparkSession=SparkSession.builder()
                .appName("sql")
                .master("local")
                .getOrCreate();
Dataset<Row>json=sparkSession.read().json("data/json");
json.printSchema();
json.show();
sparkSession.stop();
    }
}

在data的目录下创建一个名为json的文件

{"name":"a","age":23}
{"name":"b","age":24}
{"name":"c","age":25}
{"name":"d","age":26}
{"name":"e","age":27}
{"name":"f","age":28}

运行项目后输出两个结果,schema结果如下:


网络异常,图片无法展示
|


Dataset<Row>输出结果如下:


网络异常,图片无法展示
|


通过SparkSQL可以执行和SQL十分相似的查询操作:

publicclassSqlTest {
publicstaticvoidmain(String[] args) {
SparkSessionsparkSession=SparkSession.builder()
                .appName("sql")
                .master("local")
                .getOrCreate();
Dataset<Row>json=sparkSession.read().json("data/json");
json.select("age","name").where("age > 26").show();
sparkSession.stop();
    }
}

在上面的语句中,通过一系列的API实现了SQL查询操作,除此之外,SparkSQL还支持直接写原始SQL语句的操作。


在写SQL语句之前,首先需要让Spark知道对哪个表进行查询,因此需要建立一张临时表,再执行SQL查询:


json.createOrReplaceTempView("json");
sparkSession.sql("select * from json where age > 26").show();

(三)非JSON格式的Dataset创建


在上一节中创建Dataset时使用了最简单的json,因为json自己带有schema结构,因此不需要手动去增加,如果是一个txt文件,就需要在创建Dataset时手动塞入schema。

下面展示读取txt文件的例子,首先创建一个user.txt


a23b24c25d26

现在我要将上面的这几行变成DataFrame,第一列表示姓名,第二列表示年龄,于是就可以像下面这样操作:

publicclassSqlTest2 {
publicstaticvoidmain(String[] args) {
SparkSessionsparkSession=SparkSession.builder()
                .appName("sql")
                .master("local")
                .getOrCreate();
SparkContextsparkContext=sparkSession.sparkContext();
JavaSparkContextsc=newJavaSparkContext(sparkContext);
JavaRDD<String>lines=sc.textFile("data/user.txt");
//将String类型转化为Row类型JavaRDD<Row>rowJavaRDD=lines.map(newFunction<String, Row>() {
@OverridepublicRowcall(Stringv1) throwsException {
String[] split=v1.split(" ");
returnRowFactory.create(
split[0],
Integer.valueOf(split[1])
                );
            }
        });
//定义schemaList<StructField>structFields=Arrays.asList(
DataTypes.createStructField("name", DataTypes.StringType, true),
DataTypes.createStructField("age", DataTypes.IntegerType, true)
        );
StructTypestructType=DataTypes.createStructType(structFields);
//生成dataFrameDataset<Row>dataFrame=sparkSession.createDataFrame(rowJavaRDD, structType);
dataFrame.show();
    }
}

(四)通过JDBC创建DataFrame


通过JDBC可直接将对应数据库中的表放入Spark中进行一些处理,下面通过MySQL进行展示。 使用MySQL需要在依赖中引入MySQL的引擎:

<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.46</version></dependency>

接着通过类似JDBC的方式读取MySQL数据:

publicclassSqlTest3 {
publicstaticvoidmain(String[] args) {
SparkSessionsparkSession=SparkSession.builder()
                .appName("sql")
                .master("local")
                .getOrCreate();
Map<String,String>options=newHashMap<>();
options.put("url","jdbc:mysql://127.0.0.1:3306/books");
options.put("driver","com.mysql.jdbc.Driver");
options.put("user","root");
options.put("password","123456");
options.put("dbtable","book");
Dataset<Row>jdbc=sparkSession.read().format("jdbc").options(options).load();
jdbc.show();
sparkSession.close();
    }
}

读取到的数据是DataFrame,接下来的操作就是对DataFrame的操作了。


(五)总结


SparkSQL是对Spark原生RDD的增强,虽然很多功能通过RDD就可以实现,但是SparkSQL可以更加灵活地实现一些功能。我是鱼仔,我们下期再见。

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
26天前
|
存储 编译器 数据处理
C 语言结构体与位域:高效数据组织与内存优化
C语言中的结构体与位域是实现高效数据组织和内存优化的重要工具。结构体允许将不同类型的数据组合成一个整体,而位域则进一步允许对结构体成员的位进行精细控制,以节省内存空间。两者结合使用,可在嵌入式系统等资源受限环境中发挥巨大作用。
54 11
|
1月前
|
SQL 存储 缓存
SQL Server 数据太多如何优化
11种优化方案供你参考,优化 SQL Server 数据库性能得从多个方面着手,包括硬件配置、数据库结构、查询优化、索引管理、分区分表、并行处理等。通过合理的索引、查询优化、数据分区等技术,可以在数据量增大时保持较好的性能。同时,定期进行数据库维护和清理,保证数据库高效运行。
|
2月前
|
监控 算法 应用服务中间件
“四两拨千斤” —— 1.2MB 数据如何吃掉 10GB 内存
一个特殊请求引发服务器内存用量暴涨进而导致进程 OOM 的惨案。
|
2月前
|
存储 C语言
数据在内存中的存储方式
本文介绍了计算机中整数和浮点数的存储方式,包括整数的原码、反码、补码,以及浮点数的IEEE754标准存储格式。同时,探讨了大小端字节序的概念及其判断方法,通过实例代码展示了这些概念的实际应用。
93 1
|
2月前
|
存储
共用体在内存中如何存储数据
共用体(Union)在内存中为所有成员分配同一段内存空间,大小等于最大成员所需的空间。这意味着所有成员共享同一块内存,但同一时间只能存储其中一个成员的数据,无法同时保存多个成员的值。
|
2月前
|
监控 Java easyexcel
面试官:POI大量数据读取内存溢出?如何解决?
【10月更文挑战第14天】 在处理大量数据时,使用Apache POI库读取Excel文件可能会导致内存溢出的问题。这是因为POI在读取Excel文件时,会将整个文档加载到内存中,如果文件过大,就会消耗大量内存。以下是一些解决这一问题的策略:
231 1
|
2月前
|
SQL 移动开发 Oracle
SQL语句实现查询连续六天数据的方法与技巧
在数据库查询中,有时需要筛选出符合特定时间连续性条件的数据记录
|
2月前
|
缓存 安全 Java
使用 Java 内存模型解决多线程中的数据竞争问题
【10月更文挑战第11天】在 Java 多线程编程中,数据竞争是一个常见问题。通过使用 `synchronized` 关键字、`volatile` 关键字、原子类、显式锁、避免共享可变数据、合理设计数据结构、遵循线程安全原则和使用线程池等方法,可以有效解决数据竞争问题,确保程序的正确性和稳定性。
53 2
|
2月前
|
存储 编译器
数据在内存中的存储
数据在内存中的存储
42 4
|
2月前
|
SQL 存储 关系型数据库
添加数据到数据库的SQL语句详解与实践技巧
在数据库管理中,添加数据是一个基本操作,它涉及到向表中插入新的记录