Spark学习---5、SparkSQL(概述、编程、数据的加载和保存)(一)

简介: Spark学习---5、SparkSQL(概述、编程、数据的加载和保存)(一)

这是本人的学习过程,看到的同道中人祝福你们心若有所向往,何惧道阻且长;

但愿每一个人都像星星一样安详而从容的,不断沿着既定的目标走完自己的路程;

最后想说一句君子不隐其短,不知则问,不能则学。

如果大家觉得我写的还不错的话希望可以收获关注、点赞、收藏(谢谢大家)

一、SparkSQL概述

1.1 什么是SparkSQL

Spark是用于结构化数据处理的Spark模块。与基本的Spark RDD API不同,SparkSQL提供的接口为Spark提供了有关数据结构和正在执行的计算的更多信息。在内部,SparkSQL使用这些额外的信息来执行额外的优化。与SparkSQL交互的方式有很多种,包括SQL和DatasetAPI。结算时,使用相同的执行引擎,与你用于表计算的API/语言无关。

1.2 为什么要有SparkSQL

1.3 SparkSQL的发展

1、发展历史

RDD(Spark1.0)=> Dateframe(Spark1.3) =>Dataset(Spark1.6)

如果同样的数据都给到这三个数据结构,它们分别计算之后,都会给出相同的结果。

不同的是它们执行效率和执行方式。在现在的版本中,dataset性能最好,已经成为了唯一使用的接口。其中Dataframe已经在底层被看作是特殊泛型的DataSet。

2、三者的共性

(1)RDD、DataFrame、DataSet全都是Spark平台下的分布式弹性数据集,为处理大型数据通过便利。

(2)三者都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Action行动算子实,三者才会开始遍历运算。

(3)三者有许多共同的函数,例如filter,sortby等

(4)三者都会根据Spark的内存情况自动缓存运算。

(5)三者都有分区的概念

1.4 SparkSQL的特点

1、易整合:无缝的整合了SQL查询和Spark编程

2、统一的数据访问方式:使用相同的方式连接不同的数据源

3、兼容Hive:在已有的仓库上直接运行SQL或者HQL

4、标准的数据连接:通过JDBC或者ODBC来连接

二、SparkSQL 编程

2.1 SparkSession 新的起始点

在老的版本中,SparkSQL提供两种SQL查询起始点:

(1) 一个叫SQLContext,用于Spark自己提供的SQL查询;

(2)一个叫HiveContext,用于连接Hive的查询。

SparkSession是Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合,所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。

SparkSession内部封装了SparkContext,所以计算实际上是由SparkContext完成的。当我们使用spark-shell的时候,Spark框架会自动的创建一个名称叫做Spark的SparkSession,就像我们以前可以自动获取到一个sc来表示SparkContext。

2.2 常用方式

2.2.1 方法调用案例实操

1、创建一个maven工程SparkSQL

2、输入文件夹准备:在新建的SparkSQL项目名称上右键=》新建input文件夹=》在input文件夹上右键=》新建user.json。并输入如下内容:

{"age":20,"name":"qiaofeng"}
{"age":19,"name":"xuzhu"}
{"age":18,"name":"duanyu"}
{"age":22,"name":"qiaofeng"}
{"age":11,"name":"xuzhu"}
{"age":12,"name":"duanyu"}

4、需求

统计人次和统计每人最大年龄

5、在pom.xml文件中添加spark-sql的依赖

  <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>3.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.22</version>
        </dependency>
    </dependencies>

6、代码实现

(1)添加javaBean的User

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import java.io.Serializable;
/**
 * @ClassName User
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/30 20:45
 * @Version 1.0
 */
@AllArgsConstructor
@NoArgsConstructor
@Data
@ToString
public class User implements Serializable {
    private String name;
    private Long age;
}

(2)统计每个人的最大年龄

import bean.User;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.ReduceFunction;
import org.apache.spark.sql.*;
import scala.Tuple2;
/**
 * @ClassName Test01_MaxAge
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/30 20:48
 * @Version 1.0
 */
public class Test01_MaxAge {
   public static void main(String[] args) {
       //1、创建配置对象
       SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test01_MaxAge");
       //2、创建SparkSession
       SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
       //3、获取DS
       Dataset<Row> jsonDS = spark.read().json("file:///E:\\maven-workspace\\space520\\SparkSQL\\input\\user.json");
       //4、将json中的行转换为User对象
       Dataset<User> mapDS = jsonDS.map(new MapFunction<Row, User>() {
           @Override
           public User call(Row value) throws Exception {
               return new User(value.getString(1), value.getLong(0));
           }
       }, Encoders.bean(User.class));
       System.out.println("--------由jsonDS转换为UserDS后输出--------");
       mapDS.show();
        //5、分组
       KeyValueGroupedDataset<String, User> groupByKeyDS = mapDS.groupByKey(new MapFunction<User, String>() {
           @Override
           public String call(User value) throws Exception {
               return value.getName();
           }
       }, Encoders.STRING());
       //6、统计,取最大值
       Dataset<Tuple2<String, User>> resultDS = groupByKeyDS.reduceGroups(new ReduceFunction<User>() {
           @Override
           public User call(User v1, User v2) throws Exception {
               return new User( v1.getName(),Math.max(v1.getAge(), v2.getAge()));
           }
       });
       System.out.println("--------------输出结果-------------");
       resultDS.show();
       //x、关闭资源
       spark.close();
   }
}

运行结果:

(3)统计人次代码编写

import bean.User;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.*;
/**
 * @ClassName Test02_UserVisitsCount
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/6/30 21:18
 * @Version 1.0
 */
public class Test02_UserVisitsCount {
    /**
     * 在sparkSql中DS直接支持的转换算子有:
     * map(底层已经优化为mapPartition)、mapPartition、flatMap、groupByKey(聚合算子全部由groupByKey开始)、
     * filter、distinct、coalesce、repartition、sort和orderBy(不是函数式的算子,不过不影响使用)。
     */
     public static void main(String[] args) {
         //1、创建配置对象
         SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test02_UserVisitsCount");
         //2、创建SparkSession
         SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
         //3、获取DS(从文件中按行读取数据文件)
         Dataset<Row> jsonDS = spark.read().json("file:///E:\\maven-workspace\\space520\\SparkSQL\\input\\user.json");
         //4、将jsonDS中每行数据转换为对象
         Dataset<User> UserDS = jsonDS.as(Encoders.bean(User.class));
         System.out.println("-------------由jsonDS转换为UserDS输出-------------");
         UserDS.show();
         //5、根据字段分组
         RelationalGroupedDataset groupByDS = UserDS.groupBy(new Column("name"));
         System.out.println("--------由UserDS根据字段Name分组后输出----------");
         System.out.println(groupByDS.toString());
         //6、统计次数
         Dataset<Row> countDS = groupByDS.count();
         System.out.println("--------------由分组后的groupDS进行count统计后输出-------------");
         countDS.show();
         //x、关闭资源
         spark.close();
     }
}

运行结果:

在SparkSQL中DS直接支持的转换算子有:

map(底层已经优化为mapPartition)、mapPartition·、flatMap、groupByKey(聚合算子全部由groupByKey开始)、filter、distinct、coalesce、repartition、sort和orderBy(不是函数式算子,不够不影响使用)。

2.2.2 SQL使用方式

1、需求:查询年龄大于18岁的用户信息

2、代码编写

import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
/**
 * @ClassName Test03_AgeGt18
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/7/1 9:24
 * @Version 1.0
 */
public class Test03_AgeGt18_sql {
     public static void main(String[] args) {
         //1、创建配置对象
         SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test03_AgeGt18_sql");
         //2、创建SparkSession
         SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
         //3、获取DS
         Dataset<Row> jsonDS = spark.read().json("file:///E:\\maven-workspace\\space520\\SparkSQL\\input\\user.json");
         System.out.println("------------读取数据之后输出jsonDS------------");
         jsonDS.show();
         //4、根据jsonDS创建临时视图,视图的生命周期和sparkSession绑定,"orReplace"表示可覆盖
         jsonDS.createOrReplaceTempView("user_info");
         //5、查询数据中年龄大于18的用户
         Dataset<Row> sql = spark.sql("select age,name from user_info where age>18");
         //6、输出结果
         System.out.println("---------------输出年龄大于18的结果---------------");
         sql.show();
         //x、关闭资源
         spark.close();
     }
}

运行结果:


相关文章
|
2月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
72 0
|
2月前
|
分布式计算 算法 Spark
spark学习之 GraphX—预测社交圈子
spark学习之 GraphX—预测社交圈子
49 0
|
2月前
|
分布式计算 Scala Spark
educoder的spark算子学习
educoder的spark算子学习
21 0
|
2月前
|
SQL 分布式计算 大数据
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
70 0
|
2月前
|
SQL 分布式计算 算法
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
87 0
|
2月前
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
56 0
|
2月前
|
存储 SQL 分布式计算
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(一)
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(一)
49 0
|
1月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
129 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
2月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
45 0
|
2月前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
101 0