Spark SQL【Java API】(2)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: Spark SQL【Java API】

Spark SQL【Java API】(1)https://developer.aliyun.com/article/1534328

3、Spark SQL 数据的加载和保存

       Spark SQL 会把读取进来的文件封装为一个 DataFrame 对象(DataSet<Row>),所以 Spark SQL 加载数据源的过程就是创建 DataFrame 的过程。

3.1、创建 DataFrame

这里省去公共的环境代码:

public class Main {
    public static void main(String[] args) {
        // 1. 创建配置对象
        SparkConf conf = new SparkConf()
                .setMaster("local[*]")
                .setAppName("Spark Application名称");
 
        // 2. 创建 SparkSession
        SparkSession spark = SparkSession
                .builder()
                .config(conf)
                .getOrCreate();
 
        // 只在提交 Spark Application 时有效
        spark.sparkContext().setLogLevel("WARN");
 
        // 3. 业务代码
 
        // 4. 关闭 sparkSession
        spark.close();
    }
}

3.1.1、通过 JVM 对象创建

注意Spark SQL 中用到的 Java Bean 必须提供 getter 和 setter 、无参构造,而且所有属性必须为 public 修饰的。

        User user1 = new User("汤姆", 11L);
        User user2 = new User("李大喜", 18L);
        User user3 = new User("燕双鹰", 18L);
        User user4 = new User("狄仁杰", 11L);
        Dataset<Row> df = spark.createDataFrame(Arrays.asList(user1, user2, user3, user4), User.class);
        df.show();

这里的 df.show 就相当于注册了一张临时表然后 select * from 这张表。

运行结果:

3.1.2、csv 文件

注意:Spark 读取 csv 文件时,读进来的字段都是 String 类型,所以如果有需求需要把 csv 中的数据封装转为 Bean 的时候,对于任何类型的数据都必须使用 getString 来读取,读取进来再做转换。比如下面,我们把读取进来的 csv 文件使用 map 函数转为 dataset 再做查询

注意通过 csv 读取进来的 DataFrame 并没有 schema 信息,也不能通过 as 方法转为 DataSet 方法,因为 DataFrame 的列名和类型都是 _c0 string , _c1 string ...  和 User 的属性名根本匹配不上,所以只能通过 map 函数来把 DataFrame 转为 DataSet ,这样它才有了类型信息。

        // 加载 csv 文件
        Dataset<Row> df = spark.read()
                .option("sep", ",") // 使用 sep 或者 delimiter 效果一样
                .option("header", false)
                .csv("src/main/resources/csv/user.csv");
 
        // 转为 dataset 展示
        df.map(
                (MapFunction<Row, User>) row -> new User(row.getString(0),Long.parseLong(row.getString(1)),row.getString(2)),
                Encoders.bean(User.class) // 这个 Bean 必须提供 getter 和 setter 方法,否则报错
        ).show();

运行结果:

将结果写入到 csv 文件中

写入到 csv 文件不能通过 DataFrame 直接写,因为现在它连 schema 都没有,sql 中的字段它都识别不了。所以必须先转为 DataSet 再去查询出结果写入到文件:

        // 加载 csv 文件
        Dataset<Row> df = spark.read()
                .option("seq", ",")
                .option("header", false)
                .csv("src/main/resources/csv/user.csv");
 
        df.printSchema();
 
        // 不能这么转 因为 DataFrame 没有模式信息 字段名默认是 _c0,_c1 ... 和 User 的属性名完全匹配不上 会报错!
        // Dataset<User> ds = df.as(Encoders.bean(User.class));
 
        Dataset<User> ds = df.map(
                (MapFunction<Row, User>) row -> new User(row.getString(0), Long.parseLong(row.getString(1)), row.getString(2)),
                Encoders.bean(User.class)
        );
 
        ds.printSchema();
 
        ds.createOrReplaceTempView("users");
 
        spark.sql("SELECT CONCAT(name,'大侠') name, age FROM users WHERE age > 18")
                .write()
                .option("header",true)
                .option("seq","\t")
                .csv("output");

运行结果:

3.1.3、json 文件

注意:Spark 在读取 json 文件时,默认把 int 类型的值当做 bigint ,如果我们使用 row.getInt 去解析时就会直接报错(因为是小转大),所以我们的 Bean 的整型应该升级为长整型 Long 才不会报错。此外,Spark 读取 json 文件后封装成的 Row 对象是以 json 的字段作为索引的(是根据索引的 ASCII 码进行排序之后再从 0 开始排的),而不是按照 json 文件中的字段顺序,这也是一个坑点。

        Dataset<Row> df = spark.read().json("src/main/resources/json/user.json");
 
        Dataset<User> ds = df.map(
                (MapFunction<Row, User>) row -> new User(row.getString(2),row.getLong(0),row.getString(1)),
                Encoders.bean(User.class)
        );
 
        ds.show();

       所以一般不会用上面的这种方式去读取 json,因为我们无法自己预估排序后的字段索引值。我们一般直接把 json 转为 DataFrame 之后立即转为 DataSet 进行操作,或者直接把 DataFrame 对象注册为临时表,然后使用 SQL 进行分析。

将结果写入到 json 文件

       下面我们把 json 读取进来解析为 DataFrame 之后直接注册为临时表——用户表,然后用 sql 进行分析(Spark SQL 支持 HQL 中的所有语法,所以这里试用一下窗口函数):

        Dataset<Row> df = spark.read().json("src/main/resources/json/user.json");
 
        df.createOrReplaceTempView("users");
 
        spark.sql("SELECT name,ROW_NUMBER() OVER(PARTITION BY dept ORDER BY age) rk FROM users")
            .write()
            .json("users_rk");

这里的 "user_rk" 是输出文件的目录名,最终会生成四个文件:两个 CRC 校验文件,一个 SUCCESS 和 生成的 json 文件。

运行结果:

我们这里直接用 DataFrame 来将分析出结果写入到 json 文件,但是上面的 csv 就不可以,因为 json 文件自带字段名,而字段类型 Spark 是可以识别的。

3.2、与 MySQL 交互

导入 MySQL 依赖:

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.31</version>
        </dependency>
public static void main(String[] args) {
        // 1. 创建配置对象
        SparkConf conf = new SparkConf()
                .setMaster("local[*]")
                .setAppName("read from mysql");
 
        // 2. 创建 SparkSession
        SparkSession spark = SparkSession
                .builder()
                .config(conf)
                .getOrCreate();
 
        Dataset<Row> df = spark.read()
                .format("jdbc")
                .option("url", "jdbc:mysql://127.0.0.1:3306/spark")
                .option("user", "root")
                .option("password", "Yan1029.")
                .option("dbtable", "student")
                .load();
 
        df.select("*").show();
 
        spark.close();
    }

3.3、与 Hive 交互

导入依赖:

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.12</artifactId>
            <version>3.3.1</version>
        </dependency>

拷贝 hive-site.xml到resources目录(如果需要操作Hadoop,需要拷贝hdfs-site.xml、core-site.xml、yarn-site.xml),然后启动 Hadoop 和 Hive。

    public static void main(String[] args) {
 
        System.setProperty("HADOOP_USER_NAME","lyh");
 
        // 1. 创建配置对象
        SparkConf conf = new SparkConf()
                .setMaster("local[*]")
                .setAppName("spark sql operate hive");
 
        // 2. 获取 SparkSession
        SparkSession spark = SparkSession
                .builder()
                .enableHiveSupport() // 添加 hive 支持
                .config(conf)
                .getOrCreate();
 
        spark.sql("show tables").show();
 
        // 4. 关闭 SparkSession
        spark.close();
    }

运行结果:

4、Spark SQL 练习

4.1、统计每个商品的销量最高的日期

从订单明细表(order_detail)中统计出每种商品销售件数最多的日期及当日销量,如果有同一商品多日销量并列的情况,取其中的最小日期:

public static void main(String[] args) {
 
        System.setProperty("HADOOP_USER_NAME","lyh");
 
        // 1. 创建配置对象
        SparkConf conf = new SparkConf()
                .setMaster("local[*]")
                .setAppName("spark sql operate hive");
 
        // 2. 获取 SparkSession
        SparkSession spark = SparkSession
                .builder()
                .enableHiveSupport() // 添加 hive 支持
                .config(conf)
                .getOrCreate();
 
        spark.sql("use db_hive2");
        // order_detail_id  order_id  sku_id  create_date price   sku_num
        // 每件商品的最高销量
        spark.sql("SELECT sku_id, create_date, sum_num FROM (SELECT sku_id, create_date, sum_num, ROW_NUMBER() OVER(PARTITION BY sku_id ORDER BY sum_num DESC,create_date ASC) rk FROM (SELECT sku_id, create_date, sum(sku_num) sum_num FROM order_detail GROUP BY sku_id,create_date)t1)t2 WHERE rk = 1").show();
        // 4. 关闭 SparkSession
        spark.close();
    }

上面个的代码就像在写 HQL 一样,我们可以把其中的子表提出来创建为临时表:

public static void main(String[] args) {
 
        System.setProperty("HADOOP_USER_NAME","lyh");
 
        // 1. 创建配置对象
        SparkConf conf = new SparkConf()
                .setMaster("local[*]")
                .setAppName("spark sql operate hive");
 
        // 2. 获取 SparkSession
        SparkSession spark = SparkSession
                .builder()
                .enableHiveSupport() // 添加 hive 支持
                .config(conf)
                .getOrCreate();
 
        spark.sql("use db_hive2");
        // order_detail_id  order_id  sku_id  create_date price   sku_num
        // 每件商品的最高销量
        spark.sql("SELECT sku_id, create_date, sum(sku_num) sum_num FROM order_detail GROUP BY sku_id,create_date")
                .createOrReplaceTempView("t1");
 
        spark.sql("SELECT sku_id, create_date, sum_num, ROW_NUMBER() OVER(PARTITION BY sku_id ORDER BY sum_num DESC,create_date ASC) rk FROM t1")
                .createOrReplaceTempView("t2");
 
        spark.sql("SELECT sku_id, create_date, sum_num FROM t2 WHERE rk = 1").show();
        // 4. 关闭 SparkSession
        spark.close();
    }

没啥难度,这就是官网说的使用 Spark SQL 或者 HQL 来操作数仓中的数据,之后做个 Spark SQL 项目多练练手就行了。

运行结果:

相关实践学习
基于CentOS快速搭建LAMP环境
本教程介绍如何搭建LAMP环境,其中LAMP分别代表Linux、Apache、MySQL和PHP。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
2天前
|
Java API 索引
java中String类常用API
java中String类常用API
|
2天前
|
Java API
java调用个人微信API接口收发朋友圈,删除评论朋友圈
java调用个人微信API接口收发朋友圈,删除评论朋友圈
|
4天前
|
存储 Java 大数据
Java Stream API
Java Stream API
9 1
|
2天前
|
Java API 索引
java中ArrayList类常用API
java中ArrayList类常用API
|
2天前
|
Java API
备战第十五届蓝桥杯Java软件开发大学B组常见API记录
备战第十五届蓝桥杯Java软件开发大学B组常见API记录
5 0
|
2天前
|
Java API 开发工具
企业微信api,企业微信sdk接口java调用源码
企业微信api,企业微信sdk接口java调用源码
|
2天前
|
Java API 开发工具
个人微信api接口java调用源代码
个人微信api接口java调用源代码
|
分布式计算 监控 安全
威胁快报| 首个Spark REST API未授权漏洞利用分析
2018年7月7日,阿里云安全首次捕获Spark REST API的未授权RCE漏洞进行攻击的真实样本。7月9号起,阿里云平台已能默认防御此漏洞的大规模利用。 这是首次在真实攻击中发现使用“暗网”来传播恶意后门的样本,预计未来这一趋势会逐步扩大。
1957 0
|
1月前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。
|
6天前
|
存储 分布式计算 Hadoop
Spark和Hadoop都是大数据处理领域的重要工具
【6月更文挑战第17天】Spark和Hadoop都是大数据处理领域的重要工具
34 7