Spark学习---5、SparkSQL(概述、编程、数据的加载和保存)(二)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: Spark学习---5、SparkSQL(概述、编程、数据的加载和保存)(二)

2.2.3 DSL特殊语法

1、需求:查询年龄大于18的用户信息

2、特点:需要导入特殊的依赖:import static org.apache.spark.sql.functions.col;

3、代码实现

import org.apache.spark.SparkConf;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
/**
 * @ClassName Test04_AgeGt18_DSL
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/7/1 9:31
 * @Version 1.0
 */
public class Test04_AgeGt18_DSL {
    public static void main(String[] args) {
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test04_AgeGt18_DSL");
        //2、创建SparkSession
        SparkSession sparkSession = SparkSession.builder().config(conf).getOrCreate();
        //3、读取文件内容,每行按照jsonLine解析
        Dataset<Row> jsonDS = sparkSession.read().json("file:///E:\\maven-workspace\\space520\\SparkSQL\\input\\user.json");
        System.out.println("------------读取数据之后输出jsonDS------------");
        jsonDS.show();
        //4、查询年龄大于18的用户信息
        Dataset<Row> whereDS = jsonDS.select(new Column("name"), new Column("age")).where(new Column("age").gt(18));
        System.out.println("-------------------输出whereDS的结果-----------------");
        whereDS.show();
        Dataset<Row> filterDS = jsonDS.select(new Column("name"), new Column("age").plus(1).as("newAge")).filter(new Column("age").gt(18));
        System.out.println("-----------------输出filterDS的结果------------------");
        filterDS.show();
        //5、关闭sparkSession
        sparkSession.close();
    }
}

运行结果:

2.3 SQL语法的用户自定义函数

2.3.1 UDF

1、UDF:一进一出

2、特点:需要导入依赖:import static org.apache.spark.sql.functions.udf;

3、代码实现

import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.types.DataTypes;
import static org.apache.spark.sql.functions.udf;
/**
 * @ClassName Test05_UDF
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/7/1 9:41
 * @Version 1.0
 */
public class Test05_UDF {
    public static void main(String[] args) {
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test05_UDF");
        //2、创建SparkSession
        SparkSession sparkSession = SparkSession.builder().config(conf).getOrCreate();
        //3、读取文件,每行按照jsonLine解析
        Dataset<Row> jsonDS = sparkSession.read().json("input/user.json");
        //4、创建UDF函数addName:为输出参数拼接 is numberOne
        UserDefinedFunction addName = udf(new UDF1<String, String>() {
            @Override
            public String call(String s) throws Exception {
                return s + " is numberone";
            }
        }, DataTypes.StringType);
        //5、为创建的函数进行注册
        sparkSession.udf().register("addName",addName);
        //6、根据jsonDS创建视图user_info
        jsonDS.createOrReplaceTempView("user_info");
        //7、在SQL中使用函数
        Dataset<Row> sqlDS = sparkSession.sql("select addName(name) as newName,age from user_info");
        //8、输出结果
        System.out.println("------------------输出结果--------------");
        sqlDS.show();
    }
}

运行结果

2.3.2 UDAF

1、UDAF:输入多行,返回一行,通常和groupBy一起使用,如果直接使用UDAF函数,默认将所有数据合并在一起。

2、特点:需要引入依赖:import static org.apache.spark.sql.functions.udaf

3、Spark3.x推荐使用extends Aggregator自定义UDAF,属于强类型的Dataset方式。

4、Spark2.x使用extends UserDefinedAggregateFunction,属于弱类型的DataFrame

5、代码实现

(1)创建Buffer类

package code;
import bean.Buffer;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.expressions.Aggregator;
/**
 * @ClassName Test06_UDAF
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/7/1 9:51
 * @Version 1.0
 */
public class Test06_Buffer extends Aggregator<Long, Buffer,Double> {
    public Test06_Buffer() {
    }
    //初始化
    @Override
    public Buffer zero() {
        return new Buffer(0L,0L);
    }
    //单分区聚合
    @Override
    public Buffer reduce(Buffer b, Long a) {
        b.setSum(b.getSum()+a);
        b.setCount(b.getCount()+1);
        return b;
    }
    //多分区聚合
    @Override
    public Buffer merge(Buffer b1, Buffer b2) {
        b1.setSum(b1.getSum()+ b2.getSum());
        b1.setCount(b1.getCount()+ b2.getCount());
        return b1;
    }
    //最后逻辑运算
    @Override
    public Double finish(Buffer reduction) {
        return reduction.getSum().doubleValue()/ reduction.getCount();
    }
    //设置Buffer的编码格式
    @Override
    public Encoder<Buffer> bufferEncoder() {
        return Encoders.kryo(Buffer.class);
    }
    //设置返回值的编码格式
    @Override
    public Encoder<Double> outputEncoder() {
        return Encoders.DOUBLE();
    }
}

(2)创建自定义函数Test06_UDAF继承 Aggregator

package code;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import static org.apache.spark.sql.functions.udaf;
/**
 * @ClassName Test_07_UDAF
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/7/1 10:11
 * @Version 1.0
 */
public class Test06_UDAF {
    public static void main(String[] args) {
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test07_UDAF");
        //2、创建SparkSession
        SparkSession sparkSession = SparkSession.builder().config(conf).getOrCreate();
        //3、读取文件并且转换为视图user_info
        sparkSession.read().json("file:///E:\\maven-workspace\\space520\\SparkSQL\\input\\user.json").createOrReplaceTempView("user_info");
        //4、查询表中所有数据
        Dataset<Row> sqlDS = sparkSession.sql("select * from user_info");
        System.out.println("------------查询全表-----------");
        sqlDS.show();
        //5、注册函数
        sparkSession.udf().register("MyAvg",udaf(new Test06_Buffer(), Encoders.LONG()));
        //6、查询全表平均年龄
        Dataset<Row> sqlDS1 = sparkSession.sql("select MyAvg(age) avg_age from user_info");
        System.out.println("--------------查询全表平均年龄----------------");
        sqlDS1.show();
        //7、查询每个人的平均年龄
        Dataset<Row> sqlDS2 = sparkSession.sql("select  name ,MyAvg(age) as avg_age from user_info group by name");
        System.out.println("--------------------查询每个人的平均年龄(自定义函数)-------------------------");
        sqlDS2.show();
        //8、关闭sparkSession
        sparkSession.close();
    }
}

运行结果:

2.3.3 UDTF(没有)

1、UDTF:输入一行,返回多行(Hive)

SparkSQL中没有UDTF,需要使用算子类型的flatMap先完成拆分

3、SparkSQL数据的加载与保存

3.1 读取和保持文件

SparkSQL读取和保存的文件一般分为三种,JSON文件、CSV文件和列示储存文件,同时可以通过添加参数来识别不同的储存和压缩格式。

3.1.1 CSV文件

0、在新建的SparkSQL项目名称上右键=》新建input文件夹=》在input文件夹上右键=》新建user.csv。并输入如下内容:

name,age
zhangsan,16
lisi,18,
wangwu,20

1、SparkSQL读取和写出CSV文件

package inputandoutput;
import bean.User;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
/**
 * @ClassName Test01_CSV
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/7/1 10:30
 * @Version 1.0
 */
public class Test01_CSV {
    public static void main(String[] args) {
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test01_CSV");
        //2、创建SparkSession
        SparkSession sparkSession = SparkSession.builder().config(conf).getOrCreate();
        //3、获取Reader对象
        DataFrameReader reader = sparkSession.read();
        //4、读取CSV文件,同时可以通过参数指定是否读取列名和指定分割符
        Dataset<Row> csvDS = reader.option("header", "true").option("sep", ",")
                .csv("file:///E:\\maven-workspace\\space520\\SparkSQL\\input\\user.csv");
        //5、输出
        System.out.println("----------------查看csvDS-------------");
        csvDS.show();
        //6、可以将csvDS转换为user对象的集合
     //   Dataset<User> userDS = csvDS.as(Encoders.bean(User.class));
        /**
         上述直接转换会报错:AnalysisException: Cannot up cast `age` from string to bigint.
         原因:csv读进来都是string
         */
        //7、使用map转换为user之前先将String转换2为Long
        Dataset<User> mapDS = csvDS.map(new MapFunction<Row, User>() {
            @Override
            public User call(Row value) throws Exception {
                return new User(value.getString(0), Long.valueOf(value.getString(1)));
            }
        }, Encoders.bean(User.class));
        //8、输出
        System.out.println("-------------查看mapDS---------------");
        mapDS.show();
        //TODO 写出为csv文件
        //9、获取写对象
        DataFrameWriter<User> writer = mapDS.write();
        //10、写出时,可以通过参数方式指定:压缩、分隔符,写入模式、是否带有列名等
        writer.option("seq","\t").mode(SaveMode.Overwrite)
                .option("header","true").csv("file:///E:\\maven-workspace\\space520\\SparkSQL\\output\\user.csv");
        //11、关闭SparkSession
        sparkSession.close();
    }
}

运行结果

3.1.2 JSON文件

1、SparkSQL读取和写出JSON文件

package inputandoutput;
import bean.User;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.*;
/**
 * @ClassName Test02_JSON
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/7/1 11:24
 * @Version 1.0
 */
public class Test02_JSON {
     public static void main(String[] args) {
         //1、创建配置对象
         SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test02_JSON");
         //2、创建SparkSession
         SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
         //3、获取DS
         DataFrameReader reader = spark.read();
         Dataset<Row> jsonDS = reader.json("file:///E:\\maven-workspace\\space520\\SparkSQL\\input\\user.json");
         jsonDS.show();
         //将jsonDS转换为userDS
         Dataset<User> userDS = jsonDS.as(Encoders.bean(User.class));
         System.out.println("----------查看userDS的数据------------");
         userDS.show();
         //输出为JSON文件
         DataFrameWriter<User> write = userDS.write();
         write.json("file:///E:\\maven-workspace\\space520\\SparkSQL\\output\\user.json");
         //x、关闭资源
         spark.close();
     }
}

运行结果:

3.1.3 Parquet文件

1、sparkSQL读取和写出列式存储文件。

package inputandoutput;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.*;
/**
 * @ClassName Test03_Parquet
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/7/1 11:29
 * @Version 1.0
 */
public class Test03_Parquet {
     public static void main(String[] args) {
         //1、创建配置对象
         SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test03_Parquet");
         //2、创建SparkSession
         SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
         //3、获取DS
         DataFrameReader reader = spark.read();
         //我们想读取Parquet,但是没有,没有关系,我们可以先创建
         Dataset<Row> jsonDS = reader.json("file:///E:\\maven-workspace\\space520\\SparkSQL\\input\\user.json");
         //ToDO 写出Parquet文件
         //获取写对象
         DataFrameWriter<Row> writer = jsonDS.write();
         //将jsonDS数据写出,以Parquet的格式写出
         writer.parquet("file:///E:\\maven-workspace\\space520\\SparkSQL\\output\\user.parquet");
         //Todo 读取Parquet文件
         Dataset<Row> outputParquetDS = reader.parquet("file:///E:\\maven-workspace\\space520\\SparkSQL\\output\\user.parquet");
         //输出
         outputParquetDS.show();
         //x、关闭资源
         spark.close();
     }
}

运行结果

3.2 与MySQL交互(前提是自己的数据要有相关的表)

使用SparkSQL对mysql进行读写

1、导入依赖

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.27</version>
</dependency>

2、从MySQL读写数据

package inputandoutput;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.*;
import java.util.Properties;
/**
 * @ClassName Test04_mysql
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/7/1 11:34
 * @Version 1.0
 */
public class Test04_mysql {
     public static void main(String[] args) {
         //1、创建配置对象
         SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test04_mysql");
         //2、创建SparkSession
         SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
         //3、获取reader对象
         DataFrameReader reader = spark.read();
         //4、TODO   读取Mysql中的表的数据
         Properties properties = new Properties();
         properties.put("user","root");
         properties.put("password","123456");
         Dataset<Row> base_provinceDS = reader.jdbc("jdbc:mysql://hadoop102:3306/gmall", "base_province", properties);
         //5、查看数据
         System.out.println("---------------查看base_provinceDS-------------------");
         base_provinceDS.show();
         //6、查看数据中的偶数
         base_provinceDS.createOrReplaceTempView("base_province");
         Dataset<Row> sqlDS = spark.sql("select * from base_province where id%2=0");
         //7、查看数据
         System.out.println("----------------数据中的偶数---------------");
         sqlDS.show();
         //Todo 向mysql中写数据
         System.out.println("-------------输出sqlDS数据类型----------------");
         sqlDS.printSchema();
         //8、获取Writer对象
         DataFrameWriter<Row> writer = sqlDS.write();
          writer.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://hadoop102:3306/gmall?useSSL=false&amp;useUnicode=true" +
                  "&characterEncoding=UTF-8","test_base_province",properties);
         //x、关闭资源
         spark.close();
     }
}

运行结果:

3.3 与Hive交互

SparkSQL可以采用内嵌Hive(Spark开箱即用的Hive),也可以采用外部Hive。企业开发中常用外部Hive。

3.3.1 Linux中的交互

1、添加mysql连接驱动到spark-yarn的jars目录

2、添加hive-site.xml文件到spark-yarn的conf目录

3、启动spark-sql的客户端即可

3.3.2 IDEA中的交互

1、向idea项目中的pom.xml文件中添加新的依赖

(1)添加依赖

<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.12</artifactId>
            <version>3.3.0</version>
        </dependency>

(2)拷贝配置文件到resources目录

需要拷贝hive-site.xml,hdfs-site.xml、core-site.xml、yarn-site.xml

(3)记得开启metastore和Hiveserver2的服务

(4)代码实现

package inputandoutput;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
/**
 * @ClassName Test05_Hive
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/7/1 14:00
 * @Version 1.0
 */
public class Test05_Hive {
    public static void main(String[] args) {
        //1、设置系统用户名称为zhm
        System.setProperty("HADOOP_USER_NAME","zhm");
        //2、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test05_Hive");
        //3、创建SparkSession
        SparkSession sparkSession = SparkSession.builder()
                .enableHiveSupport().config(conf).getOrCreate();
        //4、显示所以表
        System.out.println("显示所有的表");
        sparkSession.sql("show tables;").show();
        //5、创建user_info表,字段包括name:string,age:bigint
        sparkSession.sql("create table user_info(name String,age bigint);");
        //6、向user_info表中插入数据,"张三",10
        sparkSession.sql("insert into  user_info values(\"zhangsan\",10);");
        //7、查询user_info表中数据
        System.out.println("------------输出user_info表中的内容--------------");
        sparkSession.sql("select * from user_info;").show();
        //8、关闭SparkSession
        sparkSession.close();
    }
}

运行结果:

恭喜大家看完了小编的博客,希望你能够有所收获,我一直相信躬身自问和沉思默想会充实我们的头脑,希望大家看完之后可以多想想喽,编辑不易,求关注、点赞、收藏(Thanks♪(・ω・)ノ)

相关实践学习
如何快速连接云数据库RDS MySQL
本场景介绍如何通过阿里云数据管理服务DMS快速连接云数据库RDS MySQL,然后进行数据表的CRUD操作。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
2月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
73 0
|
2月前
|
分布式计算 Java 大数据
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
45 0
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
|
2月前
|
分布式计算 算法 Spark
spark学习之 GraphX—预测社交圈子
spark学习之 GraphX—预测社交圈子
58 0
|
2月前
|
分布式计算 Scala Spark
educoder的spark算子学习
educoder的spark算子学习
23 0
|
2月前
|
存储 分布式计算 算法
大数据-105 Spark GraphX 基本概述 与 架构基础 概念详解 核心数据结构
大数据-105 Spark GraphX 基本概述 与 架构基础 概念详解 核心数据结构
60 0
|
2月前
|
消息中间件 分布式计算 Kafka
大数据-98 Spark 集群 Spark Streaming 基础概述 架构概念 执行流程 优缺点
大数据-98 Spark 集群 Spark Streaming 基础概述 架构概念 执行流程 优缺点
50 0
|
2月前
|
SQL 存储 分布式计算
大数据-93 Spark 集群 Spark SQL 概述 基本概念 SparkSQL对比 架构 抽象
大数据-93 Spark 集群 Spark SQL 概述 基本概念 SparkSQL对比 架构 抽象
50 0
|
2月前
|
SQL 分布式计算 大数据
大数据-91 Spark 集群 RDD 编程-高阶 RDD广播变量 RDD累加器 Spark程序优化
大数据-91 Spark 集群 RDD 编程-高阶 RDD广播变量 RDD累加器 Spark程序优化
53 0
|
分布式计算 Java Spark
Spark学习之编程进阶——累加器与广播(5)
Spark学习之编程进阶——累加器与广播(5) 1. Spark中两种类型的共享变量:累加器(accumulator)与广播变量(broadcast variable)。累加器对信息进行聚合,而广播变量用来高效分发较大的对象。 2. 共享变量是一种可以在Spark任务中使用的特殊类型的变量。 3. 累加器的用法: 通过在驱动器中调用SparkContex
1837 0
|
1月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
142 2
ClickHouse与大数据生态集成:Spark & Flink 实战