Spark DataFrame 不是真正的 DataFrame-阿里云开发者社区

开发者社区> 阿里巴巴大数据计算> 正文

Spark DataFrame 不是真正的 DataFrame

简介: 最早的 "DataFrame" ,来源于贝尔实验室开发的 S 语言。R 语言,作为 S 语言的开源版本,于 2000 年发布了第一个稳定版本,并且实现了 dataframe。pandas 于 2009 年被开发,Python 中于是也有了 DataFrame 的概念。

文章原载于 Mars 团队专栏,欢迎关注。

从这篇文章开始,我们开始一个新的读 paper 系列。

今天要介绍的 paper 是 Towards Scalable Dataframe Systems,目前还是预印本。作者 Devin Petersohn 来自 Riselab,该实验室的前身是大名鼎鼎的 APMLab,诞生了 Apache Spark、Apache Mesos 等一系列著名开源项目。

个人觉得这篇 paper 蛮有意义的,第一次(据我所知)试图在学术上对 DataFrame 做定义,给了很好的理论指导意义。

这篇文章我不会拘泥于原 paper,我会加入自己的理解。本篇文章会大致分三部分:

  1. 什么是真正的 DataFrame?
  2. 为什么现在的所谓 DataFrame 系统,典型的如 Spark DataFrame,有可能正在杀死 DataFrame 的原本含义。
  3. 从 Mars DataFrame 的角度来看这个问题。

什么是真正的 DataFrame?

起源

最早的 "DataFrame" (开始被称作 "data frame"),来源于贝尔实验室开发的 S 语言。"data frame" 在 1990 年就发布了,书《S 语言统计模型》第3章里详述了它的概念,书里着重强调了 dataframe 的矩阵起源。

书中描述 DataFrame 看上去很像矩阵,且支持类似矩阵的操作;同时又很像关系表。

R 语言,作为 S 语言的开源版本,于 2000 年发布了第一个稳定版本,并且实现了 dataframe。pandas 于 2009 年被开发,Python 中于是也有了 DataFrame 的概念。这些 DataFrame 都同宗同源,有着相同的语义和数据模型。

1_i8wjdd4yhrbV6iBmXmRcQg

DataFrame 数据模型

DataFrame 的需求来源于把数据看成矩阵和表。但是,矩阵中只包含一种数据类型,未免过于受限;同时,关系表要求数据必须要首先定义 schema。对于 DataFrame 来说,它的列类型可以在运行时推断,并不需要提前知晓,也不要求所有列都是一个类型。因此,DataFrame 可以理解成是关系系统、矩阵、甚至是电子表格程序(典型如 Excel)的合体。

1_F_HYvbpOXK1LnKkPC9b6ZA

跟关系系统相比,DataFrame 有几个特别有意思的属性,让 DataFrame 因此独一无二。

保证顺序,行列对称

首先,无论在行还是列方向上,DataFrame 都是有顺序的;且行和列都是一等公民,不会区分对待。

拿 pandas 举例子,当创建了一个 DataFrame 后,无论行和列上数据都是有顺序的,因此,在行和列上都可以使用位置来选择数据。

In [1]: import pandas as pd                                                     

In [2]: import numpy as np                                                      

In [3]: df = pd.DataFrame(np.random.rand(5, 4))                                 

In [4]: df                                                                      
Out[4]: 
          0         1         2         3
0  0.736385  0.271232  0.940270  0.926548
1  0.319533  0.891928  0.471176  0.583895
2  0.440825  0.500724  0.402782  0.109702
3  0.300279  0.483571  0.639299  0.778849
4  0.341113  0.813870  0.054731  0.059262

In [5]: df.iat[2, 2]  # 第二行第二列元素                                                             
Out[5]: 0.40278182653648853

因为行和列的对称关系,因此聚合函数在两个方向上都可以计算,只需指定 axis 即可。

In [6]: df.sum()  # 默认 axis == 0,在行方向上做聚合,因此结果是4个元素                                                                
Out[6]: 
0    2.138135
1    2.961325
2    2.508257
3    2.458257
dtype: float64

In [7]: df.sum(axis=1)  # axis == 1,在列方向上做聚合,因此是5个元素                                                        
Out[7]: 
0    2.874434
1    2.266533
2    1.454032
3    2.201998
4    1.268976
dtype: float64

如果熟悉 numpy(数值计算库,包含多维数组和矩阵的定义),可以看到这个特性非常熟悉,从而可以看出 DataFrame 的矩阵本质。

丰富的 API

DataFrame 的 API 非常丰富,横跨关系(如 filter、join)、线性代数(如 transpose、dot)以及类似电子表格(如 pivot)的操作。

还是以 pandas 为例,一个 DataFrame 可以做转置操作,让行和列对调。

In [8]: df.transpose()                                                          
Out[8]: 
          0         1         2         3         4
0  0.736385  0.319533  0.440825  0.300279  0.341113
1  0.271232  0.891928  0.500724  0.483571  0.813870
2  0.940270  0.471176  0.402782  0.639299  0.054731
3  0.926548  0.583895  0.109702  0.778849  0.059262

直观的语法,适合交互式分析

用户可以对 DataFrame 数据不断进行探索,查询结果可以被后续的结果复用,可以非常方便地用编程的方式组合非常复杂的操作,很适合交互式的分析。

列中允许异构数据

DataFrame 的类型系统允许一列中有异构数据的存在,比如,一个 int 列中允许有 string 类型数据存在,它可能是脏数据。这点看出 DataFrame 非常灵活。

In [10]: df2 = df.copy()                                                        

In [11]: df2.iloc[0, 0] = 'a'                                                   

In [12]: df2                                                                    
Out[12]: 
          0         1         2         3
0         a  0.271232  0.940270  0.926548
1  0.319533  0.891928  0.471176  0.583895
2  0.440825  0.500724  0.402782  0.109702
3  0.300279  0.483571  0.639299  0.778849
4  0.341113  0.813870  0.054731  0.059262

数据模型

现在我们可以对什么是真正的 DataFrame 正式下定义:

1_mTEG3yv1jqGfqlizFuQvaw

DataFrame 由二维混合类型的数组、行标签、列标签、以及类型(types 或者 domains)组成。在每列上,这个类型是可选的,可以在运行时推断。从行上看,可以把 DataFrame 看做行标签到行的映射,且行之间保证顺序;从列上看,可以看做列类型到列标签到列的映射,同样,列间同样保证顺序。

行标签和列标签的存在,让选择数据时非常方便。

In [13]: df.index = pd.date_range('2020-4-15', periods=5)                       

In [14]: df.columns = ['c1', 'c2', 'c3', 'c4']                                  

In [15]: df                                                                     
Out[15]: 
                  c1        c2        c3        c4
2020-04-15  0.736385  0.271232  0.940270  0.926548
2020-04-16  0.319533  0.891928  0.471176  0.583895
2020-04-17  0.440825  0.500724  0.402782  0.109702
2020-04-18  0.300279  0.483571  0.639299  0.778849
2020-04-19  0.341113  0.813870  0.054731  0.059262

In [16]: df.loc['2020-4-16': '2020-4-18', 'c2': 'c3']  # 注意这里的切片是闭区间                         
Out[16]: 
                  c2        c3
2020-04-16  0.891928  0.471176
2020-04-17  0.500724  0.402782
2020-04-18  0.483571  0.639299

这里的 indexcolumns 就分别是行和列标签。我们可以很容易选择一段时间(行上选择)和几列(列上选择)数据。当然这些建立在数据是按顺序存储的基础上。

按顺序存储的特性让 DataFrame 非常适合用来做统计方面的工作。

In [17]: df3 = df.shift(1)  # 把 df 的数据整体下移一格,行列索引保持不变                                                      

In [18]: df3                                                                    
Out[18]: 
                  c1        c2        c3        c4
2020-04-15       NaN       NaN       NaN       NaN
2020-04-16  0.736385  0.271232  0.940270  0.926548
2020-04-17  0.319533  0.891928  0.471176  0.583895
2020-04-18  0.440825  0.500724  0.402782  0.109702
2020-04-19  0.300279  0.483571  0.639299  0.778849

In [19]: df - df3  # 数据减法会自动按标签对齐,因此这一步可以用来计算环比                                                             
Out[19]: 
                  c1        c2        c3        c4
2020-04-15       NaN       NaN       NaN       NaN
2020-04-16 -0.416852  0.620697 -0.469093 -0.342653
2020-04-17  0.121293 -0.391205 -0.068395 -0.474194
2020-04-18 -0.140546 -0.017152  0.236517  0.669148
2020-04-19  0.040834  0.330299 -0.584568 -0.719587

In [21]: (df - df3).bfill()  # 第一行的空数据按下一行填充                                                    
Out[21]: 
                  c1        c2        c3        c4
2020-04-15 -0.416852  0.620697 -0.469093 -0.342653
2020-04-16 -0.416852  0.620697 -0.469093 -0.342653
2020-04-17  0.121293 -0.391205 -0.068395 -0.474194
2020-04-18 -0.140546 -0.017152  0.236517  0.669148
2020-04-19  0.040834  0.330299 -0.584568 -0.719587

从例子看到,正因为数据是按顺序存放的,因此我们可以索引保持不变,整体下移一行,这样,昨天的数据就到了今天的行上,然后拿原数据减去位移后的数据时,因为 DataFrame 会自动按标签做对齐,因此,对于一个日期,相当于用当天的数据减去了前天的数据,这样就可以做类似于环比的操作。这简直太方便了。试想,对于关系系统来说,恐怕需要想办法找一列作为 join 的条件,然后再做减法等等。最后,对于空数据,我们还可以填充上一行(ffill)或者下一行的数据(bfill)。想在关系系统里想达到同样效果,想必是需要大费周章的。

DataFrame 的真正含义正在被杀死

近几年,DataFrame 系统如同雨后春笋般出现,然而,这其中的绝大多数系统只包含了关系表的语义,并不包含我们之前说的矩阵方面的意义,且它们大多也并不保证数据顺序,因此真正 DataFrame 所拥有的统计和机器学习方面的特质也不复存在。这些 “DataFrame” 系统的出现,让 “DataFrame” 这个词本身几乎变得没有意义。数据科学家们为了处理大规模的数据,思维方式不得不作出改变,这其中必然存在风险。

Spark DataFrame 和 Koalas 不是真正的 DataFrame

这些 DataFrame 系统的代表是 Spark DataFrame, Spark 当然是伟大的,它解决了数据规模的问题;同时又首次把 ”DataFrame“ 的概念带到了大数据的领域。但其实它只是 spark.sql的另一种形式(当然 Spark DataFrame 确实在 spark.sql 下)。Spark DataFrame 只包含了关系表的语义,schema 需要确定,数据也并不保证顺序。

1_i4w87MsbSSi7lHw_csDaMQ

那么会有同学说 Koalas 呢?Koalas 提供了 pandas API,用 pandas 的语法就可以在 spark 上分析了。实际上,因为 Koalas 也是将 pandas 的操作转成 Spark DataFrame 来执行,因为 Spark DataFrame 内核本身的特性,注定 Koalas 只是看上去和 pandas 一致。

为了说明这点,我们使用 数据集Hourly Ridership by Origin-Destination Pairs),只取 2019 年的数据。

对于 pandas,我们按天聚合,并按 30 天滑动窗口来计算平均值。

In [22]: df = pd.read_csv('Downloads/bart-dataset/date-hour-soo-dest-2019.csv', 
    ...: names=['Date','Hour','Origin','Destination','Trip Count'])             

In [23]: df.groupby('Date').mean()['Trip Count'].rolling(30).mean().plot()      
Out[23]: <matplotlib.axes._subplots.AxesSubplot at 0x118077d90>

out2

如果是 Koalas,因为它的 API 看上去和 pandas 一致,因此,我们按照 Koalas 的文档做 import 替换。

In [1]: import databricks.koalas as ks
  
In [2]: df = ks.read_csv('Downloads/bart-dataset/date-hour-soo-dest-2019.csv', names=['Date','Hour','Origin','Destination','Trip Count'])
  
In [3]: df.groupby('Date').mean()['Trip Count'].rolling(30).mean().plot()

out

然后令人惊讶的是,结果并不一致。大费周章后才查到,原因是顺序问题,聚合的结果后并不保证排序,因此要得到一样的结果需要在 rolling 前加 sort_index(),确保 groupby 后的结果是排序的。

In [4]: df.groupby('Date').mean()['Trip Count'].sort_index().rolling(30).mean().plot()

默认的排序规则非常重要,这对以时间作为索引的数据尤其关键,而且这让数据科学家更容易观察数据,也更容易复现结果。

所以,在使用 Koalas 时请小心,要时刻关注你的数据在你心中是不是排序的,因为 Koalas 很可能表现地和你想的不一致。

让我们再看 shift,它能工作的一个前提就是数据是排序的,那么在 Koalas 中调用会发生什么呢?

In [6]: df.shift(1)                                                             
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
/usr/local/opt/apache-spark/libexec/python/pyspark/sql/utils.py in deco(*a, **kw)
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:

/usr/local/opt/apache-spark/libexec/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:

Py4JJavaError: An error occurred while calling o110.select.
: org.apache.spark.sql.AnalysisException: cannot resolve 'isnan(lag(`Date`, 1, NULL) OVER (ORDER BY `__natural_order__` ASC NULLS FIRST ROWS BETWEEN -1 FOLLOWING AND -1 FOLLOWING))' due to data type mismatch: argument 1 requires (double or float) type, however, 'lag(`Date`, 1, NULL) OVER (ORDER BY `__natural_order__` ASC NULLS FIRST ROWS BETWEEN -1 FOLLOWING AND -1 FOLLOWING)' is of timestamp type.;;
'Project [__index_level_0__#41, CASE WHEN (isnull(lag(Date#30, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1))) || isnan(lag(Date#30, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)))) THEN null ELSE lag(Date#30, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) END AS Date#87, CASE WHEN (isnull(lag(Hour#31, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1))) || isnan(cast(lag(Hour#31, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) as double))) THEN cast(null as int) ELSE lag(Hour#31, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) END AS Hour#88, CASE WHEN (isnull(lag(Origin#32, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1))) || isnan(cast(lag(Origin#32, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) as double))) THEN cast(null as string) ELSE lag(Origin#32, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) END AS Origin#89, CASE WHEN (isnull(lag(Destination#33, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1))) || isnan(cast(lag(Destination#33, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) as double))) THEN cast(null as string) ELSE lag(Destination#33, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) END AS Destination#90, CASE WHEN (isnull(lag(Trip Count#34, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1))) || isnan(cast(lag(Trip Count#34, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) as double))) THEN cast(null as int) ELSE lag(Trip Count#34, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) END AS Trip Count#91, __natural_order__#50L]
+- Project [__index_level_0__#41, Date#30, Hour#31, Origin#32, Destination#33, Trip Count#34, monotonically_increasing_id() AS __natural_order__#50L]
   +- Project [__index_level_0__#41, Date#30, Hour#31, Origin#32, Destination#33, Trip Count#34]
      +- Project [Date#30, Hour#31, Origin#32, Destination#33, Trip Count#34, _w0#42L, _we0#43, (_we0#43 - 1) AS __index_level_0__#41]
         +- Window [row_number() windowspecdefinition(_w0#42L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _we0#43], [_w0#42L ASC NULLS FIRST]
            +- Project [Date#30, Hour#31, Origin#32, Destination#33, Trip Count#34, monotonically_increasing_id() AS _w0#42L]
               +- Project [0#20 AS Date#30, 1#21 AS Hour#31, 2#22 AS Origin#32, 3#23 AS Destination#33, 4#24 AS Trip Count#34]
                  +- Project [_c0#10 AS 0#20, _c1#11 AS 1#21, _c2#12 AS 2#22, _c3#13 AS 3#23, _c4#14 AS 4#24]
                     +- Relation[_c0#10,_c1#11,_c2#12,_c3#13,_c4#14] csv

    at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$3.applyOrElse(CheckAnalysis.scala:116)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$3.applyOrElse(CheckAnalysis.scala:108)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:280)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:280)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:279)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
    at org.apache.spark.sql.catalyst.trees.TreeNode.org$apache$spark$sql$catalyst$trees$TreeNode$$mapChild$2(TreeNode.scala:306)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4$$anonfun$apply$13.apply(TreeNode.scala:356)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.immutable.List.map(List.scala:296)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:356)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277)
    at 
  ...  # 后面还有一大段报错信息,此处省略

这个报错可能会让数据科学家们震惊,什么,我就做了个 shift 啊,报错里掺杂着 Java 异常栈和一大堆看不懂的错误。

这里真正的错误和 Date 是时间戳有关,那么我们只取 int 类型的字段做 shift 总可以了吧。

In [10]: df['Hour'].shift(1)                                                    
Out[10]: 20/04/20 17:22:38 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
20/04/20 17:22:38 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.

确实可以运行,但却看到一句话,大意是数据会被放到一个分区来执行,这正是因为数据本身之间并不保证顺序,因此只能把数据收集到一起,排序,再调用 shift。这样就不再是一个分布式的程序了,甚至比 pandas 本身更慢。

DataFrame.dot 等矩阵相关的操作在 Koalas 里也不包含,这些操作已经很难用关系代数来表达了。

PyODPS DataFrame

相信用过 MaxCompute(原名 ODPS,阿里云自研的大数据系统),应该会听说过 PyODPS。这个库是我们前几年的产品,PyODPS 里也包含一个 DataFrame,而 PyODPS DataFrame 在执行的时候会被编译到 ODPS SQL 来执行。

提 PyODPS DataFrame 的原因是,我们在几年前就发现,虽然它提供了 pandas-like 的接口,一定程度上让用户能用类似 pandas 的思维解决问题,然而,当用户问我们,如何向后填充数据?如何通过索引获取数据?答案都是不能。原因也是一样的,因为 PyODPS DataFrame 只是将计算代理给不保证有序、只有关系代数算子的引擎来执行。

如果系统本身的数据模型不是真正的 DataFrame 模型,仅仅让接口看起来像是远远不够的。

Mars DataFrame

因此这里要说到 Mars DataFrame,其实我们做 Mars 的初衷和这篇 paper 的想法是一致的,因为现有的系统虽然能很好地解决规模问题,但那些传统数据科学包中好的部分却被人遗忘了,我们希望 Mars 能保留这些库中好的部分,又能解决规模问题,也能充分利用新硬件。

Mars DataFrame 会自动将 DataFrame 分割成很多小的 chunk,每个 chunk 也是一个 DataFrame,而无论是 chunk 间还是 chunk 内的数据,都保证顺序。

image_20200415170452820

图里的示例中,一个行数 380、列数 370 的 DataFrame,被 Mars 分成 3x3 一共 9 个 chunk,根据计算在 CPU 还是 NVIDIA GPU 上进行,用 pandas DataFrame 或者 cuDF DataFrame 来存储数据和执行真正的计算。可以看到,Mars 既会在行上,也会在列上进行分割,这种在行上和列上的对等性,让 DataFrame 的矩阵本质能得以发挥。

在单机真正执行时,根据初始数据的位置,Mars 会自动把数据分散到多核或者多卡执行;对于分布式,会将计算分散到多台机器执行。

Mars DataFrame 保留了行标签、列标签和类型的概念。因此能够想象如同 pandas 一样,可以在比较大的数据集上根据标签进行筛选。

In [1]: import mars.dataframe as md                                             

In [2]: import mars.tensor as mt

In [8]: df = md.DataFrame(mt.random.rand(10000, 10, chunk_size=1000), 
   ...:                   index=md.date_range('2020-1-1', periods=10000))       

In [9]: df.loc['2020-4-15'].execute()                                           
Out[9]: 
0    0.622763
1    0.446635
2    0.007870
3    0.107846
4    0.288893
5    0.219340
6    0.228806
7    0.969435
8    0.033130
9    0.853619
Name: 2020-04-15 00:00:00, dtype: float64

Mars 会保持和 pandas 一致的排序特性,因此对于 groupby 等操作,无需担心结果和所想不一致。

In [6]: import mars.dataframe as md                                             

In [7]: df = md.read_csv('Downloads/bart-dataset/date-hour-soo-dest-2019.csv', n
   ...: ames=['Date','Hour','Origin','Destination','Trip Count'])               

In [8]: df.groupby('Date').mean()['Trip Count'].rolling(30).mean().plot() # 结果正确      
Out[8]: <matplotlib.axes._subplots.AxesSubplot at 0x11ff8ab90>

out2

对于 shift,不光结果正确,而且执行时能利用多核、多卡和分布式的能力。

In [3]: df.shift(1).head(10).execute()                                          
Out[3]: 
         Date  Hour Origin Destination  Trip Count
0         NaN   NaN    NaN         NaN         NaN
1  2019-01-01   0.0   12TH        16TH         4.0
2  2019-01-01   0.0   12TH        ANTC         1.0
3  2019-01-01   0.0   12TH        BAYF         1.0
4  2019-01-01   0.0   12TH        CIVC         2.0
5  2019-01-01   0.0   12TH        COLM         1.0
6  2019-01-01   0.0   12TH        COLS         1.0
7  2019-01-01   0.0   12TH        CONC         1.0
8  2019-01-01   0.0   12TH        DALY         1.0
9  2019-01-01   0.0   12TH        DELN         2.0

不只是 DataFrame

Mars 还包含 tensor 模块来支持并行和分布式化 numpy,以及 learn 模块来并行和分布式化 scikit-learn,因此可以想象,如 mars.tensor.linalg.svd 可以直接作用在 Mars DataFrame 上,这就赋予了 Mars 超越 DataFrame 本身的语义。

In [1]: import mars.dataframe as md                                             

In [2]: import mars.tensor as mt                                                

In [3]: df = md.DataFrame(mt.random.rand(10000, 10, chunk_size=1000))           

In [5]: mt.linalg.svd(df).execute()

总结

《Towards Scalable DataFrame Systems》赋予了 DataFrame 学术定义。而要做到可扩展的 DataFrame,首先必须是真正的 DataFrame,其次才是可扩展。

在我们看来,Mars 是真正的 DataFrame,它生来目标就是可扩展,而 Mars 又不仅仅是 DataFrame。在我们看来,Mars 在数据科学领域大有可为。

Mars 诞生于 MaxCompute 团队,MaxCompute 原名 ODPS,是一种快速、完全托管的EB级数据仓库解决方案。Mars 即将通过 MaxCompute 提供服务,购买了 MaxCompute 服务的用户届时可以开箱即用体验 Mars 服务。敬请期待。

参考

如果对 Mars 感兴趣,可以关注 Mars 团队专栏,或者钉钉扫二维码加入 Mars 讨论群。

IMG_8215

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

分享:
阿里巴巴大数据计算
使用钉钉扫一扫加入圈子
+ 订阅

阿里大数据官方技术圈

官方博客
链接