编译:诚历,阿里巴巴计算平台事业部 EMR 技术专家,Apache Sentry PMC,Apache Commons Committer,目前从事开源大数据存储和优化方面的工作。
原文链接 :http://blog.madhukaraphatak.com/data-modeling-spark-part-1/
数据建模是数据分析重要的组成之一,正确的建立模型有助于用户更好地解答业务相关的问题。在过去几十年中,数据建模技术也一直是SQL数据仓库的基础。
Apache Spark作为新一代的数仓技术的代表,我们能够在 Spark 中使用早期的数据建模技术。这使得Spark data pineline 更加有效。
在本系列文章中,我将讨论spark中不同的数据建模。本系列的第一篇文章中将讨论如何使用日期维度。
数据分析中数据和时间的重要性
我们分析的大多数数据通常都包含日期或时间戳。例如,它可能是
- 股票的交易日期
- POS系统的交易时间
我们所做的很多分析通常都是关于日期或时间的。我们通常希望使用相同的方法对数据进行切分。
使用内置的Spark进行数据分析
本节讨论如何使用内置的spark日期函数进行数据分析。
苹果股票数据
在本例中,我们将使用苹果股票数据。以下是样本数据
Date | Open | High | Low | Close | Volume | AdjClose |
---|---|---|---|---|---|---|
2013-12-31 00:00:00 | 554.170013 | 561.279976 | 554.000023 | 561.019997 | 55771100 | 76.297771 |
2013-12-30 00:00:00 | 557.460022 | 560.089989 | 552.319984 | 554.519981 | 63407400 | 75.41378 |
加载到Spark Dataframe
下面的代码将数据加载到spark dataframe中。
val appleStockDf = sparkSession.read.format("csv").
option("header","true")
.option("inferSchema","true")
.load("src/main/resources/applestock_2013.csv")
分析日期
在本节中,让我们看看如何回答与日期相关的问题。
- 有属于周末的记录吗?
这种分析通常是为了确保数据的质量。周末应该不会有任何数据,因为周末不会有交易。
assert(sparkSession.sql
("select * from stocks where dayofweek(Date)==1 or
dayofweek(Date)==7").count() == 0)
在上述代码中,1表示星期天,7表示星期六。我们可以看到,代码是不可读的,除非我们知道如何解码这些神奇的数字。
- 显示季度最高价格
这个分析找到了给定季度的最大值。
appleStockDf.groupBy(year($"Date"),quarter($"Date")).
avg("Close").
sort("year(Date)","quarter(Date)")
.show()
使用Spark日期函数进行数据分析的挑战
尽管我们可以使用spark builtin数据函数来完成上面的分析,但是编写它们是很困难的。此外,从外部BI解决方案很难表达这些需求,通常业务分析师用户是最终用户。因此,我们需要一种更简单、更好的方法来实现上述目标。
日期维度
日期维是一个静态数据集,它在列中列出给定日期的所有不同属性。这个示例数据集模式如下所示
t
|-- date_key: integer (nullable = true)
|-- full_date: string (nullable = true)
|-- day_of_week: integer (nullable = true)
|-- day_num_in_month: integer (nullable = true)
|-- day_num_overall: integer (nullable = true)
|-- day_name: string (nullable = true)
|-- day_abbrev: string (nullable = true)
|-- weekday_flag: string (nullable = true)
|-- week_num_in_year: integer (nullable = true)
|-- week_num_overall: integer (nullable = true)
|-- week_begin_date: string (nullable = true)
|-- week_begin_date_key: integer (nullable = true)
|-- month: integer (nullable = true)
|-- month_num_overall: integer (nullable = true)
|-- month_name: string (nullable = true)
|-- month_abbrev: string (nullable = true)
|-- quarter: integer (nullable = true)
|-- year: integer (nullable = true)
|-- yearmo: integer (nullable = true)
|-- fiscal_month: integer (nullable = true)
|-- fiscal_quarter: integer (nullable = true)
|-- fiscal_year: integer (nullable = true)
|-- last_day_in_month_flag: string (nullable = true)
|-- same_day_year_ago: string (nullable = true)
在上面的格式中,一些重要的列是
* full_date - Timestamp for given day
* year - year in the date
* quarter - quarter the given date belongs
etc.
这个静态数据集可以生成多年并保持可用。我们在示例中使用的示例可以从下面的链接下载。
使用日期维度进行数据分析
本节讨论如何使用日期维度进行上述分析。
加载数据以触发Dataframe
我们可以为我们的数据集创建一个dataframe,如下所示
val originalDf = sparkSession.read.format("csv").
option("header","true")
.option("inferSchema","true")
.load(dataPath)
//replace space in the column names
val new_columns = originalDf.schema.fields
.map(value => value.copy(name = value.name.replaceAll("\\s+","_")))
val newSchema = StructType(new_columns)
val newNameDf = sparkSession.createDataFrame(originalDf.rdd, newSchema)
import org.apache.spark.sql.functions._
val dateDf = newNameDf.withColumn("full_date_formatted",
to_date(newNameDf.col("full_date"),"dd/MM/yy"))
在上面的代码中,进行了预处理,将字符串转换为spark date数据类型。
与股票数据连接
我们可以使用spark连接将股票数据与日期结合起来
val joinedDF = appleStockDf.join(dateDf, appleStockDf.col("Date") ===
dateDf.col("full_date_formatted"))
This join doesn’t increase size of the data as it’s an inner join.
这个连接并不会增加数据的大小,因为它是一个内部连接。
分析
本节介绍如何在不使用复杂的spark函数的情况下进行分析
- 有属于周末的记录吗?
assert(joinedDF.filter("weekday_flag != 'y'").count()==0)
- 显示季度最高价格
joinedDF.groupBy("year","quarter").
avg("Close").
sort("year","quarter")
.show()
日期维度的优点
本节讨论日期维度的优点。
跨不同分析的重用
相同的数据集可以用于不同的数据分析。与在查询中编写特殊函数或在数据集本身上添加这些列不同,拥有标准的日期维度有助于使所有的数据分析标准化。
Scalable
用户可以在日期维度上添加更多的属性,如区域假日等。这将丰富每个人的分析。这里不需要额外的查询。
用户友好
使用日期维度生成的查询更容易理解。
引用
代码
文本示例代码可以在 github 上查看
https://github.com/phatak-dev/spark2.0-examples/blob/2.4/src/main/scala/com/madhukaraphatak/examples/sparktwo/datamodeling/DateHandlingExample.scala
阿里巴巴开源大数据技术团队成立Apache Spark中国技术社区,定期推送精彩案例,技术专家直播,问答区近万人Spark技术同学在线提问答疑,只为营造纯粹的Spark氛围,欢迎钉钉扫码加入!
对开源大数据和感兴趣的同学可以加小编微信(下图二维码,备注“进群”)进入技术交流微信群。