Spark DataFrame 不是真正的 DataFrame

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: 最早的 "DataFrame" ,来源于贝尔实验室开发的 S 语言。R 语言,作为 S 语言的开源版本,于 2000 年发布了第一个稳定版本,并且实现了 dataframe。pandas 于 2009 年被开发,Python 中于是也有了 DataFrame 的概念。

文章原载于 Mars 团队专栏,欢迎关注。

从这篇文章开始,我们开始一个新的读 paper 系列。

今天要介绍的 paper 是 Towards Scalable Dataframe Systems,目前还是预印本。作者 Devin Petersohn 来自 Riselab,该实验室的前身是大名鼎鼎的 APMLab,诞生了 Apache Spark、Apache Mesos 等一系列著名开源项目。

个人觉得这篇 paper 蛮有意义的,第一次(据我所知)试图在学术上对 DataFrame 做定义,给了很好的理论指导意义。

这篇文章我不会拘泥于原 paper,我会加入自己的理解。本篇文章会大致分三部分:

  1. 什么是真正的 DataFrame?
  2. 为什么现在的所谓 DataFrame 系统,典型的如 Spark DataFrame,有可能正在杀死 DataFrame 的原本含义。
  3. 从 Mars DataFrame 的角度来看这个问题。

什么是真正的 DataFrame?

起源

最早的 "DataFrame" (开始被称作 "data frame"),来源于贝尔实验室开发的 S 语言。"data frame" 在 1990 年就发布了,书《S 语言统计模型》第3章里详述了它的概念,书里着重强调了 dataframe 的矩阵起源。

书中描述 DataFrame 看上去很像矩阵,且支持类似矩阵的操作;同时又很像关系表。

R 语言,作为 S 语言的开源版本,于 2000 年发布了第一个稳定版本,并且实现了 dataframe。pandas 于 2009 年被开发,Python 中于是也有了 DataFrame 的概念。这些 DataFrame 都同宗同源,有着相同的语义和数据模型。

1_i8wjdd4yhrbV6iBmXmRcQg

DataFrame 数据模型

DataFrame 的需求来源于把数据看成矩阵和表。但是,矩阵中只包含一种数据类型,未免过于受限;同时,关系表要求数据必须要首先定义 schema。对于 DataFrame 来说,它的列类型可以在运行时推断,并不需要提前知晓,也不要求所有列都是一个类型。因此,DataFrame 可以理解成是关系系统、矩阵、甚至是电子表格程序(典型如 Excel)的合体。

1_F_HYvbpOXK1LnKkPC9b6ZA

跟关系系统相比,DataFrame 有几个特别有意思的属性,让 DataFrame 因此独一无二。

保证顺序,行列对称

首先,无论在行还是列方向上,DataFrame 都是有顺序的;且行和列都是一等公民,不会区分对待。

拿 pandas 举例子,当创建了一个 DataFrame 后,无论行和列上数据都是有顺序的,因此,在行和列上都可以使用位置来选择数据。

In [1]: import pandas as pd                                                     

In [2]: import numpy as np                                                      

In [3]: df = pd.DataFrame(np.random.rand(5, 4))                                 

In [4]: df                                                                      
Out[4]: 
          0         1         2         3
0  0.736385  0.271232  0.940270  0.926548
1  0.319533  0.891928  0.471176  0.583895
2  0.440825  0.500724  0.402782  0.109702
3  0.300279  0.483571  0.639299  0.778849
4  0.341113  0.813870  0.054731  0.059262

In [5]: df.iat[2, 2]  # 第二行第二列元素                                                             
Out[5]: 0.40278182653648853

因为行和列的对称关系,因此聚合函数在两个方向上都可以计算,只需指定 axis 即可。

In [6]: df.sum()  # 默认 axis == 0,在行方向上做聚合,因此结果是4个元素                                                                
Out[6]: 
0    2.138135
1    2.961325
2    2.508257
3    2.458257
dtype: float64

In [7]: df.sum(axis=1)  # axis == 1,在列方向上做聚合,因此是5个元素                                                        
Out[7]: 
0    2.874434
1    2.266533
2    1.454032
3    2.201998
4    1.268976
dtype: float64

如果熟悉 numpy(数值计算库,包含多维数组和矩阵的定义),可以看到这个特性非常熟悉,从而可以看出 DataFrame 的矩阵本质。

丰富的 API

DataFrame 的 API 非常丰富,横跨关系(如 filter、join)、线性代数(如 transpose、dot)以及类似电子表格(如 pivot)的操作。

还是以 pandas 为例,一个 DataFrame 可以做转置操作,让行和列对调。

In [8]: df.transpose()                                                          
Out[8]: 
          0         1         2         3         4
0  0.736385  0.319533  0.440825  0.300279  0.341113
1  0.271232  0.891928  0.500724  0.483571  0.813870
2  0.940270  0.471176  0.402782  0.639299  0.054731
3  0.926548  0.583895  0.109702  0.778849  0.059262

直观的语法,适合交互式分析

用户可以对 DataFrame 数据不断进行探索,查询结果可以被后续的结果复用,可以非常方便地用编程的方式组合非常复杂的操作,很适合交互式的分析。

列中允许异构数据

DataFrame 的类型系统允许一列中有异构数据的存在,比如,一个 int 列中允许有 string 类型数据存在,它可能是脏数据。这点看出 DataFrame 非常灵活。

In [10]: df2 = df.copy()                                                        

In [11]: df2.iloc[0, 0] = 'a'                                                   

In [12]: df2                                                                    
Out[12]: 
          0         1         2         3
0         a  0.271232  0.940270  0.926548
1  0.319533  0.891928  0.471176  0.583895
2  0.440825  0.500724  0.402782  0.109702
3  0.300279  0.483571  0.639299  0.778849
4  0.341113  0.813870  0.054731  0.059262

数据模型

现在我们可以对什么是真正的 DataFrame 正式下定义:

1_mTEG3yv1jqGfqlizFuQvaw

DataFrame 由二维混合类型的数组、行标签、列标签、以及类型(types 或者 domains)组成。在每列上,这个类型是可选的,可以在运行时推断。从行上看,可以把 DataFrame 看做行标签到行的映射,且行之间保证顺序;从列上看,可以看做列类型到列标签到列的映射,同样,列间同样保证顺序。

行标签和列标签的存在,让选择数据时非常方便。

In [13]: df.index = pd.date_range('2020-4-15', periods=5)                       

In [14]: df.columns = ['c1', 'c2', 'c3', 'c4']                                  

In [15]: df                                                                     
Out[15]: 
                  c1        c2        c3        c4
2020-04-15  0.736385  0.271232  0.940270  0.926548
2020-04-16  0.319533  0.891928  0.471176  0.583895
2020-04-17  0.440825  0.500724  0.402782  0.109702
2020-04-18  0.300279  0.483571  0.639299  0.778849
2020-04-19  0.341113  0.813870  0.054731  0.059262

In [16]: df.loc['2020-4-16': '2020-4-18', 'c2': 'c3']  # 注意这里的切片是闭区间                         
Out[16]: 
                  c2        c3
2020-04-16  0.891928  0.471176
2020-04-17  0.500724  0.402782
2020-04-18  0.483571  0.639299

这里的 indexcolumns 就分别是行和列标签。我们可以很容易选择一段时间(行上选择)和几列(列上选择)数据。当然这些建立在数据是按顺序存储的基础上。

按顺序存储的特性让 DataFrame 非常适合用来做统计方面的工作。

In [17]: df3 = df.shift(1)  # 把 df 的数据整体下移一格,行列索引保持不变                                                      

In [18]: df3                                                                    
Out[18]: 
                  c1        c2        c3        c4
2020-04-15       NaN       NaN       NaN       NaN
2020-04-16  0.736385  0.271232  0.940270  0.926548
2020-04-17  0.319533  0.891928  0.471176  0.583895
2020-04-18  0.440825  0.500724  0.402782  0.109702
2020-04-19  0.300279  0.483571  0.639299  0.778849

In [19]: df - df3  # 数据减法会自动按标签对齐,因此这一步可以用来计算环比                                                             
Out[19]: 
                  c1        c2        c3        c4
2020-04-15       NaN       NaN       NaN       NaN
2020-04-16 -0.416852  0.620697 -0.469093 -0.342653
2020-04-17  0.121293 -0.391205 -0.068395 -0.474194
2020-04-18 -0.140546 -0.017152  0.236517  0.669148
2020-04-19  0.040834  0.330299 -0.584568 -0.719587

In [21]: (df - df3).bfill()  # 第一行的空数据按下一行填充                                                    
Out[21]: 
                  c1        c2        c3        c4
2020-04-15 -0.416852  0.620697 -0.469093 -0.342653
2020-04-16 -0.416852  0.620697 -0.469093 -0.342653
2020-04-17  0.121293 -0.391205 -0.068395 -0.474194
2020-04-18 -0.140546 -0.017152  0.236517  0.669148
2020-04-19  0.040834  0.330299 -0.584568 -0.719587

从例子看到,正因为数据是按顺序存放的,因此我们可以索引保持不变,整体下移一行,这样,昨天的数据就到了今天的行上,然后拿原数据减去位移后的数据时,因为 DataFrame 会自动按标签做对齐,因此,对于一个日期,相当于用当天的数据减去了前天的数据,这样就可以做类似于环比的操作。这简直太方便了。试想,对于关系系统来说,恐怕需要想办法找一列作为 join 的条件,然后再做减法等等。最后,对于空数据,我们还可以填充上一行(ffill)或者下一行的数据(bfill)。想在关系系统里想达到同样效果,想必是需要大费周章的。

DataFrame 的真正含义正在被杀死

近几年,DataFrame 系统如同雨后春笋般出现,然而,这其中的绝大多数系统只包含了关系表的语义,并不包含我们之前说的矩阵方面的意义,且它们大多也并不保证数据顺序,因此真正 DataFrame 所拥有的统计和机器学习方面的特质也不复存在。这些 “DataFrame” 系统的出现,让 “DataFrame” 这个词本身几乎变得没有意义。数据科学家们为了处理大规模的数据,思维方式不得不作出改变,这其中必然存在风险。

Spark DataFrame 和 Koalas 不是真正的 DataFrame

这些 DataFrame 系统的代表是 Spark DataFrame, Spark 当然是伟大的,它解决了数据规模的问题;同时又首次把 ”DataFrame“ 的概念带到了大数据的领域。但其实它只是 spark.sql的另一种形式(当然 Spark DataFrame 确实在 spark.sql 下)。Spark DataFrame 只包含了关系表的语义,schema 需要确定,数据也并不保证顺序。

1_i4w87MsbSSi7lHw_csDaMQ

那么会有同学说 Koalas 呢?Koalas 提供了 pandas API,用 pandas 的语法就可以在 spark 上分析了。实际上,因为 Koalas 也是将 pandas 的操作转成 Spark DataFrame 来执行,因为 Spark DataFrame 内核本身的特性,注定 Koalas 只是看上去和 pandas 一致。

对于 pandas,我们按天聚合,并按 30 天滑动窗口来计算平均值。

In [22]: df = pd.read_csv('Downloads/bart-dataset/date-hour-soo-dest-2019.csv', 
    ...: names=['Date','Hour','Origin','Destination','Trip Count'])             

In [23]: df.groupby('Date').mean()['Trip Count'].rolling(30).mean().plot()      
Out[23]: <matplotlib.axes._subplots.AxesSubplot at 0x118077d90>

out2

如果是 Koalas,因为它的 API 看上去和 pandas 一致,因此,我们按照 Koalas 的文档做 import 替换。

In [1]: import databricks.koalas as ks
  
In [2]: df = ks.read_csv('Downloads/bart-dataset/date-hour-soo-dest-2019.csv', names=['Date','Hour','Origin','Destination','Trip Count'])
  
In [3]: df.groupby('Date').mean()['Trip Count'].rolling(30).mean().plot()

out

然后令人惊讶的是,结果并不一致。大费周章后才查到,原因是顺序问题,聚合的结果后并不保证排序,因此要得到一样的结果需要在 rolling 前加 sort_index(),确保 groupby 后的结果是排序的。

In [4]: df.groupby('Date').mean()['Trip Count'].sort_index().rolling(30).mean().plot()

默认的排序规则非常重要,这对以时间作为索引的数据尤其关键,而且这让数据科学家更容易观察数据,也更容易复现结果。

所以,在使用 Koalas 时请小心,要时刻关注你的数据在你心中是不是排序的,因为 Koalas 很可能表现地和你想的不一致。

让我们再看 shift,它能工作的一个前提就是数据是排序的,那么在 Koalas 中调用会发生什么呢?

In [6]: df.shift(1)                                                             
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
/usr/local/opt/apache-spark/libexec/python/pyspark/sql/utils.py in deco(*a, **kw)
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:

/usr/local/opt/apache-spark/libexec/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:

Py4JJavaError: An error occurred while calling o110.select.
: org.apache.spark.sql.AnalysisException: cannot resolve 'isnan(lag(`Date`, 1, NULL) OVER (ORDER BY `__natural_order__` ASC NULLS FIRST ROWS BETWEEN -1 FOLLOWING AND -1 FOLLOWING))' due to data type mismatch: argument 1 requires (double or float) type, however, 'lag(`Date`, 1, NULL) OVER (ORDER BY `__natural_order__` ASC NULLS FIRST ROWS BETWEEN -1 FOLLOWING AND -1 FOLLOWING)' is of timestamp type.;;
'Project [__index_level_0__#41, CASE WHEN (isnull(lag(Date#30, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1))) || isnan(lag(Date#30, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)))) THEN null ELSE lag(Date#30, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) END AS Date#87, CASE WHEN (isnull(lag(Hour#31, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1))) || isnan(cast(lag(Hour#31, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) as double))) THEN cast(null as int) ELSE lag(Hour#31, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) END AS Hour#88, CASE WHEN (isnull(lag(Origin#32, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1))) || isnan(cast(lag(Origin#32, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) as double))) THEN cast(null as string) ELSE lag(Origin#32, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) END AS Origin#89, CASE WHEN (isnull(lag(Destination#33, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1))) || isnan(cast(lag(Destination#33, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) as double))) THEN cast(null as string) ELSE lag(Destination#33, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) END AS Destination#90, CASE WHEN (isnull(lag(Trip Count#34, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1))) || isnan(cast(lag(Trip Count#34, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) as double))) THEN cast(null as int) ELSE lag(Trip Count#34, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) END AS Trip Count#91, __natural_order__#50L]
+- Project [__index_level_0__#41, Date#30, Hour#31, Origin#32, Destination#33, Trip Count#34, monotonically_increasing_id() AS __natural_order__#50L]
   +- Project [__index_level_0__#41, Date#30, Hour#31, Origin#32, Destination#33, Trip Count#34]
      +- Project [Date#30, Hour#31, Origin#32, Destination#33, Trip Count#34, _w0#42L, _we0#43, (_we0#43 - 1) AS __index_level_0__#41]
         +- Window [row_number() windowspecdefinition(_w0#42L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _we0#43], [_w0#42L ASC NULLS FIRST]
            +- Project [Date#30, Hour#31, Origin#32, Destination#33, Trip Count#34, monotonically_increasing_id() AS _w0#42L]
               +- Project [0#20 AS Date#30, 1#21 AS Hour#31, 2#22 AS Origin#32, 3#23 AS Destination#33, 4#24 AS Trip Count#34]
                  +- Project [_c0#10 AS 0#20, _c1#11 AS 1#21, _c2#12 AS 2#22, _c3#13 AS 3#23, _c4#14 AS 4#24]
                     +- Relation[_c0#10,_c1#11,_c2#12,_c3#13,_c4#14] csv

    at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$3.applyOrElse(CheckAnalysis.scala:116)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$3.applyOrElse(CheckAnalysis.scala:108)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:280)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:280)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:279)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
    at org.apache.spark.sql.catalyst.trees.TreeNode.org$apache$spark$sql$catalyst$trees$TreeNode$$mapChild$2(TreeNode.scala:306)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4$$anonfun$apply$13.apply(TreeNode.scala:356)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.immutable.List.map(List.scala:296)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:356)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
    at 
  ...  # 后面还有一大段报错信息,此处省略

这个报错可能会让数据科学家们震惊,什么,我就做了个 shift 啊,报错里掺杂着 Java 异常栈和一大堆看不懂的错误。

这里真正的错误和 Date 是时间戳有关,那么我们只取 int 类型的字段做 shift 总可以了吧。

In [10]: df['Hour'].shift(1)                                                    
Out[10]: 20/04/20 17:22:38 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
20/04/20 17:22:38 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.

确实可以运行,但却看到一句话,大意是数据会被放到一个分区来执行,这正是因为数据本身之间并不保证顺序,因此只能把数据收集到一起,排序,再调用 shift。这样就不再是一个分布式的程序了,甚至比 pandas 本身更慢。

DataFrame.dot 等矩阵相关的操作在 Koalas 里也不包含,这些操作已经很难用关系代数来表达了。

PyODPS DataFrame

相信用过 MaxCompute(原名 ODPS,阿里云自研的大数据系统),应该会听说过 PyODPS。这个库是我们前几年的产品,PyODPS 里也包含一个 DataFrame,而 PyODPS DataFrame 在执行的时候会被编译到 ODPS SQL 来执行。

提 PyODPS DataFrame 的原因是,我们在几年前就发现,虽然它提供了 pandas-like 的接口,一定程度上让用户能用类似 pandas 的思维解决问题,然而,当用户问我们,如何向后填充数据?如何通过索引获取数据?答案都是不能。原因也是一样的,因为 PyODPS DataFrame 只是将计算代理给不保证有序、只有关系代数算子的引擎来执行。

如果系统本身的数据模型不是真正的 DataFrame 模型,仅仅让接口看起来像是远远不够的。

Mars DataFrame

因此这里要说到 Mars DataFrame,其实我们做 Mars 的初衷和这篇 paper 的想法是一致的,因为现有的系统虽然能很好地解决规模问题,但那些传统数据科学包中好的部分却被人遗忘了,我们希望 Mars 能保留这些库中好的部分,又能解决规模问题,也能充分利用新硬件。

Mars DataFrame 会自动将 DataFrame 分割成很多小的 chunk,每个 chunk 也是一个 DataFrame,而无论是 chunk 间还是 chunk 内的数据,都保证顺序。

image_20200415170452820

图里的示例中,一个行数 380、列数 370 的 DataFrame,被 Mars 分成 3x3 一共 9 个 chunk,根据计算在 CPU 还是 NVIDIA GPU 上进行,用 pandas DataFrame 或者 cuDF DataFrame 来存储数据和执行真正的计算。可以看到,Mars 既会在行上,也会在列上进行分割,这种在行上和列上的对等性,让 DataFrame 的矩阵本质能得以发挥。

在单机真正执行时,根据初始数据的位置,Mars 会自动把数据分散到多核或者多卡执行;对于分布式,会将计算分散到多台机器执行。

Mars DataFrame 保留了行标签、列标签和类型的概念。因此能够想象如同 pandas 一样,可以在比较大的数据集上根据标签进行筛选。

In [1]: import mars.dataframe as md                                             

In [2]: import mars.tensor as mt

In [8]: df = md.DataFrame(mt.random.rand(10000, 10, chunk_size=1000), 
   ...:                   index=md.date_range('2020-1-1', periods=10000))       

In [9]: df.loc['2020-4-15'].execute()                                           
Out[9]: 
0    0.622763
1    0.446635
2    0.007870
3    0.107846
4    0.288893
5    0.219340
6    0.228806
7    0.969435
8    0.033130
9    0.853619
Name: 2020-04-15 00:00:00, dtype: float64

Mars 会保持和 pandas 一致的排序特性,因此对于 groupby 等操作,无需担心结果和所想不一致。

In [6]: import mars.dataframe as md                                             

In [7]: df = md.read_csv('Downloads/bart-dataset/date-hour-soo-dest-2019.csv', n
   ...: ames=['Date','Hour','Origin','Destination','Trip Count'])               

In [8]: df.groupby('Date').mean()['Trip Count'].rolling(30).mean().plot() # 结果正确      
Out[8]: <matplotlib.axes._subplots.AxesSubplot at 0x11ff8ab90>

out2

对于 shift,不光结果正确,而且执行时能利用多核、多卡和分布式的能力。

In [3]: df.shift(1).head(10).execute()                                          
Out[3]: 
         Date  Hour Origin Destination  Trip Count
0         NaN   NaN    NaN         NaN         NaN
1  2019-01-01   0.0   12TH        16TH         4.0
2  2019-01-01   0.0   12TH        ANTC         1.0
3  2019-01-01   0.0   12TH        BAYF         1.0
4  2019-01-01   0.0   12TH        CIVC         2.0
5  2019-01-01   0.0   12TH        COLM         1.0
6  2019-01-01   0.0   12TH        COLS         1.0
7  2019-01-01   0.0   12TH        CONC         1.0
8  2019-01-01   0.0   12TH        DALY         1.0
9  2019-01-01   0.0   12TH        DELN         2.0

不只是 DataFrame

Mars 还包含 tensor 模块来支持并行和分布式化 numpy,以及 learn 模块来并行和分布式化 scikit-learn,因此可以想象,如 mars.tensor.linalg.svd 可以直接作用在 Mars DataFrame 上,这就赋予了 Mars 超越 DataFrame 本身的语义。

In [1]: import mars.dataframe as md                                             

In [2]: import mars.tensor as mt                                                

In [3]: df = md.DataFrame(mt.random.rand(10000, 10, chunk_size=1000))           

In [5]: mt.linalg.svd(df).execute()

总结

《Towards Scalable DataFrame Systems》赋予了 DataFrame 学术定义。而要做到可扩展的 DataFrame,首先必须是真正的 DataFrame,其次才是可扩展。

在我们看来,Mars 是真正的 DataFrame,它生来目标就是可扩展,而 Mars 又不仅仅是 DataFrame。在我们看来,Mars 在数据科学领域大有可为。

Mars 诞生于 MaxCompute 团队,MaxCompute 原名 ODPS,是一种快速、完全托管的EB级数据仓库解决方案。Mars 即将通过 MaxCompute 提供服务,购买了 MaxCompute 服务的用户届时可以开箱即用体验 Mars 服务。敬请期待。

参考

如果对 Mars 感兴趣,可以关注 Mars 团队专栏,或者钉钉扫二维码加入 Mars 讨论群。

IMG_8215

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
7月前
|
SQL 分布式计算 数据库
【大数据技术Spark】Spark SQL操作Dataframe、读写MySQL、Hive数据库实战(附源码)
【大数据技术Spark】Spark SQL操作Dataframe、读写MySQL、Hive数据库实战(附源码)
277 0
|
7月前
|
SQL 分布式计算 大数据
【大数据技术Hadoop+Spark】Spark SQL、DataFrame、Dataset的讲解及操作演示(图文解释)
【大数据技术Hadoop+Spark】Spark SQL、DataFrame、Dataset的讲解及操作演示(图文解释)
172 0
|
SQL 分布式计算 HIVE
pyspark笔记(RDD,DataFrame和Spark SQL)1
pyspark笔记(RDD,DataFrame和Spark SQL)
127 1
|
分布式计算 大数据 Spark
大数据Spark DataFrame/DataSet常用操作4
大数据Spark DataFrame/DataSet常用操作4
67 0
|
分布式计算 大数据 Spark
大数据Spark DataFrame/DataSet常用操作3
大数据Spark DataFrame/DataSet常用操作3
95 0
|
分布式计算 大数据 Spark
大数据Spark DataFrame/DataSet常用操作2
大数据Spark DataFrame/DataSet常用操作2
82 0
|
20天前
|
SQL JSON 分布式计算
【赵渝强老师】Spark SQL的数据模型:DataFrame
本文介绍了在Spark SQL中创建DataFrame的三种方法。首先,通过定义case class来创建表结构,然后将CSV文件读入RDD并关联Schema生成DataFrame。其次,使用StructType定义表结构,同样将CSV文件读入RDD并转换为Row对象后创建DataFrame。最后,直接加载带有格式的数据文件(如JSON),通过读取文件内容直接创建DataFrame。每种方法都包含详细的代码示例和解释。
|
2月前
|
SQL 分布式计算 大数据
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
58 0
|
4月前
|
SQL 存储 分布式计算
|
7月前
|
SQL 分布式计算 数据挖掘
Spark_Day07:Spark SQL(DataFrame是什么和数据分析(案例讲解))
Spark_Day07:Spark SQL(DataFrame是什么和数据分析(案例讲解))
174 0