Spark SQL【Java API】(2)

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,高可用系列 2核4GB
简介: Spark SQL【Java API】

Spark SQL【Java API】(1)https://developer.aliyun.com/article/1534328

3、Spark SQL 数据的加载和保存

       Spark SQL 会把读取进来的文件封装为一个 DataFrame 对象(DataSet<Row>),所以 Spark SQL 加载数据源的过程就是创建 DataFrame 的过程。

3.1、创建 DataFrame

这里省去公共的环境代码:

public class Main {
    public static void main(String[] args) {
        // 1. 创建配置对象
        SparkConf conf = new SparkConf()
                .setMaster("local[*]")
                .setAppName("Spark Application名称");
 
        // 2. 创建 SparkSession
        SparkSession spark = SparkSession
                .builder()
                .config(conf)
                .getOrCreate();
 
        // 只在提交 Spark Application 时有效
        spark.sparkContext().setLogLevel("WARN");
 
        // 3. 业务代码
 
        // 4. 关闭 sparkSession
        spark.close();
    }
}

3.1.1、通过 JVM 对象创建

注意Spark SQL 中用到的 Java Bean 必须提供 getter 和 setter 、无参构造,而且所有属性必须为 public 修饰的。

        User user1 = new User("汤姆", 11L);
        User user2 = new User("李大喜", 18L);
        User user3 = new User("燕双鹰", 18L);
        User user4 = new User("狄仁杰", 11L);
        Dataset<Row> df = spark.createDataFrame(Arrays.asList(user1, user2, user3, user4), User.class);
        df.show();

这里的 df.show 就相当于注册了一张临时表然后 select * from 这张表。

运行结果:

3.1.2、csv 文件

注意:Spark 读取 csv 文件时,读进来的字段都是 String 类型,所以如果有需求需要把 csv 中的数据封装转为 Bean 的时候,对于任何类型的数据都必须使用 getString 来读取,读取进来再做转换。比如下面,我们把读取进来的 csv 文件使用 map 函数转为 dataset 再做查询

注意通过 csv 读取进来的 DataFrame 并没有 schema 信息,也不能通过 as 方法转为 DataSet 方法,因为 DataFrame 的列名和类型都是 _c0 string , _c1 string ...  和 User 的属性名根本匹配不上,所以只能通过 map 函数来把 DataFrame 转为 DataSet ,这样它才有了类型信息。

        // 加载 csv 文件
        Dataset<Row> df = spark.read()
                .option("sep", ",") // 使用 sep 或者 delimiter 效果一样
                .option("header", false)
                .csv("src/main/resources/csv/user.csv");
 
        // 转为 dataset 展示
        df.map(
                (MapFunction<Row, User>) row -> new User(row.getString(0),Long.parseLong(row.getString(1)),row.getString(2)),
                Encoders.bean(User.class) // 这个 Bean 必须提供 getter 和 setter 方法,否则报错
        ).show();

运行结果:

将结果写入到 csv 文件中

写入到 csv 文件不能通过 DataFrame 直接写,因为现在它连 schema 都没有,sql 中的字段它都识别不了。所以必须先转为 DataSet 再去查询出结果写入到文件:

        // 加载 csv 文件
        Dataset<Row> df = spark.read()
                .option("seq", ",")
                .option("header", false)
                .csv("src/main/resources/csv/user.csv");
 
        df.printSchema();
 
        // 不能这么转 因为 DataFrame 没有模式信息 字段名默认是 _c0,_c1 ... 和 User 的属性名完全匹配不上 会报错!
        // Dataset<User> ds = df.as(Encoders.bean(User.class));
 
        Dataset<User> ds = df.map(
                (MapFunction<Row, User>) row -> new User(row.getString(0), Long.parseLong(row.getString(1)), row.getString(2)),
                Encoders.bean(User.class)
        );
 
        ds.printSchema();
 
        ds.createOrReplaceTempView("users");
 
        spark.sql("SELECT CONCAT(name,'大侠') name, age FROM users WHERE age > 18")
                .write()
                .option("header",true)
                .option("seq","\t")
                .csv("output");

运行结果:

3.1.3、json 文件

注意:Spark 在读取 json 文件时,默认把 int 类型的值当做 bigint ,如果我们使用 row.getInt 去解析时就会直接报错(因为是小转大),所以我们的 Bean 的整型应该升级为长整型 Long 才不会报错。此外,Spark 读取 json 文件后封装成的 Row 对象是以 json 的字段作为索引的(是根据索引的 ASCII 码进行排序之后再从 0 开始排的),而不是按照 json 文件中的字段顺序,这也是一个坑点。

        Dataset<Row> df = spark.read().json("src/main/resources/json/user.json");
 
        Dataset<User> ds = df.map(
                (MapFunction<Row, User>) row -> new User(row.getString(2),row.getLong(0),row.getString(1)),
                Encoders.bean(User.class)
        );
 
        ds.show();

       所以一般不会用上面的这种方式去读取 json,因为我们无法自己预估排序后的字段索引值。我们一般直接把 json 转为 DataFrame 之后立即转为 DataSet 进行操作,或者直接把 DataFrame 对象注册为临时表,然后使用 SQL 进行分析。

将结果写入到 json 文件

       下面我们把 json 读取进来解析为 DataFrame 之后直接注册为临时表——用户表,然后用 sql 进行分析(Spark SQL 支持 HQL 中的所有语法,所以这里试用一下窗口函数):

        Dataset<Row> df = spark.read().json("src/main/resources/json/user.json");
 
        df.createOrReplaceTempView("users");
 
        spark.sql("SELECT name,ROW_NUMBER() OVER(PARTITION BY dept ORDER BY age) rk FROM users")
            .write()
            .json("users_rk");

这里的 "user_rk" 是输出文件的目录名,最终会生成四个文件:两个 CRC 校验文件,一个 SUCCESS 和 生成的 json 文件。

运行结果:

我们这里直接用 DataFrame 来将分析出结果写入到 json 文件,但是上面的 csv 就不可以,因为 json 文件自带字段名,而字段类型 Spark 是可以识别的。

3.2、与 MySQL 交互

导入 MySQL 依赖:

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.31</version>
        </dependency>
public static void main(String[] args) {
        // 1. 创建配置对象
        SparkConf conf = new SparkConf()
                .setMaster("local[*]")
                .setAppName("read from mysql");
 
        // 2. 创建 SparkSession
        SparkSession spark = SparkSession
                .builder()
                .config(conf)
                .getOrCreate();
 
        Dataset<Row> df = spark.read()
                .format("jdbc")
                .option("url", "jdbc:mysql://127.0.0.1:3306/spark")
                .option("user", "root")
                .option("password", "Yan1029.")
                .option("dbtable", "student")
                .load();
 
        df.select("*").show();
 
        spark.close();
    }

3.3、与 Hive 交互

导入依赖:

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.12</artifactId>
            <version>3.3.1</version>
        </dependency>

拷贝 hive-site.xml到resources目录(如果需要操作Hadoop,需要拷贝hdfs-site.xml、core-site.xml、yarn-site.xml),然后启动 Hadoop 和 Hive。

    public static void main(String[] args) {
 
        System.setProperty("HADOOP_USER_NAME","lyh");
 
        // 1. 创建配置对象
        SparkConf conf = new SparkConf()
                .setMaster("local[*]")
                .setAppName("spark sql operate hive");
 
        // 2. 获取 SparkSession
        SparkSession spark = SparkSession
                .builder()
                .enableHiveSupport() // 添加 hive 支持
                .config(conf)
                .getOrCreate();
 
        spark.sql("show tables").show();
 
        // 4. 关闭 SparkSession
        spark.close();
    }

运行结果:

4、Spark SQL 练习

4.1、统计每个商品的销量最高的日期

从订单明细表(order_detail)中统计出每种商品销售件数最多的日期及当日销量,如果有同一商品多日销量并列的情况,取其中的最小日期:

public static void main(String[] args) {
 
        System.setProperty("HADOOP_USER_NAME","lyh");
 
        // 1. 创建配置对象
        SparkConf conf = new SparkConf()
                .setMaster("local[*]")
                .setAppName("spark sql operate hive");
 
        // 2. 获取 SparkSession
        SparkSession spark = SparkSession
                .builder()
                .enableHiveSupport() // 添加 hive 支持
                .config(conf)
                .getOrCreate();
 
        spark.sql("use db_hive2");
        // order_detail_id  order_id  sku_id  create_date price   sku_num
        // 每件商品的最高销量
        spark.sql("SELECT sku_id, create_date, sum_num FROM (SELECT sku_id, create_date, sum_num, ROW_NUMBER() OVER(PARTITION BY sku_id ORDER BY sum_num DESC,create_date ASC) rk FROM (SELECT sku_id, create_date, sum(sku_num) sum_num FROM order_detail GROUP BY sku_id,create_date)t1)t2 WHERE rk = 1").show();
        // 4. 关闭 SparkSession
        spark.close();
    }

上面个的代码就像在写 HQL 一样,我们可以把其中的子表提出来创建为临时表:

public static void main(String[] args) {
 
        System.setProperty("HADOOP_USER_NAME","lyh");
 
        // 1. 创建配置对象
        SparkConf conf = new SparkConf()
                .setMaster("local[*]")
                .setAppName("spark sql operate hive");
 
        // 2. 获取 SparkSession
        SparkSession spark = SparkSession
                .builder()
                .enableHiveSupport() // 添加 hive 支持
                .config(conf)
                .getOrCreate();
 
        spark.sql("use db_hive2");
        // order_detail_id  order_id  sku_id  create_date price   sku_num
        // 每件商品的最高销量
        spark.sql("SELECT sku_id, create_date, sum(sku_num) sum_num FROM order_detail GROUP BY sku_id,create_date")
                .createOrReplaceTempView("t1");
 
        spark.sql("SELECT sku_id, create_date, sum_num, ROW_NUMBER() OVER(PARTITION BY sku_id ORDER BY sum_num DESC,create_date ASC) rk FROM t1")
                .createOrReplaceTempView("t2");
 
        spark.sql("SELECT sku_id, create_date, sum_num FROM t2 WHERE rk = 1").show();
        // 4. 关闭 SparkSession
        spark.close();
    }

没啥难度,这就是官网说的使用 Spark SQL 或者 HQL 来操作数仓中的数据,之后做个 Spark SQL 项目多练练手就行了。

运行结果:

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
1天前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
9 2
|
16天前
|
Java API 数据处理
探索Java中的Lambda表达式与Stream API
【10月更文挑战第22天】 在Java编程中,Lambda表达式和Stream API是两个强大的功能,它们极大地简化了代码的编写和提高了开发效率。本文将深入探讨这两个概念的基本用法、优势以及在实际项目中的应用案例,帮助读者更好地理解和运用这些现代Java特性。
|
5天前
|
SQL JSON 分布式计算
【赵渝强老师】Spark SQL的数据模型:DataFrame
本文介绍了在Spark SQL中创建DataFrame的三种方法。首先,通过定义case class来创建表结构,然后将CSV文件读入RDD并关联Schema生成DataFrame。其次,使用StructType定义表结构,同样将CSV文件读入RDD并转换为Row对象后创建DataFrame。最后,直接加载带有格式的数据文件(如JSON),通过读取文件内容直接创建DataFrame。每种方法都包含详细的代码示例和解释。
|
1月前
|
分布式计算 Java 大数据
大数据-147 Apache Kudu 常用 Java API 增删改查
大数据-147 Apache Kudu 常用 Java API 增删改查
26 1
|
1月前
|
SQL 分布式计算 大数据
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
40 0
|
1月前
|
SQL 分布式计算 算法
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
75 0
|
1月前
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
34 0
|
1月前
|
SQL 分布式计算 大数据
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
49 0
|
1月前
|
SQL 存储 分布式计算
大数据-93 Spark 集群 Spark SQL 概述 基本概念 SparkSQL对比 架构 抽象
大数据-93 Spark 集群 Spark SQL 概述 基本概念 SparkSQL对比 架构 抽象
37 0
|
JSON 资源调度 分布式计算
基于Yarn API的Spark程序监控
一.简述 通过对Yarn ResourceManager中运行程序的状态(RUNNING、KILLED、FAILED、FINISHED)以及ApplicationMaster中Application的Job执行时长超过批次时间的监控,来达到对Spark on Yarn程序的失败重启、超时重启等功能 二.
4913 0