PyOdps 0.4版本发布,从一个故事说起

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: PyOdps 0.4版本,DataFrame API支持使用pandas进行本地计算,用户因此能join ODPS和本地数据,也能进行本地debug,另外还有MapReduce API等新特性

有这么个故事(如有雷同,纯属巧合)。有一天,某运营同学给某开发同学一个excel文件,里面是个客户清单。

“帮我查下这些用户的消耗呢”。

开发同学扫了一眼,几百个用户。这个事肯定是可以办的,但是想到麻烦程度,开发同学心里肯定是有不少羊驼经过的啦。

“有点麻烦啊”,开发同学轻轻抱怨。

“我懂的,把这个表和ODPS里的表join下就好了嘛。”运营同学努努嘴。

“……”。于是,开发同学把excel数据导出成文本格式,然后dship上传到ODPS,ODPS上编写SQL,dship下载,大功告成。

这里说得很轻松,但其实整个过程真的挺麻烦呢。要是这个过程中还要对excel中的数据进行过滤,最终结果还要绘个图,还是需要不少时间。

但是,如果这个开发同学使用PyOdps 0.4+版本新特性,一切就都轻松写意了。

为了模拟这个过程,我们拿movielens 100K的数据做例子,现在本地有一个excel表格,里面有100个需要查询的用户,表格包含两个字段,分别是用户ID和年龄。在ODPS上,我们有一张电影评分表,现在我们要求出这100用户个中年龄在20-30之间,按每个年龄来求电影评分均值,并用条形图展现。

可以想象,这个过程如果按照前面的描述,有多麻烦。那么用PyOdps DataFrame API呢。

首先,我们读出本地Excel文件。

QQ20160406_0_2x

In [14]: from odps.df import read_excel

In [15]: users = read_excel('/Users/chine/userids.xlsx')

In [16]: users.head(10)
|==========================================|   1 /  1  (100.00%)         0s
Out[16]: 
    id  age
0   46   27
1  917   22
2  217   22
3  889   24
4  792   40
5  267   23
6  626   23
7  433   27
8  751   24
9  932   58

In [40]: users.count()
|==========================================|   1 /  1  (100.00%)         0s
100

然后我们用join语句,过滤出来电影评分表中这些用户的评分数据。

In [17]: ratings = DataFrame(o.get_table('pyodps_ml_100k_ratings'))

In [18]: ratings.head(10)
|==========================================|   1 /  1  (100.00%)         2s
Out[18]: 
   user_id  movie_id  rating  unix_timestamp
0      196       242       3       881250949
1      186       302       3       891717742
2       22       377       1       878887116
3      244        51       2       880606923
4      166       346       1       886397596
5      298       474       4       884182806
6      115       265       2       881171488
7      253       465       5       891628467
8      305       451       3       886324817
9        6        86       3       883603013

In [25]: filter_ratings = ratings.join(users.filter(users.age.between(20, 30)), ('user_id', 'id'))[ratings, lambda x, y: y.age]  
# 这里做字段抽取时,可以使用Collection,也可以使用lambda表达式,参数是左右两个Collection

In [26]: filter_ratings.head(10)
|==========================================|   1 /  1  (100.00%)        44s
Out[26]: 
   user_id  movie_id  rating  unix_timestamp  age
0        3       350       3       889237076   23
1        3       332       1       889237224   23
2        3       327       4       889237455   23
3        3       341       1       889237055   23
4        3       317       2       889237482   23
5        3       336       1       889237198   23
6        3       322       3       889237269   23
7        3       323       2       889237269   23
8        3       339       3       889237141   23
9        3       268       3       889236961   23

然后我们就可以按年龄聚合,求出评分均值啦。绘图也一气呵成。

In [28]: age_ratings = filter_ratings.groupby('age').agg(lambda x: x.rating.mean())

In [29]: age_ratings.head(10)
|==========================================|   1 /  1  (100.00%)        30s
Out[29]: 
   age  rating_mean
0   20     4.002309
1   21     4.051643
2   22     3.227513
3   23     3.519174
4   24     3.481013
5   25     3.774744
6   26     3.391509
7   27     3.355130
8   28     3.382883
9   29     3.705660

In [30]: age_ratings.plot(kind='bar', rot=45)
|==========================================|   1 /  1  (100.00%)        29s
Out[30]: <matplotlib.axes._subplots.AxesSubplot at 0x10b875f10>

age_ratings

超级简单,有木有!

这里的users其实是存在于本地的,而ratings是存在于ODPS上,用户依然可以join这两个Collection。其实对于0.4之前的版本,本地数据上传的接口也很容易(但是无法使用DataFrame API来进行本地过滤),但是对于0.4版本,不管一个Collection是存在于ODPS还是本地,用户都可以执行join和union的操作。

而这一切都源自0.4版本带来的新特性,DataFrame API的pandas计算后端。

DataFrame API使用pandas计算

我们知道,PyOdps DataFrame API类似于pandas的接口,但还是有些许不同的,那我们为什么不能用pandas来执行本地计算呢,这样也能充分利用pandas的一些特性,如支持各种数据输入。

所以,除了过去使用odps.models.Table来初始化DataFrame,我们也可以使用pandas DataFrame来初始化。

In [41]: import numpy as np

In [42]: import pandas as pd

In [44]: pandas_df = pd.DataFrame(np.random.random((10, 3)), columns=list('abc'))

In [45]: pandas_df
Out[45]: 
          a         b         c
0  0.583845  0.301504  0.764223
1  0.153269  0.335511  0.455193
2  0.725460  0.460367  0.294741
3  0.315234  0.907264  0.849361
4  0.678395  0.642199  0.746051
5  0.977872  0.841084  0.931561
6  0.903927  0.846036  0.982424
7  0.347098  0.373247  0.193810
8  0.672611  0.242942  0.381713
9  0.461411  0.687164  0.514689

In [46]: df = DataFrame(pandas_df)

In [49]: type(df)
Out[49]: odps.df.core.DataFrame

In [47]: df.head(3)
|==========================================|   1 /  1  (100.00%)         0s
Out[47]: 
          a         b         c
0  0.583845  0.301504  0.764223
1  0.153269  0.335511  0.455193
2  0.725460  0.460367  0.294741

In [48]: df[df.a < 0.5].a.sum()
|==========================================|   1 /  1  (100.00%)         0s
1.2770121422535428

这里转化成PyOdps DataFrame后,所有的计算也一样,变成延迟执行,PyOdps DataFrame在计算前的优化也同样适用。

这样做的好处是,除了前面我们提到的,能结合本地和ODPS做计算外;还有个好处就是方便进行本地调试。所以,我们可以用写出以下代码:

DEBUG = True

if DEBUG:
    # 这个操作使用tunnel下载,因此速度很快。对于分区表,需要给出所有分区值。
    df = ratings[:100].to_pandas(wrap=True)
else:
    df = ratings

在DEBUG的时候,我们能够利用PyOdps DataFrame在对原始表做切片操作时使用tunnel下载,速度很快的特性,选择原始表的一小部分数据来作为本地测试数据。值得注意的是, 本地计算通过不一定能在ODPS上也计算通过,比如自定义函数的沙箱限制

目前pandas计算后端尚不支持窗口函数。

apply和MapReduce API

使用apply对单行数据调用自定义函数

以前我们对于某个字段,能调用map来使用自定义函数,现在结合axis=1的apply,我们能对一行数据进行操作。

In [13]: ratings.apply(lambda row: row.rating / float(row.age), axis=1, reduce=True, types='float', names='rda').head(10)
|==========================================|   1 /  1  (100.00%)      1m44s
Out[13]: 
        rda
0  0.166667
1  0.166667
2  0.208333
3  0.208333
4  0.125000
5  0.208333
6  0.166667
7  0.208333
8  0.208333
9  0.125000

reduce为True的时候,会返回一个sequence,详细参考文档

MapReduce API

PyOdps DataFrame API也提供MapReduce API。我们还是以movielens 100K为例子,看如何使用。

现在假设我们需要求出每部电影前两名的评分,直接上代码。

from odps.df import output

@output(['movie_id', 'movie_title', 'movie_rating'], ['int', 'string', 'int'])
def mapper(row):
    yield row.movie_id, row.title, row.rating

@output(['title', 'top_rating'], ['string', 'int'])
def reducer(keys):
    i = [0]
    def h(row, done):
        if i[0] < 2:
            yield row.movie_title, row.movie_rating
        i[0] += 1
    return h

In [7]: top_ratings = ratings.map_reduce(mapper, reducer, group='movie_id', sort='movie_rating', ascending=False)

In [10]: top_ratings.head(10)
|==========================================|   1 /  1  (100.00%)      3m48s
Out[10]: 
               title  top_rating
0   Toy Story (1995)           5
1   Toy Story (1995)           5
2   GoldenEye (1995)           5
3   GoldenEye (1995)           5
4  Four Rooms (1995)           5
5  Four Rooms (1995)           5
6  Get Shorty (1995)           5
7  Get Shorty (1995)           5
8     Copycat (1995)           5
9     Copycat (1995)           5

利用刚刚说的本地DEBUG特性,我们也能使用本地计算来验证,计算结果能很快得出。人生苦短!

In [22]: local_ratings = ratings[:100].to_pandas(wrap=True)
|==========================================|   1 /  1  (100.00%)         2s

In [23]: local_ratings.map_reduce(mapper, reducer, group='movie_id', sort='movie_rating', ascending=False).head(10)
|==========================================|   1 /  1  (100.00%)         0s
Out[23]: 
                                               title  top_rating
0  Shanghai Triad (Yao a yao yao dao waipo qiao) ...           5
1                              Twelve Monkeys (1995)           4
2                               Seven (Se7en) (1995)           4
3                         Usual Suspects, The (1995)           5
4                                 Postino, Il (1994)           3
5                          Mr. Holland's Opus (1995)           4
6                                 Taxi Driver (1976)           5
7                                       Crumb (1994)           5
8                                   Star Wars (1977)           5
9                                   Star Wars (1977)           5

cache机制

在0.4之前的版本,我们提供了一个persist接口,来保存执行结果。但是这个操作是个立即执行接口。现在我们提供cache接口,cache的collection会被单独计算,但不会立即执行。

In [25]: tmpdf = ratings[ratings.title.len() > 10].cache()

In [26]: tmpdf['title', 'movie_id'].head(3)
|==========================================|   1 /  1  (100.00%)        35s
Out[26]: 
                  title  movie_id
0  Seven (Se7en) (1995)        11
1  Event Horizon (1997)       260
2      Star Wars (1977)        50

In [27]: tmpdf.count()  # tmpdf已经被cache,所以我们能立刻计算出数量
|==========================================|   1 /  1  (100.00%)         0s
99823

记住,目前的cache接口,计算的结果还是要落地的,并不是存放在内存中。

而一个collection如果已经被计算过,这个过程会自动触发cache机制,后续的计算过程会从这计算个向后进行,而不再需要从头计算。

其他特性

PyOdps 0.4版本还带来一些其他特性,比如join支持mapjoin(只对ODPS后端有效);Sequence上支持unique和nunique;execute_sql执行时支持设置hints,对于IPython插件,支持使用SET来设置hints,等等。

PyOdps下一步计划

对于PyOdps的DataFrame API来说,我们的短期目标是能完成ODPS SQL能做的所有事情,然后在这个基础上再带来更多SQL不容易做到的,但是却很有用的操作。现在,除了自定义聚合函数,我们已经能基本涵盖所有的SQL场景。

PyOdps非常年轻,期待大家来使用、提feature、贡献代码。

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
8月前
|
消息中间件 分布式计算 DataWorks
DataWorks常见问题之dataworks中lasticseatch8.9和logstash版本兼容问题如何解决
DataWorks是阿里云提供的一站式大数据开发与管理平台,支持数据集成、数据开发、数据治理等功能;在本汇总中,我们梳理了DataWorks产品在使用过程中经常遇到的问题及解答,以助用户在数据处理和分析工作中提高效率,降低难度。
|
DataWorks 数据可视化 前端开发
《阿里云飞天大数据平台 DataWorks 前端技术解密:工作流调度可视化》(脱敏版本)
## ![image.png](https://intranetproxy.alipay.com/skylark/lark/0/2021/png/13481/1614773723538-e8d99a86-b04d-47bb-86ad-90cdb07ac657.png#height=220&id=QQWI7&margin=%5Bobject%20Object%5D&name=image.png&or
842 0
|
8月前
|
DataWorks Oracle 关系型数据库
DataWorks操作报错合集之尝试从Oracle数据库同步数据到TDSQL的PG版本,并遇到了与RAW字段相关的语法错误,该怎么处理
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
111 0
|
6月前
|
DataWorks NoSQL fastjson
DataWorks操作报错合集之DataX进行MongoDB全量迁移的过程中,DataX的MongoDB Reader插件在初始化阶段找不到Fastjson 2.x版本的类库,该怎么办
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
8月前
|
分布式计算 DataWorks 大数据
DataWorks常见问题之地域版本确认如何解决
DataWorks是阿里云提供的一站式大数据开发与管理平台,支持数据集成、数据开发、数据治理等功能;在本汇总中,我们梳理了DataWorks产品在使用过程中经常遇到的问题及解答,以助用户在数据处理和分析工作中提高效率,降低难度。
|
DataWorks
DataWorks中如何查看本地的代码版本(线上未保存)
DataWorks中如何查看本地的代码版本(线上未保存)
115 1
|
存储 分布式计算 DataWorks
阿里云 DataWorks数据集成 的开源版本DataX
DataX 是阿里云 DataWorks数据集成 的开源版本,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。
阿里云 DataWorks数据集成 的开源版本DataX
|
Web App开发 开发者 Python