Spark SQL【Java API】(1)https://developer.aliyun.com/article/1534328
3、Spark SQL 数据的加载和保存
Spark SQL 会把读取进来的文件封装为一个 DataFrame 对象(DataSet<Row>),所以 Spark SQL 加载数据源的过程就是创建 DataFrame 的过程。
3.1、创建 DataFrame
这里省去公共的环境代码:
public class Main { public static void main(String[] args) { // 1. 创建配置对象 SparkConf conf = new SparkConf() .setMaster("local[*]") .setAppName("Spark Application名称"); // 2. 创建 SparkSession SparkSession spark = SparkSession .builder() .config(conf) .getOrCreate(); // 只在提交 Spark Application 时有效 spark.sparkContext().setLogLevel("WARN"); // 3. 业务代码 // 4. 关闭 sparkSession spark.close(); } }
3.1.1、通过 JVM 对象创建
注意:Spark SQL 中用到的 Java Bean 必须提供 getter 和 setter 、无参构造,而且所有属性必须为 public 修饰的。
User user1 = new User("汤姆", 11L); User user2 = new User("李大喜", 18L); User user3 = new User("燕双鹰", 18L); User user4 = new User("狄仁杰", 11L); Dataset<Row> df = spark.createDataFrame(Arrays.asList(user1, user2, user3, user4), User.class); df.show();
这里的 df.show 就相当于注册了一张临时表然后 select * from 这张表。
运行结果:
3.1.2、csv 文件
注意:Spark 读取 csv 文件时,读进来的字段都是 String 类型,所以如果有需求需要把 csv 中的数据封装转为 Bean 的时候,对于任何类型的数据都必须使用 getString 来读取,读取进来再做转换。比如下面,我们把读取进来的 csv 文件使用 map 函数转为 dataset 再做查询
注意:通过 csv 读取进来的 DataFrame 并没有 schema 信息,也不能通过 as 方法转为 DataSet 方法,因为 DataFrame 的列名和类型都是 _c0 string , _c1 string ... 和 User 的属性名根本匹配不上,所以只能通过 map 函数来把 DataFrame 转为 DataSet ,这样它才有了类型信息。
// 加载 csv 文件 Dataset<Row> df = spark.read() .option("sep", ",") // 使用 sep 或者 delimiter 效果一样 .option("header", false) .csv("src/main/resources/csv/user.csv"); // 转为 dataset 展示 df.map( (MapFunction<Row, User>) row -> new User(row.getString(0),Long.parseLong(row.getString(1)),row.getString(2)), Encoders.bean(User.class) // 这个 Bean 必须提供 getter 和 setter 方法,否则报错 ).show();
运行结果:
将结果写入到 csv 文件中:
写入到 csv 文件不能通过 DataFrame 直接写,因为现在它连 schema 都没有,sql 中的字段它都识别不了。所以必须先转为 DataSet 再去查询出结果写入到文件:
// 加载 csv 文件 Dataset<Row> df = spark.read() .option("seq", ",") .option("header", false) .csv("src/main/resources/csv/user.csv"); df.printSchema(); // 不能这么转 因为 DataFrame 没有模式信息 字段名默认是 _c0,_c1 ... 和 User 的属性名完全匹配不上 会报错! // Dataset<User> ds = df.as(Encoders.bean(User.class)); Dataset<User> ds = df.map( (MapFunction<Row, User>) row -> new User(row.getString(0), Long.parseLong(row.getString(1)), row.getString(2)), Encoders.bean(User.class) ); ds.printSchema(); ds.createOrReplaceTempView("users"); spark.sql("SELECT CONCAT(name,'大侠') name, age FROM users WHERE age > 18") .write() .option("header",true) .option("seq","\t") .csv("output");
运行结果:
3.1.3、json 文件
注意:Spark 在读取 json 文件时,默认把 int 类型的值当做 bigint ,如果我们使用 row.getInt 去解析时就会直接报错(因为是小转大),所以我们的 Bean 的整型应该升级为长整型 Long 才不会报错。此外,Spark 读取 json 文件后封装成的 Row 对象是以 json 的字段作为索引的(是根据索引的 ASCII 码进行排序之后再从 0 开始排的),而不是按照 json 文件中的字段顺序,这也是一个坑点。
Dataset<Row> df = spark.read().json("src/main/resources/json/user.json"); Dataset<User> ds = df.map( (MapFunction<Row, User>) row -> new User(row.getString(2),row.getLong(0),row.getString(1)), Encoders.bean(User.class) ); ds.show();
所以一般不会用上面的这种方式去读取 json,因为我们无法自己预估排序后的字段索引值。我们一般直接把 json 转为 DataFrame 之后立即转为 DataSet 进行操作,或者直接把 DataFrame 对象注册为临时表,然后使用 SQL 进行分析。
将结果写入到 json 文件:
下面我们把 json 读取进来解析为 DataFrame 之后直接注册为临时表——用户表,然后用 sql 进行分析(Spark SQL 支持 HQL 中的所有语法,所以这里试用一下窗口函数):
Dataset<Row> df = spark.read().json("src/main/resources/json/user.json"); df.createOrReplaceTempView("users"); spark.sql("SELECT name,ROW_NUMBER() OVER(PARTITION BY dept ORDER BY age) rk FROM users") .write() .json("users_rk");
这里的 "user_rk" 是输出文件的目录名,最终会生成四个文件:两个 CRC 校验文件,一个 SUCCESS 和 生成的 json 文件。
运行结果:
我们这里直接用 DataFrame 来将分析出结果写入到 json 文件,但是上面的 csv 就不可以,因为 json 文件自带字段名,而字段类型 Spark 是可以识别的。
3.2、与 MySQL 交互
导入 MySQL 依赖:
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.31</version> </dependency>
public static void main(String[] args) { // 1. 创建配置对象 SparkConf conf = new SparkConf() .setMaster("local[*]") .setAppName("read from mysql"); // 2. 创建 SparkSession SparkSession spark = SparkSession .builder() .config(conf) .getOrCreate(); Dataset<Row> df = spark.read() .format("jdbc") .option("url", "jdbc:mysql://127.0.0.1:3306/spark") .option("user", "root") .option("password", "Yan1029.") .option("dbtable", "student") .load(); df.select("*").show(); spark.close(); }
3.3、与 Hive 交互
导入依赖:
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.12</artifactId> <version>3.3.1</version> </dependency>
拷贝 hive-site.xml到resources目录(如果需要操作Hadoop,需要拷贝hdfs-site.xml、core-site.xml、yarn-site.xml),然后启动 Hadoop 和 Hive。
public static void main(String[] args) { System.setProperty("HADOOP_USER_NAME","lyh"); // 1. 创建配置对象 SparkConf conf = new SparkConf() .setMaster("local[*]") .setAppName("spark sql operate hive"); // 2. 获取 SparkSession SparkSession spark = SparkSession .builder() .enableHiveSupport() // 添加 hive 支持 .config(conf) .getOrCreate(); spark.sql("show tables").show(); // 4. 关闭 SparkSession spark.close(); }
运行结果:
4、Spark SQL 练习
4.1、统计每个商品的销量最高的日期
从订单明细表(order_detail)中统计出每种商品销售件数最多的日期及当日销量,如果有同一商品多日销量并列的情况,取其中的最小日期:
public static void main(String[] args) { System.setProperty("HADOOP_USER_NAME","lyh"); // 1. 创建配置对象 SparkConf conf = new SparkConf() .setMaster("local[*]") .setAppName("spark sql operate hive"); // 2. 获取 SparkSession SparkSession spark = SparkSession .builder() .enableHiveSupport() // 添加 hive 支持 .config(conf) .getOrCreate(); spark.sql("use db_hive2"); // order_detail_id order_id sku_id create_date price sku_num // 每件商品的最高销量 spark.sql("SELECT sku_id, create_date, sum_num FROM (SELECT sku_id, create_date, sum_num, ROW_NUMBER() OVER(PARTITION BY sku_id ORDER BY sum_num DESC,create_date ASC) rk FROM (SELECT sku_id, create_date, sum(sku_num) sum_num FROM order_detail GROUP BY sku_id,create_date)t1)t2 WHERE rk = 1").show(); // 4. 关闭 SparkSession spark.close(); }
上面个的代码就像在写 HQL 一样,我们可以把其中的子表提出来创建为临时表:
public static void main(String[] args) { System.setProperty("HADOOP_USER_NAME","lyh"); // 1. 创建配置对象 SparkConf conf = new SparkConf() .setMaster("local[*]") .setAppName("spark sql operate hive"); // 2. 获取 SparkSession SparkSession spark = SparkSession .builder() .enableHiveSupport() // 添加 hive 支持 .config(conf) .getOrCreate(); spark.sql("use db_hive2"); // order_detail_id order_id sku_id create_date price sku_num // 每件商品的最高销量 spark.sql("SELECT sku_id, create_date, sum(sku_num) sum_num FROM order_detail GROUP BY sku_id,create_date") .createOrReplaceTempView("t1"); spark.sql("SELECT sku_id, create_date, sum_num, ROW_NUMBER() OVER(PARTITION BY sku_id ORDER BY sum_num DESC,create_date ASC) rk FROM t1") .createOrReplaceTempView("t2"); spark.sql("SELECT sku_id, create_date, sum_num FROM t2 WHERE rk = 1").show(); // 4. 关闭 SparkSession spark.close(); }
没啥难度,这就是官网说的使用 Spark SQL 或者 HQL 来操作数仓中的数据,之后做个 Spark SQL 项目多练练手就行了。
运行结果: