Spark SQL的数据分析应用

简介: Spark SQL的数据分析应用

1. 实验室名称:

大数据实验教学系统

2. 实验项目名称:

Spark SQL的数据分析应用

3. 实验学时:

4. 实验原理:

DataFrame API 的设计目的是在数据集中操作或转换单个行,如过滤或分组。如果我们想要转换一个数据集中的每一行的列的值,例如将字符串从大写字母转换成驼峰命名形式,那么我们将使用一个函数来实现这一点。函数基本上就是应用于列的方法。Spark SQL 提供了一组通常需要的函数,同时也提供了创建新函数的简单方法。

 尽管 Spark SQL 为大多数常见用例提供了大量的内置函数,但总会有一些情况下,这些功能都不能提供您的用例所需要的功能。Spark SQL 提供了一个相当简单的工具来编写用户定义的函数(UDF),并在 Spark 数据处理逻辑或应用程序中使用它们,就像使用内置函数一样。UDFs 实际上是您可以扩展 Spark 的功能以满足您的特定需求的一种方式。我最喜欢Spark 的另一件事是 UDFs 可以用 Python、Java 或 Scala 来写,它们可以利用和集成任何必要的库。因为您能够使用您最熟悉的编程语言来编写 UDFs,所以开发和测试 UDFs 是非常简单和快速的。

5. 实验目的:

掌握Spark SQL常用内置函数的使用。

 掌握Spark SQL自定义函数(UDF)的使用。


6. 实验内容:

1、学习Spark SQL常用内置函数的使用。

 2、学习Spark SQL自定义函数(UDF)的使用。


7. 实验器材(设备、虚拟机名称):

硬件:x86_64 ubuntu 16.04服务器

 软件:JDK 1.8,Spark-2.3.2,Hadoop-2.7.3,zeppelin-0.8.1


8. 实验步骤:

8.1 启动Spark与zeppelin

1、在终端窗口下,输入以下命令,分别启动Spark集群和Zeppelin服务器:

1.  $ cd /opt/spark
2.  $ ./sbin/start-all.sh
3.  $ zeppelin-daemon.sh start

2、启动浏览器,打开zeppelin notebook首页,点击【Create new note】链接,创建一个新的笔记本,如下图所示:

9d3e8800bdae4b009a192d3f775e2471.png


8.2 Spark SQL内置函数

Spark SQL提供了很多内置函数,包括处理日期时间函数、处理字符串函数和数学处理函数。

 1、 处理日期时间函数。日期和时间转换函数有助于将字符串转换为日期、时间戳或 Unix 时间戳,反之亦然。

在内部,它使用 Java 日期格式模式语法。这些函数使用的默认的日期格式是 yyyy-mm-dd HH:mm:ss。

 下面的示例显示了将字符串类型的日期和时间戳转换为 Spark date 和 timestamp 类型。

 在zeppelin中执行如下代码:

1.  //日期数据,最后两列不遵循默认日期格式
2.  val testDate = Seq((1, "2018-01-01", "2018-01-01 15:04:58:865", "01-01-2018", "12-05-2017 45:50"))
3.       
4.  //添加列名
5.  val testDateTSDF = testDate.toDF("id","date", "timestamp","date_str", "ts_str")
6.       
7.  // 将这些字符串转换为 date、timestamp 和 unix timestamp,并指定一个自定义的 date 和 timestamp格式
8.  val testDateResultDF= testDateTSDF.select(
9.      to_date('date).as("date1"), 
10.     to_timestamp('timestamp).as("ts1"), 
11.     to_date('date_str,"MM-dd-yyyy").as("date2"), 
12.     to_timestamp('ts_str,"MM-dd-yyyy mm:ss").as("ts2"), 
13.     unix_timestamp('timestamp).as("unix_ts"))
14.      
15. //输出DataFrame中的数据结构信息,即为schema
16. testDateResultDF.printSchema
17.      
18. //输出DataFrame
19. testDateResultDF.show

执行以上代码,输出结果如下:

1.   |-- date1: date (nullable = true)
2.   |-- ts1: timestamp (nullable = true)
3.   |-- date2: date (nullable = true)
4.   |-- ts2: timestamp (nullable = true)
5.   |-- unix_ts: long (nullable = true)
6.       
7.  +----------+----+----------+-------------------+----------+
8.  |     date1| ts1|     date2|                ts2|   unix_ts|
9.  +----------+----+----------+-------------------+----------+
10. |2018-01-01|null|2018-01-01|2017-12-05 00:45:50|1514790298|
11. +----------+----+----------+-------------------+----------+

将日期或时间戳转换为时间字符串是很容易的,方法是使用 date_format 函数和定制日 期格式,或者使用 from_unixtime 函数将 Unix 时间戳(以秒为单位)转换成字符串。

 在zeppelin中执行如下代码:

1.  // 将日期、时间戳和 Unix 时间戳转换成字符串 
2.  testDateResultDF.select(date_format('date1,"dd-MM-YYYY").as("date_str"), 
3.                          date_format('ts1,"dd-MM-YYYY HH:mm:ss").as("ts_str"),
4.                          from_unixtime('unix_ts,"dd-MM-YYYY HH:mm:ss").as("unix_ts_str")).show

执行以上代码,输出结果如下:

1.  +----------+------+-------------------+
2.  |  date_str|ts_str|        unix_ts_str|
3.  +----------+------+-------------------+
4.  |01-01-2018|  null|01-01-2018 15:04:58|
5.  +----------+------+-------------------+

在处理时间序列数据(time-series data)时,能够提取日期或时间戳值的特定字段(如 年、月、小时、分钟和秒)的能力是非常方便的。例如,当需要按季度、月或周对所有股票 交易进行分组时,就可以从交易日期提取该信息,并按这些值分组。下面的代码展示了如何从日期或时间戳中提取字段。

 在zeppelin中执行如下代码:

1.  // 从一个日期值中提取指定的字段 
2.  val valentimeDateDF=Seq(("2018-02-14 05:35:55")).toDF("date") 
3.       
4.  valentimeDateDF.select(
5.      year('date).as("year"), 
6.      quarter('date).as("quarter"), 
7.      month('date).as("month"), 
8.      weekofyear('date).as("woy"), 
9.      dayofmonth('date).as("dom"), 
10.     dayofyear('date).as("doy"), 
11.     hour('date).as("hour"), 
12.     minute('date).as("minute"), 
13.     second('date).as("second")
14. ).show

结果:

1.  +----+-------+-----+---+---+---+----+------+------+
2.  |year|quarter|month|woy|dom|doy|hour|minute|second|
3.  +----+-------+-----+---+---+---+----+------+------+
4.  |2018|      1|    2|  7| 14| 45|   5|    35|    55|
5.  +----+-------+-----+---+---+---+----+------+------+

2、处理字符串函数。SparkSQL 内置的字符串函数 提供了操作这类列的通用和强大的方法。

 有很多方法可以转换字符串。最常见的是去空格、填充、大写、小写和连接。下面的代 码展示了使用各种内置字符串函数转换字符串的各种方法。

 在zeppelin中执行如下代码:

1.  //原始数据
2.  val sparkDF=Seq((" Spark ")).toDF("name")
3.  //原始数据展示
4.  sparkDF.show
5.       
6.  //trimming 
7.  sparkDF.select(trim('name).as("trim"),ltrim('name).as("ltrim"),rtrim('name).as("rtrim")).show
8.       
9.  // 用给定的 pad 字符串将字符串填充到指定长度 // 首先去掉"Spark"前后的空格,然后填充到 8 个字符长 
10. sparkDF.select(trim('name).as("trim"))
11.        .select(lpad('trim, 8, "-").as("lpad"),rpad('trim, 8, "=").as("rpad"))
12.        .show
13.      
14. // 使用 concatenation,uppercase,lowercase 和 reverse 转换一个字符串 
15. val sparkAwesomeDF= Seq(("Spark","is","awesome")).toDF("subject","verb","adj") 
16. sparkAwesomeDF.select(concat_ws(" ",'subject,'verb, 'adj).as("sentence"))
17.               .select(lower('sentence).as("lower"), 
18.                       upper('sentence).as("upper"), 
19.                       initcap('sentence).as("initcap"), 
20.                       reverse('sentence).as("reverse")
21.                       )
22.               .show
23.      
24. // 从一个字符转换到另一个字符 
25. sparkAwesomeDF.select('subject,translate('subject,"ar", "oc").as("translate")).show

【shift+enter】对程序进行输出。输出结果如下所示:

1.  +-------+
2.  |   name|
3.  +-------+
4.  | Spark |
5.  +-------+
6.       
7.  +-----+------+------+
8.  | trim| ltrim| rtrim|
9.  +-----+------+------+
10. |Spark|Spark | Spark|
11. +-----+------+------+
12.      
13. +--------+--------+
14. |    lpad|    rpad|
15. +--------+--------+
16. |---Spark|Spark===|
17. +--------+--------+
18.      
19. +----------------+----------------+----------------+----------------+
20. |           lower|           upper|         initcap|         reverse|
21. +----------------+----------------+----------------+----------------+
22. |spark is awesome|SPARK IS AWESOME|Spark Is Awesome|emosewa si krapS|
23. +----------------+----------------+----------------+----------------+
24.      
25. +-------+---------+
26. |subject|translate|
27. +-------+---------+
28. |  Spark|    Spock|
29. +-------+---------+

正则表达式是一种强大而灵活的方式,可以替换字符串的某些部分或从字符串中提取子 字符串。regexp_extract 和 regexp_replace 函数是专门为这些目的而设计的。Spark 利用 Java 正则表达式库来实现这两个字符串函数的底层实现。 regexp_extract 函数的输入参数是字符串列、匹配的模式和组索引。在字符串中可能会 有多个匹配模式;因此,需要组索引(从 0 开始)来确定是哪一个。如果没有指定模式的匹 配,则该函数返回空字符串。


1.  // 使用 regexp_extract字符串函数来提取"fox",使用一个模式 
2.  val rhymeDF = Seq(("A fox saw a crow sitting on a tree singing \"Caw! Caw! Caw!\"")).toDF("rhyme")
3.       
4.  // 使用一个模式 
5.  rhymeDF.select(regexp_extract('rhyme,"[a-z]*o[xw]",0).as("substring")).show

【shift+enter】对程序进行输出。输出结果如下所示:

1.  +---------+
2.  |substring|
3.  +---------+
4.  |      fox|
5.  +---------+

regexp_replace 字符串函数的输入参数是字符串列、匹配的模式、以及替换的值。

1.  // 用 regexp_replace字符串函数将“fox”和“Caw”替换为“animal” 
2.  val rhymeDF1 = Seq(("A fox saw a crow sitting on a tree singing \"Caw! Caw! Caw!\"")).toDF("rhyme1")
3.       
4.  // 下面两行产生相同的输出
5.  rhymeDF1.select(regexp_replace('rhyme1,"fox|crow", "animal").as("new_rhyme")).show(false) 
6.  rhymeDF1.select(regexp_replace('rhyme1,"[a-z]*o[xw]", "animal").as("new_rhyme")).show(false)

【shift+enter】对程序进行输出。输出结果如下所示:

1.  +----------------------------------------------------------------+
2.  |new_rhyme                                                       |
3.  +----------------------------------------------------------------+
4.  |A animal saw a animal sitting on a tree singing "Caw! Caw! Caw!"|
5.  +----------------------------------------------------------------+
6.       
7.  +----------------------------------------------------------------+
8.  |new_rhyme                                                       |
9.  +----------------------------------------------------------------+
10. |A animal saw a animal sitting on a tree singing "Caw! Caw! Caw!"|
11. +----------------------------------------------------------------+

3、处理 Math 函数。最常见的列类型是数值类型。一个非常有用和常用的名为 round 的函数,它根据给定的规模执行一个数值的半向上的四舍五入。

1.  //初始数据集
2.  val numberDF = Seq((3.25,4.45)).toDF("pie", "gpa")
3.       
4.  numberDF.show()
5.       
6.  //处理 Math 函数 
7.  numberDF.select(round('pie).as("pie0"), round('pie,1).as("pie1"), round('pie,2).as("pie2"), round('gpa).as("gpa")).show
8.  // 因为它是一个半向上的四舍五入,gpa 的值四舍五入到 4.0

【shift+enter】对程序进行输出。输出结果如下所示:

1.  +----+----+
2.  | pie| gpa|
3.  +----+----+
4.  |3.25|4.45|
5.  +----+----+
6.       
7.  +----+----+----+---+
8.  |pie0|pie1|pie2|gpa|
9.  +----+----+----+---+
10. | 3.0| 3.3|3.25|4.0|
11. +----+----+----+---+

8.3 使用自定义函数(UDF)

使用 UDFs 涉及有三个步骤。第一步是编写一个函数并进行测试。第二步是通过将函数 名及其签名传递给Spark的udf函数来注册该函数。 最后一步是在DataFrame代码或发出SQL 查询时使用 UDF。在 SQL 查询中使用 UDF 时,注册过程略有不同。

1.  // 在 Scala 中一个简单的 UDF,将数字级别转换为字母等级 
2.  // 创建学生成绩 DataFrame 
3.  import sqlContext.implicits._
4.  import org.apache.spark.sql.functions.udf
5.  
6.  case class Student(name:String,score:Int)
7.       
8.  val studentDF= Seq(Student("Joe",85),Student("Jane",90),Student("Mary",55)).toDF()
9.  // 注册为视图 
10. studentDF.createOrReplaceTempView("students")
11.      
12. // 创建一个函数将成绩转换到字母等级 createa functiontoconvert gradetoletter grade 
13. def letterGrade(score:Int): String= { 
14.     score match{ 
15.         case score if score>100 =>"Cheating" 
16.         case score if score>=90 =>"A" 
17.         case score if score>=80 =>"B" 
18.         case score if score>=70 =>"C" 
19.         case _ =>"F" } 
20. 
21. }
22.      
23. // 注册为一个 UDF
24. val letterGradeUDF =udf(letterGrade(_:Int):String)
25.      
26. // 使用该 UDF 将成绩转换为字母等级 
27. studentDF.select($"name",$"score",letterGradeUDF($"score").as("grade")).show
28.      
29. // 注册为 UDF,在 SQL 中使用
30. spark.udf.register("letterGrade",letterGrade(_: Int):String)
31. spark.sql("select name,score,letterGrade(score) as grade from students").show

【shift+enter】对程序进行输出。输出结果如下所示:

1.   +----+-----+-----+
2.  |name|score|grade|
3.  +----+-----+-----+
4.  | Joe|   85|    B|
5.  |Jane|   90|    A|
6.  |Mary|   55|    F|
7.  +----+-----+-----+
8.       
9.  +----+-----+-----+
10. |name|score|grade|
11. +----+-----+-----+
12. | Joe|   85|    B|
13. |Jane|   90|    A|
14. |Mary|   55|    F|
15. +----+-----+-----+

9. 实验结果及分析:

实验结果运行准确,无误


10. 实验结论:

经过本节实验的学习,通过学习Spark SQL的数据分析应用,进一步巩固了我们的Spark基础。


11. 总结及心得体会:

经过本节实验的学习,通过学习Spark SQL常用内置函数的使用,Spark SQL自定义函数(UDF)的使用。


d965a7bbcd37484ba0daffe36f6e00ea.png

相关文章
|
1月前
|
分布式计算 数据处理 Apache
Spark和Flink的区别是什么?如何选择?都应用在哪些行业?
【10月更文挑战第10天】Spark和Flink的区别是什么?如何选择?都应用在哪些行业?
194 1
|
10天前
|
数据采集 监控 数据可视化
BI工具在数据分析和业务洞察中的应用
BI工具在数据分析和业务洞察中的应用
43 11
|
26天前
|
消息中间件 数据挖掘 Kafka
Apache Kafka流处理实战:构建实时数据分析应用
【10月更文挑战第24天】在当今这个数据爆炸的时代,能够快速准确地处理实时数据变得尤为重要。无论是金融交易监控、网络行为分析还是物联网设备的数据收集,实时数据处理技术都是不可或缺的一部分。Apache Kafka作为一款高性能的消息队列系统,不仅支持传统的消息传递模式,还提供了强大的流处理能力,能够帮助开发者构建高效、可扩展的实时数据分析应用。
69 5
|
22天前
|
SQL 数据库
如何应用SQL约束条件?
【10月更文挑战第28天】如何应用SQL约束条件?
40 11
|
22天前
|
数据采集 数据可视化 数据挖掘
数据驱动决策:BI工具在数据分析和业务洞察中的应用
【10月更文挑战第28天】在信息爆炸的时代,数据成为企业决策的重要依据。本文综述了商业智能(BI)工具在数据分析和业务洞察中的应用,介绍了数据整合、清洗、可视化及报告生成等功能,并结合实际案例探讨了其价值。BI工具如Tableau、Power BI、QlikView等,通过高效的数据处理和分析,助力企业提升竞争力。
38 5
|
29天前
|
机器学习/深度学习 并行计算 数据挖掘
R语言是一种强大的统计分析工具,广泛应用于数据分析和机器学习领域
【10月更文挑战第21天】R语言是一种强大的统计分析工具,广泛应用于数据分析和机器学习领域。本文将介绍R语言中的一些高级编程技巧,包括函数式编程、向量化运算、字符串处理、循环和条件语句、异常处理和性能优化等方面,以帮助读者更好地掌握R语言的编程技巧,提高数据分析的效率。
43 2
|
1月前
|
SQL Oracle 关系型数据库
SQL语言的主要标准及其应用技巧
SQL(Structured Query Language)是数据库领域的标准语言,广泛应用于各种数据库管理系统(DBMS)中,如MySQL、Oracle、SQL Server等
|
15天前
|
SQL JSON 分布式计算
【赵渝强老师】Spark SQL的数据模型:DataFrame
本文介绍了在Spark SQL中创建DataFrame的三种方法。首先,通过定义case class来创建表结构,然后将CSV文件读入RDD并关联Schema生成DataFrame。其次,使用StructType定义表结构,同样将CSV文件读入RDD并转换为Row对象后创建DataFrame。最后,直接加载带有格式的数据文件(如JSON),通过读取文件内容直接创建DataFrame。每种方法都包含详细的代码示例和解释。
|
15天前
|
SQL 监控 安全
员工上网行为监控软件:SQL 在数据查询监控中的应用解析
在数字化办公环境中,员工上网行为监控软件对企业网络安全和管理至关重要。通过 SQL 查询和分析数据库中的数据,企业可以精准了解员工的上网行为,包括基础查询、复杂条件查询、数据统计与分析等,从而提高网络管理和安全防护的效率。
26 0
|
17天前
|
SQL 数据挖掘 Python
数据分析编程:SQL,Python or SPL?
数据分析编程用什么,SQL、python or SPL?话不多说,直接上代码,对比明显,明眼人一看就明了:本案例涵盖五个数据分析任务:1) 计算用户会话次数;2) 球员连续得分分析;3) 连续三天活跃用户数统计;4) 新用户次日留存率计算;5) 股价涨跌幅分析。每个任务基于相应数据表进行处理和计算。
下一篇
无影云桌面