Spark学习---5、SparkSQL(概述、编程、数据的加载和保存)(一)

简介: Spark学习---5、SparkSQL(概述、编程、数据的加载和保存)(一)

这是本人的学习过程,看到的同道中人祝福你们心若有所向往,何惧道阻且长;

但愿每一个人都像星星一样安详而从容的,不断沿着既定的目标走完自己的路程;

最后想说一句君子不隐其短,不知则问,不能则学。

如果大家觉得我写的还不错的话希望可以收获关注、点赞、收藏(谢谢大家)

一、SparkSQL概述

1.1 什么是SparkSQL

Spark是用于结构化数据处理的Spark模块。与基本的Spark RDD API不同,SparkSQL提供的接口为Spark提供了有关数据结构和正在执行的计算的更多信息。在内部,SparkSQL使用这些额外的信息来执行额外的优化。与SparkSQL交互的方式有很多种,包括SQL和DatasetAPI。结算时,使用相同的执行引擎,与你用于表计算的API/语言无关。

1.2 为什么要有SparkSQL

1.3 SparkSQL的发展

1、发展历史

RDD(Spark1.0)=> Dateframe(Spark1.3) =>Dataset(Spark1.6)

如果同样的数据都给到这三个数据结构,它们分别计算之后,都会给出相同的结果。

不同的是它们执行效率和执行方式。在现在的版本中,dataset性能最好,已经成为了唯一使用的接口。其中Dataframe已经在底层被看作是特殊泛型的DataSet。

2、三者的共性

(1)RDD、DataFrame、DataSet全都是Spark平台下的分布式弹性数据集,为处理大型数据通过便利。

(2)三者都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Action行动算子实,三者才会开始遍历运算。

(3)三者有许多共同的函数,例如filter,sortby等

(4)三者都会根据Spark的内存情况自动缓存运算。

(5)三者都有分区的概念

1.4 SparkSQL的特点

1、易整合:无缝的整合了SQL查询和Spark编程

2、统一的数据访问方式:使用相同的方式连接不同的数据源

3、兼容Hive:在已有的仓库上直接运行SQL或者HQL

4、标准的数据连接:通过JDBC或者ODBC来连接

二、SparkSQL 编程

2.1 SparkSession 新的起始点

在老的版本中,SparkSQL提供两种SQL查询起始点:

(1) 一个叫SQLContext,用于Spark自己提供的SQL查询;

(2)一个叫HiveContext,用于连接Hive的查询。

SparkSession是Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合,所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。

SparkSession内部封装了SparkContext,所以计算实际上是由SparkContext完成的。当我们使用spark-shell的时候,Spark框架会自动的创建一个名称叫做Spark的SparkSession,就像我们以前可以自动获取到一个sc来表示SparkContext。

2.2 常用方式

2.2.1 方法调用案例实操

1、创建一个maven工程SparkSQL

2、输入文件夹准备:在新建的SparkSQL项目名称上右键=》新建input文件夹=》在input文件夹上右键=》新建user.json。并输入如下内容:

{"age":20,"name":"qiaofeng"}
{"age":19,"name":"xuzhu"}
{"age":18,"name":"duanyu"}
{"age":22,"name":"qiaofeng"}
{"age":11,"name":"xuzhu"}
{"age":12,"name":"duanyu"}

4、需求

统计人次和统计每人最大年龄

5、在pom.xml文件中添加spark-sql的依赖

  <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>3.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.22</version>
        </dependency>
    </dependencies>

6、代码实现

(1)添加javaBean的User

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import java.io.Serializable;
/**
 * @ClassName User
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/30 20:45
 * @Version 1.0
 */
@AllArgsConstructor
@NoArgsConstructor
@Data
@ToString
public class User implements Serializable {
    private String name;
    private Long age;
}

(2)统计每个人的最大年龄

import bean.User;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.ReduceFunction;
import org.apache.spark.sql.*;
import scala.Tuple2;
/**
 * @ClassName Test01_MaxAge
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/30 20:48
 * @Version 1.0
 */
public class Test01_MaxAge {
   public static void main(String[] args) {
       //1、创建配置对象
       SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test01_MaxAge");
       //2、创建SparkSession
       SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
       //3、获取DS
       Dataset<Row> jsonDS = spark.read().json("file:///E:\\maven-workspace\\space520\\SparkSQL\\input\\user.json");
       //4、将json中的行转换为User对象
       Dataset<User> mapDS = jsonDS.map(new MapFunction<Row, User>() {
           @Override
           public User call(Row value) throws Exception {
               return new User(value.getString(1), value.getLong(0));
           }
       }, Encoders.bean(User.class));
       System.out.println("--------由jsonDS转换为UserDS后输出--------");
       mapDS.show();
        //5、分组
       KeyValueGroupedDataset<String, User> groupByKeyDS = mapDS.groupByKey(new MapFunction<User, String>() {
           @Override
           public String call(User value) throws Exception {
               return value.getName();
           }
       }, Encoders.STRING());
       //6、统计,取最大值
       Dataset<Tuple2<String, User>> resultDS = groupByKeyDS.reduceGroups(new ReduceFunction<User>() {
           @Override
           public User call(User v1, User v2) throws Exception {
               return new User( v1.getName(),Math.max(v1.getAge(), v2.getAge()));
           }
       });
       System.out.println("--------------输出结果-------------");
       resultDS.show();
       //x、关闭资源
       spark.close();
   }
}

运行结果:

(3)统计人次代码编写

import bean.User;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.*;
/**
 * @ClassName Test02_UserVisitsCount
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/30 21:18
 * @Version 1.0
 */
public class Test02_UserVisitsCount {
    /**
     * 在sparkSql中DS直接支持的转换算子有:
     * map(底层已经优化为mapPartition)、mapPartition、flatMap、groupByKey(聚合算子全部由groupByKey开始)、
     * filter、distinct、coalesce、repartition、sort和orderBy(不是函数式的算子,不过不影响使用)。
     */
     public static void main(String[] args) {
         //1、创建配置对象
         SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test02_UserVisitsCount");
         //2、创建SparkSession
         SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
         //3、获取DS(从文件中按行读取数据文件)
         Dataset<Row> jsonDS = spark.read().json("file:///E:\\maven-workspace\\space520\\SparkSQL\\input\\user.json");
         //4、将jsonDS中每行数据转换为对象
         Dataset<User> UserDS = jsonDS.as(Encoders.bean(User.class));
         System.out.println("-------------由jsonDS转换为UserDS输出-------------");
         UserDS.show();
         //5、根据字段分组
         RelationalGroupedDataset groupByDS = UserDS.groupBy(new Column("name"));
         System.out.println("--------由UserDS根据字段Name分组后输出----------");
         System.out.println(groupByDS.toString());
         //6、统计次数
         Dataset<Row> countDS = groupByDS.count();
         System.out.println("--------------由分组后的groupDS进行count统计后输出-------------");
         countDS.show();
         //x、关闭资源
         spark.close();
     }
}

运行结果:

在SparkSQL中DS直接支持的转换算子有:

map(底层已经优化为mapPartition)、mapPartition·、flatMap、groupByKey(聚合算子全部由groupByKey开始)、filter、distinct、coalesce、repartition、sort和orderBy(不是函数式算子,不够不影响使用)。

2.2.2 SQL使用方式

1、需求:查询年龄大于18岁的用户信息

2、代码编写

import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
/**
 * @ClassName Test03_AgeGt18
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/7/1 9:24
 * @Version 1.0
 */
public class Test03_AgeGt18_sql {
     public static void main(String[] args) {
         //1、创建配置对象
         SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test03_AgeGt18_sql");
         //2、创建SparkSession
         SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
         //3、获取DS
         Dataset<Row> jsonDS = spark.read().json("file:///E:\\maven-workspace\\space520\\SparkSQL\\input\\user.json");
         System.out.println("------------读取数据之后输出jsonDS------------");
         jsonDS.show();
         //4、根据jsonDS创建临时视图,视图的生命周期和sparkSession绑定,"orReplace"表示可覆盖
         jsonDS.createOrReplaceTempView("user_info");
         //5、查询数据中年龄大于18的用户
         Dataset<Row> sql = spark.sql("select age,name from user_info where age>18");
         //6、输出结果
         System.out.println("---------------输出年龄大于18的结果---------------");
         sql.show();
         //x、关闭资源
         spark.close();
     }
}

运行结果:


相关文章
|
4月前
|
机器学习/深度学习 人工智能 自然语言处理
3 秒音频也能克隆?拆解 Spark-TTS 架构的极致小样本学习
本文深入解析了 Spark-TTS 模型的架构与原理,该模型仅需 3 秒语音样本即可实现高质量的零样本语音克隆。其核心创新在于 BiCodec 单流语音编码架构,将语音信号分解为语义 Token 和全局 Token,实现内容与音色解耦。结合大型语言模型(如 Qwen 2.5),Spark-TTS 能直接生成语义 Token 并还原波形,简化推理流程。实验表明,它不仅能克隆音色、语速和语调,还支持跨语言朗读及情感调整。尽管面临相似度提升、样本鲁棒性等挑战,但其技术突破为定制化 AI 声音提供了全新可能。
373 35
|
11月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
191 0
|
11月前
|
分布式计算 算法 Spark
spark学习之 GraphX—预测社交圈子
spark学习之 GraphX—预测社交圈子
296 0
|
11月前
|
分布式计算 Scala Spark
educoder的spark算子学习
educoder的spark算子学习
142 0
|
11月前
|
存储 分布式计算 算法
大数据-105 Spark GraphX 基本概述 与 架构基础 概念详解 核心数据结构
大数据-105 Spark GraphX 基本概述 与 架构基础 概念详解 核心数据结构
213 0
|
11月前
|
消息中间件 分布式计算 Kafka
大数据-98 Spark 集群 Spark Streaming 基础概述 架构概念 执行流程 优缺点
大数据-98 Spark 集群 Spark Streaming 基础概述 架构概念 执行流程 优缺点
201 0
|
11月前
|
SQL 分布式计算 大数据
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
250 0
|
分布式计算
SparkStream mapWithState编程练习
SparkStream在处理流数据时,按时间间隔把数据分成小批,在一个小批中利用RDD 的函数完成各种运算。如果要在各小批之间共享数据,或者保存到每批次的数据到一个集中变量中,就要用到mapWithState函数,在整个流计算任务中维护了一个key-value State对象(应该也是一个RDD),根据本批次的任务更改State。
1541 0
|
3月前
|
人工智能 分布式计算 大数据
大数据≠大样本:基于Spark的特征降维实战(提升10倍训练效率)
本文探讨了大数据场景下降维的核心问题与解决方案,重点分析了“维度灾难”对模型性能的影响及特征冗余的陷阱。通过数学证明与实际案例,揭示高维空间中样本稀疏性问题,并提出基于Spark的分布式降维技术选型与优化策略。文章详细展示了PCA在亿级用户画像中的应用,包括数据准备、核心实现与效果评估,同时深入探讨了协方差矩阵计算与特征值分解的并行优化方法。此外,还介绍了动态维度调整、非线性特征处理及降维与其他AI技术的协同效应,为生产环境提供了最佳实践指南。最终总结出降维的本质与工程实践原则,展望未来发展方向。
201 0
|
6月前
|
存储 分布式计算 Hadoop
从“笨重大象”到“敏捷火花”:Hadoop与Spark的大数据技术进化之路
从“笨重大象”到“敏捷火花”:Hadoop与Spark的大数据技术进化之路
289 79

热门文章

最新文章