大数据Spark SQL快速入门

简介: 大数据Spark SQL快速入门

1 SparkSession 应用入口

Spark 2.0开始,应用程序入口为SparkSession,加载不同数据源的数据,封装到DataFrame/Dataset集合数据结构中,使得编程更加简单,程序运行更加快速高效。

个人总结:从RDD就相当于一个集合列表,然后到DS,DF就有了表的概念,然后有SQL对表进行操作.


SparkSession:这是一个新入口,取代了原本的SQLContextAPI的用户来说,Spark常见的混乱源头来自于使用哪个“context”。现在使SparkSession,它作为

单个入口可以兼容两者,注意原本的SQLContext与HiveContext仍然保留,以支持向下兼容。

文档:http://spark.apache.org/docs/2.4.5/sql-getting-started.html#starting-point-sparksession


1)、SparkSession在SparkSQL模块中,添加MAVEN依赖与HiveContext。对于DataFrame

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.5</version>
</dependency>

2)、SparkSession对象实例通过建造者模式构建,代码如下:

其中

①表示导入SparkSession所在的包,

  • ②表示建造者模式构建对象和设置属性,
    ③表示导入SparkSession类中implicits对象object中隐式转换函数。
  • 3)、范例演示:构建SparkSession实例,加载文本数据,统计条目数。
import org.apache.spark.sql.{Dataset, SparkSession}
/**
 * Spark 2.x开始,提供了SparkSession类,作为Spark Application程序入口,
 * 用于读取数据和调度Job,底层依然为SparkContext
 */
object SparkStartPoint {
  def main(args: Array[String]): Unit = {
    // TODO: 构建SparkSession实例对象,读取数据
    val spark = SparkSession.builder()
      // 设置应用名称和运行模式
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      .master("local[2]")
      // 通过装饰模式获取实例对象,此种方式为线程安全的
      .getOrCreate()
// For implicit conversions like converting RDDs to DataFrames 用于隐式转换,如将RDD转换为DataFrame
    import spark.implicits._
    // TODO: 2. 从文件系统读取数据,包含本地文件系统或HDFS文件系统
    val inputDS: Dataset[String] = spark.read.textFile("datas/wordcount/wordcount.data")
    println(s"Count = ${inputDS.count()}")
    inputDS.show(10)
    // TODO: 3. 应用运行结束,关闭资源
    spark.stop()
  }
}

使用SparkSession加载数据源数据,将其封装到DataFrame或Dataset中,直接使用show函数就可以显示样本数据(默认显示前20条)。

Spark2.0使用全新的SparkSession接口替代Spark1.6中的SQLContext及HiveContext接口来实现其对数据加载、转换、处理等功能。SparkSession实现了SQLContext及HiveContext所有功能。SparkSession支持从不同的数据源加载数据,并把数据转换成DataFrame,并且支持把DataFrame转换成SQLContext自身中的表,然后使用SQL语句来操作数据。SparkSession亦提供了HiveQL以及其他依赖于Hive的功能的支持。


2 词频统计WordCount

前面使用RDD封装数据,实现词频统计WordCount功能,从Spark 1.0开始,一直到Spark 2.0,建立在RDD之上的一种新的数据结构DataFrame/Dataset发展而来,更好的实现数据处理分析。DataFrame 数据结构相当于给RDD加上约束Schema,知道数据内部结构(字段名称、字段类型),提供两种方式分析处理数据:DataFrame API(DSL编程)和SQL(类似HiveQL编程),下面以WordCount程序为例编程实现,体验DataFrame使用。


2.1 基于DSL编程

使用SparkSession加载文本数据,封装到Dataset/DataFrame中,调用API函数处理分析数据(类似RDD中API函数,如flatMap、map、filter等),编程步骤:


第一步、构建SparkSession实例对象,设置应用名称和运行本地模式;

第二步、读取HDFS上文本文件数据;

第三步、使用DSL(Dataset API),类似RDD API处理分析数据;

第四步、控制台打印结果数据和关闭SparkSession;

具体演示代码如下:

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
/**
 * 使用SparkSQL进行词频统计WordCount:SQL、DSL
 */
object SparkDSLWordCount {
  def main(args: Array[String]): Unit = {
    // TODO: 1、构建SparkSession实例对象,通过建造者模式创建
    val spark: SparkSession = SparkSession.builder()
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      .master("local[3]")
      .getOrCreate()
    import spark.implicits._
    // TODO: 2、读取HDFS上文本文件数据
    val inputDS: Dataset[String] = spark.read.textFile("datas/wordcount/wordcount.data")
    // TODO: 3、使用DSL(Dataset API),类似RDD API
    val resultDF: DataFrame = inputDS
      // 过滤不合格的数据
      .filter(line => null != line && line.trim.length > 0)
      // 将每行数据进行分割
      .flatMap(line => line.split("\\s+"))
      // 按照单词分组统计:SELECT word, count(1) FROM tb_words GROUP BY word
      .groupBy("value")
      // 使用count函数,获取值类型Long类型 -> 数据库中就是BigInt类型
      .count()
    resultDF.show(10)
    // TODO: 关闭资源
    spark.stop()
  }
}

运行程序结果如下:

2.2 基于SQL编程

也可以实现类似HiveQL方式进行词频统计,直接对单词分组group by,再进行count即可,步

骤如下:

第一步、构建SparkSession对象,加载文件数据,分割每行数据为单词;

第二步、将DataFrame/Dataset注册为临时视图(Spark 1.x中为临时表);

第三步、编写SQL语句,使用SparkSession执行获取结果;

第四步、控制台打印结果数据和关闭SparkSession;

具体演示代码如下:

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
object SparkSQLWordCount {
  def main(args: Array[String]): Unit = {
    // TODO: 1、构建SparkSession实例对象,通过建造者模式创建
    val spark: SparkSession = SparkSession.builder()
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      .master("local[3]")
      .getOrCreate()
    import spark.implicits._
    // TODO: 2、读取HDFS上文本文件数据
    val inputDS: Dataset[String] = spark.read.textFile("datas/wordcount/wordcount.data")
    // TODO: 3、使用DSL(Dataset API),类似RDD API
    val wordsDS: Dataset[String] = inputDS
      // 过滤不合格的数据
      .filter(line => null != line && line.trim.length > 0)
      // 将每行数据分割单词
      .flatMap(line => line.trim.split("\\s+"))
    wordsDS.printSchema()
    wordsDS.show(20)
    // select value, count(1) as cnt from tb_words
    // TODO: 第一步,将Dataset注册为临时视图
    wordsDS.createOrReplaceTempView("view_tmp_words")
    // TODO: 第二步,编写SQL执行分析
    val resultDF: DataFrame = spark.sql(
      """
        |SELECT value, COUNT(1) AS cnt FROM view_tmp_words GROUP BY value ORDER BY cnt DESC
""".stripMargin)
    /*
    +---------+---+
    | value|cnt|
    +---------+---+
    | spark| 11|
    | hive| 6|
    |mapreduce| 4|
    | hadoop| 3|
    | sql| 2|
    | hdfs| 2|
    +---------+---+
    */
    resultDF.show(10)
    // TODO: 关闭资源
    spark.stop()
  }
}

运行程序结果如下:

无论使用DSL还是SQL编程方式,底层转换为RDD操作都是一样,性能一致,查看WEB UI监控中Job

从上述的案例可以发现将数据封装到Dataset/DataFrame中,进行处理分析,更加方便简洁,

这就是Spark框架中针对结构化数据处理模:Spark SQL模块。

官方文档:http://spark.apache.org/sql/


3 数据处理分析

在SparkSQL模块中,将结构化数据封装到DataFrame或Dataset集合中后,提供两种方式分析

处理数据,正如前面案例【词频统计WordCount】两种方式:


第一种:DSL(domain-specific language)编程,调用DataFrame/Dataset API(函数),类似

RDD中函数;

第二种:SQL 编程,将DataFrame/Dataset注册为临时视图或表,编写SQL语句,类似HiveQL;

两种方式底层转换为RDD操作,包括性能优化完全一致,在实际项目中语句不通的习惯及业务灵活选择。比如机器学习相关特征数据处理,习惯使用DSL编程;比如数据仓库中数据ETL和报表分析,习惯使用SQL编程。无论哪种方式,都是相通的,必须灵活使用掌握。

3.1 基于DSL分析

调用DataFrame/Dataset中API(函数)分析数据,其中函数包含RDD中转换函数和类似SQL

语句函数,部分截图如下:

类似SQL语法函数:调用Dataset中API进行数据分析,Dataset中涵盖很多函数,大致分类如下:

  • 1、选择函数select:选取某些列的值

  • 2、过滤函数filter/where:设置过滤条件,类似SQL中WHERE语句
  • 3、分组函数groupBy/rollup/cube:对某些字段分组,在进行聚合统计

  • 4、聚合函数agg:通常与分组函数连用,使用一些count、max、sum等聚合函数操作

  • 5、排序函数sort/orderBy:按照某写列的值进行排序(升序ASC或者降序DESC)

  • 6、限制函数limit:获取前几条数据,类似RDD中take函数

  • 7、重命名函数withColumnRenamed:将某列的名称重新命名

  • 8、删除函数drop:删除某些列

9、增加列函数withColumn:当某列存在时替换值,不存在时添加此列

上述函数在实际项目中经常使用,尤其数据分析处理的时候,其中要注意,调用函数时,通常

指定某个列名称,传递Column对象,通过隐式转换转换字符串String类型为Column对象。

  • Dataset/DataFrame中转换函数,类似RDD中Transformation函数,使用差不多:

3.2 基于SQL分析

将Dataset/DataFrame注册为临时视图,编写SQL执行分析,分为两个步骤:

  • 第一步、注册为临时视图
  • 第二步、编写SQL,执行分析

其中SQL语句类似Hive中SQL语句,查看Hive官方文档,SQL查询分析语句语法,官方文档文档:

https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Select

相关实践学习
基于MaxCompute的热门话题分析
Apsara Clouder大数据专项技能认证配套课程:基于MaxCompute的热门话题分析
目录
相关文章
|
6月前
|
SQL 存储 分布式计算
【万字长文,建议收藏】《高性能ODPS SQL章法》——用古人智慧驾驭大数据战场
本文旨在帮助非专业数据研发但是有高频ODPS使用需求的同学们(如数分、算法、产品等)能够快速上手ODPS查询优化,实现高性能查数看数,避免日常工作中因SQL任务卡壳、失败等情况造成的工作产出delay甚至集群资源稳定性问题。
1361 36
【万字长文,建议收藏】《高性能ODPS SQL章法》——用古人智慧驾驭大数据战场
|
9月前
|
人工智能 分布式计算 大数据
大数据≠大样本:基于Spark的特征降维实战(提升10倍训练效率)
本文探讨了大数据场景下降维的核心问题与解决方案,重点分析了“维度灾难”对模型性能的影响及特征冗余的陷阱。通过数学证明与实际案例,揭示高维空间中样本稀疏性问题,并提出基于Spark的分布式降维技术选型与优化策略。文章详细展示了PCA在亿级用户画像中的应用,包括数据准备、核心实现与效果评估,同时深入探讨了协方差矩阵计算与特征值分解的并行优化方法。此外,还介绍了动态维度调整、非线性特征处理及降维与其他AI技术的协同效应,为生产环境提供了最佳实践指南。最终总结出降维的本质与工程实践原则,展望未来发展方向。
461 0
|
存储 分布式计算 Hadoop
从“笨重大象”到“敏捷火花”:Hadoop与Spark的大数据技术进化之路
从“笨重大象”到“敏捷火花”:Hadoop与Spark的大数据技术进化之路
607 79
|
8月前
|
SQL JSON 分布式计算
Spark SQL架构及高级用法
Spark SQL基于Catalyst优化器与Tungsten引擎,提供高效的数据处理能力。其架构涵盖SQL解析、逻辑计划优化、物理计划生成及分布式执行,支持复杂数据类型、窗口函数与多样化聚合操作,结合自适应查询与代码生成技术,实现高性能大数据分析。
559 2
|
7月前
|
SQL 分布式计算 大数据
SparkSQL 入门指南:小白也能懂的大数据 SQL 处理神器
在大数据处理的领域,SparkSQL 是一种非常强大的工具,它可以让开发人员以 SQL 的方式处理和查询大规模数据集。SparkSQL 集成了 SQL 查询引擎和 Spark 的分布式计算引擎,使得我们可以在分布式环境下执行 SQL 查询,并能利用 Spark 的强大计算能力进行数据分析。
|
9月前
|
SQL 人工智能 分布式计算
别再只会写SQL了!这五个大数据趋势正在悄悄改变行业格局
别再只会写SQL了!这五个大数据趋势正在悄悄改变行业格局
191 0
|
11月前
|
SQL 关系型数据库 MySQL
大数据新视界--大数据大厂之MySQL数据库课程设计:MySQL 数据库 SQL 语句调优方法详解(2-1)
本文深入介绍 MySQL 数据库 SQL 语句调优方法。涵盖分析查询执行计划,如使用 EXPLAIN 命令及理解关键指标;优化查询语句结构,包括避免子查询、减少函数使用、合理用索引列及避免 “OR”。还介绍了索引类型知识,如 B 树索引、哈希索引等。结合与 MySQL 数据库课程设计相关文章,强调 SQL 语句调优重要性。为提升数据库性能提供实用方法,适合数据库管理员和开发人员。
|
11月前
|
关系型数据库 MySQL 大数据
大数据新视界--大数据大厂之MySQL 数据库课程设计:MySQL 数据库 SQL 语句调优的进阶策略与实际案例(2-2)
本文延续前篇,深入探讨 MySQL 数据库 SQL 语句调优进阶策略。包括优化索引使用,介绍多种索引类型及避免索引失效等;调整数据库参数,如缓冲池、连接数和日志参数;还有分区表、垂直拆分等其他优化方法。通过实际案例分析展示调优效果。回顾与数据库课程设计相关文章,强调全面认识 MySQL 数据库重要性。为读者提供综合调优指导,确保数据库高效运行。
|
SQL 分布式计算 资源调度
Dataphin功能Tips系列(48)-如何根据Hive SQL/Spark SQL的任务优先级指定YARN资源队列
如何根据Hive SQL/Spark SQL的任务优先级指定YARN资源队列
450 4