Spark学习---5、SparkSQL(概述、编程、数据的加载和保存)(二)

本文涉及的产品
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
云数据库 RDS MySQL Serverless,价值2615元额度,1个月
简介: Spark学习---5、SparkSQL(概述、编程、数据的加载和保存)(二)

2.2.3 DSL特殊语法

1、需求:查询年龄大于18的用户信息

2、特点:需要导入特殊的依赖:import static org.apache.spark.sql.functions.col;

3、代码实现

import org.apache.spark.SparkConf;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
/**
 * @ClassName Test04_AgeGt18_DSL
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/7/1 9:31
 * @Version 1.0
 */
public class Test04_AgeGt18_DSL {
    public static void main(String[] args) {
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test04_AgeGt18_DSL");
        //2、创建SparkSession
        SparkSession sparkSession = SparkSession.builder().config(conf).getOrCreate();
        //3、读取文件内容,每行按照jsonLine解析
        Dataset<Row> jsonDS = sparkSession.read().json("file:///E:\\maven-workspace\\space520\\SparkSQL\\input\\user.json");
        System.out.println("------------读取数据之后输出jsonDS------------");
        jsonDS.show();
        //4、查询年龄大于18的用户信息
        Dataset<Row> whereDS = jsonDS.select(new Column("name"), new Column("age")).where(new Column("age").gt(18));
        System.out.println("-------------------输出whereDS的结果-----------------");
        whereDS.show();
        Dataset<Row> filterDS = jsonDS.select(new Column("name"), new Column("age").plus(1).as("newAge")).filter(new Column("age").gt(18));
        System.out.println("-----------------输出filterDS的结果------------------");
        filterDS.show();
        //5、关闭sparkSession
        sparkSession.close();
    }
}

运行结果:

2.3 SQL语法的用户自定义函数

2.3.1 UDF

1、UDF:一进一出

2、特点:需要导入依赖:import static org.apache.spark.sql.functions.udf;

3、代码实现

import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.types.DataTypes;
import static org.apache.spark.sql.functions.udf;
/**
 * @ClassName Test05_UDF
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/7/1 9:41
 * @Version 1.0
 */
public class Test05_UDF {
    public static void main(String[] args) {
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test05_UDF");
        //2、创建SparkSession
        SparkSession sparkSession = SparkSession.builder().config(conf).getOrCreate();
        //3、读取文件,每行按照jsonLine解析
        Dataset<Row> jsonDS = sparkSession.read().json("input/user.json");
        //4、创建UDF函数addName:为输出参数拼接 is numberOne
        UserDefinedFunction addName = udf(new UDF1<String, String>() {
            @Override
            public String call(String s) throws Exception {
                return s + " is numberone";
            }
        }, DataTypes.StringType);
        //5、为创建的函数进行注册
        sparkSession.udf().register("addName",addName);
        //6、根据jsonDS创建视图user_info
        jsonDS.createOrReplaceTempView("user_info");
        //7、在SQL中使用函数
        Dataset<Row> sqlDS = sparkSession.sql("select addName(name) as newName,age from user_info");
        //8、输出结果
        System.out.println("------------------输出结果--------------");
        sqlDS.show();
    }
}

运行结果

2.3.2 UDAF

1、UDAF:输入多行,返回一行,通常和groupBy一起使用,如果直接使用UDAF函数,默认将所有数据合并在一起。

2、特点:需要引入依赖:import static org.apache.spark.sql.functions.udaf

3、Spark3.x推荐使用extends Aggregator自定义UDAF,属于强类型的Dataset方式。

4、Spark2.x使用extends UserDefinedAggregateFunction,属于弱类型的DataFrame

5、代码实现

(1)创建Buffer类

package code;
import bean.Buffer;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.expressions.Aggregator;
/**
 * @ClassName Test06_UDAF
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/7/1 9:51
 * @Version 1.0
 */
public class Test06_Buffer extends Aggregator<Long, Buffer,Double> {
    public Test06_Buffer() {
    }
    //初始化
    @Override
    public Buffer zero() {
        return new Buffer(0L,0L);
    }
    //单分区聚合
    @Override
    public Buffer reduce(Buffer b, Long a) {
        b.setSum(b.getSum()+a);
        b.setCount(b.getCount()+1);
        return b;
    }
    //多分区聚合
    @Override
    public Buffer merge(Buffer b1, Buffer b2) {
        b1.setSum(b1.getSum()+ b2.getSum());
        b1.setCount(b1.getCount()+ b2.getCount());
        return b1;
    }
    //最后逻辑运算
    @Override
    public Double finish(Buffer reduction) {
        return reduction.getSum().doubleValue()/ reduction.getCount();
    }
    //设置Buffer的编码格式
    @Override
    public Encoder<Buffer> bufferEncoder() {
        return Encoders.kryo(Buffer.class);
    }
    //设置返回值的编码格式
    @Override
    public Encoder<Double> outputEncoder() {
        return Encoders.DOUBLE();
    }
}

(2)创建自定义函数Test06_UDAF继承 Aggregator

package code;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import static org.apache.spark.sql.functions.udaf;
/**
 * @ClassName Test_07_UDAF
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/7/1 10:11
 * @Version 1.0
 */
public class Test06_UDAF {
    public static void main(String[] args) {
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test07_UDAF");
        //2、创建SparkSession
        SparkSession sparkSession = SparkSession.builder().config(conf).getOrCreate();
        //3、读取文件并且转换为视图user_info
        sparkSession.read().json("file:///E:\\maven-workspace\\space520\\SparkSQL\\input\\user.json").createOrReplaceTempView("user_info");
        //4、查询表中所有数据
        Dataset<Row> sqlDS = sparkSession.sql("select * from user_info");
        System.out.println("------------查询全表-----------");
        sqlDS.show();
        //5、注册函数
        sparkSession.udf().register("MyAvg",udaf(new Test06_Buffer(), Encoders.LONG()));
        //6、查询全表平均年龄
        Dataset<Row> sqlDS1 = sparkSession.sql("select MyAvg(age) avg_age from user_info");
        System.out.println("--------------查询全表平均年龄----------------");
        sqlDS1.show();
        //7、查询每个人的平均年龄
        Dataset<Row> sqlDS2 = sparkSession.sql("select  name ,MyAvg(age) as avg_age from user_info group by name");
        System.out.println("--------------------查询每个人的平均年龄(自定义函数)-------------------------");
        sqlDS2.show();
        //8、关闭sparkSession
        sparkSession.close();
    }
}

运行结果:

2.3.3 UDTF(没有)

1、UDTF:输入一行,返回多行(Hive)

SparkSQL中没有UDTF,需要使用算子类型的flatMap先完成拆分

3、SparkSQL数据的加载与保存

3.1 读取和保持文件

SparkSQL读取和保存的文件一般分为三种,JSON文件、CSV文件和列示储存文件,同时可以通过添加参数来识别不同的储存和压缩格式。

3.1.1 CSV文件

0、在新建的SparkSQL项目名称上右键=》新建input文件夹=》在input文件夹上右键=》新建user.csv。并输入如下内容:

name,age
zhangsan,16
lisi,18,
wangwu,20

1、SparkSQL读取和写出CSV文件

package inputandoutput;
import bean.User;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
/**
 * @ClassName Test01_CSV
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/7/1 10:30
 * @Version 1.0
 */
public class Test01_CSV {
    public static void main(String[] args) {
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test01_CSV");
        //2、创建SparkSession
        SparkSession sparkSession = SparkSession.builder().config(conf).getOrCreate();
        //3、获取Reader对象
        DataFrameReader reader = sparkSession.read();
        //4、读取CSV文件,同时可以通过参数指定是否读取列名和指定分割符
        Dataset<Row> csvDS = reader.option("header", "true").option("sep", ",")
                .csv("file:///E:\\maven-workspace\\space520\\SparkSQL\\input\\user.csv");
        //5、输出
        System.out.println("----------------查看csvDS-------------");
        csvDS.show();
        //6、可以将csvDS转换为user对象的集合
     //   Dataset<User> userDS = csvDS.as(Encoders.bean(User.class));
        /**
         上述直接转换会报错:AnalysisException: Cannot up cast `age` from string to bigint.
         原因:csv读进来都是string
         */
        //7、使用map转换为user之前先将String转换2为Long
        Dataset<User> mapDS = csvDS.map(new MapFunction<Row, User>() {
            @Override
            public User call(Row value) throws Exception {
                return new User(value.getString(0), Long.valueOf(value.getString(1)));
            }
        }, Encoders.bean(User.class));
        //8、输出
        System.out.println("-------------查看mapDS---------------");
        mapDS.show();
        //TODO 写出为csv文件
        //9、获取写对象
        DataFrameWriter<User> writer = mapDS.write();
        //10、写出时,可以通过参数方式指定:压缩、分隔符,写入模式、是否带有列名等
        writer.option("seq","\t").mode(SaveMode.Overwrite)
                .option("header","true").csv("file:///E:\\maven-workspace\\space520\\SparkSQL\\output\\user.csv");
        //11、关闭SparkSession
        sparkSession.close();
    }
}

运行结果

3.1.2 JSON文件

1、SparkSQL读取和写出JSON文件

package inputandoutput;
import bean.User;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.*;
/**
 * @ClassName Test02_JSON
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/7/1 11:24
 * @Version 1.0
 */
public class Test02_JSON {
     public static void main(String[] args) {
         //1、创建配置对象
         SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test02_JSON");
         //2、创建SparkSession
         SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
         //3、获取DS
         DataFrameReader reader = spark.read();
         Dataset<Row> jsonDS = reader.json("file:///E:\\maven-workspace\\space520\\SparkSQL\\input\\user.json");
         jsonDS.show();
         //将jsonDS转换为userDS
         Dataset<User> userDS = jsonDS.as(Encoders.bean(User.class));
         System.out.println("----------查看userDS的数据------------");
         userDS.show();
         //输出为JSON文件
         DataFrameWriter<User> write = userDS.write();
         write.json("file:///E:\\maven-workspace\\space520\\SparkSQL\\output\\user.json");
         //x、关闭资源
         spark.close();
     }
}

运行结果:

3.1.3 Parquet文件

1、sparkSQL读取和写出列式存储文件。

package inputandoutput;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.*;
/**
 * @ClassName Test03_Parquet
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/7/1 11:29
 * @Version 1.0
 */
public class Test03_Parquet {
     public static void main(String[] args) {
         //1、创建配置对象
         SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test03_Parquet");
         //2、创建SparkSession
         SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
         //3、获取DS
         DataFrameReader reader = spark.read();
         //我们想读取Parquet,但是没有,没有关系,我们可以先创建
         Dataset<Row> jsonDS = reader.json("file:///E:\\maven-workspace\\space520\\SparkSQL\\input\\user.json");
         //ToDO 写出Parquet文件
         //获取写对象
         DataFrameWriter<Row> writer = jsonDS.write();
         //将jsonDS数据写出,以Parquet的格式写出
         writer.parquet("file:///E:\\maven-workspace\\space520\\SparkSQL\\output\\user.parquet");
         //Todo 读取Parquet文件
         Dataset<Row> outputParquetDS = reader.parquet("file:///E:\\maven-workspace\\space520\\SparkSQL\\output\\user.parquet");
         //输出
         outputParquetDS.show();
         //x、关闭资源
         spark.close();
     }
}

运行结果

3.2 与MySQL交互(前提是自己的数据要有相关的表)

使用SparkSQL对mysql进行读写

1、导入依赖

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.27</version>
</dependency>

2、从MySQL读写数据

package inputandoutput;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.*;
import java.util.Properties;
/**
 * @ClassName Test04_mysql
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/7/1 11:34
 * @Version 1.0
 */
public class Test04_mysql {
     public static void main(String[] args) {
         //1、创建配置对象
         SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test04_mysql");
         //2、创建SparkSession
         SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
         //3、获取reader对象
         DataFrameReader reader = spark.read();
         //4、TODO   读取Mysql中的表的数据
         Properties properties = new Properties();
         properties.put("user","root");
         properties.put("password","123456");
         Dataset<Row> base_provinceDS = reader.jdbc("jdbc:mysql://hadoop102:3306/gmall", "base_province", properties);
         //5、查看数据
         System.out.println("---------------查看base_provinceDS-------------------");
         base_provinceDS.show();
         //6、查看数据中的偶数
         base_provinceDS.createOrReplaceTempView("base_province");
         Dataset<Row> sqlDS = spark.sql("select * from base_province where id%2=0");
         //7、查看数据
         System.out.println("----------------数据中的偶数---------------");
         sqlDS.show();
         //Todo 向mysql中写数据
         System.out.println("-------------输出sqlDS数据类型----------------");
         sqlDS.printSchema();
         //8、获取Writer对象
         DataFrameWriter<Row> writer = sqlDS.write();
          writer.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://hadoop102:3306/gmall?useSSL=false&amp;useUnicode=true" +
                  "&characterEncoding=UTF-8","test_base_province",properties);
         //x、关闭资源
         spark.close();
     }
}

运行结果:

3.3 与Hive交互

SparkSQL可以采用内嵌Hive(Spark开箱即用的Hive),也可以采用外部Hive。企业开发中常用外部Hive。

3.3.1 Linux中的交互

1、添加mysql连接驱动到spark-yarn的jars目录

2、添加hive-site.xml文件到spark-yarn的conf目录

3、启动spark-sql的客户端即可

3.3.2 IDEA中的交互

1、向idea项目中的pom.xml文件中添加新的依赖

(1)添加依赖

<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.12</artifactId>
            <version>3.3.0</version>
        </dependency>

(2)拷贝配置文件到resources目录

需要拷贝hive-site.xml,hdfs-site.xml、core-site.xml、yarn-site.xml

(3)记得开启metastore和Hiveserver2的服务

(4)代码实现

package inputandoutput;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
/**
 * @ClassName Test05_Hive
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/7/1 14:00
 * @Version 1.0
 */
public class Test05_Hive {
    public static void main(String[] args) {
        //1、设置系统用户名称为zhm
        System.setProperty("HADOOP_USER_NAME","zhm");
        //2、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test05_Hive");
        //3、创建SparkSession
        SparkSession sparkSession = SparkSession.builder()
                .enableHiveSupport().config(conf).getOrCreate();
        //4、显示所以表
        System.out.println("显示所有的表");
        sparkSession.sql("show tables;").show();
        //5、创建user_info表,字段包括name:string,age:bigint
        sparkSession.sql("create table user_info(name String,age bigint);");
        //6、向user_info表中插入数据,"张三",10
        sparkSession.sql("insert into  user_info values(\"zhangsan\",10);");
        //7、查询user_info表中数据
        System.out.println("------------输出user_info表中的内容--------------");
        sparkSession.sql("select * from user_info;").show();
        //8、关闭SparkSession
        sparkSession.close();
    }
}

运行结果:

恭喜大家看完了小编的博客,希望你能够有所收获,我一直相信躬身自问和沉思默想会充实我们的头脑,希望大家看完之后可以多想想喽,编辑不易,求关注、点赞、收藏(Thanks♪(・ω・)ノ)

相关实践学习
基于CentOS快速搭建LAMP环境
本教程介绍如何搭建LAMP环境,其中LAMP分别代表Linux、Apache、MySQL和PHP。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
4天前
|
新零售 分布式计算 数据可视化
数据分享|基于Python、Hadoop零售交易数据的Spark数据处理与Echarts可视化分析
数据分享|基于Python、Hadoop零售交易数据的Spark数据处理与Echarts可视化分析
15 0
|
2月前
|
SQL 分布式计算 Java
Spark学习---SparkSQL(概述、编程、数据的加载和保存、自定义UDFA、项目实战)
Spark学习---SparkSQL(概述、编程、数据的加载和保存、自定义UDFA、项目实战)
114 1
|
2月前
|
分布式计算 Spark 索引
Spark学习---day07、Spark内核(Shuffle、任务执行)
Spark学习---day07、Spark内核(源码提交流程、任务执行)
|
2月前
|
存储 分布式计算 API
adb spark的lakehouse api访问内表数据,还支持算子下推吗
【2月更文挑战第21天】adb spark的lakehouse api访问内表数据,还支持算子下推吗
107 2
|
2月前
|
分布式计算 监控 Java
Spark学习---day06、Spark内核(源码提交流程、任务执行)
Spark学习---day06、Spark内核(源码提交流程、任务执行)
|
分布式计算 Java Spark
|
分布式计算 Java Spark
Spark Streaming 数据清理机制
大家刚开始用Spark Streaming时,心里肯定嘀咕,对于一个7*24小时运行的数据,cache住的RDD,broadcast 系统会帮忙自己清理掉么?还是说必须自己做清理?如果系统帮忙清理的话,机制是啥?
2955 0
|
4月前
|
机器学习/深度学习 SQL 分布式计算
Apache Spark 的基本概念和在大数据分析中的应用
介绍 Apache Spark 的基本概念和在大数据分析中的应用
161 0
|
18天前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。
|
4月前
|
机器学习/深度学习 SQL 分布式计算
介绍 Apache Spark 的基本概念和在大数据分析中的应用。
介绍 Apache Spark 的基本概念和在大数据分析中的应用。