Spark学习---5、SparkSQL(概述、编程、数据的加载和保存)(一)

简介: Spark学习---5、SparkSQL(概述、编程、数据的加载和保存)(一)

这是本人的学习过程,看到的同道中人祝福你们心若有所向往,何惧道阻且长;

但愿每一个人都像星星一样安详而从容的,不断沿着既定的目标走完自己的路程;

最后想说一句君子不隐其短,不知则问,不能则学。

如果大家觉得我写的还不错的话希望可以收获关注、点赞、收藏(谢谢大家)

一、SparkSQL概述

1.1 什么是SparkSQL

Spark是用于结构化数据处理的Spark模块。与基本的Spark RDD API不同,SparkSQL提供的接口为Spark提供了有关数据结构和正在执行的计算的更多信息。在内部,SparkSQL使用这些额外的信息来执行额外的优化。与SparkSQL交互的方式有很多种,包括SQL和DatasetAPI。结算时,使用相同的执行引擎,与你用于表计算的API/语言无关。

1.2 为什么要有SparkSQL

1.3 SparkSQL的发展

1、发展历史

RDD(Spark1.0)=> Dateframe(Spark1.3) =>Dataset(Spark1.6)

如果同样的数据都给到这三个数据结构,它们分别计算之后,都会给出相同的结果。

不同的是它们执行效率和执行方式。在现在的版本中,dataset性能最好,已经成为了唯一使用的接口。其中Dataframe已经在底层被看作是特殊泛型的DataSet。

2、三者的共性

(1)RDD、DataFrame、DataSet全都是Spark平台下的分布式弹性数据集,为处理大型数据通过便利。

(2)三者都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Action行动算子实,三者才会开始遍历运算。

(3)三者有许多共同的函数,例如filter,sortby等

(4)三者都会根据Spark的内存情况自动缓存运算。

(5)三者都有分区的概念

1.4 SparkSQL的特点

1、易整合:无缝的整合了SQL查询和Spark编程

2、统一的数据访问方式:使用相同的方式连接不同的数据源

3、兼容Hive:在已有的仓库上直接运行SQL或者HQL

4、标准的数据连接:通过JDBC或者ODBC来连接

二、SparkSQL 编程

2.1 SparkSession 新的起始点

在老的版本中,SparkSQL提供两种SQL查询起始点:

(1) 一个叫SQLContext,用于Spark自己提供的SQL查询;

(2)一个叫HiveContext,用于连接Hive的查询。

SparkSession是Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合,所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。

SparkSession内部封装了SparkContext,所以计算实际上是由SparkContext完成的。当我们使用spark-shell的时候,Spark框架会自动的创建一个名称叫做Spark的SparkSession,就像我们以前可以自动获取到一个sc来表示SparkContext。

2.2 常用方式

2.2.1 方法调用案例实操

1、创建一个maven工程SparkSQL

2、输入文件夹准备:在新建的SparkSQL项目名称上右键=》新建input文件夹=》在input文件夹上右键=》新建user.json。并输入如下内容:

{"age":20,"name":"qiaofeng"}
{"age":19,"name":"xuzhu"}
{"age":18,"name":"duanyu"}
{"age":22,"name":"qiaofeng"}
{"age":11,"name":"xuzhu"}
{"age":12,"name":"duanyu"}

4、需求

统计人次和统计每人最大年龄

5、在pom.xml文件中添加spark-sql的依赖

  <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>3.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.22</version>
        </dependency>
    </dependencies>

6、代码实现

(1)添加javaBean的User

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import java.io.Serializable;
/**
 * @ClassName User
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/30 20:45
 * @Version 1.0
 */
@AllArgsConstructor
@NoArgsConstructor
@Data
@ToString
public class User implements Serializable {
    private String name;
    private Long age;
}

(2)统计每个人的最大年龄

import bean.User;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.ReduceFunction;
import org.apache.spark.sql.*;
import scala.Tuple2;
/**
 * @ClassName Test01_MaxAge
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/30 20:48
 * @Version 1.0
 */
public class Test01_MaxAge {
   public static void main(String[] args) {
       //1、创建配置对象
       SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test01_MaxAge");
       //2、创建SparkSession
       SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
       //3、获取DS
       Dataset<Row> jsonDS = spark.read().json("file:///E:\\maven-workspace\\space520\\SparkSQL\\input\\user.json");
       //4、将json中的行转换为User对象
       Dataset<User> mapDS = jsonDS.map(new MapFunction<Row, User>() {
           @Override
           public User call(Row value) throws Exception {
               return new User(value.getString(1), value.getLong(0));
           }
       }, Encoders.bean(User.class));
       System.out.println("--------由jsonDS转换为UserDS后输出--------");
       mapDS.show();
        //5、分组
       KeyValueGroupedDataset<String, User> groupByKeyDS = mapDS.groupByKey(new MapFunction<User, String>() {
           @Override
           public String call(User value) throws Exception {
               return value.getName();
           }
       }, Encoders.STRING());
       //6、统计,取最大值
       Dataset<Tuple2<String, User>> resultDS = groupByKeyDS.reduceGroups(new ReduceFunction<User>() {
           @Override
           public User call(User v1, User v2) throws Exception {
               return new User( v1.getName(),Math.max(v1.getAge(), v2.getAge()));
           }
       });
       System.out.println("--------------输出结果-------------");
       resultDS.show();
       //x、关闭资源
       spark.close();
   }
}

运行结果:

(3)统计人次代码编写

import bean.User;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.*;
/**
 * @ClassName Test02_UserVisitsCount
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/30 21:18
 * @Version 1.0
 */
public class Test02_UserVisitsCount {
    /**
     * 在sparkSql中DS直接支持的转换算子有:
     * map(底层已经优化为mapPartition)、mapPartition、flatMap、groupByKey(聚合算子全部由groupByKey开始)、
     * filter、distinct、coalesce、repartition、sort和orderBy(不是函数式的算子,不过不影响使用)。
     */
     public static void main(String[] args) {
         //1、创建配置对象
         SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test02_UserVisitsCount");
         //2、创建SparkSession
         SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
         //3、获取DS(从文件中按行读取数据文件)
         Dataset<Row> jsonDS = spark.read().json("file:///E:\\maven-workspace\\space520\\SparkSQL\\input\\user.json");
         //4、将jsonDS中每行数据转换为对象
         Dataset<User> UserDS = jsonDS.as(Encoders.bean(User.class));
         System.out.println("-------------由jsonDS转换为UserDS输出-------------");
         UserDS.show();
         //5、根据字段分组
         RelationalGroupedDataset groupByDS = UserDS.groupBy(new Column("name"));
         System.out.println("--------由UserDS根据字段Name分组后输出----------");
         System.out.println(groupByDS.toString());
         //6、统计次数
         Dataset<Row> countDS = groupByDS.count();
         System.out.println("--------------由分组后的groupDS进行count统计后输出-------------");
         countDS.show();
         //x、关闭资源
         spark.close();
     }
}

运行结果:

在SparkSQL中DS直接支持的转换算子有:

map(底层已经优化为mapPartition)、mapPartition·、flatMap、groupByKey(聚合算子全部由groupByKey开始)、filter、distinct、coalesce、repartition、sort和orderBy(不是函数式算子,不够不影响使用)。

2.2.2 SQL使用方式

1、需求:查询年龄大于18岁的用户信息

2、代码编写

import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
/**
 * @ClassName Test03_AgeGt18
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/7/1 9:24
 * @Version 1.0
 */
public class Test03_AgeGt18_sql {
     public static void main(String[] args) {
         //1、创建配置对象
         SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test03_AgeGt18_sql");
         //2、创建SparkSession
         SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
         //3、获取DS
         Dataset<Row> jsonDS = spark.read().json("file:///E:\\maven-workspace\\space520\\SparkSQL\\input\\user.json");
         System.out.println("------------读取数据之后输出jsonDS------------");
         jsonDS.show();
         //4、根据jsonDS创建临时视图,视图的生命周期和sparkSession绑定,"orReplace"表示可覆盖
         jsonDS.createOrReplaceTempView("user_info");
         //5、查询数据中年龄大于18的用户
         Dataset<Row> sql = spark.sql("select age,name from user_info where age>18");
         //6、输出结果
         System.out.println("---------------输出年龄大于18的结果---------------");
         sql.show();
         //x、关闭资源
         spark.close();
     }
}

运行结果:


相关文章
|
3天前
|
分布式计算 Java 关系型数据库
|
8天前
|
SQL 分布式计算 数据可视化
数据分享|Python、Spark SQL、MapReduce决策树、回归对车祸发生率影响因素可视化分析
数据分享|Python、Spark SQL、MapReduce决策树、回归对车祸发生率影响因素可视化分析
|
18天前
|
新零售 分布式计算 数据可视化
数据分享|基于Python、Hadoop零售交易数据的Spark数据处理与Echarts可视化分析
数据分享|基于Python、Hadoop零售交易数据的Spark数据处理与Echarts可视化分析
|
2月前
|
SQL 分布式计算 Java
Spark学习---SparkSQL(概述、编程、数据的加载和保存、自定义UDFA、项目实战)
Spark学习---SparkSQL(概述、编程、数据的加载和保存、自定义UDFA、项目实战)
140 1
|
4月前
|
机器学习/深度学习 SQL 分布式计算
Apache Spark 的基本概念和在大数据分析中的应用
介绍 Apache Spark 的基本概念和在大数据分析中的应用
162 0
|
1月前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。
|
4月前
|
机器学习/深度学习 SQL 分布式计算
介绍 Apache Spark 的基本概念和在大数据分析中的应用。
介绍 Apache Spark 的基本概念和在大数据分析中的应用。
|
15天前
|
分布式计算 DataWorks 大数据
MaxCompute操作报错合集之大数据计算的MaxCompute Spark引擎无法读取到表,是什么原因
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
MaxCompute操作报错合集之大数据计算的MaxCompute Spark引擎无法读取到表,是什么原因
|
20天前
|
分布式计算 大数据 数据处理
[AIGC大数据基础] Spark 入门
[AIGC大数据基础] Spark 入门
141 0
|
3月前
|
分布式计算 大数据 Java
Spark 大数据实战:基于 RDD 的大数据处理分析
Spark 大数据实战:基于 RDD 的大数据处理分析
138 0