2.2.3 DSL特殊语法
1、需求:查询年龄大于18的用户信息
2、特点:需要导入特殊的依赖:import static org.apache.spark.sql.functions.col;
3、代码实现
import org.apache.spark.SparkConf; import org.apache.spark.sql.Column; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; /** * @ClassName Test04_AgeGt18_DSL * @Description TODO * @Author Zouhuiming * @Date 2023/7/1 9:31 * @Version 1.0 */ public class Test04_AgeGt18_DSL { public static void main(String[] args) { //1、创建配置对象 SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test04_AgeGt18_DSL"); //2、创建SparkSession SparkSession sparkSession = SparkSession.builder().config(conf).getOrCreate(); //3、读取文件内容,每行按照jsonLine解析 Dataset<Row> jsonDS = sparkSession.read().json("file:///E:\\maven-workspace\\space520\\SparkSQL\\input\\user.json"); System.out.println("------------读取数据之后输出jsonDS------------"); jsonDS.show(); //4、查询年龄大于18的用户信息 Dataset<Row> whereDS = jsonDS.select(new Column("name"), new Column("age")).where(new Column("age").gt(18)); System.out.println("-------------------输出whereDS的结果-----------------"); whereDS.show(); Dataset<Row> filterDS = jsonDS.select(new Column("name"), new Column("age").plus(1).as("newAge")).filter(new Column("age").gt(18)); System.out.println("-----------------输出filterDS的结果------------------"); filterDS.show(); //5、关闭sparkSession sparkSession.close(); } }
运行结果:
2.3 SQL语法的用户自定义函数
2.3.1 UDF
1、UDF:一进一出
2、特点:需要导入依赖:import static org.apache.spark.sql.functions.udf;
3、代码实现
import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.api.java.UDF1; import org.apache.spark.sql.expressions.UserDefinedFunction; import org.apache.spark.sql.types.DataTypes; import static org.apache.spark.sql.functions.udf; /** * @ClassName Test05_UDF * @Description TODO * @Author Zouhuiming * @Date 2023/7/1 9:41 * @Version 1.0 */ public class Test05_UDF { public static void main(String[] args) { //1、创建配置对象 SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test05_UDF"); //2、创建SparkSession SparkSession sparkSession = SparkSession.builder().config(conf).getOrCreate(); //3、读取文件,每行按照jsonLine解析 Dataset<Row> jsonDS = sparkSession.read().json("input/user.json"); //4、创建UDF函数addName:为输出参数拼接 is numberOne UserDefinedFunction addName = udf(new UDF1<String, String>() { @Override public String call(String s) throws Exception { return s + " is numberone"; } }, DataTypes.StringType); //5、为创建的函数进行注册 sparkSession.udf().register("addName",addName); //6、根据jsonDS创建视图user_info jsonDS.createOrReplaceTempView("user_info"); //7、在SQL中使用函数 Dataset<Row> sqlDS = sparkSession.sql("select addName(name) as newName,age from user_info"); //8、输出结果 System.out.println("------------------输出结果--------------"); sqlDS.show(); } }
运行结果
2.3.2 UDAF
1、UDAF:输入多行,返回一行,通常和groupBy一起使用,如果直接使用UDAF函数,默认将所有数据合并在一起。
2、特点:需要引入依赖:import static org.apache.spark.sql.functions.udaf
3、Spark3.x推荐使用extends Aggregator自定义UDAF,属于强类型的Dataset方式。
4、Spark2.x使用extends UserDefinedAggregateFunction,属于弱类型的DataFrame
5、代码实现
(1)创建Buffer类
package code; import bean.Buffer; import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.expressions.Aggregator; /** * @ClassName Test06_UDAF * @Description TODO * @Author Zouhuiming * @Date 2023/7/1 9:51 * @Version 1.0 */ public class Test06_Buffer extends Aggregator<Long, Buffer,Double> { public Test06_Buffer() { } //初始化 @Override public Buffer zero() { return new Buffer(0L,0L); } //单分区聚合 @Override public Buffer reduce(Buffer b, Long a) { b.setSum(b.getSum()+a); b.setCount(b.getCount()+1); return b; } //多分区聚合 @Override public Buffer merge(Buffer b1, Buffer b2) { b1.setSum(b1.getSum()+ b2.getSum()); b1.setCount(b1.getCount()+ b2.getCount()); return b1; } //最后逻辑运算 @Override public Double finish(Buffer reduction) { return reduction.getSum().doubleValue()/ reduction.getCount(); } //设置Buffer的编码格式 @Override public Encoder<Buffer> bufferEncoder() { return Encoders.kryo(Buffer.class); } //设置返回值的编码格式 @Override public Encoder<Double> outputEncoder() { return Encoders.DOUBLE(); } }
(2)创建自定义函数Test06_UDAF继承 Aggregator
package code; import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import static org.apache.spark.sql.functions.udaf; /** * @ClassName Test_07_UDAF * @Description TODO * @Author Zouhuiming * @Date 2023/7/1 10:11 * @Version 1.0 */ public class Test06_UDAF { public static void main(String[] args) { //1、创建配置对象 SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test07_UDAF"); //2、创建SparkSession SparkSession sparkSession = SparkSession.builder().config(conf).getOrCreate(); //3、读取文件并且转换为视图user_info sparkSession.read().json("file:///E:\\maven-workspace\\space520\\SparkSQL\\input\\user.json").createOrReplaceTempView("user_info"); //4、查询表中所有数据 Dataset<Row> sqlDS = sparkSession.sql("select * from user_info"); System.out.println("------------查询全表-----------"); sqlDS.show(); //5、注册函数 sparkSession.udf().register("MyAvg",udaf(new Test06_Buffer(), Encoders.LONG())); //6、查询全表平均年龄 Dataset<Row> sqlDS1 = sparkSession.sql("select MyAvg(age) avg_age from user_info"); System.out.println("--------------查询全表平均年龄----------------"); sqlDS1.show(); //7、查询每个人的平均年龄 Dataset<Row> sqlDS2 = sparkSession.sql("select name ,MyAvg(age) as avg_age from user_info group by name"); System.out.println("--------------------查询每个人的平均年龄(自定义函数)-------------------------"); sqlDS2.show(); //8、关闭sparkSession sparkSession.close(); } }
运行结果:
2.3.3 UDTF(没有)
1、UDTF:输入一行,返回多行(Hive)
SparkSQL中没有UDTF,需要使用算子类型的flatMap先完成拆分
3、SparkSQL数据的加载与保存
3.1 读取和保持文件
SparkSQL读取和保存的文件一般分为三种,JSON文件、CSV文件和列示储存文件,同时可以通过添加参数来识别不同的储存和压缩格式。
3.1.1 CSV文件
0、在新建的SparkSQL项目名称上右键=》新建input文件夹=》在input文件夹上右键=》新建user.csv。并输入如下内容:
name,age zhangsan,16 lisi,18, wangwu,20
1、SparkSQL读取和写出CSV文件
package inputandoutput; import bean.User; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.*; /** * @ClassName Test01_CSV * @Description TODO * @Author Zouhuiming * @Date 2023/7/1 10:30 * @Version 1.0 */ public class Test01_CSV { public static void main(String[] args) { //1、创建配置对象 SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test01_CSV"); //2、创建SparkSession SparkSession sparkSession = SparkSession.builder().config(conf).getOrCreate(); //3、获取Reader对象 DataFrameReader reader = sparkSession.read(); //4、读取CSV文件,同时可以通过参数指定是否读取列名和指定分割符 Dataset<Row> csvDS = reader.option("header", "true").option("sep", ",") .csv("file:///E:\\maven-workspace\\space520\\SparkSQL\\input\\user.csv"); //5、输出 System.out.println("----------------查看csvDS-------------"); csvDS.show(); //6、可以将csvDS转换为user对象的集合 // Dataset<User> userDS = csvDS.as(Encoders.bean(User.class)); /** 上述直接转换会报错:AnalysisException: Cannot up cast `age` from string to bigint. 原因:csv读进来都是string */ //7、使用map转换为user之前先将String转换2为Long Dataset<User> mapDS = csvDS.map(new MapFunction<Row, User>() { @Override public User call(Row value) throws Exception { return new User(value.getString(0), Long.valueOf(value.getString(1))); } }, Encoders.bean(User.class)); //8、输出 System.out.println("-------------查看mapDS---------------"); mapDS.show(); //TODO 写出为csv文件 //9、获取写对象 DataFrameWriter<User> writer = mapDS.write(); //10、写出时,可以通过参数方式指定:压缩、分隔符,写入模式、是否带有列名等 writer.option("seq","\t").mode(SaveMode.Overwrite) .option("header","true").csv("file:///E:\\maven-workspace\\space520\\SparkSQL\\output\\user.csv"); //11、关闭SparkSession sparkSession.close(); } }
运行结果
3.1.2 JSON文件
1、SparkSQL读取和写出JSON文件
package inputandoutput; import bean.User; import org.apache.spark.SparkConf; import org.apache.spark.sql.*; /** * @ClassName Test02_JSON * @Description TODO * @Author Zouhuiming * @Date 2023/7/1 11:24 * @Version 1.0 */ public class Test02_JSON { public static void main(String[] args) { //1、创建配置对象 SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test02_JSON"); //2、创建SparkSession SparkSession spark = SparkSession.builder().config(conf).getOrCreate(); //3、获取DS DataFrameReader reader = spark.read(); Dataset<Row> jsonDS = reader.json("file:///E:\\maven-workspace\\space520\\SparkSQL\\input\\user.json"); jsonDS.show(); //将jsonDS转换为userDS Dataset<User> userDS = jsonDS.as(Encoders.bean(User.class)); System.out.println("----------查看userDS的数据------------"); userDS.show(); //输出为JSON文件 DataFrameWriter<User> write = userDS.write(); write.json("file:///E:\\maven-workspace\\space520\\SparkSQL\\output\\user.json"); //x、关闭资源 spark.close(); } }
运行结果:
3.1.3 Parquet文件
1、sparkSQL读取和写出列式存储文件。
package inputandoutput; import org.apache.spark.SparkConf; import org.apache.spark.sql.*; /** * @ClassName Test03_Parquet * @Description TODO * @Author Zouhuiming * @Date 2023/7/1 11:29 * @Version 1.0 */ public class Test03_Parquet { public static void main(String[] args) { //1、创建配置对象 SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test03_Parquet"); //2、创建SparkSession SparkSession spark = SparkSession.builder().config(conf).getOrCreate(); //3、获取DS DataFrameReader reader = spark.read(); //我们想读取Parquet,但是没有,没有关系,我们可以先创建 Dataset<Row> jsonDS = reader.json("file:///E:\\maven-workspace\\space520\\SparkSQL\\input\\user.json"); //ToDO 写出Parquet文件 //获取写对象 DataFrameWriter<Row> writer = jsonDS.write(); //将jsonDS数据写出,以Parquet的格式写出 writer.parquet("file:///E:\\maven-workspace\\space520\\SparkSQL\\output\\user.parquet"); //Todo 读取Parquet文件 Dataset<Row> outputParquetDS = reader.parquet("file:///E:\\maven-workspace\\space520\\SparkSQL\\output\\user.parquet"); //输出 outputParquetDS.show(); //x、关闭资源 spark.close(); } }
运行结果
3.2 与MySQL交互(前提是自己的数据要有相关的表)
使用SparkSQL对mysql进行读写
1、导入依赖
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.27</version> </dependency>
2、从MySQL读写数据
package inputandoutput; import org.apache.spark.SparkConf; import org.apache.spark.sql.*; import java.util.Properties; /** * @ClassName Test04_mysql * @Description TODO * @Author Zouhuiming * @Date 2023/7/1 11:34 * @Version 1.0 */ public class Test04_mysql { public static void main(String[] args) { //1、创建配置对象 SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test04_mysql"); //2、创建SparkSession SparkSession spark = SparkSession.builder().config(conf).getOrCreate(); //3、获取reader对象 DataFrameReader reader = spark.read(); //4、TODO 读取Mysql中的表的数据 Properties properties = new Properties(); properties.put("user","root"); properties.put("password","123456"); Dataset<Row> base_provinceDS = reader.jdbc("jdbc:mysql://hadoop102:3306/gmall", "base_province", properties); //5、查看数据 System.out.println("---------------查看base_provinceDS-------------------"); base_provinceDS.show(); //6、查看数据中的偶数 base_provinceDS.createOrReplaceTempView("base_province"); Dataset<Row> sqlDS = spark.sql("select * from base_province where id%2=0"); //7、查看数据 System.out.println("----------------数据中的偶数---------------"); sqlDS.show(); //Todo 向mysql中写数据 System.out.println("-------------输出sqlDS数据类型----------------"); sqlDS.printSchema(); //8、获取Writer对象 DataFrameWriter<Row> writer = sqlDS.write(); writer.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://hadoop102:3306/gmall?useSSL=false&useUnicode=true" + "&characterEncoding=UTF-8","test_base_province",properties); //x、关闭资源 spark.close(); } }
运行结果:
3.3 与Hive交互
SparkSQL可以采用内嵌Hive(Spark开箱即用的Hive),也可以采用外部Hive。企业开发中常用外部Hive。
3.3.1 Linux中的交互
1、添加mysql连接驱动到spark-yarn的jars目录
2、添加hive-site.xml文件到spark-yarn的conf目录
3、启动spark-sql的客户端即可
3.3.2 IDEA中的交互
1、向idea项目中的pom.xml文件中添加新的依赖
(1)添加依赖
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.12</artifactId> <version>3.3.0</version> </dependency>
(2)拷贝配置文件到resources目录
需要拷贝hive-site.xml,hdfs-site.xml、core-site.xml、yarn-site.xml
(3)记得开启metastore和Hiveserver2的服务
(4)代码实现
package inputandoutput; import org.apache.spark.SparkConf; import org.apache.spark.sql.SparkSession; /** * @ClassName Test05_Hive * @Description TODO * @Author Zouhuiming * @Date 2023/7/1 14:00 * @Version 1.0 */ public class Test05_Hive { public static void main(String[] args) { //1、设置系统用户名称为zhm System.setProperty("HADOOP_USER_NAME","zhm"); //2、创建配置对象 SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test05_Hive"); //3、创建SparkSession SparkSession sparkSession = SparkSession.builder() .enableHiveSupport().config(conf).getOrCreate(); //4、显示所以表 System.out.println("显示所有的表"); sparkSession.sql("show tables;").show(); //5、创建user_info表,字段包括name:string,age:bigint sparkSession.sql("create table user_info(name String,age bigint);"); //6、向user_info表中插入数据,"张三",10 sparkSession.sql("insert into user_info values(\"zhangsan\",10);"); //7、查询user_info表中数据 System.out.println("------------输出user_info表中的内容--------------"); sparkSession.sql("select * from user_info;").show(); //8、关闭SparkSession sparkSession.close(); } }
运行结果:
恭喜大家看完了小编的博客,希望你能够有所收获,我一直相信躬身自问和沉思默想会充实我们的头脑,希望大家看完之后可以多想想喽,编辑不易,求关注、点赞、收藏(Thanks♪(・ω・)ノ)。