Mars 开源月报(2020.3)

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: 本月,Mars 发布了 0.4.0b1 ,0.4.0b2 和 0.3.2 以及 0.3.3,点击链接查看详细的 Release Notes。本月两次发布版本是特殊情况,0.4.0b2 修复了 0.4.0b1 中比较紧急的问题。

本月,Mars 发布了 0.4.0b10.4.0b20.3.2 以及 0.3.3,点击链接查看详细的 Release Notes。本月两次发布版本是特殊情况,0.4.0b2 修复了 0.4.0b1 中比较紧急的问题。

Mars 项目发布周期

这里先简述下 Mars 的版本发布周期。Mars 以一个月为发布周期,采用双版本发布策略,一般会同时发布 Pre-release 版本和正式版。Pre-release 版本里会包含更多激进的功能或改动,可能会不稳定,而开发中我们认为稳定的功能或增强会被同步到正式版里。

查看 Github 项目的 milestones 可以看到最新的 Pre-release 和正式版本。

查看 Github Projects 页面 可以看到归类的 issues 和 PRs。

image

v0.4 Release 是我们按版本归档的进行中的 issues 和 PRs。其他则是按模块划分。

新版本功能 Highlight

新版本我们花了大量时间来完善 DataFrame API,经过这个版本的努力,pandas 中的一些常见的接口都得到了支持。

更完善的聚合和分组聚合

  • #1030Groupby.aggregate 支持传入多个聚合函数。
  • #1054 支持了 DataFrame.aggregateSeries.aggregate
  • #1019#1069 支持了 cummax 等累积计算。

举个例子,在 pandas 中我们可以对 movielens 的数据 执行如下操作:

In [1]: import pandas as pd                                                     

In [2]: %%time 
   ...: df = pd.read_csv('Downloads/ml-20m/ratings.csv') 
   ...: df.groupby('movieId').agg({'rating': ['max', 'min', 'mean', 'std']}) 
   ...:  
   ...:                                                                         
CPU times: user 5.41 s, sys: 1.28 s, total: 6.7 s
Wall time: 4.3 s
Out[2]: 
        rating                         
           max  min      mean       std
movieId                                
1          5.0  0.5  3.921240  0.889012
2          5.0  0.5  3.211977  0.951150
3          5.0  0.5  3.151040  1.006642
4          5.0  0.5  2.861393  1.095702
5          5.0  0.5  3.064592  0.982140
...        ...  ...       ...       ...
131254     4.0  4.0  4.000000       NaN
131256     4.0  4.0  4.000000       NaN
131258     2.5  2.5  2.500000       NaN
131260     3.0  3.0  3.000000       NaN
131262     4.0  4.0  4.000000       NaN

[26744 rows x 4 columns]

我们根据电影的 ID 进行聚合,求用户评价的最大、最小、平均值以及标准差。

使用 Mars 则可以:

In [1]: import mars.dataframe as md                                             

In [2]: %%time 
   ...: df = md.read_csv('Downloads/ml-20m/ratings.csv') 
   ...: df.groupby('movieId').agg({'rating': ['max', 'min', 'mean', 'std']}).execute() 
   ...:  
   ...:                                                                         
CPU times: user 5.81 s, sys: 6.9 s, total: 12.7 s
Wall time: 1.54 s
Out[2]: 
        rating                         
           max  min      mean       std
movieId                                
1          5.0  0.5  3.921240  0.889012
2          5.0  0.5  3.211977  0.951150
3          5.0  0.5  3.151040  1.006642
4          5.0  0.5  2.861393  1.095702
5          5.0  0.5  3.064592  0.982140
...        ...  ...       ...       ...
131254     4.0  4.0  4.000000       NaN
131256     4.0  4.0  4.000000       NaN
131258     2.5  2.5  2.500000       NaN
131260     3.0  3.0  3.000000       NaN
131262     4.0  4.0  4.000000       NaN

[26744 rows x 4 columns]

代码几乎一致,除了 Mars 需要通过 execute() 触发执行。

ratings.csv 有 500M+,使用 Mars 在我的笔记本上运行就可以有数倍加速。当数据量更大的时候,使用 Mars 还可以有更好的加速效果,如果单机无法胜任,也可以使用 Mars 分布式用一致的代码加速执行。

排序

  • #1053 支持了 sort_index
  • #1046 支持了 sort_values

还是以 movielens 数据 为例。

In [1]: import pandas as pd                                                                                               

In [2]: %%time 
   ...: ratings = pd.read_csv('Downloads/ml-20m/ratings.csv') 
   ...: movies = pd.read_csv('Downloads/ml-20m/movies.csv') 
   ...: movie_rating = ratings.groupby('movieId', as_index=False).agg({'rating': 'mean'}) 
   ...: result = movie_rating.merge(movies[['movieId', 'title']], on='movieId') 
   ...: result.sort_values(by='rating', ascending=False) 
   ...:  
   ...:                                                                                                                   
CPU times: user 5.17 s, sys: 1.13 s, total: 6.3 s
Wall time: 4.05 s
Out[2]: 
       movieId  rating                                  title
19152    95517     5.0      Barchester Chronicles, The (1982)
21842   105846     5.0                   Only Daughter (2013)
17703    89133     5.0                   Boys (Drenge) (1977)
21656   105187     5.0              Linotype: The Film (2012)
21658   105191     5.0                    Rocaterrania (2009)
...        ...     ...                                    ...
26465   129784     0.5            Xuxa in Crystal Moon (1990)
18534    92479     0.5         Kisses for My President (1964)
26475   129834     0.5  Tom and Jerry: The Lost Dragon (2014)
24207   115631     0.5             Alone for Christmas (2013)
25043   119909     0.5                  Sharpe's Eagle (1993)

[26744 rows x 3 columns]

主要目标是将数据集中的电影按平均分从高到低进行排列。

到 Mars 这边,代码还是几乎一致。

In [1]: import mars.dataframe as md                                                                                       

In [2]: %%time 
   ...: ratings = md.read_csv('Downloads/ml-20m/ratings.csv') 
   ...: movies = md.read_csv('Downloads/ml-20m/movies.csv') 
   ...: movie_rating = ratings.groupby('movieId', as_index=False).agg({'rating': 'mean'}) 
   ...: result = movie_rating.merge(movies[['movieId', 'title']], on='movieId') 
   ...: result.sort_values(by='rating', ascending=False).execute() 
   ...:  
   ...:                                                                                                                   
CPU times: user 4.97 s, sys: 6.01 s, total: 11 s
Wall time: 1.39 s
Out[2]: 
       movieId  rating                                  title
19152    95517     5.0      Barchester Chronicles, The (1982)
21842   105846     5.0                   Only Daughter (2013)
17703    89133     5.0                   Boys (Drenge) (1977)
21656   105187     5.0              Linotype: The Film (2012)
21658   105191     5.0                    Rocaterrania (2009)
...        ...     ...                                    ...
26465   129784     0.5            Xuxa in Crystal Moon (1990)
18534    92479     0.5         Kisses for My President (1964)
26475   129834     0.5  Tom and Jerry: The Lost Dragon (2014)
24207   115631     0.5             Alone for Christmas (2013)
25043   119909     0.5                  Sharpe's Eagle (1993)

[26744 rows x 3 columns]

Mars 的排序采用了并行正则采样排序算法,在我们的文章(链接)中已经做了介绍,这里不再赘述。

更完善的索引支持

Mars 在之前的版本中就支持了 iloc,现在我们也支持了其他的索引方法。

  • #1042 中支持了 loc
  • #1101 中支持了 atiat
  • #1073 中支持了 md.date_range 方法。

通过 loc 的支持,使得基于索引的数据的查找更加方便。

In [1]: import mars.dataframe as md 
  
In [3]: import mars.tensor as mt

In [8]: df = md.DataFrame(mt.random.rand(10000, 10), index=md.date_range('2000-1-1', periods=10000))                      

In [9]: df.loc['2020-3-25'].execute()                                                                                     
Out[9]: 
0    0.372354
1    0.139235
2    0.511007
3    0.102200
4    0.908454
5    0.144455
6    0.290627
7    0.248334
8    0.912666
9    0.830526
Name: 2020-03-25 00:00:00, dtype: float64

自定义函数、字符串和时间处理

  • #1038 增加了 apply 的支持。
  • #1063 支持了 md.Series.strmd.Series.dt来处理字符串和时间列。

我们可以利用 apply 来计算每个城市(数据集)到杭州(东经120°12′,北纬30°16′)的距离。

In [1]: import numpy as np                                                                                                

In [2]: def haversine(lat1, lon1, lat2, lon2): 
   ...:     dlon = np.radians(lon2 - lon1) 
   ...:     dlat = np.radians(lat2 - lat1) 
   ...:     a = np.sin(dlat / 2) ** 2 + np.cos(np.radians(lat1)) * np.cos(np.radians(lat2)) * np.sin(dlon / 2) ** 2 
   ...:     c = 2 * np.arcsin(np.sqrt(a)) 
   ...:     r =  6371 
   ...:     return c * r 
   ...:                                                                                                                   

In [4]: import mars.dataframe as md                                                                                       

In [5]: df = md.read_csv('Downloads/world-cities-database/worldcitiespop.csv', chunk_bytes='16M', dtype={'Region': object}
   ...: )                                                                                                                 

In [6]: df.execute(fetch=False)                                                                                           

In [8]: df.apply(lambda r: haversine(r['Latitude'], r['Longitude'], 30.25, 120.17), result_type='reduce', axis=1).execute()                                                                                                                 
Out[8]: 
0          9789.135208
1          9788.270528
2          9788.270528
3          9788.270528
4          9789.307210
              ...     
248061    10899.720735
248062    11220.703197
248063    10912.645753
248064    11318.038981
248065    11141.080171
Length: 3173958, dtype: float64

移动窗口函数

  • #1045 增加了 rolling 移动窗口的支持。

移动窗口函数在金融领域使用频率很高,rolling 是在一个固定长度(也可能是固定的时间间隔)上进行一些聚合计算。以下是一个例子。

In [1]: import pandas_datareader.data as web                                                                                                                      

In [2]: data = web.DataReader("^TWII", "yahoo", "2000-01-01","2020-03-25")                                                                                        

In [3]: import mars.dataframe as md                                                                                                                               

In [4]: df = md.DataFrame(data)                                                                                                                                   

In [5]: df.rolling(10, min_periods=1).mean().execute()                                                                                                            
Out[5]: 
                    High           Low          Open         Close     Volume     Adj Close
Date                                                                                       
2000-01-04   8803.610352   8642.500000   8644.910156   8756.549805        0.0   8756.517578
2000-01-05   8835.645020   8655.259766   8667.754883   8803.209961        0.0   8803.177734
2000-01-06   8898.426758   8714.809896   8745.356445   8842.816732        0.0   8842.784180
2000-01-07   8909.012451   8720.964844   8772.374756   8844.580078        0.0   8844.547607
2000-01-10   8952.413867   8755.129883   8806.285742   8896.183984        0.0   8896.151172
...                  ...           ...           ...           ...        ...           ...
2020-03-19  10423.317090  10083.132910  10370.730078  10180.533887  4149640.0  10180.533887
2020-03-20  10202.623047   9833.786914  10105.280078   9971.761914  4366130.0   9971.761914
2020-03-23   9983.399023   9611.036914   9885.659082   9763.000977  3990040.0   9763.000977
2020-03-24   9821.716016   9436.392969   9703.275098   9591.208984  3927690.0   9591.208984
2020-03-25   9685.129980   9290.444922   9543.636035   9466.308984  4003760.0   9466.308984

[4974 rows x 6 columns]

下一个版本计划

下一个版本会是 0.4.0rc1 和 0.3.4,我们仍然会专注提升 DataFrame API 的覆盖率和性能,提升稳定性,并增加文档。

如果对 Mars 感兴趣,可以关注 Mars 团队专栏,或者钉钉扫二维码加入 Mars 讨论群。

IMG_8215

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
4天前
|
分布式计算 大数据 数据处理
技术评测:MaxCompute MaxFrame——阿里云自研分布式计算框架的Python编程接口
随着大数据和人工智能技术的发展,数据处理的需求日益增长。阿里云推出的MaxCompute MaxFrame(简称“MaxFrame”)是一个专为Python开发者设计的分布式计算框架,它不仅支持Python编程接口,还能直接利用MaxCompute的云原生大数据计算资源和服务。本文将通过一系列最佳实践测评,探讨MaxFrame在分布式Pandas处理以及大语言模型数据处理场景中的表现,并分析其在实际工作中的应用潜力。
24 2
|
7月前
|
SQL C++ 开发者
【技术解析 | 实践】Havenask-UDF定制
本节分享 Havenask UDF定制相关的内容,共包含3个部分,分关于 Havenask 的 UDF 相关的介绍、自定义 UDF 的开发及配置方法的介绍,最后将进行 UDF 定制的实际操作演示。
56679 1
|
SQL 存储 分布式计算
阿里云EMR 2.0:兼容开源,贡献开源,超越开源
本文整理自阿里云资深技术专家吴威(无谓)在 阿里云EMR2.0线上发布会 的分享。本文从开源的角度出发,分享了阿里云EMR团队的工作。
1087 0
阿里云EMR 2.0:兼容开源,贡献开源,超越开源
|
Python 机器学习/深度学习 数据处理
Mars学习笔记(一)
对阿里开源的、基于Python的大规模计算框架Mars的学习笔记系列。
926 0
Mars学习笔记(一)
|
人工智能 分布式计算 Cloud Native
向量化执行引擎框架 Gluten 宣布正式开源,并亮相 Spark 技术峰会
向量化执行引擎框架 Gluten 宣布正式开源,并亮相 Spark 技术峰会
885 0
向量化执行引擎框架 Gluten 宣布正式开源,并亮相 Spark 技术峰会
|
机器学习/深度学习 算法 数据可视化
「直播回顾」Mars应用与最佳实践
本文首先对Mars的概念、功能、优势进行了介绍,随后,对Mars几个典型的应用场景进行介绍,并通过两个Demo展示了在使用Mars后数据科学性能的提升,最后总结了Mars的最佳实践,让使用Mars更高效便捷。
6852 0
「直播回顾」Mars应用与最佳实践
|
机器学习/深度学习 分布式计算 数据可视化
「直播回顾」Mars:加速数据科学的新方式
本文从数据科学概念、背景和现状切入,引出加速数据科学的新方式Mars,并介绍了Mars具体能解决的一些问题和背后的逻辑、哲学,同时对Mars整体数据处理流程进行了介绍。
2793 0
「直播回顾」Mars:加速数据科学的新方式
|
机器学习/深度学习 分布式计算 并行计算
10月15日社区直播【Intel MLlib:构建平台优化的Spark机器学习】
Intel MLlib是一个为Apache Spark MLlib优化的软件包。它在保持和Spark MLlib兼容的同时,在底层利用原生算法库来实现在CPU和GPU上的最优化算法,同时使用Collective Communication来实现效率更高的节点间通信。我们的初步结果表明,该软件包在最小化应用改动的基础上,可以极大地提升MLlib算法的性能。
10月15日社区直播【Intel MLlib:构建平台优化的Spark机器学习】
|
分布式计算 算法 大数据
MaxCompute Mars开发指南
Mars 算法实践 人脸识别 Mars 是一个基于矩阵的统一分布式计算框架 ,而且 Mars 已经在 GitHub 中开源。当你看完 Mars 的介绍可能会问它能做什么,这几乎取决于你想做什么,因为 Mars 作为底层运算库,实现了 numpy 70% 的常用接口。
3089 0
|
存储 人工智能 分布式计算