Spark SQL【Java API】(2)

简介: Spark SQL【Java API】

Spark SQL【Java API】(1)https://developer.aliyun.com/article/1534328

3、Spark SQL 数据的加载和保存

       Spark SQL 会把读取进来的文件封装为一个 DataFrame 对象(DataSet<Row>),所以 Spark SQL 加载数据源的过程就是创建 DataFrame 的过程。

3.1、创建 DataFrame

这里省去公共的环境代码:

public class Main {
    public static void main(String[] args) {
        // 1. 创建配置对象
        SparkConf conf = new SparkConf()
                .setMaster("local[*]")
                .setAppName("Spark Application名称");
 
        // 2. 创建 SparkSession
        SparkSession spark = SparkSession
                .builder()
                .config(conf)
                .getOrCreate();
 
        // 只在提交 Spark Application 时有效
        spark.sparkContext().setLogLevel("WARN");
 
        // 3. 业务代码
 
        // 4. 关闭 sparkSession
        spark.close();
    }
}

3.1.1、通过 JVM 对象创建

注意Spark SQL 中用到的 Java Bean 必须提供 getter 和 setter 、无参构造,而且所有属性必须为 public 修饰的。

        User user1 = new User("汤姆", 11L);
        User user2 = new User("李大喜", 18L);
        User user3 = new User("燕双鹰", 18L);
        User user4 = new User("狄仁杰", 11L);
        Dataset<Row> df = spark.createDataFrame(Arrays.asList(user1, user2, user3, user4), User.class);
        df.show();

这里的 df.show 就相当于注册了一张临时表然后 select * from 这张表。

运行结果:

3.1.2、csv 文件

注意:Spark 读取 csv 文件时,读进来的字段都是 String 类型,所以如果有需求需要把 csv 中的数据封装转为 Bean 的时候,对于任何类型的数据都必须使用 getString 来读取,读取进来再做转换。比如下面,我们把读取进来的 csv 文件使用 map 函数转为 dataset 再做查询

注意通过 csv 读取进来的 DataFrame 并没有 schema 信息,也不能通过 as 方法转为 DataSet 方法,因为 DataFrame 的列名和类型都是 _c0 string , _c1 string ...  和 User 的属性名根本匹配不上,所以只能通过 map 函数来把 DataFrame 转为 DataSet ,这样它才有了类型信息。

        // 加载 csv 文件
        Dataset<Row> df = spark.read()
                .option("sep", ",") // 使用 sep 或者 delimiter 效果一样
                .option("header", false)
                .csv("src/main/resources/csv/user.csv");
 
        // 转为 dataset 展示
        df.map(
                (MapFunction<Row, User>) row -> new User(row.getString(0),Long.parseLong(row.getString(1)),row.getString(2)),
                Encoders.bean(User.class) // 这个 Bean 必须提供 getter 和 setter 方法,否则报错
        ).show();

运行结果:

将结果写入到 csv 文件中

写入到 csv 文件不能通过 DataFrame 直接写,因为现在它连 schema 都没有,sql 中的字段它都识别不了。所以必须先转为 DataSet 再去查询出结果写入到文件:

        // 加载 csv 文件
        Dataset<Row> df = spark.read()
                .option("seq", ",")
                .option("header", false)
                .csv("src/main/resources/csv/user.csv");
 
        df.printSchema();
 
        // 不能这么转 因为 DataFrame 没有模式信息 字段名默认是 _c0,_c1 ... 和 User 的属性名完全匹配不上 会报错!
        // Dataset<User> ds = df.as(Encoders.bean(User.class));
 
        Dataset<User> ds = df.map(
                (MapFunction<Row, User>) row -> new User(row.getString(0), Long.parseLong(row.getString(1)), row.getString(2)),
                Encoders.bean(User.class)
        );
 
        ds.printSchema();
 
        ds.createOrReplaceTempView("users");
 
        spark.sql("SELECT CONCAT(name,'大侠') name, age FROM users WHERE age > 18")
                .write()
                .option("header",true)
                .option("seq","\t")
                .csv("output");

运行结果:

3.1.3、json 文件

注意:Spark 在读取 json 文件时,默认把 int 类型的值当做 bigint ,如果我们使用 row.getInt 去解析时就会直接报错(因为是小转大),所以我们的 Bean 的整型应该升级为长整型 Long 才不会报错。此外,Spark 读取 json 文件后封装成的 Row 对象是以 json 的字段作为索引的(是根据索引的 ASCII 码进行排序之后再从 0 开始排的),而不是按照 json 文件中的字段顺序,这也是一个坑点。

        Dataset<Row> df = spark.read().json("src/main/resources/json/user.json");
 
        Dataset<User> ds = df.map(
                (MapFunction<Row, User>) row -> new User(row.getString(2),row.getLong(0),row.getString(1)),
                Encoders.bean(User.class)
        );
 
        ds.show();

       所以一般不会用上面的这种方式去读取 json,因为我们无法自己预估排序后的字段索引值。我们一般直接把 json 转为 DataFrame 之后立即转为 DataSet 进行操作,或者直接把 DataFrame 对象注册为临时表,然后使用 SQL 进行分析。

将结果写入到 json 文件

       下面我们把 json 读取进来解析为 DataFrame 之后直接注册为临时表——用户表,然后用 sql 进行分析(Spark SQL 支持 HQL 中的所有语法,所以这里试用一下窗口函数):

        Dataset<Row> df = spark.read().json("src/main/resources/json/user.json");
 
        df.createOrReplaceTempView("users");
 
        spark.sql("SELECT name,ROW_NUMBER() OVER(PARTITION BY dept ORDER BY age) rk FROM users")
            .write()
            .json("users_rk");

这里的 "user_rk" 是输出文件的目录名,最终会生成四个文件:两个 CRC 校验文件,一个 SUCCESS 和 生成的 json 文件。

运行结果:

我们这里直接用 DataFrame 来将分析出结果写入到 json 文件,但是上面的 csv 就不可以,因为 json 文件自带字段名,而字段类型 Spark 是可以识别的。

3.2、与 MySQL 交互

导入 MySQL 依赖:

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.31</version>
        </dependency>
public static void main(String[] args) {
        // 1. 创建配置对象
        SparkConf conf = new SparkConf()
                .setMaster("local[*]")
                .setAppName("read from mysql");
 
        // 2. 创建 SparkSession
        SparkSession spark = SparkSession
                .builder()
                .config(conf)
                .getOrCreate();
 
        Dataset<Row> df = spark.read()
                .format("jdbc")
                .option("url", "jdbc:mysql://127.0.0.1:3306/spark")
                .option("user", "root")
                .option("password", "Yan1029.")
                .option("dbtable", "student")
                .load();
 
        df.select("*").show();
 
        spark.close();
    }

3.3、与 Hive 交互

导入依赖:

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.12</artifactId>
            <version>3.3.1</version>
        </dependency>

拷贝 hive-site.xml到resources目录(如果需要操作Hadoop,需要拷贝hdfs-site.xml、core-site.xml、yarn-site.xml),然后启动 Hadoop 和 Hive。

    public static void main(String[] args) {
 
        System.setProperty("HADOOP_USER_NAME","lyh");
 
        // 1. 创建配置对象
        SparkConf conf = new SparkConf()
                .setMaster("local[*]")
                .setAppName("spark sql operate hive");
 
        // 2. 获取 SparkSession
        SparkSession spark = SparkSession
                .builder()
                .enableHiveSupport() // 添加 hive 支持
                .config(conf)
                .getOrCreate();
 
        spark.sql("show tables").show();
 
        // 4. 关闭 SparkSession
        spark.close();
    }

运行结果:

4、Spark SQL 练习

4.1、统计每个商品的销量最高的日期

从订单明细表(order_detail)中统计出每种商品销售件数最多的日期及当日销量,如果有同一商品多日销量并列的情况,取其中的最小日期:

public static void main(String[] args) {
 
        System.setProperty("HADOOP_USER_NAME","lyh");
 
        // 1. 创建配置对象
        SparkConf conf = new SparkConf()
                .setMaster("local[*]")
                .setAppName("spark sql operate hive");
 
        // 2. 获取 SparkSession
        SparkSession spark = SparkSession
                .builder()
                .enableHiveSupport() // 添加 hive 支持
                .config(conf)
                .getOrCreate();
 
        spark.sql("use db_hive2");
        // order_detail_id  order_id  sku_id  create_date price   sku_num
        // 每件商品的最高销量
        spark.sql("SELECT sku_id, create_date, sum_num FROM (SELECT sku_id, create_date, sum_num, ROW_NUMBER() OVER(PARTITION BY sku_id ORDER BY sum_num DESC,create_date ASC) rk FROM (SELECT sku_id, create_date, sum(sku_num) sum_num FROM order_detail GROUP BY sku_id,create_date)t1)t2 WHERE rk = 1").show();
        // 4. 关闭 SparkSession
        spark.close();
    }

上面个的代码就像在写 HQL 一样,我们可以把其中的子表提出来创建为临时表:

public static void main(String[] args) {
 
        System.setProperty("HADOOP_USER_NAME","lyh");
 
        // 1. 创建配置对象
        SparkConf conf = new SparkConf()
                .setMaster("local[*]")
                .setAppName("spark sql operate hive");
 
        // 2. 获取 SparkSession
        SparkSession spark = SparkSession
                .builder()
                .enableHiveSupport() // 添加 hive 支持
                .config(conf)
                .getOrCreate();
 
        spark.sql("use db_hive2");
        // order_detail_id  order_id  sku_id  create_date price   sku_num
        // 每件商品的最高销量
        spark.sql("SELECT sku_id, create_date, sum(sku_num) sum_num FROM order_detail GROUP BY sku_id,create_date")
                .createOrReplaceTempView("t1");
 
        spark.sql("SELECT sku_id, create_date, sum_num, ROW_NUMBER() OVER(PARTITION BY sku_id ORDER BY sum_num DESC,create_date ASC) rk FROM t1")
                .createOrReplaceTempView("t2");
 
        spark.sql("SELECT sku_id, create_date, sum_num FROM t2 WHERE rk = 1").show();
        // 4. 关闭 SparkSession
        spark.close();
    }

没啥难度,这就是官网说的使用 Spark SQL 或者 HQL 来操作数仓中的数据,之后做个 Spark SQL 项目多练练手就行了。

运行结果:

相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。 &nbsp; 相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情:&nbsp;https://www.aliyun.com/product/rds/mysql&nbsp;
相关文章
|
3月前
|
Java API 数据处理
Java新特性:使用Stream API重构你的数据处理
Java新特性:使用Stream API重构你的数据处理
|
3月前
|
Java 大数据 API
Java Stream API:现代集合处理与函数式编程
Java Stream API:现代集合处理与函数式编程
266 100
|
3月前
|
Java API 数据处理
Java Stream API:现代集合处理新方式
Java Stream API:现代集合处理新方式
299 101
|
3月前
|
并行计算 Java 大数据
Java Stream API:现代数据处理之道
Java Stream API:现代数据处理之道
261 101
|
4月前
|
存储 Java API
Java Stream API:现代数据处理之道
Java Stream API:现代数据处理之道
376 188
|
4月前
|
存储 Java API
Java Stream API:现代数据处理之道
Java Stream API:现代数据处理之道
291 92
|
3月前
|
安全 Java API
使用 Java 构建强大的 REST API 的四个基本技巧
本文结合探险领域案例,分享Java构建REST API的四大核心策略:统一资源命名、版本控制与自动化文档、安全防护及标准化异常处理,助力开发者打造易用、可维护、安全可靠的稳健API服务。
232 2
|
3月前
|
存储 数据可视化 Java
Java Stream API 的强大功能
Java Stream API 是 Java 8 引入的重要特性,它改变了集合数据的处理方式。通过声明式语法,开发者可以更简洁地进行过滤、映射、聚合等操作。Stream API 支持惰性求值和并行处理,提升了代码效率和可读性,是现代 Java 开发不可或缺的工具。
Java Stream API 的强大功能
|
4月前
|
安全 Java API
Java日期时间API:从Date到Java.time
本文深入解析了Java 8中引入的全新日期时间API,涵盖LocalDate、LocalTime、LocalDateTime、ZonedDateTime等核心类的使用,以及时间调整、格式化、时区处理和与旧API的互操作。通过实例对比,展示了新API在可变性、线程安全与易用性方面的显著优势,并提供迁移方案与实战技巧,助你掌握现代Java时间处理的最佳实践。