点一下关注吧!!!非常感谢!!持续更新!!!
目前已经更新到了:
Hadoop(已更完)
HDFS(已更完)
MapReduce(已更完)
Hive(已更完)
Flume(已更完)
Sqoop(已更完)
Zookeeper(已更完)
HBase(已更完)
Redis (已更完)
Kafka(已更完)
Spark(正在更新!)
章节内容
上节我们完成了如下的内容:
SparkSQL 核心操作
Action操作 详细解释+测试案例
Transformation操作 详细解释+测试案例
SQL 语句
总体而言:SparkSQL语HQL兼容;与HQL相比,SparkSQL更简洁。
SparkSQL是Apache Spark框架中的一个模块,专门用于处理结构化和半结构化数据。它提供了对数据进行查询、处理和分析的高级接口。
SparkSQL的核心特点包括:
DataFrame API:SparkSQL提供了DataFrame API,它是一种以行和列为结构的数据集,与关系数据库中的表非常相似。DataFrame支持多种数据源,如Hive、Parquet、JSON、JDBC等,可以轻松地将数据导入并进行操作。
SQL查询:SparkSQL允许用户通过标准的SQL语法查询DataFrame,这使得数据分析师和工程师可以使用他们熟悉的SQL语言来处理大数据。SparkSQL会自动将SQL查询转换为底层的RDD操作,从而在分布式环境中执行。
与Hive集成:SparkSQL可以与Hive无缝集成,使用Hive的元数据和查询引擎。它支持HiveQL(Hive Query Language)语法,并且能够直接访问Hive中的数据。
性能优化:SparkSQL采用了多种优化技术,如Catalyst查询优化器和Tungsten物理执行引擎。这些优化技术能够自动生成高效的执行计划,提高查询的执行速度。
数据样例
// 数据 1 1,2,3 2 2,3 3 1,2 // 需要实现如下的效果 1 1 1 2 1 3 2 2 2 3 3 1 3 2
编写代码
package icu.wzk import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Dataset, SparkSession} import org.apache.spark.sql.Encoders case class Info(id: String, tags: String) object SparkSql01 { def main(args: Array[String]): Unit = { val sparkSession = SparkSession .builder() .appName("SparkSQLDemo") .master("local[*]") .getOrCreate() val sc = sparkSession.sparkContext sc.setLogLevel("WARN") val arr = Array("1 1,2,3", "2 2,3", "3 1,2") val rdd: RDD[Info] = sc .makeRDD(arr) .map{ line => val fields: Array[String] = line.split("\\s+") Info(fields(0), fields(1)) } import sparkSession.implicits._ implicit val infoEncoder = Encoders.product[Info] val ds: Dataset[Info] = sparkSession.createDataset(rdd) ds.createOrReplaceTempView("t1") sparkSession.sql( """ | select id, tag | from t1 | lateral view explode(split(tags, ",")) t2 as tag |""".stripMargin ).show sparkSession.sql( """ | select id, explode(split(tags, ",")) | from t1 |""".stripMargin ).show sparkSession.close() } }
运行测试
控制台输出结果为:
+---+---+ | id|tag| +---+---+ | 1| 1| | 1| 2| | 1| 3| | 2| 2| | 2| 3| | 3| 1| | 3| 2| +---+---+ +---+---+ | id|col| +---+---+ | 1| 1| | 1| 2| | 1| 3| | 2| 2| | 2| 3| | 3| 1| | 3| 2| +---+---+
运行结果
运行结果如下图所示:
输入与输出
SparkSQL 内建支持的数据源包括:
Parquet (默认数据源)
JSON
CSV
Avro
Images
BinaryFiles(Spark 3.0)
简单介绍一下,Parquet 是一种列式存储格式,专门为大数据处理和分析而设计。
列式存储:Parquet 采用列式存储格式,这意味着同一列的数据存储在一起。这样可以极大地提高查询性能,尤其是当查询只涉及少量列时。
高效压缩:由于同一列的数据具有相似性,Parquet 能够更高效地进行压缩,节省存储空间。
支持复杂数据类型:Parquet 支持嵌套的数据结构,包括嵌套列表、映射和结构体,这使得它非常适合处理复杂的、半结构化的数据。
跨平台:Parquet 是一种开放标准,支持多种编程语言和数据处理引擎,包括 Apache Spark、Hadoop、Impala 等。
Parquet
特点:Parquet是一种列式存储格式,特别适合大规模数据的存储和处理。它支持压缩和嵌套数据结构,因此在存储效率和读取性能方面表现优异。
使用方式:spark.read.parquet(“path/to/data”) 读取Parquet文件;df.write.parquet(“path/to/output”) 将DataFrame保存为Parquet格式。
JSON
特点:JSON是一种轻量级的数据交换格式,广泛用于Web应用程序和NoSQL数据库中。SparkSQL能够解析和生成JSON格式的数据,并支持嵌套结构。
使用方式:spark.read.json(“path/to/data”) 读取JSON文件;df.write.json(“path/to/output”) 将DataFrame保存为JSON格式。
CSV
特点:CSV(逗号分隔值)是最常见的平面文本格式之一,简单易用,但不支持嵌套结构。SparkSQL支持读取和写入CSV文件,并提供了处理缺失值、指定分隔符等功能。
使用方式:spark.read.csv(“path/to/data”) 读取CSV文件;df.write.csv(“path/to/output”) 将DataFrame保存为CSV格式。
Avro
特点:Avro是一种行式存储格式,适合大规模数据的序列化。它支持丰富的数据结构和模式演化,通常用于Hadoop生态系统中的数据存储和传输。
使用方式:spark.read.format(“avro”).load(“path/to/data”) 读取Avro文件;df.write.format(“avro”).save(“path/to/output”) 将DataFrame保存为Avro格式。
ORC
特点:ORC(Optimized Row Columnar)是一种高效的列式存储格式,专为大数据处理而设计,支持高压缩率和快速读取性能。它在存储空间和I/O性能方面表现优越。
使用方式:spark.read.orc(“path/to/data”) 读取ORC文件;df.write.orc(“path/to/output”) 将DataFrame保存为ORC格式。
Hive Tables
特点:SparkSQL能够无缝集成Hive,直接访问Hive元数据,并对Hive表进行查询。它支持HiveQL语法,并能够利用Hive的存储格式和结构。
使用方式:通过spark.sql(“SELECT * FROM hive_table”)查询Hive表;也可以使用saveAsTable将DataFrame写入Hive表。
JDBC/ODBC
特点:SparkSQL支持通过JDBC/ODBC接口连接关系型数据库,如MySQL、PostgreSQL、Oracle等。它允许从数据库读取数据并将结果写回数据库。
使用方式:spark.read.format(“jdbc”).option(“url”, “jdbc:mysql://host/db”).option(“dbtable”, “table”).option(“user”, “username”).option(“password”, “password”).load() 读取数据库表;df.write.format(“jdbc”).option(“url”, “jdbc:mysql://host/db”).option(“dbtable”, “table”).option(“user”, “username”).option(“password”, “password”).save() 将DataFrame写入数据库。
Text Files
特点:SparkSQL可以处理简单的文本文件,每一行被读取为一个字符串。适合用于处理纯文本数据。
使用方式:spark.read.text(“path/to/data”) 读取文本文件;df.write.text(“path/to/output”) 将DataFrame保存为文本格式。
Delta Lake (外部插件)
特点:Delta Lake是一种开源存储层,构建在Parquet格式之上,支持ACID事务、可扩展元数据处理和流批一体的实时数据处理。尽管不是内建的数据源,但它在Spark生态系统中得到了广泛支持。
使用方式:spark.read.format(“delta”).load(“path/to/delta-table”) 读取Delta表;df.write.format(“delta”).save(“path/to/delta-table”) 将DataFrame保存为Delta格式。
测试案例
val df1 = spark.read.format("parquet").load("data/users.parquet") // Use Parquet; you can omit format("parquet") if you wish as it's the default val df2 = spark.read.load("data/users.parquet") // Use CSV val df3 = spark.read.format("csv") .option("inferSchema", "true") .option("header", "true") .load("data/people1.csv") // Use JSON val df4 = spark.read.format("json") .load("data/emp.json")
此外还支持 JDBC 的方式:
val jdbcDF = sparkSession .read .format("jdbc") .option("url", "jdbc:mysql://h122.wzk.icu/spark_test?useSSL=false") .option("driver", "com.mysql.jdbc.Driver") .option("user", "hive") .option("password", "hive@wzk.icu") .load() jdbcDF.show()
导入依赖
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.12</artifactId> <version>${spark.version}</version> </dependency>
hive-site
需要在项目的 Resource 目录下,新增一个 hive-site.xml
备注:最好使用 metastore service连接Hive,使用直接metastore的方式时,SparkSQL程序会修改Hive的版本信息
<configuration> <property> <name>hive.metastore.uris</name> <value>thrift://h122.wzk.icu:9083</value> </property> </configuration>
编写代码
object AccessHive { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("Demo1") .master("local[*]") .enableHiveSupport() // 设为true时,Spark使用与Hive相同的约定来编写Parquet数据 .config("spark.sql.parquet.writeLegacyFormat", true) .getOrCreate() val sc = spark.sparkContext sc.setLogLevel("warn") spark.sql("show databases").show spark.sql("select * from ods.ods_trade_product_info").show val df: DataFrame = spark.table("ods.ods_trade_product_info") df.show() df.write.mode(SaveMode.Append).saveAsTable("ods.ods_trade_product_info_back") spark.table("ods.ods_trade_product_info_back").show spark.close() } }