Spark学习---5、SparkSQL(概述、编程、数据的加载和保存)(二)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: Spark学习---5、SparkSQL(概述、编程、数据的加载和保存)(二)

2.2.3 DSL特殊语法

1、需求:查询年龄大于18的用户信息

2、特点:需要导入特殊的依赖:import static org.apache.spark.sql.functions.col;

3、代码实现

import org.apache.spark.SparkConf;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
/**
 * @ClassName Test04_AgeGt18_DSL
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/7/1 9:31
 * @Version 1.0
 */
public class Test04_AgeGt18_DSL {
    public static void main(String[] args) {
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test04_AgeGt18_DSL");
        //2、创建SparkSession
        SparkSession sparkSession = SparkSession.builder().config(conf).getOrCreate();
        //3、读取文件内容,每行按照jsonLine解析
        Dataset<Row> jsonDS = sparkSession.read().json("file:///E:\\maven-workspace\\space520\\SparkSQL\\input\\user.json");
        System.out.println("------------读取数据之后输出jsonDS------------");
        jsonDS.show();
        //4、查询年龄大于18的用户信息
        Dataset<Row> whereDS = jsonDS.select(new Column("name"), new Column("age")).where(new Column("age").gt(18));
        System.out.println("-------------------输出whereDS的结果-----------------");
        whereDS.show();
        Dataset<Row> filterDS = jsonDS.select(new Column("name"), new Column("age").plus(1).as("newAge")).filter(new Column("age").gt(18));
        System.out.println("-----------------输出filterDS的结果------------------");
        filterDS.show();
        //5、关闭sparkSession
        sparkSession.close();
    }
}

运行结果:

2.3 SQL语法的用户自定义函数

2.3.1 UDF

1、UDF:一进一出

2、特点:需要导入依赖:import static org.apache.spark.sql.functions.udf;

3、代码实现

import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.types.DataTypes;
import static org.apache.spark.sql.functions.udf;
/**
 * @ClassName Test05_UDF
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/7/1 9:41
 * @Version 1.0
 */
public class Test05_UDF {
    public static void main(String[] args) {
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test05_UDF");
        //2、创建SparkSession
        SparkSession sparkSession = SparkSession.builder().config(conf).getOrCreate();
        //3、读取文件,每行按照jsonLine解析
        Dataset<Row> jsonDS = sparkSession.read().json("input/user.json");
        //4、创建UDF函数addName:为输出参数拼接 is numberOne
        UserDefinedFunction addName = udf(new UDF1<String, String>() {
            @Override
            public String call(String s) throws Exception {
                return s + " is numberone";
            }
        }, DataTypes.StringType);
        //5、为创建的函数进行注册
        sparkSession.udf().register("addName",addName);
        //6、根据jsonDS创建视图user_info
        jsonDS.createOrReplaceTempView("user_info");
        //7、在SQL中使用函数
        Dataset<Row> sqlDS = sparkSession.sql("select addName(name) as newName,age from user_info");
        //8、输出结果
        System.out.println("------------------输出结果--------------");
        sqlDS.show();
    }
}

运行结果

2.3.2 UDAF

1、UDAF:输入多行,返回一行,通常和groupBy一起使用,如果直接使用UDAF函数,默认将所有数据合并在一起。

2、特点:需要引入依赖:import static org.apache.spark.sql.functions.udaf

3、Spark3.x推荐使用extends Aggregator自定义UDAF,属于强类型的Dataset方式。

4、Spark2.x使用extends UserDefinedAggregateFunction,属于弱类型的DataFrame

5、代码实现

(1)创建Buffer类

package code;
import bean.Buffer;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.expressions.Aggregator;
/**
 * @ClassName Test06_UDAF
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/7/1 9:51
 * @Version 1.0
 */
public class Test06_Buffer extends Aggregator<Long, Buffer,Double> {
    public Test06_Buffer() {
    }
    //初始化
    @Override
    public Buffer zero() {
        return new Buffer(0L,0L);
    }
    //单分区聚合
    @Override
    public Buffer reduce(Buffer b, Long a) {
        b.setSum(b.getSum()+a);
        b.setCount(b.getCount()+1);
        return b;
    }
    //多分区聚合
    @Override
    public Buffer merge(Buffer b1, Buffer b2) {
        b1.setSum(b1.getSum()+ b2.getSum());
        b1.setCount(b1.getCount()+ b2.getCount());
        return b1;
    }
    //最后逻辑运算
    @Override
    public Double finish(Buffer reduction) {
        return reduction.getSum().doubleValue()/ reduction.getCount();
    }
    //设置Buffer的编码格式
    @Override
    public Encoder<Buffer> bufferEncoder() {
        return Encoders.kryo(Buffer.class);
    }
    //设置返回值的编码格式
    @Override
    public Encoder<Double> outputEncoder() {
        return Encoders.DOUBLE();
    }
}

(2)创建自定义函数Test06_UDAF继承 Aggregator

package code;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import static org.apache.spark.sql.functions.udaf;
/**
 * @ClassName Test_07_UDAF
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/7/1 10:11
 * @Version 1.0
 */
public class Test06_UDAF {
    public static void main(String[] args) {
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test07_UDAF");
        //2、创建SparkSession
        SparkSession sparkSession = SparkSession.builder().config(conf).getOrCreate();
        //3、读取文件并且转换为视图user_info
        sparkSession.read().json("file:///E:\\maven-workspace\\space520\\SparkSQL\\input\\user.json").createOrReplaceTempView("user_info");
        //4、查询表中所有数据
        Dataset<Row> sqlDS = sparkSession.sql("select * from user_info");
        System.out.println("------------查询全表-----------");
        sqlDS.show();
        //5、注册函数
        sparkSession.udf().register("MyAvg",udaf(new Test06_Buffer(), Encoders.LONG()));
        //6、查询全表平均年龄
        Dataset<Row> sqlDS1 = sparkSession.sql("select MyAvg(age) avg_age from user_info");
        System.out.println("--------------查询全表平均年龄----------------");
        sqlDS1.show();
        //7、查询每个人的平均年龄
        Dataset<Row> sqlDS2 = sparkSession.sql("select  name ,MyAvg(age) as avg_age from user_info group by name");
        System.out.println("--------------------查询每个人的平均年龄(自定义函数)-------------------------");
        sqlDS2.show();
        //8、关闭sparkSession
        sparkSession.close();
    }
}

运行结果:

2.3.3 UDTF(没有)

1、UDTF:输入一行,返回多行(Hive)

SparkSQL中没有UDTF,需要使用算子类型的flatMap先完成拆分

3、SparkSQL数据的加载与保存

3.1 读取和保持文件

SparkSQL读取和保存的文件一般分为三种,JSON文件、CSV文件和列示储存文件,同时可以通过添加参数来识别不同的储存和压缩格式。

3.1.1 CSV文件

0、在新建的SparkSQL项目名称上右键=》新建input文件夹=》在input文件夹上右键=》新建user.csv。并输入如下内容:

name,age
zhangsan,16
lisi,18,
wangwu,20

1、SparkSQL读取和写出CSV文件

package inputandoutput;
import bean.User;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
/**
 * @ClassName Test01_CSV
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/7/1 10:30
 * @Version 1.0
 */
public class Test01_CSV {
    public static void main(String[] args) {
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test01_CSV");
        //2、创建SparkSession
        SparkSession sparkSession = SparkSession.builder().config(conf).getOrCreate();
        //3、获取Reader对象
        DataFrameReader reader = sparkSession.read();
        //4、读取CSV文件,同时可以通过参数指定是否读取列名和指定分割符
        Dataset<Row> csvDS = reader.option("header", "true").option("sep", ",")
                .csv("file:///E:\\maven-workspace\\space520\\SparkSQL\\input\\user.csv");
        //5、输出
        System.out.println("----------------查看csvDS-------------");
        csvDS.show();
        //6、可以将csvDS转换为user对象的集合
     //   Dataset<User> userDS = csvDS.as(Encoders.bean(User.class));
        /**
         上述直接转换会报错:AnalysisException: Cannot up cast `age` from string to bigint.
         原因:csv读进来都是string
         */
        //7、使用map转换为user之前先将String转换2为Long
        Dataset<User> mapDS = csvDS.map(new MapFunction<Row, User>() {
            @Override
            public User call(Row value) throws Exception {
                return new User(value.getString(0), Long.valueOf(value.getString(1)));
            }
        }, Encoders.bean(User.class));
        //8、输出
        System.out.println("-------------查看mapDS---------------");
        mapDS.show();
        //TODO 写出为csv文件
        //9、获取写对象
        DataFrameWriter<User> writer = mapDS.write();
        //10、写出时,可以通过参数方式指定:压缩、分隔符,写入模式、是否带有列名等
        writer.option("seq","\t").mode(SaveMode.Overwrite)
                .option("header","true").csv("file:///E:\\maven-workspace\\space520\\SparkSQL\\output\\user.csv");
        //11、关闭SparkSession
        sparkSession.close();
    }
}

运行结果

3.1.2 JSON文件

1、SparkSQL读取和写出JSON文件

package inputandoutput;
import bean.User;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.*;
/**
 * @ClassName Test02_JSON
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/7/1 11:24
 * @Version 1.0
 */
public class Test02_JSON {
     public static void main(String[] args) {
         //1、创建配置对象
         SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test02_JSON");
         //2、创建SparkSession
         SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
         //3、获取DS
         DataFrameReader reader = spark.read();
         Dataset<Row> jsonDS = reader.json("file:///E:\\maven-workspace\\space520\\SparkSQL\\input\\user.json");
         jsonDS.show();
         //将jsonDS转换为userDS
         Dataset<User> userDS = jsonDS.as(Encoders.bean(User.class));
         System.out.println("----------查看userDS的数据------------");
         userDS.show();
         //输出为JSON文件
         DataFrameWriter<User> write = userDS.write();
         write.json("file:///E:\\maven-workspace\\space520\\SparkSQL\\output\\user.json");
         //x、关闭资源
         spark.close();
     }
}

运行结果:

3.1.3 Parquet文件

1、sparkSQL读取和写出列式存储文件。

package inputandoutput;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.*;
/**
 * @ClassName Test03_Parquet
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/7/1 11:29
 * @Version 1.0
 */
public class Test03_Parquet {
     public static void main(String[] args) {
         //1、创建配置对象
         SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test03_Parquet");
         //2、创建SparkSession
         SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
         //3、获取DS
         DataFrameReader reader = spark.read();
         //我们想读取Parquet,但是没有,没有关系,我们可以先创建
         Dataset<Row> jsonDS = reader.json("file:///E:\\maven-workspace\\space520\\SparkSQL\\input\\user.json");
         //ToDO 写出Parquet文件
         //获取写对象
         DataFrameWriter<Row> writer = jsonDS.write();
         //将jsonDS数据写出,以Parquet的格式写出
         writer.parquet("file:///E:\\maven-workspace\\space520\\SparkSQL\\output\\user.parquet");
         //Todo 读取Parquet文件
         Dataset<Row> outputParquetDS = reader.parquet("file:///E:\\maven-workspace\\space520\\SparkSQL\\output\\user.parquet");
         //输出
         outputParquetDS.show();
         //x、关闭资源
         spark.close();
     }
}

运行结果

3.2 与MySQL交互(前提是自己的数据要有相关的表)

使用SparkSQL对mysql进行读写

1、导入依赖

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.27</version>
</dependency>

2、从MySQL读写数据

package inputandoutput;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.*;
import java.util.Properties;
/**
 * @ClassName Test04_mysql
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/7/1 11:34
 * @Version 1.0
 */
public class Test04_mysql {
     public static void main(String[] args) {
         //1、创建配置对象
         SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test04_mysql");
         //2、创建SparkSession
         SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
         //3、获取reader对象
         DataFrameReader reader = spark.read();
         //4、TODO   读取Mysql中的表的数据
         Properties properties = new Properties();
         properties.put("user","root");
         properties.put("password","123456");
         Dataset<Row> base_provinceDS = reader.jdbc("jdbc:mysql://hadoop102:3306/gmall", "base_province", properties);
         //5、查看数据
         System.out.println("---------------查看base_provinceDS-------------------");
         base_provinceDS.show();
         //6、查看数据中的偶数
         base_provinceDS.createOrReplaceTempView("base_province");
         Dataset<Row> sqlDS = spark.sql("select * from base_province where id%2=0");
         //7、查看数据
         System.out.println("----------------数据中的偶数---------------");
         sqlDS.show();
         //Todo 向mysql中写数据
         System.out.println("-------------输出sqlDS数据类型----------------");
         sqlDS.printSchema();
         //8、获取Writer对象
         DataFrameWriter<Row> writer = sqlDS.write();
          writer.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://hadoop102:3306/gmall?useSSL=false&amp;useUnicode=true" +
                  "&characterEncoding=UTF-8","test_base_province",properties);
         //x、关闭资源
         spark.close();
     }
}

运行结果:

3.3 与Hive交互

SparkSQL可以采用内嵌Hive(Spark开箱即用的Hive),也可以采用外部Hive。企业开发中常用外部Hive。

3.3.1 Linux中的交互

1、添加mysql连接驱动到spark-yarn的jars目录

2、添加hive-site.xml文件到spark-yarn的conf目录

3、启动spark-sql的客户端即可

3.3.2 IDEA中的交互

1、向idea项目中的pom.xml文件中添加新的依赖

(1)添加依赖

<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.12</artifactId>
            <version>3.3.0</version>
        </dependency>

(2)拷贝配置文件到resources目录

需要拷贝hive-site.xml,hdfs-site.xml、core-site.xml、yarn-site.xml

(3)记得开启metastore和Hiveserver2的服务

(4)代码实现

package inputandoutput;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
/**
 * @ClassName Test05_Hive
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/7/1 14:00
 * @Version 1.0
 */
public class Test05_Hive {
    public static void main(String[] args) {
        //1、设置系统用户名称为zhm
        System.setProperty("HADOOP_USER_NAME","zhm");
        //2、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test05_Hive");
        //3、创建SparkSession
        SparkSession sparkSession = SparkSession.builder()
                .enableHiveSupport().config(conf).getOrCreate();
        //4、显示所以表
        System.out.println("显示所有的表");
        sparkSession.sql("show tables;").show();
        //5、创建user_info表,字段包括name:string,age:bigint
        sparkSession.sql("create table user_info(name String,age bigint);");
        //6、向user_info表中插入数据,"张三",10
        sparkSession.sql("insert into  user_info values(\"zhangsan\",10);");
        //7、查询user_info表中数据
        System.out.println("------------输出user_info表中的内容--------------");
        sparkSession.sql("select * from user_info;").show();
        //8、关闭SparkSession
        sparkSession.close();
    }
}

运行结果:

恭喜大家看完了小编的博客,希望你能够有所收获,我一直相信躬身自问和沉思默想会充实我们的头脑,希望大家看完之后可以多想想喽,编辑不易,求关注、点赞、收藏(Thanks♪(・ω・)ノ)

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
1月前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
50 3
|
1月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
61 0
|
1月前
|
分布式计算 算法 Spark
spark学习之 GraphX—预测社交圈子
spark学习之 GraphX—预测社交圈子
28 0
|
1月前
|
分布式计算 Scala Spark
educoder的spark算子学习
educoder的spark算子学习
16 0
|
分布式计算 Spark Shell
spark概述与编程模型
spark快的原因1.内存计算 2.DAG spark shell已经初始化好了SparkContext,直接用sc调用即可 lineage 血统 RDD wide and narrow dependencies 窄依赖每个 RDD partition最多被一个子RDD partirion依赖 /sbin(system binary)放的都是涉及系统管理的命令。有些系统里面
1132 0
|
Web App开发 分布式计算 算法
Spark入门到精通视频学习资料--第二章:Spark生态系统介绍,Spark整体概述与Spark编程模型(2讲)
概述 什么是Spark ◆ Spark是UC Berkeley AMP lab所开源的类Hadoop MapReduce的通用的并行计算框架,Spark基于map reduce算法实现的分布式计算,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出和结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的map reduce的算法。
1491 0
|
21天前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
55 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
1月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
40 0
|
1月前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
87 0
|
22天前
|
SQL 机器学习/深度学习 分布式计算
Spark快速上手:揭秘大数据处理的高效秘密,让你轻松应对海量数据
【10月更文挑战第25天】本文全面介绍了大数据处理框架 Spark,涵盖其基本概念、安装配置、编程模型及实际应用。Spark 是一个高效的分布式计算平台,支持批处理、实时流处理、SQL 查询和机器学习等任务。通过详细的技术综述和示例代码,帮助读者快速掌握 Spark 的核心技能。
48 6
下一篇
无影云桌面