Spark学习---5、SparkSQL(概述、编程、数据的加载和保存)(一)

简介: Spark学习---5、SparkSQL(概述、编程、数据的加载和保存)(一)

这是本人的学习过程,看到的同道中人祝福你们心若有所向往,何惧道阻且长;

但愿每一个人都像星星一样安详而从容的,不断沿着既定的目标走完自己的路程;

最后想说一句君子不隐其短,不知则问,不能则学。

如果大家觉得我写的还不错的话希望可以收获关注、点赞、收藏(谢谢大家)

一、SparkSQL概述

1.1 什么是SparkSQL

Spark是用于结构化数据处理的Spark模块。与基本的Spark RDD API不同,SparkSQL提供的接口为Spark提供了有关数据结构和正在执行的计算的更多信息。在内部,SparkSQL使用这些额外的信息来执行额外的优化。与SparkSQL交互的方式有很多种,包括SQL和DatasetAPI。结算时,使用相同的执行引擎,与你用于表计算的API/语言无关。

1.2 为什么要有SparkSQL

1.3 SparkSQL的发展

1、发展历史

RDD(Spark1.0)=> Dateframe(Spark1.3) =>Dataset(Spark1.6)

如果同样的数据都给到这三个数据结构,它们分别计算之后,都会给出相同的结果。

不同的是它们执行效率和执行方式。在现在的版本中,dataset性能最好,已经成为了唯一使用的接口。其中Dataframe已经在底层被看作是特殊泛型的DataSet。

2、三者的共性

(1)RDD、DataFrame、DataSet全都是Spark平台下的分布式弹性数据集,为处理大型数据通过便利。

(2)三者都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Action行动算子实,三者才会开始遍历运算。

(3)三者有许多共同的函数,例如filter,sortby等

(4)三者都会根据Spark的内存情况自动缓存运算。

(5)三者都有分区的概念

1.4 SparkSQL的特点

1、易整合:无缝的整合了SQL查询和Spark编程

2、统一的数据访问方式:使用相同的方式连接不同的数据源

3、兼容Hive:在已有的仓库上直接运行SQL或者HQL

4、标准的数据连接:通过JDBC或者ODBC来连接

二、SparkSQL 编程

2.1 SparkSession 新的起始点

在老的版本中,SparkSQL提供两种SQL查询起始点:

(1) 一个叫SQLContext,用于Spark自己提供的SQL查询;

(2)一个叫HiveContext,用于连接Hive的查询。

SparkSession是Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合,所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。

SparkSession内部封装了SparkContext,所以计算实际上是由SparkContext完成的。当我们使用spark-shell的时候,Spark框架会自动的创建一个名称叫做Spark的SparkSession,就像我们以前可以自动获取到一个sc来表示SparkContext。

2.2 常用方式

2.2.1 方法调用案例实操

1、创建一个maven工程SparkSQL

2、输入文件夹准备:在新建的SparkSQL项目名称上右键=》新建input文件夹=》在input文件夹上右键=》新建user.json。并输入如下内容:

{"age":20,"name":"qiaofeng"}
{"age":19,"name":"xuzhu"}
{"age":18,"name":"duanyu"}
{"age":22,"name":"qiaofeng"}
{"age":11,"name":"xuzhu"}
{"age":12,"name":"duanyu"}

4、需求

统计人次和统计每人最大年龄

5、在pom.xml文件中添加spark-sql的依赖

  <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>3.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.22</version>
        </dependency>
    </dependencies>

6、代码实现

(1)添加javaBean的User

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import java.io.Serializable;
/**
 * @ClassName User
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/30 20:45
 * @Version 1.0
 */
@AllArgsConstructor
@NoArgsConstructor
@Data
@ToString
public class User implements Serializable {
    private String name;
    private Long age;
}

(2)统计每个人的最大年龄

import bean.User;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.ReduceFunction;
import org.apache.spark.sql.*;
import scala.Tuple2;
/**
 * @ClassName Test01_MaxAge
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/30 20:48
 * @Version 1.0
 */
public class Test01_MaxAge {
   public static void main(String[] args) {
       //1、创建配置对象
       SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test01_MaxAge");
       //2、创建SparkSession
       SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
       //3、获取DS
       Dataset<Row> jsonDS = spark.read().json("file:///E:\\maven-workspace\\space520\\SparkSQL\\input\\user.json");
       //4、将json中的行转换为User对象
       Dataset<User> mapDS = jsonDS.map(new MapFunction<Row, User>() {
           @Override
           public User call(Row value) throws Exception {
               return new User(value.getString(1), value.getLong(0));
           }
       }, Encoders.bean(User.class));
       System.out.println("--------由jsonDS转换为UserDS后输出--------");
       mapDS.show();
        //5、分组
       KeyValueGroupedDataset<String, User> groupByKeyDS = mapDS.groupByKey(new MapFunction<User, String>() {
           @Override
           public String call(User value) throws Exception {
               return value.getName();
           }
       }, Encoders.STRING());
       //6、统计,取最大值
       Dataset<Tuple2<String, User>> resultDS = groupByKeyDS.reduceGroups(new ReduceFunction<User>() {
           @Override
           public User call(User v1, User v2) throws Exception {
               return new User( v1.getName(),Math.max(v1.getAge(), v2.getAge()));
           }
       });
       System.out.println("--------------输出结果-------------");
       resultDS.show();
       //x、关闭资源
       spark.close();
   }
}

运行结果:

(3)统计人次代码编写

import bean.User;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.*;
/**
 * @ClassName Test02_UserVisitsCount
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/30 21:18
 * @Version 1.0
 */
public class Test02_UserVisitsCount {
    /**
     * 在sparkSql中DS直接支持的转换算子有:
     * map(底层已经优化为mapPartition)、mapPartition、flatMap、groupByKey(聚合算子全部由groupByKey开始)、
     * filter、distinct、coalesce、repartition、sort和orderBy(不是函数式的算子,不过不影响使用)。
     */
     public static void main(String[] args) {
         //1、创建配置对象
         SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test02_UserVisitsCount");
         //2、创建SparkSession
         SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
         //3、获取DS(从文件中按行读取数据文件)
         Dataset<Row> jsonDS = spark.read().json("file:///E:\\maven-workspace\\space520\\SparkSQL\\input\\user.json");
         //4、将jsonDS中每行数据转换为对象
         Dataset<User> UserDS = jsonDS.as(Encoders.bean(User.class));
         System.out.println("-------------由jsonDS转换为UserDS输出-------------");
         UserDS.show();
         //5、根据字段分组
         RelationalGroupedDataset groupByDS = UserDS.groupBy(new Column("name"));
         System.out.println("--------由UserDS根据字段Name分组后输出----------");
         System.out.println(groupByDS.toString());
         //6、统计次数
         Dataset<Row> countDS = groupByDS.count();
         System.out.println("--------------由分组后的groupDS进行count统计后输出-------------");
         countDS.show();
         //x、关闭资源
         spark.close();
     }
}

运行结果:

在SparkSQL中DS直接支持的转换算子有:

map(底层已经优化为mapPartition)、mapPartition·、flatMap、groupByKey(聚合算子全部由groupByKey开始)、filter、distinct、coalesce、repartition、sort和orderBy(不是函数式算子,不够不影响使用)。

2.2.2 SQL使用方式

1、需求:查询年龄大于18岁的用户信息

2、代码编写

import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
/**
 * @ClassName Test03_AgeGt18
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/7/1 9:24
 * @Version 1.0
 */
public class Test03_AgeGt18_sql {
     public static void main(String[] args) {
         //1、创建配置对象
         SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test03_AgeGt18_sql");
         //2、创建SparkSession
         SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
         //3、获取DS
         Dataset<Row> jsonDS = spark.read().json("file:///E:\\maven-workspace\\space520\\SparkSQL\\input\\user.json");
         System.out.println("------------读取数据之后输出jsonDS------------");
         jsonDS.show();
         //4、根据jsonDS创建临时视图,视图的生命周期和sparkSession绑定,"orReplace"表示可覆盖
         jsonDS.createOrReplaceTempView("user_info");
         //5、查询数据中年龄大于18的用户
         Dataset<Row> sql = spark.sql("select age,name from user_info where age>18");
         //6、输出结果
         System.out.println("---------------输出年龄大于18的结果---------------");
         sql.show();
         //x、关闭资源
         spark.close();
     }
}

运行结果:


相关文章
|
2月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
73 0
|
2月前
|
分布式计算 大数据 Spark
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(二)
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(二)
48 1
|
2月前
|
分布式计算 算法 Spark
spark学习之 GraphX—预测社交圈子
spark学习之 GraphX—预测社交圈子
58 0
|
2月前
|
分布式计算 Scala Spark
educoder的spark算子学习
educoder的spark算子学习
23 0
|
2月前
|
SQL 分布式计算 大数据
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
77 0
|
2月前
|
SQL 分布式计算 算法
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
92 0
|
2月前
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
67 0
|
2月前
|
存储 SQL 分布式计算
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(一)
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(一)
54 0
|
2月前
|
SQL 分布式计算 大数据
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
82 0
|
2月前
|
SQL 存储 分布式计算
大数据-93 Spark 集群 Spark SQL 概述 基本概念 SparkSQL对比 架构 抽象
大数据-93 Spark 集群 Spark SQL 概述 基本概念 SparkSQL对比 架构 抽象
50 0