1. 实验室名称:
大数据实验教学系统
2. 实验项目名称:
Spark SQL的数据分析应用
3. 实验学时:
4. 实验原理:
DataFrame API 的设计目的是在数据集中操作或转换单个行,如过滤或分组。如果我们想要转换一个数据集中的每一行的列的值,例如将字符串从大写字母转换成驼峰命名形式,那么我们将使用一个函数来实现这一点。函数基本上就是应用于列的方法。Spark SQL 提供了一组通常需要的函数,同时也提供了创建新函数的简单方法。
尽管 Spark SQL 为大多数常见用例提供了大量的内置函数,但总会有一些情况下,这些功能都不能提供您的用例所需要的功能。Spark SQL 提供了一个相当简单的工具来编写用户定义的函数(UDF),并在 Spark 数据处理逻辑或应用程序中使用它们,就像使用内置函数一样。UDFs 实际上是您可以扩展 Spark 的功能以满足您的特定需求的一种方式。我最喜欢Spark 的另一件事是 UDFs 可以用 Python、Java 或 Scala 来写,它们可以利用和集成任何必要的库。因为您能够使用您最熟悉的编程语言来编写 UDFs,所以开发和测试 UDFs 是非常简单和快速的。
5. 实验目的:
掌握Spark SQL常用内置函数的使用。
掌握Spark SQL自定义函数(UDF)的使用。
6. 实验内容:
1、学习Spark SQL常用内置函数的使用。
2、学习Spark SQL自定义函数(UDF)的使用。
7. 实验器材(设备、虚拟机名称):
硬件:x86_64 ubuntu 16.04服务器
软件:JDK 1.8,Spark-2.3.2,Hadoop-2.7.3,zeppelin-0.8.1
8. 实验步骤:
8.1 启动Spark与zeppelin
1、在终端窗口下,输入以下命令,分别启动Spark集群和Zeppelin服务器:
1. $ cd /opt/spark 2. $ ./sbin/start-all.sh 3. $ zeppelin-daemon.sh start
2、启动浏览器,打开zeppelin notebook首页,点击【Create new note】链接,创建一个新的笔记本,如下图所示:
8.2 Spark SQL内置函数
Spark SQL提供了很多内置函数,包括处理日期时间函数、处理字符串函数和数学处理函数。
1、 处理日期时间函数。日期和时间转换函数有助于将字符串转换为日期、时间戳或 Unix 时间戳,反之亦然。
在内部,它使用 Java 日期格式模式语法。这些函数使用的默认的日期格式是 yyyy-mm-dd HH:mm:ss。
下面的示例显示了将字符串类型的日期和时间戳转换为 Spark date 和 timestamp 类型。
在zeppelin中执行如下代码:
1. //日期数据,最后两列不遵循默认日期格式 2. val testDate = Seq((1, "2018-01-01", "2018-01-01 15:04:58:865", "01-01-2018", "12-05-2017 45:50")) 3. 4. //添加列名 5. val testDateTSDF = testDate.toDF("id","date", "timestamp","date_str", "ts_str") 6. 7. // 将这些字符串转换为 date、timestamp 和 unix timestamp,并指定一个自定义的 date 和 timestamp格式 8. val testDateResultDF= testDateTSDF.select( 9. to_date('date).as("date1"), 10. to_timestamp('timestamp).as("ts1"), 11. to_date('date_str,"MM-dd-yyyy").as("date2"), 12. to_timestamp('ts_str,"MM-dd-yyyy mm:ss").as("ts2"), 13. unix_timestamp('timestamp).as("unix_ts")) 14. 15. //输出DataFrame中的数据结构信息,即为schema 16. testDateResultDF.printSchema 17. 18. //输出DataFrame 19. testDateResultDF.show
执行以上代码,输出结果如下:
1. |-- date1: date (nullable = true) 2. |-- ts1: timestamp (nullable = true) 3. |-- date2: date (nullable = true) 4. |-- ts2: timestamp (nullable = true) 5. |-- unix_ts: long (nullable = true) 6. 7. +----------+----+----------+-------------------+----------+ 8. | date1| ts1| date2| ts2| unix_ts| 9. +----------+----+----------+-------------------+----------+ 10. |2018-01-01|null|2018-01-01|2017-12-05 00:45:50|1514790298| 11. +----------+----+----------+-------------------+----------+
将日期或时间戳转换为时间字符串是很容易的,方法是使用 date_format 函数和定制日 期格式,或者使用 from_unixtime 函数将 Unix 时间戳(以秒为单位)转换成字符串。
在zeppelin中执行如下代码:
1. // 将日期、时间戳和 Unix 时间戳转换成字符串 2. testDateResultDF.select(date_format('date1,"dd-MM-YYYY").as("date_str"), 3. date_format('ts1,"dd-MM-YYYY HH:mm:ss").as("ts_str"), 4. from_unixtime('unix_ts,"dd-MM-YYYY HH:mm:ss").as("unix_ts_str")).show
执行以上代码,输出结果如下:
1. +----------+------+-------------------+ 2. | date_str|ts_str| unix_ts_str| 3. +----------+------+-------------------+ 4. |01-01-2018| null|01-01-2018 15:04:58| 5. +----------+------+-------------------+
在处理时间序列数据(time-series data)时,能够提取日期或时间戳值的特定字段(如 年、月、小时、分钟和秒)的能力是非常方便的。例如,当需要按季度、月或周对所有股票 交易进行分组时,就可以从交易日期提取该信息,并按这些值分组。下面的代码展示了如何从日期或时间戳中提取字段。
在zeppelin中执行如下代码:
1. // 从一个日期值中提取指定的字段 2. val valentimeDateDF=Seq(("2018-02-14 05:35:55")).toDF("date") 3. 4. valentimeDateDF.select( 5. year('date).as("year"), 6. quarter('date).as("quarter"), 7. month('date).as("month"), 8. weekofyear('date).as("woy"), 9. dayofmonth('date).as("dom"), 10. dayofyear('date).as("doy"), 11. hour('date).as("hour"), 12. minute('date).as("minute"), 13. second('date).as("second") 14. ).show
结果:
1. +----+-------+-----+---+---+---+----+------+------+ 2. |year|quarter|month|woy|dom|doy|hour|minute|second| 3. +----+-------+-----+---+---+---+----+------+------+ 4. |2018| 1| 2| 7| 14| 45| 5| 35| 55| 5. +----+-------+-----+---+---+---+----+------+------+
2、处理字符串函数。SparkSQL 内置的字符串函数 提供了操作这类列的通用和强大的方法。
有很多方法可以转换字符串。最常见的是去空格、填充、大写、小写和连接。下面的代 码展示了使用各种内置字符串函数转换字符串的各种方法。
在zeppelin中执行如下代码:
1. //原始数据 2. val sparkDF=Seq((" Spark ")).toDF("name") 3. //原始数据展示 4. sparkDF.show 5. 6. //trimming 7. sparkDF.select(trim('name).as("trim"),ltrim('name).as("ltrim"),rtrim('name).as("rtrim")).show 8. 9. // 用给定的 pad 字符串将字符串填充到指定长度 // 首先去掉"Spark"前后的空格,然后填充到 8 个字符长 10. sparkDF.select(trim('name).as("trim")) 11. .select(lpad('trim, 8, "-").as("lpad"),rpad('trim, 8, "=").as("rpad")) 12. .show 13. 14. // 使用 concatenation,uppercase,lowercase 和 reverse 转换一个字符串 15. val sparkAwesomeDF= Seq(("Spark","is","awesome")).toDF("subject","verb","adj") 16. sparkAwesomeDF.select(concat_ws(" ",'subject,'verb, 'adj).as("sentence")) 17. .select(lower('sentence).as("lower"), 18. upper('sentence).as("upper"), 19. initcap('sentence).as("initcap"), 20. reverse('sentence).as("reverse") 21. ) 22. .show 23. 24. // 从一个字符转换到另一个字符 25. sparkAwesomeDF.select('subject,translate('subject,"ar", "oc").as("translate")).show
【shift+enter】对程序进行输出。输出结果如下所示:
1. +-------+ 2. | name| 3. +-------+ 4. | Spark | 5. +-------+ 6. 7. +-----+------+------+ 8. | trim| ltrim| rtrim| 9. +-----+------+------+ 10. |Spark|Spark | Spark| 11. +-----+------+------+ 12. 13. +--------+--------+ 14. | lpad| rpad| 15. +--------+--------+ 16. |---Spark|Spark===| 17. +--------+--------+ 18. 19. +----------------+----------------+----------------+----------------+ 20. | lower| upper| initcap| reverse| 21. +----------------+----------------+----------------+----------------+ 22. |spark is awesome|SPARK IS AWESOME|Spark Is Awesome|emosewa si krapS| 23. +----------------+----------------+----------------+----------------+ 24. 25. +-------+---------+ 26. |subject|translate| 27. +-------+---------+ 28. | Spark| Spock| 29. +-------+---------+
正则表达式是一种强大而灵活的方式,可以替换字符串的某些部分或从字符串中提取子 字符串。regexp_extract 和 regexp_replace 函数是专门为这些目的而设计的。Spark 利用 Java 正则表达式库来实现这两个字符串函数的底层实现。 regexp_extract 函数的输入参数是字符串列、匹配的模式和组索引。在字符串中可能会 有多个匹配模式;因此,需要组索引(从 0 开始)来确定是哪一个。如果没有指定模式的匹 配,则该函数返回空字符串。
1. // 使用 regexp_extract字符串函数来提取"fox",使用一个模式 2. val rhymeDF = Seq(("A fox saw a crow sitting on a tree singing \"Caw! Caw! Caw!\"")).toDF("rhyme") 3. 4. // 使用一个模式 5. rhymeDF.select(regexp_extract('rhyme,"[a-z]*o[xw]",0).as("substring")).show
【shift+enter】对程序进行输出。输出结果如下所示:
1. +---------+ 2. |substring| 3. +---------+ 4. | fox| 5. +---------+
regexp_replace 字符串函数的输入参数是字符串列、匹配的模式、以及替换的值。
1. // 用 regexp_replace字符串函数将“fox”和“Caw”替换为“animal” 2. val rhymeDF1 = Seq(("A fox saw a crow sitting on a tree singing \"Caw! Caw! Caw!\"")).toDF("rhyme1") 3. 4. // 下面两行产生相同的输出 5. rhymeDF1.select(regexp_replace('rhyme1,"fox|crow", "animal").as("new_rhyme")).show(false) 6. rhymeDF1.select(regexp_replace('rhyme1,"[a-z]*o[xw]", "animal").as("new_rhyme")).show(false)
【shift+enter】对程序进行输出。输出结果如下所示:
1. +----------------------------------------------------------------+ 2. |new_rhyme | 3. +----------------------------------------------------------------+ 4. |A animal saw a animal sitting on a tree singing "Caw! Caw! Caw!"| 5. +----------------------------------------------------------------+ 6. 7. +----------------------------------------------------------------+ 8. |new_rhyme | 9. +----------------------------------------------------------------+ 10. |A animal saw a animal sitting on a tree singing "Caw! Caw! Caw!"| 11. +----------------------------------------------------------------+
3、处理 Math 函数。最常见的列类型是数值类型。一个非常有用和常用的名为 round 的函数,它根据给定的规模执行一个数值的半向上的四舍五入。
1. //初始数据集 2. val numberDF = Seq((3.25,4.45)).toDF("pie", "gpa") 3. 4. numberDF.show() 5. 6. //处理 Math 函数 7. numberDF.select(round('pie).as("pie0"), round('pie,1).as("pie1"), round('pie,2).as("pie2"), round('gpa).as("gpa")).show 8. // 因为它是一个半向上的四舍五入,gpa 的值四舍五入到 4.0
【shift+enter】对程序进行输出。输出结果如下所示:
1. +----+----+ 2. | pie| gpa| 3. +----+----+ 4. |3.25|4.45| 5. +----+----+ 6. 7. +----+----+----+---+ 8. |pie0|pie1|pie2|gpa| 9. +----+----+----+---+ 10. | 3.0| 3.3|3.25|4.0| 11. +----+----+----+---+
8.3 使用自定义函数(UDF)
使用 UDFs 涉及有三个步骤。第一步是编写一个函数并进行测试。第二步是通过将函数 名及其签名传递给Spark的udf函数来注册该函数。 最后一步是在DataFrame代码或发出SQL 查询时使用 UDF。在 SQL 查询中使用 UDF 时,注册过程略有不同。
1. // 在 Scala 中一个简单的 UDF,将数字级别转换为字母等级 2. // 创建学生成绩 DataFrame 3. import sqlContext.implicits._ 4. import org.apache.spark.sql.functions.udf 5. 6. case class Student(name:String,score:Int) 7. 8. val studentDF= Seq(Student("Joe",85),Student("Jane",90),Student("Mary",55)).toDF() 9. // 注册为视图 10. studentDF.createOrReplaceTempView("students") 11. 12. // 创建一个函数将成绩转换到字母等级 createa functiontoconvert gradetoletter grade 13. def letterGrade(score:Int): String= { 14. score match{ 15. case score if score>100 =>"Cheating" 16. case score if score>=90 =>"A" 17. case score if score>=80 =>"B" 18. case score if score>=70 =>"C" 19. case _ =>"F" } 20. 21. } 22. 23. // 注册为一个 UDF 24. val letterGradeUDF =udf(letterGrade(_:Int):String) 25. 26. // 使用该 UDF 将成绩转换为字母等级 27. studentDF.select($"name",$"score",letterGradeUDF($"score").as("grade")).show 28. 29. // 注册为 UDF,在 SQL 中使用 30. spark.udf.register("letterGrade",letterGrade(_: Int):String) 31. spark.sql("select name,score,letterGrade(score) as grade from students").show
【shift+enter】对程序进行输出。输出结果如下所示:
1. +----+-----+-----+ 2. |name|score|grade| 3. +----+-----+-----+ 4. | Joe| 85| B| 5. |Jane| 90| A| 6. |Mary| 55| F| 7. +----+-----+-----+ 8. 9. +----+-----+-----+ 10. |name|score|grade| 11. +----+-----+-----+ 12. | Joe| 85| B| 13. |Jane| 90| A| 14. |Mary| 55| F| 15. +----+-----+-----+
9. 实验结果及分析:
实验结果运行准确,无误
10. 实验结论:
经过本节实验的学习,通过学习Spark SQL的数据分析应用,进一步巩固了我们的Spark基础。
11. 总结及心得体会:
经过本节实验的学习,通过学习Spark SQL常用内置函数的使用,Spark SQL自定义函数(UDF)的使用。