这是本人的学习过程,看到的同道中人祝福你们心若有所向往,何惧道阻且长;
但愿每一个人都像星星一样安详而从容的,不断沿着既定的目标走完自己的路程;
最后想说一句君子不隐其短,不知则问,不能则学。
如果大家觉得我写的还不错的话希望可以收获关注、点赞、收藏(谢谢大家)
一、SparkSQL概述
1.1 什么是SparkSQL
Spark是用于结构化数据处理的Spark模块。与基本的Spark RDD API不同,SparkSQL提供的接口为Spark提供了有关数据结构和正在执行的计算的更多信息。在内部,SparkSQL使用这些额外的信息来执行额外的优化。与SparkSQL交互的方式有很多种,包括SQL和DatasetAPI。结算时,使用相同的执行引擎,与你用于表计算的API/语言无关。
1.2 为什么要有SparkSQL
1.3 SparkSQL的发展
1、发展历史
RDD(Spark1.0)=> Dateframe(Spark1.3) =>Dataset(Spark1.6)
如果同样的数据都给到这三个数据结构,它们分别计算之后,都会给出相同的结果。
不同的是它们执行效率和执行方式。在现在的版本中,dataset性能最好,已经成为了唯一使用的接口。其中Dataframe已经在底层被看作是特殊泛型的DataSet。
2、三者的共性
(1)RDD、DataFrame、DataSet全都是Spark平台下的分布式弹性数据集,为处理大型数据通过便利。
(2)三者都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Action行动算子实,三者才会开始遍历运算。
(3)三者有许多共同的函数,例如filter,sortby等
(4)三者都会根据Spark的内存情况自动缓存运算。
(5)三者都有分区的概念
1.4 SparkSQL的特点
1、易整合:无缝的整合了SQL查询和Spark编程
2、统一的数据访问方式:使用相同的方式连接不同的数据源
3、兼容Hive:在已有的仓库上直接运行SQL或者HQL
4、标准的数据连接:通过JDBC或者ODBC来连接
二、SparkSQL 编程
2.1 SparkSession 新的起始点
在老的版本中,SparkSQL提供两种SQL查询起始点:
(1) 一个叫SQLContext,用于Spark自己提供的SQL查询;
(2)一个叫HiveContext,用于连接Hive的查询。
SparkSession是Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合,所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。
SparkSession内部封装了SparkContext,所以计算实际上是由SparkContext完成的。当我们使用spark-shell的时候,Spark框架会自动的创建一个名称叫做Spark的SparkSession,就像我们以前可以自动获取到一个sc来表示SparkContext。
2.2 常用方式
2.2.1 方法调用案例实操
1、创建一个maven工程SparkSQL
2、输入文件夹准备:在新建的SparkSQL项目名称上右键=》新建input文件夹=》在input文件夹上右键=》新建user.json。并输入如下内容:
{"age":20,"name":"qiaofeng"} {"age":19,"name":"xuzhu"} {"age":18,"name":"duanyu"} {"age":22,"name":"qiaofeng"} {"age":11,"name":"xuzhu"} {"age":12,"name":"duanyu"}
4、需求
统计人次和统计每人最大年龄
5、在pom.xml文件中添加spark-sql的依赖
<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.3.0</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.22</version> </dependency> </dependencies>
6、代码实现
(1)添加javaBean的User
import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import lombok.ToString; import java.io.Serializable; /** * @ClassName User * @Description TODO * @Author Zouhuiming * @Date 2023/6/30 20:45 * @Version 1.0 */ @AllArgsConstructor @NoArgsConstructor @Data @ToString public class User implements Serializable { private String name; private Long age; }
(2)统计每个人的最大年龄
import bean.User; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.ReduceFunction; import org.apache.spark.sql.*; import scala.Tuple2; /** * @ClassName Test01_MaxAge * @Description TODO * @Author Zouhuiming * @Date 2023/6/30 20:48 * @Version 1.0 */ public class Test01_MaxAge { public static void main(String[] args) { //1、创建配置对象 SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test01_MaxAge"); //2、创建SparkSession SparkSession spark = SparkSession.builder().config(conf).getOrCreate(); //3、获取DS Dataset<Row> jsonDS = spark.read().json("file:///E:\\maven-workspace\\space520\\SparkSQL\\input\\user.json"); //4、将json中的行转换为User对象 Dataset<User> mapDS = jsonDS.map(new MapFunction<Row, User>() { @Override public User call(Row value) throws Exception { return new User(value.getString(1), value.getLong(0)); } }, Encoders.bean(User.class)); System.out.println("--------由jsonDS转换为UserDS后输出--------"); mapDS.show(); //5、分组 KeyValueGroupedDataset<String, User> groupByKeyDS = mapDS.groupByKey(new MapFunction<User, String>() { @Override public String call(User value) throws Exception { return value.getName(); } }, Encoders.STRING()); //6、统计,取最大值 Dataset<Tuple2<String, User>> resultDS = groupByKeyDS.reduceGroups(new ReduceFunction<User>() { @Override public User call(User v1, User v2) throws Exception { return new User( v1.getName(),Math.max(v1.getAge(), v2.getAge())); } }); System.out.println("--------------输出结果-------------"); resultDS.show(); //x、关闭资源 spark.close(); } }
运行结果:
(3)统计人次代码编写
import bean.User; import org.apache.spark.SparkConf; import org.apache.spark.sql.*; /** * @ClassName Test02_UserVisitsCount * @Description TODO * @Author Zouhuiming * @Date 2023/6/30 21:18 * @Version 1.0 */ public class Test02_UserVisitsCount { /** * 在sparkSql中DS直接支持的转换算子有: * map(底层已经优化为mapPartition)、mapPartition、flatMap、groupByKey(聚合算子全部由groupByKey开始)、 * filter、distinct、coalesce、repartition、sort和orderBy(不是函数式的算子,不过不影响使用)。 */ public static void main(String[] args) { //1、创建配置对象 SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test02_UserVisitsCount"); //2、创建SparkSession SparkSession spark = SparkSession.builder().config(conf).getOrCreate(); //3、获取DS(从文件中按行读取数据文件) Dataset<Row> jsonDS = spark.read().json("file:///E:\\maven-workspace\\space520\\SparkSQL\\input\\user.json"); //4、将jsonDS中每行数据转换为对象 Dataset<User> UserDS = jsonDS.as(Encoders.bean(User.class)); System.out.println("-------------由jsonDS转换为UserDS输出-------------"); UserDS.show(); //5、根据字段分组 RelationalGroupedDataset groupByDS = UserDS.groupBy(new Column("name")); System.out.println("--------由UserDS根据字段Name分组后输出----------"); System.out.println(groupByDS.toString()); //6、统计次数 Dataset<Row> countDS = groupByDS.count(); System.out.println("--------------由分组后的groupDS进行count统计后输出-------------"); countDS.show(); //x、关闭资源 spark.close(); } }
运行结果:
在SparkSQL中DS直接支持的转换算子有:
map(底层已经优化为mapPartition)、mapPartition·、flatMap、groupByKey(聚合算子全部由groupByKey开始)、filter、distinct、coalesce、repartition、sort和orderBy(不是函数式算子,不够不影响使用)。
2.2.2 SQL使用方式
1、需求:查询年龄大于18岁的用户信息
2、代码编写
import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; /** * @ClassName Test03_AgeGt18 * @Description TODO * @Author Zouhuiming * @Date 2023/7/1 9:24 * @Version 1.0 */ public class Test03_AgeGt18_sql { public static void main(String[] args) { //1、创建配置对象 SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test03_AgeGt18_sql"); //2、创建SparkSession SparkSession spark = SparkSession.builder().config(conf).getOrCreate(); //3、获取DS Dataset<Row> jsonDS = spark.read().json("file:///E:\\maven-workspace\\space520\\SparkSQL\\input\\user.json"); System.out.println("------------读取数据之后输出jsonDS------------"); jsonDS.show(); //4、根据jsonDS创建临时视图,视图的生命周期和sparkSession绑定,"orReplace"表示可覆盖 jsonDS.createOrReplaceTempView("user_info"); //5、查询数据中年龄大于18的用户 Dataset<Row> sql = spark.sql("select age,name from user_info where age>18"); //6、输出结果 System.out.println("---------------输出年龄大于18的结果---------------"); sql.show(); //x、关闭资源 spark.close(); } }
运行结果: