
程序猿一枚,把梦想揣进口袋的挨踢工作者。主要工作是分布式数据分析(DataFrame并行化框架),以及大规模分布式多维数组计算框架等。
本文主要从以下几个方面展开: Mars简介 典型场景 Demo 最佳实践 一、Mars简介 Mars是统一的数据科学平台,它用来加速传统的Python数据科学技术栈,在单机中也可以用多核加速,或用分布式来加速。Mars可以部署在单机的分布式集群,或者Kubernetes和Hadoop Yarn上。 Mars整个框架构建在单机的并行和分布式的这两个调度的基础之上,它的数据科学基础包括三个核心部分,Tensor、DataFrame和Remote。而构建在这个基础之上的,是Mars Learn模块,它可以兼容Scikit-learn API,能简单地进行更大数据规模的分布式处理。此外,Mars还支持深度学习和机器学习的框架,比如能轻松运行TensorFlow、PyTorch等,而且可视化也可以在Mars上完成。除此之外,Mars还支持了丰富的数据源。 从传统Python技术栈到Mars也非常简单,比如在NumPy和Pandas里要变成Mars,只需要替换import,然后后面变为延迟执行即可。 普通的Python函数,在调用的时候变成mr.spawn来延迟这个过程,最后通过execute来并发执行,不用担心Mars是运行在单机上还是分布式运行。 而Mars上的TensorFlow大部分也一样,区别在于main函数部分的变化。最后,我们需要通过run_tensorflow_script的方式把脚本运行到Mars中。 二、典型场景 场景1. CPU和GPU混合计算 在安全和金融领域可以用Mars做CPU和GPU的混合计算,加速现有的工作流。 在这个领域,因为传统大数据平台挖掘周期长,资源紧张,需要很久来执行任务,不能达到客户需求。所以能用Mars DataFrame来加速数据处理,它可以做大规模数据排序,并帮助用户做高阶统计和聚合分析。 另外在安全领域有很多无监督学习的算法,Mars learn能加速无监督学习,同时拉起分布式深度学习计算加速现有的深度学习训练。之后,对于某些计算任务也可以利用GPU来加速。 场景2. 可解释性计算 在广告领域,在广告归因和洞察特征的解释算法中,因为本身计算量大,所以耗时很长。这种情况下,单机加速是比较困难的,基于传统大数据平台的分布式也不太灵活,但是通过Mars remote,可以很轻松地把计算分布到几十台机器上来加速,达到百倍的性能提升。 场景3. 大规模K-最邻近算法 Mars非常广泛地应用在K-最邻近算法中,因为Embedding越来越流行,它让向量表述实体非常常见。另外,Mars的NearestNeighbors算法兼容scikit-learn,它里面有暴力算法,而用户也需要暴力算法来进行大规模计算,可以通过多个worker来完成,从而让性能提升百倍。最后,Mars支持分布式的方式加速Faiss和Proxima,达到千万级别和上亿级别的规模。 三、Demo Demo1. 分析豆瓣电影数据 我们从这个Demo看一下Mars如何加速pandas数据处理及其可视化。 开始演示之前我们需要安装Mars。这里已经创建了Jupyter,然后 pip install pymars。 安装之后,我们可以到IPython进行验证,可以看到下面的结果没有问题,接下来我们就可以进入到Jupyter notebook里。 我们开始demo。这个数据可以在GitHub地址下载,然后我们用pandas来分析电影的数据,使用ipython memory usage来查看内存使用。 我们的数据主要用到4个CSV文件,分别是movies、ratings、users和comments。 接下来根据上映日期统计有多少电影发布。这里先处理一下数据,让发行日期只取到年份,去掉日期,并对年份做聚合。 数据出来后,可以用pandas bokeh把图绘制出来,并通过交互式的方式查看。 接下来看电影评分的统计。首先把有评分的电影筛选出来,然后把豆瓣评分的数值数量从大到小进行排序。可以看到,最多的评分是6.8分。 同样,通过pandas bokeh把它画成柱状图,评分差不多呈现正态分布。 接下来做一个标签词云,看电影哪个标签词最多,这里从movies取出tags,用斜杠分割,然后max words是50。 接下来我们再对电影的Top K进行分析。首先按电影ID进行聚合,求出评价的平均值和个数。然后我们对评价个数进行过滤,从高到低,算出top20的电影。 然后做评论数据分析。因为评论是中文的,所以需要做一个分词,然后对每一句话做切分,在统计的时候进行排序。这里可以加一个进度条,在处理数据的时候方便看到进程。这个过程大概花了20分钟,所以在单机上跑大任务的时候对机器的压力还是比较大。 这是最终的词云图。 接下来我们用Mars做同样的分析任务。首先是对Mars环境进行部署,然后这里有5个worker,每个worker是8个CPU和32G内存。还是一样,我们打开内存的监控,做一些import,这里把import Pandas替换成import mars.dataframe,然后Numpy是import mars.tensor。 随后我们在SDK里来创建to mars dataframe,这一步几乎没有用到内存,最终得到的结果也和之前一样。 我们用同样的方式来分析上映日期的电影个数和电影评分。得益于Mars跟Pandas的高度兼容,我们也能用Pandas bokeh来呈现结果。 电影评论的分析也一样,但是在显示的时候,Mars只会拉取头几条和最后几条,所以客户端几乎没有内存使用。而且整个running过程只用了45秒,与之前的20分钟相比提升了几十倍性能。 接下来我们用Mars做一个地区的统计,让它有一个动态的效果。首先我们看一下刚刚计算过的已经released的电影dataframe,然后取1980-2019这几年的电影,而regions部分可能有多个,所以用斜杠分割开,最后执行排出top10地域电影。 然后我们通过bar chart race来生成动态效果。 Demo2. 豆瓣电影推荐 第二个demo我们会基于刚才豆瓣电影的数据来做一个推荐。我们首先会用TensorFlow Mars来进行训练,接着用Mars分布式KNN算法来加速召回计算。 我们先使用单机的技术栈,这个数据已经分成了训练和测试集,所以我们先to pandas把它下载到本地,接着来对用户和电影做一个label encode,把它变成一个数字,而不是字符串的值。随后我们对数据进行处理,先按照时间排序,然后按照用户进行分组,生成分组聚合的结果。 接下来开始训练,我们需要用TensorFlow训练出代表user的embedding。之前说过embedding,可以对任一实体用向量描述,所以得到embedding之后,我们在给用户推荐电影时就可以查找在这个向量空间里面跟这个用户比较接近的电影embedding。 训练后我们可以保存向量,这里的搜索规模是60万乘7万,单机花费了22分钟,但如果达到千万乘千万级别,搜索耗时要超过800小时,这是不可接受的。 接下来我们看如何用Mars来实现这一过程。首先创建一个Mars集群,这里有8个worker。然后和上面一样,对数据进行预处理,做label encode,按时间排序,按user分组生成分组聚合。 这里唯一的区别是Mars会自动推断DataFrame的结果,如果推断失败就需要用户自己提供dtypes和output type。 然后是执行和训练。这里TensorFlow可以写Python文件,不用写到notebook里。 接着我们用Mars的run tensorflow script来跑这个脚本,然后指定worker是8。可以看到,执行的时间缩小到了23分钟。同时,我们也拿到了最终的embedding,用Mars做embedding只需1分25秒,比刚刚的时间提升个十倍左右。1400万乘1400万也可以稳定在1小时左右,与单机800个小时相比提升是非常巨大的。 四、最佳实践 首先尽量不要使用to pandas和to numpy,因为这会把Mars的分布式数据变成单机的数据,失去了Mars本身的优势,除非这个操作不能用Mars实现;其次,Mars tensor、DataFrame和learn由于本身受限于API的原因需要自己写一些函数,所以可以考虑用Mars remote来加速,把操作抽象成函数;第三,Pandas的加速技巧在Mars DataFrame依然适用,比如可以使用更高效的数据类型,可以优先使用内建操作,使用apply取代循环。 以上就是今天的课程,欢迎大家继续关注后续内容。
本文分为4个部分: Mars的背景和现状 Mars解决了什么问题 Mars背后的哲学 总结与展望 一、Mars的背景和现状 说到加速数据科学的新方式,就不得不说什么是数据科学,以下是维基的定义: 数据科学通过运用各种相关的数据来帮助非专业人士来理解问题。第一,它的目标是从数据中提取输入价值的部分,第二,它要生产数据产品,第三它结合了非常多重要的技术,包括数学、统计、机器学习、可视化,最后,它需要真正解决问题。 它包含三个方面,计算机科学、数学和统计、领域和商业知识。它们结合起来分别是机器学习、软件开发和传统研究,中间是数据科学。 数据科学技术栈包含语言、数据分析、数据工程、机器学习、Web开发、运维和商业智能。每个技术栈都包含不同的工具,提供不同的数据服务。 传统Python数据科学栈的三大底座是NumPy、Pandas和SciPy。NumPy是最核心的部分,它用来做数值计算,几乎其他所有Python的数据科学技术栈都建立在NumPy上,因为它有最基础的数据结构,也就是多维数组;另外,Pandas也用NumPy实现,它上面有很多API来进行分析操作;而SciPy主要负责科学计算。在此基础上,是机器学习和可视化,同时还有丰富的Python函数。 上面是Python数据科学技术栈整体的状况,他们有几大好处,比如广泛使用,是事实标准;上手成本低,容易为初学者和学生入门;和语言结合紧密,能用Python来组织函数调用。但是它们都是单机的库,不能处理很大的数据量,所以需要用到大数据的数据工程技术栈,比如Hadoop、Hive、Spark等。它们虽然支持多语言,没有强绑定,但是学习门槛比较陡峭,也需要对系统本身有足够的了解。做数据科学需要把精力放在数据本身,而不是工具,但是这几个库让工作围绕着库展开,而非数据。因此,我们是否能把这两个工作连接起来,利用传统的技术价值而不是关注大数据系统本身,来解决很多问题。 现在大家说摩尔定律已经失效,我们可以回顾一下它的历史。早期它通过复杂指令集和精简指令集的方式让速度提升,但是随着缩放比例定律和阿姆达尔定律的终结,未来是不是要20年才能达到2倍效率的提升呢? 另外随着深度学习、机器学习和AI的火热,机器学习模型也会越来越大,它的训练已经呈指数级增长,但是摩尔定律并没有追上机器学习模型训练发展的速度。 另外一个现状就是技术栈的问题,NumPy、Pandas等它们更多只能在单核上来应用计算,但是阿姆达尔定律下的核数是会越来越多,所以并没有很好地进行利用。同时,不管是计算量还是数据规模的增长速度,都超过了摩尔定律的速度,所以也不能用单机解决问题。 基于以上,加速数据科学主要有两种方式。第一是Scale up,利用多核和更好的硬件,比如GPU、FPGA和TPU等,以及现有的库,包括Modin、CuPy、Rapids等来提升效率。第二是 Scale out,利用分布式的方式来加速,因为单核的性能总归是有限的,可以利用Ray、Dask和Modin等。而结合Scale up和Scale out,就可以构建一个大规模的更好的硬件集成,比如今天介绍的加速数据科学的新方式Mars。 二、Mars解决了什么问题 Mars就是我们试图构建的“桥”,能来兼顾小规模和大规模数据处理。大规模数据处理能构建集群,有三种主要方式,第一是在物理机上,第二是kubernetes,第三是Hadoop Yarn的调度器上,拉起Mars的集群。 Mars的核心基础部分对应着传统Python数据技术栈,比如Mars Tensor对应NumPy,DataFrame对应Pandas等。而构建在这个基础之上的,是Mars Learn模块,它可以兼容Scikit-learn API,能简单地进行更大数据规模的分布式处理。此外,Mars还支持深度学习和机器学习的框架,比如能轻松运行TensorFlow、PyTorch等,而且可视化也可以在Mars上宣布。除此之外,Mars还支持了丰富的数据源。 从传统Python技术栈到Mars也非常简单,比如在NumPy和Pandas里要变成Mars,只需要替换import,然后后面变为延迟执行即可。 普通的Python函数,在调用的时候变成mr.spawn来延迟这个过程,最后通过execute来并发执行,不用担心Mars是运行在单机上还是分布式运行。 而Mars上的TensorFlow大部分也一样,区别在于main函数部分的变化。 三、Mars背后的设计哲学 第一是分而治之,不重复造轮子。比如创建一个Mars的Tensor,对里面所有元素进行求和操作,在触发execute之后,Mars会生成小任务的计算图,它可以将小任务调度到分布式的集群中来运行。此外还做了很多优化,通过算子融合来提升性能。 第二,在Mars的世界,一切皆可并行。在Mars里实现了并行正则排序算法,比如创建一个cluster,有5个Worker,每个Worker是8核32G,那么相比单机的NumPy函数,它提升了4倍。 下面看下Mars整体数据的处理流程。我们通过客户端的代码触发执行,生成一个粗粒度的计算图,然后通过Web服务器提交任务,并将任务转发给Scheduler,然后在这里切分成小任务。随后,Scheduler根据Worker工作负载的情况来把它分发到各个机器上运行,这里会用到调度的策略等等。这个过程中,Mars会自动触发数据传输工作,保证在节点执行的时候它所有输入的数据都在这台机器上。此外Mars还会自动进行Spill操作,当我们内存不够的时候,它会把不常用的数据Spill到磁盘上,让Mars可以处理远超过这个内存能放下的数据规模。 四、总结与展望 首先Mars是完全开源的项目,完全遵循开源的规范,而且项目的progress都能在GitHub上看到;其次Mars有兼容性,它的API高度兼容NumPy、Pandas和Scikit-learn;同时,它的单机、多核与分布式的处理性能也更高;最后是交互性,Mars Remote可以把以前的代码变成分布式代码,并且内部也可以与第三方库做集成。 Github地址:https://github.com/mars-project/mars 下面是关于Mars未来的展望,大家可以访问专栏。未来我们的开发计划重心放在提升Mars learn的接口覆盖率和Mars DataFrame接口覆盖率上,统一单机和分布式执行层等等。 以上就是今天的课程,欢迎大家继续关注Mars。戳我观看直播回放
Mars 是一个并行和分布式 Python 框架,能轻松把单机大家耳熟能详的的 numpy、pandas、scikit-learn 等库,以及 Python 函数利用多核或者多机加速。这其中,并行和分布式 Python 函数主要利用 Mars Remote API。 启动 Mars 分布式环境可以参考: 命令行方式在集群中部署。 Kubernetes 中部署。 MaxCompute 开箱即用的环境,购买了 MaxCompute 服务的可以直接使用。 如何使用 Mars Remote API 使用 Mars Remote API 非常简单,只需要对原有的代码做少许改动,就可以分布式执行。 拿用蒙特卡洛方法计算 π 为例。代码如下,我们编写了两个函数,calc_chunk 用来计算每个分片内落在圆内的点的个数,calc_pi 用来把多个分片 calc_chunk 计算的结果汇总最后得出 π 值。 from typing import List import numpy as np def calc_chunk(n: int, i: int): # 计算n个随机点(x和y轴落在-1到1之间)到原点距离小于1的点的个数 rs = np.random.RandomState(i) a = rs.uniform(-1, 1, size=(n, 2)) d = np.linalg.norm(a, axis=1) return (d < 1).sum() def calc_pi(fs: List[int], N: int): # 将若干次 calc_chunk 计算的结果汇总,计算 pi 的值 return sum(fs) * 4 / N N = 200_000_000 n = 10_000_000 fs = [calc_chunk(n, i) for i in range(N // n)] pi = calc_pi(fs, N) print(pi) %%time 下可以看到结果: 3.1416312 CPU times: user 9.47 s, sys: 2.62 s, total: 12.1 s Wall time: 12.3 s 在单机需要 12.3 s。 要让这个计算使用 Mars Remote API 并行起来,我们不需要对函数做任何改动,需要变动的仅仅是最后部分。 import mars.remote as mr # 函数调用改成 mars.remote.spawn fs = [mr.spawn(calc_chunk, args=(n, i)) for i in range(N // n)] # 把 spawn 的列表传入作为参数,再 spawn 新的函数 pi = mr.spawn(calc_pi, args=(fs, N)) # 通过 execute() 触发执行,fetch() 获取结果 print(pi.execute().fetch()) %%time 下看到结果: 3.1416312 CPU times: user 29.6 ms, sys: 4.23 ms, total: 33.8 ms Wall time: 2.85 s 结果一模一样,但是却有数倍的性能提升。 可以看到,对已有的 Python 代码,Mars remote API 几乎不需要做多少改动,就能有效并行和分布式来加速执行过程。 一个例子 为了让读者理解 Mars Remote API 的作用,我们从另一个例子开始。现在我们有一个数据集,我们希望对它们做一个分类任务。要做分类,我们有很多算法和库可以选择,这里我们用 RandomForest、LogisticRegression,以及 XGBoost。 困难的地方是,除了有多个模型选择,这些模型也会包含多个超参,那哪个超参效果最好呢?对于调参不那么有经验的同学,跑过了才知道。所以,我们希望能生成一堆可选的超参,然后把他们都跑一遍,看看效果。 准备数据 这个例子里我们使用 otto 数据集。 首先,我们准备数据。读取数据后,我们按 2:1 的比例把数据分成训练集和测试集。 import pandas as pd from sklearn.preprocessing import LabelEncoder from sklearn.model_selection import train_test_split def gen_data(): df = pd.read_csv('otto/train.csv') X = df.drop(['target', 'id'], axis=1) y = df['target'] label_encoder = LabelEncoder() label_encoder.fit(y) y = label_encoder.transform(y) return train_test_split(X, y, test_size=0.33, random_state=123) X_train, X_test, y_train, y_test = gen_data() 模型 接着,我们使用 scikit-learn 的 RandomForest 和 LogisticRegression 来处理分类。 RandomForest: from sklearn.ensemble import RandomForestClassifier def random_forest(X_train: pd.DataFrame, y_train: pd.Series, verbose: bool = False, **kw): model = RandomForestClassifier(verbose=verbose, **kw) model.fit(X_train, y_train) return model 接着,我们生成供 RandomForest 使用的超参,我们用 yield 的方式来迭代返回。 def gen_random_forest_parameters(): for n_estimators in [50, 100, 600]: for max_depth in [None, 3, 15]: for criterion in ['gini', 'entropy']: yield { 'n_estimators': n_estimators, 'max_depth': max_depth, 'criterion': criterion } LogisticRegression 也是这个过程。我们先定义模型。 from sklearn.linear_model import LogisticRegression def logistic_regression(X_train: pd.DataFrame, y_train: pd.Series, verbose: bool = False, **kw): model = LogisticRegression(verbose=verbose, **kw) model.fit(X_train, y_train) return model 接着生成供 LogisticRegression 使用的超参。 def gen_lr_parameters(): for penalty in ['l2', 'none']: for tol in [0.1, 0.01, 1e-4]: yield { 'penalty': penalty, 'tol': tol } XGBoost 也是一样,我们用 XGBClassifier 来执行分类任务。 from xgboost import XGBClassifier def xgb(X_train: pd.DataFrame, y_train: pd.Series, verbose: bool = False, **kw): model = XGBClassifier(verbosity=int(verbose), **kw) model.fit(X_train, y_train) return model 生成一系列超参。 def gen_xgb_parameters(): for n_estimators in [100, 600]: for criterion in ['gini', 'entropy']: for learning_rate in [0.001, 0.1, 0.5]: yield { 'n_estimators': n_estimators, 'criterion': criterion, 'learning_rate': learning_rate } 验证 接着我们编写验证逻辑,这里我们使用 log_loss 来作为评价函数。 from sklearn.metrics import log_loss def metric_model(model, X_test: pd.DataFrame, y_test: pd.Series) -> float: if isinstance(model, bytes): model = pickle.loads(model) y_pred = model.predict_proba(X_test) return log_loss(y_test, y_pred) def train_and_metric(train_func, train_params: dict, X_train: pd.DataFrame, y_train: pd.Series, X_test: pd.DataFrame, y_test: pd.Series, verbose: bool = False ): # 把训练和验证封装到一起 model = train_func(X_train, y_train, verbose=verbose, **train_params) metric = metric_model(model, X_test, y_test) return model, metric 找出最好的模型 做好准备工作后,我们就开始来跑模型了。针对每个模型,我们把每次生成的超参们送进去训练,除了这些超参,我们还把 n_jobs 设成 -1,这样能更好利用单机的多核。 results = [] # ------------- # Random Forest # ------------- for params in gen_random_forest_parameters(): print(f'calculating on {params}') # fixed random_state params['random_state'] = 123 # use all CPU cores params['n_jobs'] = -1 model, metric = train_and_metric(random_forest, params, X_train, y_train, X_test, y_test) print(f'metric: {metric}') results.append({'model': model, 'metric': metric}) # ------------------- # Logistic Regression # ------------------- for params in gen_lr_parameters(): print(f'calculating on {params}') # fixed random_state params['random_state'] = 123 # use all CPU cores params['n_jobs'] = -1 model, metric = train_and_metric(logistic_regression, params, X_train, y_train, X_test, y_test) print(f'metric: {metric}') results.append({'model': model, 'metric': metric}) # ------- # XGBoost # ------- for params in gen_xgb_parameters(): print(f'calculating on {params}') # fixed random_state params['random_state'] = 123 # use all CPU cores params['n_jobs'] = -1 model, metric = train_and_metric(xgb, params, X_train, y_train, X_test, y_test) print(f'metric: {metric}') results.append({'model': model, 'metric': metric}) 运行一下,需要相当长时间,我们省略掉一部分输出内容。 calculating on {'n_estimators': 50, 'max_depth': None, 'criterion': 'gini'} metric: 0.6964123781828575 calculating on {'n_estimators': 50, 'max_depth': None, 'criterion': 'entropy'} metric: 0.6912312790832288 # 省略其他模型的输出结果 CPU times: user 3h 41min 53s, sys: 2min 34s, total: 3h 44min 28s Wall time: 31min 44s 从 CPU 时间和 Wall 时间,能看出来这些训练还是充分利用了多核的性能。但整个过程还是花费了 31 分钟。 使用 Remote API 分布式加速 现在我们尝试使用 Remote API 通过分布式方式加速整个过程。 集群方面,我们使用最开始说的第三种方式,直接在 MaxCompute 上拉起一个集群。大家可以选择其他方式,效果是一样的。 n_cores = 8 mem = 2 * n_cores # 16G # o 是 MaxCompute 入口,这里创建 10 个 worker 的集群,每个 worker 8核16G cluster = o.create_mars_cluster(10, n_cores, mem, image='extended') 为了方便在分布式读取数据,我们对数据处理稍作改动,把数据上传到 MaxCompute 资源。对于其他环境,用户可以考虑 HDFS、Aliyun OSS 或者 Amazon S3 等存储。 if not o.exist_resource('otto_train.csv'): with open('otto/train.csv') as f: # 上传资源 o.create_resource('otto_train.csv', 'file', fileobj=f) def gen_data(): # 改成从资源读取 df = pd.read_csv(o.open_resource('otto_train.csv')) X = df.drop(['target', 'id'], axis=1) y = df['target'] label_encoder = LabelEncoder() label_encoder.fit(y) y = label_encoder.transform(y) return train_test_split(X, y, test_size=0.33, random_state=123) 稍作改动之后,我们使用 mars.remote.spawn 方法来让 gen_data 调度到集群上运行。 import mars.remote as mr # n_output 说明是 4 输出 # execute() 执行后,数据会读取到 Mars 集群内部 data = mr.ExecutableTuple(mr.spawn(gen_data, n_output=4)).execute() # remote_ 开头的都是 Mars 对象,这时候数据在集群内,这些对象只是引用 remote_X_train, remote_X_test, remote_y_train, remote_y_test = data 目前 Mars 能正确序列化 numpy ndarray、pandas DataFrame 等,还不能序列化模型,所以,我们要对 train_and_metric 稍作改动,把模型 pickle 了之后再返回。 def distributed_train_and_metric(train_func, train_params: dict, X_train: pd.DataFrame, y_train: pd.Series, X_test: pd.DataFrame, y_test: pd.Series, verbose: bool = False ): model, metric = train_and_metric(train_func, train_params, X_train, y_train, X_test, y_test, verbose=verbose) return pickle.dumps(model), metric 后续 Mars 支持了序列化模型后可以直接 spawn 原本的函数。 接着我们就对前面的执行过程稍作改动,把函数调用全部都用 mars.remote.spawn 来改写。 import numpy as np tasks = [] models = [] metrics = [] # ------------- # Random Forest # ------------- for params in gen_random_forest_parameters(): # fixed random_state params['random_state'] = 123 task = mr.spawn(distributed_train_and_metric, args=(random_forest, params, remote_X_train, remote_y_train, remote_X_test, remote_y_test), kwargs={'verbose': 2}, n_output=2 ) tasks.extend(task) # 把模型和评价分别存储 models.append(task[0]) metrics.append(task[1]) # ------------------- # Logistic Regression # ------------------- for params in gen_lr_parameters(): # fixed random_state params['random_state'] = 123 task = mr.spawn(distributed_train_and_metric, args=(logistic_regression, params, remote_X_train, remote_y_train, remote_X_test, remote_y_test), kwargs={'verbose': 2}, n_output=2 ) tasks.extend(task) # 把模型和评价分别存储 models.append(task[0]) metrics.append(task[1]) # ------- # XGBoost # ------- for params in gen_xgb_parameters(): # fixed random_state params['random_state'] = 123 # 再指定并发为核的个数 params['n_jobs'] = n_cores task = mr.spawn(distributed_train_and_metric, args=(xgb, params, remote_X_train, remote_y_train, remote_X_test, remote_y_test), kwargs={'verbose': 2}, n_output=2 ) tasks.extend(task) # 把模型和评价分别存储 models.append(task[0]) metrics.append(task[1]) # 把顺序打乱,目的是能分散到 worker 上平均一点 shuffled_tasks = np.random.permutation(tasks) _ = mr.ExecutableTuple(shuffled_tasks).execute() 可以看到代码几乎一致。 运行查看结果: CPU times: user 69.1 ms, sys: 10.9 ms, total: 80 ms Wall time: 1min 59s 时间一下子从 31 分钟多来到了 2 分钟,提升 15x+。但代码修改的代价可以忽略不计。 细心的读者可能注意到了,分布式运行的代码中,我们把模型的 verbose 给打开了,在分布式环境下,因为这些函数远程执行,打印的内容只会输出到 worker 的标准输出流,我们在客户端不会看到打印的结果,但 Mars 提供了一个非常有用的接口来让我们查看每个模型运行时的输出。 以第0个模型为例,我们可以在 Mars 对象上直接调用 fetch_log 方法。 print(models[0].fetch_log()) 输出我们简略一部分。 building tree 1 of 50 building tree 2 of 50 building tree 3 of 50 building tree 4 of 50 building tree 5 of 50 building tree 6 of 50 # 中间省略 building tree 49 of 50 building tree 50 of 50 要看哪个模型都可以通过这种方式。试想下,如果没有 fetch_log API,你确想看中间过程的输出有多麻烦。首先这个函数在哪个 worker 上执行,不得而知;然后,即便知道是哪个 worker,因为每个 worker 上可能有多个函数执行,这些输出就可能混杂在一起,甚至被庞大日志淹没了。fetch_log 接口让用户不需要关心在哪个 worker 上执行,也不用担心日志混合在一起。 想要了解 fetch_log 接口,可以查看 文档。 还有更多 Mars Remote API 的能力其实不止这些,举个例子,在 remote 内部可以 spawn 新的函数;也可以调用 Mars tensor、DataFrame 或者 learn 的算法。这些内容,读者们可以先行探索,后续我们再写别的文章介绍。 总结 Mars Remote API 通过并行和分布式 Python 函数,用很小的修改代价,极大提升了执行效率。 对 Mars 项目感兴趣的读者们,欢迎 star Github 项目,以及订阅我们的专栏。 Mars 开源地址:https://github.com/mars-project/mars Mars 团队专栏:https://zhuanlan.zhihu.com/mars-project 联系我们 除了可以在 Github Issues 和我们联系,也可以加入钉钉群 32697156 和我们交流。
Mars 简介 Mars 能利用并行和分布式技术,加速 Python 数据科学栈,包括 numpy、pandas 和 scikit-learn。新的 Remote API 能轻松并行执行 Python 函数。此外,也能轻松与 TensorFlow、PyTorch 和 XGBoost 集成。 Mars tensor 的接口和 numpy 保持一致,但支持大规模高维数组。样例代码如下。 import mars.tensor as mt a = mt.random.rand(10000, 50) b = mt.random.rand(50, 5000) a.dot(b).execute() Mars DataFrame 接口和 pandas 保持一致,但可以支撑大规模数据处理和分析。样例代码如下。 import mars.dataframe as md ratings = md.read_csv('Downloads/ml-20m/ratings.csv') movies = md.read_csv('Downloads/ml-20m/movies.csv') movie_rating = ratings.groupby('movieId', as_index=False).agg({'rating': 'mean'}) result = movie_rating.merge(movies[['movieId', 'title']], on='movieId') result.sort_values(by='rating', ascending=False).execute() Mars learn 保持和 scikit-learn 接口一致。样例代码如下。 import mars.dataframe as md from mars.learn.neighbors import NearestNeighbors df = md.read_csv('data.csv') nn = NearestNeighbors(n_neighbors=10) nn.fit(df) neighbors = nn.kneighbors(df).fetch() Mars learn 可以很方便地与 TensorFlow、PyTorch 和 XGBoost 集成,点击链接查看文档。 Mars Remote 可以轻松并行和分布式执行一系列 Python 函数。样例代码如下: import numpy as np import mars.remote as mr def calc_chunk(n, i): rs = np.random.RandomState(i) a = rs.uniform(-1, 1, size=(n, 2)) d = np.linalg.norm(a, axis=1) return (d < 1).sum() def calc_pi(fs, N): return sum(fs) * 4 / N N = 200_000_000 n = 10_000_000 fs = [mr.spawn(calc_chunk, args=(n, i)) for i in range(N // n)] pi = mr.spawn(calc_pi, args=(fs, N)) print(pi.execute().fetch()) 这里 calc_chunk 被 spawn 了 20次,这20次调用可以分布到多核,或者到集群里并行执行。最后 Mars 会自动等待这20次调用结束后触发 calc_pi 的执行。 在 MaxCompute 上使用 Mars,我们提供了简单易用的接口来拉起 Mars 集群,用户不需要关心安装和维护集群。同时,通过 MaxCompute 拉起的 Mars,也支持直接读写 MaxCompute 表。 申请试用 目前我们采用申请开通的方式,公共云用户请提工单申请。 环境准备 要在 MaxCompute 上运行 Mars,需要有相应的运行环境。这可以分为如下几种情况。 开箱即用的环境,如 dataworks,会包含所需要的依赖。 其他环境,需要自己安装相关依赖。 我们分别展开。 开箱即用的环境 开箱即用的环境,如 dataworks 的 pyodps3 节点,已经包含了 PyODPS 和 Mars。 在新建的 pyodps3 节点里运行如下命令检查版本,确保满足要求。 from odps import __version__ as odps_version from mars import __version__ as mars_version print(odps_version) print(mars_version) 输出的第一个为 PyODPS 版本,第二个为 Mars 版本。要求 PyODPS 至少是 0.9.0 。 其他环境 这个环境就要求通过 pip 安装 PyODPS 和 Mars。Python 版本推荐使用 3.7 版本,至少需要是 3.5 版本。 通过如下命令安装: pip install -U pip # 可选,确保 pip 版本足够新 pip install pyarrow==0.12.1 # 目前 pyarrow 版本固定到 0.12.1 pip install pyodps>0.9.0 # pyodps 需要至少 0.9.0 pip install pymars>=0.4.0rc1 # mars 版本需要至少是 0.4.0rc1 准备 ODPS 入口 ODPS 入口是 MaxCompute 所有操作的基础: 对于开箱即用的环境,如 dataworks,我们会自动创建 o 即 ODPS 入口实例,因此可以不需要创建。 对于其他环境,需要通过 access_id、access_key 等参数创建,详细参考 文档。 基本概念 MaxCompute 任务实例:MaxCompute 上任务以 instance 概念存在。Mars 集群也是通过一个 MaxCompute Instance 拉起。 Logview 地址:每个 MaxCompute instance 包含一个 logview 地址来查看任务状态。拉起 Mars 集群的 instance 也不例外。 Mars UI: Mars 集群拉起后,会包含一个 Web UI,通过这个 Web UI,可以查看 Mars 集群、任务状态,可以提交任务。当集群拉起后,一般来说就不需要和 MaxCompute 任务实例交互了。 Mars session:Mars session 和具体的执行有关,一般情况下用户不需要关心 session,因为会包含默认的 session。通过 o.create_mars_cluster 创建了 Mars 集群后,会创建默认连接到 Mars 集群的 session。 Jupyter Notebook:Jupyter Notebook 是一个基于网页的用于交互式计算的应用程序,可以用来开发、文档编写、运行代码和展示结果。 基础用法 创建 Mars 集群 准备好环境后,接着我们就可以拉起 Mars 集群了。 有了 o 这个对象后,拉起 Mars 集群非常简单,只需要运行如下代码。 from odps import options options.verbose = True # 在 dataworks pyodps3 里已经设置,所以不需要前两行代码 client = o.create_mars_cluster(5, 4, 16, min_worker_num=3) 这个例子里指定了 worker 数量为 5 的集群,每个 worker 是4核、16G 内存的配置,min_worker_num 指当 worker 已经起了3个后,就可以返回 client 对象了,而不用等全部 5 个 worker 都启动再返回。Mars 集群的创建过程可能比较慢,需要耐心等待。 注意:申请的单个 worker 内存需大于 1G,CPU 核数和内存的最佳比例为 1:4,例如单 worker 4核、16G。同时,新建的 worker 个数也不要超过 30 个,否则会对镜像服务器造成压力,如果需要使用超过 30 个 worker,请工单申请。 这个过程中会打印 MaxCompute instance 的 logview、 Mars UI 以及 Notebook 地址。Mars UI 可以用来连接 Mars 集群,亦可以用来查看集群、任务状态。 Mars 集群的创建就是一个 MaxCompute 任务,因此也有 instance id、logview 等 MaxCompute 通用的概念。 提交作业 Mars 集群创建的时候会设置默认 session,通过 .execute() 执行时任务会被自动提交到集群。 import mars.dataframe as md import mars.tensor as mt md.DataFrame(mt.random.rand(10, 3)).execute() # execute 自动提交任务到创建的集群 停止并释放集群 目前一个 Mars 集群超过3天就会被自动释放。当 Mars 集群不再需要使用时,也可以通过调用 client.stop_server() 手动释放: client.stop_server() MaxCompute 表读写支持 创建了 Mars 集群后,集群内的 Mars 任务可以直读和直写 MaxCompute 表。 读表 通过 o.to_mars_dataframe 来读取 MaxCompute 表,并返回 Mars DataFrame。 In [1]: df = o.to_mars_dataframe('test_mars') In [2]: df.head(6).execute() Out[2]: col1 col2 0 0 0 1 0 1 2 0 2 3 1 0 4 1 1 5 1 2 写表 通过 o.persist_mars_dataframe(df, 'table_name') 将 Mars DataFrame 保存成 MaxCompute 表。 In [3]: df = o.to_mars_dataframe('test_mars') In [4]: df2 = df + 1 In [5]: o.persist_mars_dataframe(df2, 'test_mars_persist') # 保存 Mars DataFrame In [6]: o.get_table('test_mars_persist').to_df().head(6) # 通过 PyODPS DataFrame 查看数据 col1 col2 0 1 1 1 1 2 2 1 3 3 2 1 4 2 2 5 2 3 使用 Mars 集群自带的 Jupyter Notebook 创建 Mars 集群会自动创建一个 Jupyter Notebook 以编写代码。 新建一个 Notebook 会自动设置 session,提交任务到集群。因此在这个 notebook 内也不需要显示创建 session。 import mars.dataframe as md md.DataFrame(mt.random.rand(10, 3)).sum().execute() # 在 notebook 里运行,execute 自动提交到当前集群 有一点要注意:这个 notebook 不会保存你的 notebook 文件,所以要记得自行保存。 用户也可以使用自己的 notebook 连接到集群,此时参考 使用已经创建的 Mars 集群 章节。 其他用法 使用已经创建的 Mars 集群 首先,我们可以通过 instance id 重建 Mars 集群的 client。 client = o.create_mars_cluster(instance_id=**instance-id**) 如果只是想使用 Mars,可以使用 Mars session 来连接。给定 Mars UI 的地址。则: from mars.session import new_session new_session('**Mars UI address**').as_default() # 设置为默认 session 获取 Mars UI 地址 Mars 集群创建的时候指定了 options.verbose=True 会打印 Mars UI 地址。 也可以通过 client.endpoint 来获取 Mars UI。 print(client.endpoint) 获取 Logview 地址 创建集群的时候指定了 options.verbose=True 会自动打印 logview。 也可以通过 client.get_logview_address() 获取 logview 地址。 print(client.get_logview_address()) 获取 Jupyter Notebook 地址 Mars 集群创建的时候指定了 options.verbose=True 会打印 Jupyter Notebook 地址。 也可以通过 client.get_notebook_endpoint() 获取 Jupyter Notebook 地址。 print(client.get_notebook_endpoint()) Mars 和 PyODPS DataFrame 对比 有同学会问,Mars 和 PyODPS DataFrame 有什么区别呢? API Mars DataFrame 的接口完全兼容 pandas。除了 DataFrame,Mars tensor 兼容 numpy,Mars learn 兼容 scikit-learn。 而 PyODPS 只有 DataFrame 接口,和 pandas 的接口存在着很多不同。 索引 Mars DataFrame 有 pandas 索引的概念。 In [1]: import mars.dataframe as md In [5]: import mars.tensor as mt In [7]: df = md.DataFrame(mt.random.rand(10, 3), index=md.date_range('2020-5-1', periods=10)) In [9]: df.loc['2020-5'].execute() Out[9]: 0 1 2 2020-05-01 0.061912 0.507101 0.372242 2020-05-02 0.833663 0.818519 0.943887 2020-05-03 0.579214 0.573056 0.319786 2020-05-04 0.476143 0.245831 0.434038 2020-05-05 0.444866 0.465851 0.445263 2020-05-06 0.654311 0.972639 0.443985 2020-05-07 0.276574 0.096421 0.264799 2020-05-08 0.106188 0.921479 0.202131 2020-05-09 0.281736 0.465473 0.003585 2020-05-10 0.400000 0.451150 0.956905 PyODPS 里没有索引的概念,因此跟索引有关的操作全部都不支持。 数据顺序 Mars DataFrame 一旦创建,保证顺序,因此一些时序操作比如 shift,以及向前向后填空值如ffill、bfill ,只有 Mars DataFrame 支持。 In [3]: df = md.DataFrame([[1, None], [None, 1]]) In [4]: df.execute() Out[4]: 0 1 0 1.0 NaN 1 NaN 1.0 In [5]: df.ffill().execute() # 空值用上一行的值 Out[5]: 0 1 0 1.0 NaN 1 1.0 1.0 PyODPS 由于背后使用 MaxCompute 计算和存储数据,而 MaxCompute 并不保证数据顺序,所以这些操作在 MaxCompute 上都无法支持。 执行层 PyODPS 本身只是个客户端,不包含任何服务端部分。PyODPS DataFrame 在真正执行时,会将计算编译到 MaxCompute SQL 执行。因此,PyODPS DataFrame 支持的操作,取决于 MaxCompute SQL 本身。此外,每一次调用 execute 方法时,会提交一次 MaxCompute 作业,需要在集群内调度。 Mars 本身包含客户端和分布式执行层。通过调用 o.create_mars_cluster ,会在 MaxCompute 内部拉起 Mars 集群,一旦 Mars 集群拉起,后续的交互就直接和 Mars 集群进行。计算会直接提交到这个集群,调度开销极小。在数据规模不是特别大的时候,Mars 应更有优势。 使用场景指引 有同学会关心,何时使用 Mars,何时使用 PyODPS DataFrame?我们分别阐述。 适合 Mars 的使用场景。 如果你经常使用 PyODPS DataFrame 的 to_pandas() 方法,将 PyODPS DataFrame 转成 pandas DataFrame,推荐使用 Mars DataFrame。 Mars DataFrame 目标是完全兼容 pandas 的接口以及行为,如果你熟悉 pandas 的接口,而不愿意学习 PyODPS DataFrame 的接口,那么使用 Mars。 Mars DataFrame 因为兼容 pandas 的行为,因此如下的特性如果你需要用到,那么使用 Mars。 Mars DataFrame 包含行和列索引,如果需要使用索引,使用 Mars。 Mars DataFrame 创建后会保证顺序,通过 iloc 等接口可以获取某个偏移的数据。如 df.iloc[10] 可以获取第10行数据。此外,如 df.shift() 、df.ffill() 等需要有保证顺序特性的接口也在 Mars DataFrame 里得到了实现,有这方面的需求可以使用 Mars。 Mars 还包含 Mars tensor 来并行和分布式化 Numpy,以及 Mars learn 来并行和分布式化 scikit-learn、以及支持在 Mars 集群里分布式运行 TensorFlow、PyTorch 和 XGBoost。有这方面的需求使用 Mars。 Mars 集群一旦创建,后续不再需要通过 MaxCompute 调度,任务可以直接提交到 Mars 集群执行;此外,Mars 对于中小型任务(数据量 T 级别以下),会有较好的性能。这些情况可以使用 Mars。 适合 PyODPS DataFrame 的使用场景 PyODPS DataFrame 会把 DataFrame 任务编译成 MaxCompute SQL 执行,如果希望依托 MaxCompute 调度任务,使用 PyODPS DataFrame。 PyODPS DataFrame 会编译任务到 MaxCompute 执行,由于 MaxCompute 相当稳定,而 Mars 相对比较新,如果对稳定性有很高要求,那么使用 PyODPS DataFrame。 数据量特别大(T 级别以上),使用 PyODPS DataFrame。 Mars 参考文档 Mars 开源地址:https://github.com/mars-project/mars Mars 文档:https://docs.pymars.org/zh_CN/latest/ Mars 团队专栏:https://zhuanlan.zhihu.com/mars-project 技术支持 技术支持请加 PyODPS 钉钉群:11701793 FAQ Q:一个用户创建的 Mars 集群,别人能不能用。 A:可以,参考 使用已经创建的 Mars 集群 章节。
文章原载于 Mars 团队专栏,欢迎关注。 从这篇文章开始,我们开始一个新的读 paper 系列。 今天要介绍的 paper 是 Towards Scalable Dataframe Systems,目前还是预印本。作者 Devin Petersohn 来自 Riselab,该实验室的前身是大名鼎鼎的 APMLab,诞生了 Apache Spark、Apache Mesos 等一系列著名开源项目。 个人觉得这篇 paper 蛮有意义的,第一次(据我所知)试图在学术上对 DataFrame 做定义,给了很好的理论指导意义。 这篇文章我不会拘泥于原 paper,我会加入自己的理解。本篇文章会大致分三部分: 什么是真正的 DataFrame? 为什么现在的所谓 DataFrame 系统,典型的如 Spark DataFrame,有可能正在杀死 DataFrame 的原本含义。 从 Mars DataFrame 的角度来看这个问题。 什么是真正的 DataFrame? 起源 最早的 "DataFrame" (开始被称作 "data frame"),来源于贝尔实验室开发的 S 语言。"data frame" 在 1990 年就发布了,书《S 语言统计模型》第3章里详述了它的概念,书里着重强调了 dataframe 的矩阵起源。 书中描述 DataFrame 看上去很像矩阵,且支持类似矩阵的操作;同时又很像关系表。 R 语言,作为 S 语言的开源版本,于 2000 年发布了第一个稳定版本,并且实现了 dataframe。pandas 于 2009 年被开发,Python 中于是也有了 DataFrame 的概念。这些 DataFrame 都同宗同源,有着相同的语义和数据模型。 DataFrame 数据模型 DataFrame 的需求来源于把数据看成矩阵和表。但是,矩阵中只包含一种数据类型,未免过于受限;同时,关系表要求数据必须要首先定义 schema。对于 DataFrame 来说,它的列类型可以在运行时推断,并不需要提前知晓,也不要求所有列都是一个类型。因此,DataFrame 可以理解成是关系系统、矩阵、甚至是电子表格程序(典型如 Excel)的合体。 跟关系系统相比,DataFrame 有几个特别有意思的属性,让 DataFrame 因此独一无二。 保证顺序,行列对称 首先,无论在行还是列方向上,DataFrame 都是有顺序的;且行和列都是一等公民,不会区分对待。 拿 pandas 举例子,当创建了一个 DataFrame 后,无论行和列上数据都是有顺序的,因此,在行和列上都可以使用位置来选择数据。 In [1]: import pandas as pd In [2]: import numpy as np In [3]: df = pd.DataFrame(np.random.rand(5, 4)) In [4]: df Out[4]: 0 1 2 3 0 0.736385 0.271232 0.940270 0.926548 1 0.319533 0.891928 0.471176 0.583895 2 0.440825 0.500724 0.402782 0.109702 3 0.300279 0.483571 0.639299 0.778849 4 0.341113 0.813870 0.054731 0.059262 In [5]: df.iat[2, 2] # 第二行第二列元素 Out[5]: 0.40278182653648853 因为行和列的对称关系,因此聚合函数在两个方向上都可以计算,只需指定 axis 即可。 In [6]: df.sum() # 默认 axis == 0,在行方向上做聚合,因此结果是4个元素 Out[6]: 0 2.138135 1 2.961325 2 2.508257 3 2.458257 dtype: float64 In [7]: df.sum(axis=1) # axis == 1,在列方向上做聚合,因此是5个元素 Out[7]: 0 2.874434 1 2.266533 2 1.454032 3 2.201998 4 1.268976 dtype: float64 如果熟悉 numpy(数值计算库,包含多维数组和矩阵的定义),可以看到这个特性非常熟悉,从而可以看出 DataFrame 的矩阵本质。 丰富的 API DataFrame 的 API 非常丰富,横跨关系(如 filter、join)、线性代数(如 transpose、dot)以及类似电子表格(如 pivot)的操作。 还是以 pandas 为例,一个 DataFrame 可以做转置操作,让行和列对调。 In [8]: df.transpose() Out[8]: 0 1 2 3 4 0 0.736385 0.319533 0.440825 0.300279 0.341113 1 0.271232 0.891928 0.500724 0.483571 0.813870 2 0.940270 0.471176 0.402782 0.639299 0.054731 3 0.926548 0.583895 0.109702 0.778849 0.059262 直观的语法,适合交互式分析 用户可以对 DataFrame 数据不断进行探索,查询结果可以被后续的结果复用,可以非常方便地用编程的方式组合非常复杂的操作,很适合交互式的分析。 列中允许异构数据 DataFrame 的类型系统允许一列中有异构数据的存在,比如,一个 int 列中允许有 string 类型数据存在,它可能是脏数据。这点看出 DataFrame 非常灵活。 In [10]: df2 = df.copy() In [11]: df2.iloc[0, 0] = 'a' In [12]: df2 Out[12]: 0 1 2 3 0 a 0.271232 0.940270 0.926548 1 0.319533 0.891928 0.471176 0.583895 2 0.440825 0.500724 0.402782 0.109702 3 0.300279 0.483571 0.639299 0.778849 4 0.341113 0.813870 0.054731 0.059262 数据模型 现在我们可以对什么是真正的 DataFrame 正式下定义: DataFrame 由二维混合类型的数组、行标签、列标签、以及类型(types 或者 domains)组成。在每列上,这个类型是可选的,可以在运行时推断。从行上看,可以把 DataFrame 看做行标签到行的映射,且行之间保证顺序;从列上看,可以看做列类型到列标签到列的映射,同样,列间同样保证顺序。 行标签和列标签的存在,让选择数据时非常方便。 In [13]: df.index = pd.date_range('2020-4-15', periods=5) In [14]: df.columns = ['c1', 'c2', 'c3', 'c4'] In [15]: df Out[15]: c1 c2 c3 c4 2020-04-15 0.736385 0.271232 0.940270 0.926548 2020-04-16 0.319533 0.891928 0.471176 0.583895 2020-04-17 0.440825 0.500724 0.402782 0.109702 2020-04-18 0.300279 0.483571 0.639299 0.778849 2020-04-19 0.341113 0.813870 0.054731 0.059262 In [16]: df.loc['2020-4-16': '2020-4-18', 'c2': 'c3'] # 注意这里的切片是闭区间 Out[16]: c2 c3 2020-04-16 0.891928 0.471176 2020-04-17 0.500724 0.402782 2020-04-18 0.483571 0.639299 这里的 index 和 columns 就分别是行和列标签。我们可以很容易选择一段时间(行上选择)和几列(列上选择)数据。当然这些建立在数据是按顺序存储的基础上。 按顺序存储的特性让 DataFrame 非常适合用来做统计方面的工作。 In [17]: df3 = df.shift(1) # 把 df 的数据整体下移一格,行列索引保持不变 In [18]: df3 Out[18]: c1 c2 c3 c4 2020-04-15 NaN NaN NaN NaN 2020-04-16 0.736385 0.271232 0.940270 0.926548 2020-04-17 0.319533 0.891928 0.471176 0.583895 2020-04-18 0.440825 0.500724 0.402782 0.109702 2020-04-19 0.300279 0.483571 0.639299 0.778849 In [19]: df - df3 # 数据减法会自动按标签对齐,因此这一步可以用来计算环比 Out[19]: c1 c2 c3 c4 2020-04-15 NaN NaN NaN NaN 2020-04-16 -0.416852 0.620697 -0.469093 -0.342653 2020-04-17 0.121293 -0.391205 -0.068395 -0.474194 2020-04-18 -0.140546 -0.017152 0.236517 0.669148 2020-04-19 0.040834 0.330299 -0.584568 -0.719587 In [21]: (df - df3).bfill() # 第一行的空数据按下一行填充 Out[21]: c1 c2 c3 c4 2020-04-15 -0.416852 0.620697 -0.469093 -0.342653 2020-04-16 -0.416852 0.620697 -0.469093 -0.342653 2020-04-17 0.121293 -0.391205 -0.068395 -0.474194 2020-04-18 -0.140546 -0.017152 0.236517 0.669148 2020-04-19 0.040834 0.330299 -0.584568 -0.719587 从例子看到,正因为数据是按顺序存放的,因此我们可以索引保持不变,整体下移一行,这样,昨天的数据就到了今天的行上,然后拿原数据减去位移后的数据时,因为 DataFrame 会自动按标签做对齐,因此,对于一个日期,相当于用当天的数据减去了前天的数据,这样就可以做类似于环比的操作。这简直太方便了。试想,对于关系系统来说,恐怕需要想办法找一列作为 join 的条件,然后再做减法等等。最后,对于空数据,我们还可以填充上一行(ffill)或者下一行的数据(bfill)。想在关系系统里想达到同样效果,想必是需要大费周章的。 DataFrame 的真正含义正在被杀死 近几年,DataFrame 系统如同雨后春笋般出现,然而,这其中的绝大多数系统只包含了关系表的语义,并不包含我们之前说的矩阵方面的意义,且它们大多也并不保证数据顺序,因此真正 DataFrame 所拥有的统计和机器学习方面的特质也不复存在。这些 “DataFrame” 系统的出现,让 “DataFrame” 这个词本身几乎变得没有意义。数据科学家们为了处理大规模的数据,思维方式不得不作出改变,这其中必然存在风险。 Spark DataFrame 和 Koalas 不是真正的 DataFrame 这些 DataFrame 系统的代表是 Spark DataFrame, Spark 当然是伟大的,它解决了数据规模的问题;同时又首次把 ”DataFrame“ 的概念带到了大数据的领域。但其实它只是 spark.sql的另一种形式(当然 Spark DataFrame 确实在 spark.sql 下)。Spark DataFrame 只包含了关系表的语义,schema 需要确定,数据也并不保证顺序。 那么会有同学说 Koalas 呢?Koalas 提供了 pandas API,用 pandas 的语法就可以在 spark 上分析了。实际上,因为 Koalas 也是将 pandas 的操作转成 Spark DataFrame 来执行,因为 Spark DataFrame 内核本身的特性,注定 Koalas 只是看上去和 pandas 一致。 为了说明这点,我们使用 数据集(Hourly Ridership by Origin-Destination Pairs),只取 2019 年的数据。 对于 pandas,我们按天聚合,并按 30 天滑动窗口来计算平均值。 In [22]: df = pd.read_csv('Downloads/bart-dataset/date-hour-soo-dest-2019.csv', ...: names=['Date','Hour','Origin','Destination','Trip Count']) In [23]: df.groupby('Date').mean()['Trip Count'].rolling(30).mean().plot() Out[23]: <matplotlib.axes._subplots.AxesSubplot at 0x118077d90> 如果是 Koalas,因为它的 API 看上去和 pandas 一致,因此,我们按照 Koalas 的文档做 import 替换。 In [1]: import databricks.koalas as ks In [2]: df = ks.read_csv('Downloads/bart-dataset/date-hour-soo-dest-2019.csv', names=['Date','Hour','Origin','Destination','Trip Count']) In [3]: df.groupby('Date').mean()['Trip Count'].rolling(30).mean().plot() 然后令人惊讶的是,结果并不一致。大费周章后才查到,原因是顺序问题,聚合的结果后并不保证排序,因此要得到一样的结果需要在 rolling 前加 sort_index(),确保 groupby 后的结果是排序的。 In [4]: df.groupby('Date').mean()['Trip Count'].sort_index().rolling(30).mean().plot() 默认的排序规则非常重要,这对以时间作为索引的数据尤其关键,而且这让数据科学家更容易观察数据,也更容易复现结果。 所以,在使用 Koalas 时请小心,要时刻关注你的数据在你心中是不是排序的,因为 Koalas 很可能表现地和你想的不一致。 让我们再看 shift,它能工作的一个前提就是数据是排序的,那么在 Koalas 中调用会发生什么呢? In [6]: df.shift(1) --------------------------------------------------------------------------- Py4JJavaError Traceback (most recent call last) /usr/local/opt/apache-spark/libexec/python/pyspark/sql/utils.py in deco(*a, **kw) 62 try: ---> 63 return f(*a, **kw) 64 except py4j.protocol.Py4JJavaError as e: /usr/local/opt/apache-spark/libexec/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 327 "An error occurred while calling {0}{1}{2}.\n". --> 328 format(target_id, ".", name), value) 329 else: Py4JJavaError: An error occurred while calling o110.select. : org.apache.spark.sql.AnalysisException: cannot resolve 'isnan(lag(`Date`, 1, NULL) OVER (ORDER BY `__natural_order__` ASC NULLS FIRST ROWS BETWEEN -1 FOLLOWING AND -1 FOLLOWING))' due to data type mismatch: argument 1 requires (double or float) type, however, 'lag(`Date`, 1, NULL) OVER (ORDER BY `__natural_order__` ASC NULLS FIRST ROWS BETWEEN -1 FOLLOWING AND -1 FOLLOWING)' is of timestamp type.;; 'Project [__index_level_0__#41, CASE WHEN (isnull(lag(Date#30, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1))) || isnan(lag(Date#30, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)))) THEN null ELSE lag(Date#30, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) END AS Date#87, CASE WHEN (isnull(lag(Hour#31, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1))) || isnan(cast(lag(Hour#31, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) as double))) THEN cast(null as int) ELSE lag(Hour#31, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) END AS Hour#88, CASE WHEN (isnull(lag(Origin#32, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1))) || isnan(cast(lag(Origin#32, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) as double))) THEN cast(null as string) ELSE lag(Origin#32, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) END AS Origin#89, CASE WHEN (isnull(lag(Destination#33, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1))) || isnan(cast(lag(Destination#33, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) as double))) THEN cast(null as string) ELSE lag(Destination#33, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) END AS Destination#90, CASE WHEN (isnull(lag(Trip Count#34, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1))) || isnan(cast(lag(Trip Count#34, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) as double))) THEN cast(null as int) ELSE lag(Trip Count#34, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) END AS Trip Count#91, __natural_order__#50L] +- Project [__index_level_0__#41, Date#30, Hour#31, Origin#32, Destination#33, Trip Count#34, monotonically_increasing_id() AS __natural_order__#50L] +- Project [__index_level_0__#41, Date#30, Hour#31, Origin#32, Destination#33, Trip Count#34] +- Project [Date#30, Hour#31, Origin#32, Destination#33, Trip Count#34, _w0#42L, _we0#43, (_we0#43 - 1) AS __index_level_0__#41] +- Window [row_number() windowspecdefinition(_w0#42L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _we0#43], [_w0#42L ASC NULLS FIRST] +- Project [Date#30, Hour#31, Origin#32, Destination#33, Trip Count#34, monotonically_increasing_id() AS _w0#42L] +- Project [0#20 AS Date#30, 1#21 AS Hour#31, 2#22 AS Origin#32, 3#23 AS Destination#33, 4#24 AS Trip Count#34] +- Project [_c0#10 AS 0#20, _c1#11 AS 1#21, _c2#12 AS 2#22, _c3#13 AS 3#23, _c4#14 AS 4#24] +- Relation[_c0#10,_c1#11,_c2#12,_c3#13,_c4#14] csv at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$3.applyOrElse(CheckAnalysis.scala:116) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$3.applyOrElse(CheckAnalysis.scala:108) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:280) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:280) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:279) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277) at org.apache.spark.sql.catalyst.trees.TreeNode.org$apache$spark$sql$catalyst$trees$TreeNode$$mapChild$2(TreeNode.scala:306) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4$$anonfun$apply$13.apply(TreeNode.scala:356) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:392) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.immutable.List.map(List.scala:296) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:356) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277) at ... # 后面还有一大段报错信息,此处省略 这个报错可能会让数据科学家们震惊,什么,我就做了个 shift 啊,报错里掺杂着 Java 异常栈和一大堆看不懂的错误。 这里真正的错误和 Date 是时间戳有关,那么我们只取 int 类型的字段做 shift 总可以了吧。 In [10]: df['Hour'].shift(1) Out[10]: 20/04/20 17:22:38 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 20/04/20 17:22:38 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 确实可以运行,但却看到一句话,大意是数据会被放到一个分区来执行,这正是因为数据本身之间并不保证顺序,因此只能把数据收集到一起,排序,再调用 shift。这样就不再是一个分布式的程序了,甚至比 pandas 本身更慢。 如 DataFrame.dot 等矩阵相关的操作在 Koalas 里也不包含,这些操作已经很难用关系代数来表达了。 PyODPS DataFrame 相信用过 MaxCompute(原名 ODPS,阿里云自研的大数据系统),应该会听说过 PyODPS。这个库是我们前几年的产品,PyODPS 里也包含一个 DataFrame,而 PyODPS DataFrame 在执行的时候会被编译到 ODPS SQL 来执行。 提 PyODPS DataFrame 的原因是,我们在几年前就发现,虽然它提供了 pandas-like 的接口,一定程度上让用户能用类似 pandas 的思维解决问题,然而,当用户问我们,如何向后填充数据?如何通过索引获取数据?答案都是不能。原因也是一样的,因为 PyODPS DataFrame 只是将计算代理给不保证有序、只有关系代数算子的引擎来执行。 如果系统本身的数据模型不是真正的 DataFrame 模型,仅仅让接口看起来像是远远不够的。 Mars DataFrame 因此这里要说到 Mars DataFrame,其实我们做 Mars 的初衷和这篇 paper 的想法是一致的,因为现有的系统虽然能很好地解决规模问题,但那些传统数据科学包中好的部分却被人遗忘了,我们希望 Mars 能保留这些库中好的部分,又能解决规模问题,也能充分利用新硬件。 Mars DataFrame 会自动将 DataFrame 分割成很多小的 chunk,每个 chunk 也是一个 DataFrame,而无论是 chunk 间还是 chunk 内的数据,都保证顺序。 图里的示例中,一个行数 380、列数 370 的 DataFrame,被 Mars 分成 3x3 一共 9 个 chunk,根据计算在 CPU 还是 NVIDIA GPU 上进行,用 pandas DataFrame 或者 cuDF DataFrame 来存储数据和执行真正的计算。可以看到,Mars 既会在行上,也会在列上进行分割,这种在行上和列上的对等性,让 DataFrame 的矩阵本质能得以发挥。 在单机真正执行时,根据初始数据的位置,Mars 会自动把数据分散到多核或者多卡执行;对于分布式,会将计算分散到多台机器执行。 Mars DataFrame 保留了行标签、列标签和类型的概念。因此能够想象如同 pandas 一样,可以在比较大的数据集上根据标签进行筛选。 In [1]: import mars.dataframe as md In [2]: import mars.tensor as mt In [8]: df = md.DataFrame(mt.random.rand(10000, 10, chunk_size=1000), ...: index=md.date_range('2020-1-1', periods=10000)) In [9]: df.loc['2020-4-15'].execute() Out[9]: 0 0.622763 1 0.446635 2 0.007870 3 0.107846 4 0.288893 5 0.219340 6 0.228806 7 0.969435 8 0.033130 9 0.853619 Name: 2020-04-15 00:00:00, dtype: float64 Mars 会保持和 pandas 一致的排序特性,因此对于 groupby 等操作,无需担心结果和所想不一致。 In [6]: import mars.dataframe as md In [7]: df = md.read_csv('Downloads/bart-dataset/date-hour-soo-dest-2019.csv', n ...: ames=['Date','Hour','Origin','Destination','Trip Count']) In [8]: df.groupby('Date').mean()['Trip Count'].rolling(30).mean().plot() # 结果正确 Out[8]: <matplotlib.axes._subplots.AxesSubplot at 0x11ff8ab90> 对于 shift,不光结果正确,而且执行时能利用多核、多卡和分布式的能力。 In [3]: df.shift(1).head(10).execute() Out[3]: Date Hour Origin Destination Trip Count 0 NaN NaN NaN NaN NaN 1 2019-01-01 0.0 12TH 16TH 4.0 2 2019-01-01 0.0 12TH ANTC 1.0 3 2019-01-01 0.0 12TH BAYF 1.0 4 2019-01-01 0.0 12TH CIVC 2.0 5 2019-01-01 0.0 12TH COLM 1.0 6 2019-01-01 0.0 12TH COLS 1.0 7 2019-01-01 0.0 12TH CONC 1.0 8 2019-01-01 0.0 12TH DALY 1.0 9 2019-01-01 0.0 12TH DELN 2.0 不只是 DataFrame Mars 还包含 tensor 模块来支持并行和分布式化 numpy,以及 learn 模块来并行和分布式化 scikit-learn,因此可以想象,如 mars.tensor.linalg.svd 可以直接作用在 Mars DataFrame 上,这就赋予了 Mars 超越 DataFrame 本身的语义。 In [1]: import mars.dataframe as md In [2]: import mars.tensor as mt In [3]: df = md.DataFrame(mt.random.rand(10000, 10, chunk_size=1000)) In [5]: mt.linalg.svd(df).execute() 总结 《Towards Scalable DataFrame Systems》赋予了 DataFrame 学术定义。而要做到可扩展的 DataFrame,首先必须是真正的 DataFrame,其次才是可扩展。 在我们看来,Mars 是真正的 DataFrame,它生来目标就是可扩展,而 Mars 又不仅仅是 DataFrame。在我们看来,Mars 在数据科学领域大有可为。 Mars 诞生于 MaxCompute 团队,MaxCompute 原名 ODPS,是一种快速、完全托管的EB级数据仓库解决方案。Mars 即将通过 MaxCompute 提供服务,购买了 MaxCompute 服务的用户届时可以开箱即用体验 Mars 服务。敬请期待。 参考 Towards Scalable Dataframe Systems:https://arxiv.org/abs/2001.00888 Preventing the Death of the DataFrame:https://towardsdatascience.com/preventing-the-death-of-the-dataframe-8bca1c0f83c8 如果对 Mars 感兴趣,可以关注 Mars 团队专栏,或者钉钉扫二维码加入 Mars 讨论群。
背景 在数据科学世界,Python 是一个不可忽视的存在,且有愈演愈烈之势。而其中主要的使用工具,包括 Numpy、Pandas 和 Scikit-learn 等。 Numpy Numpy 是数值计算的基础包,内部提供了多维数组(ndarray)这样一个数据结构,用户可以很方便地在任意维度上进行数值计算。 我们举一个蒙特卡洛方法求解 Pi 的例子。这背后的原理非常简单,现在我们有个半径为1的圆和边长为2的正方形,他们的中心都在原点。现在我们生成大量的均匀分布的点,让这些点落在正方形内,通过简单的推导,我们就可以知道,Pi 的值 = 落在圆内的点的个数 / 点的总数 * 4。 这里要注意,就是随机生成的点的个数越多,结果越精确。 用 Numpy 实现如下: import numpy as np N = 10 ** 7 # 1千万个点 data = np.random.uniform(-1, 1, size=(N, 2)) # 生成1千万个x轴和y轴都介于-1和1间的点 inside = (np.sqrt((data ** 2).sum(axis=1)) < 1).sum() # 计算到原点的距离小于1的点的个数 pi = 4 * inside / N print('pi: %.5f' % pi) 可以看到,用 Numpy 来进行数值计算非常简单,只要寥寥数行代码,而如果读者习惯了 Numpy 这种面相数组的思维方式之后,无论是代码的可读性还是执行效率都会有巨大提升。 pandas pandas 是一个强大的数据分析和处理的工具,它其中包含了海量的 API 来帮助用户在二维数据(DataFrame)上进行分析和处理。 pandas 中的一个核心数据结构就是 DataFrame,它可以简单理解成表数据,但不同的是,它在行和列上都包含索引(Index),要注意这里不同于数据库的索引的概念,它的索引可以这么理解:当从行看 DataFrame 时,我们可以把 DataFrame 看成行索引到行数据的这么一个字典,通过行索引,可以很方便地选中一行数据;列也同理。 我们拿 movielens 的数据集 作为简单的例子,来看 pandas 是如何使用的。这里我们用的是 Movielens 20M Dataset. import pandas as pd ratings = pd.read_csv('ml-20m/ratings.csv') ratings.groupby('userId').agg({'rating': ['sum', 'mean', 'max', 'min']}) 通过一行简单的 pandas.read_csv 就可以读取 CSV 数据,接着按 userId 做分组聚合,求 rating 这列在每组的总和、平均、最大、最小值。 “食用“ pandas 的最佳方式,还是在 Jupyter notebook 里,以交互式的方式来分析数据,这种体验会让你不由感叹:人生苦短,我用 xx() scikit-learn scikit-learn 是一个 Python 机器学习包,提供了大量机器学习算法,用户不需要知道算法的细节,只要通过几个简单的 high-level 接口就可以完成机器学习任务。当然现在很多算法都使用深度学习,但 scikit-learn 依然能作为基础机器学习库来串联整个流程。 我们以 K-最邻近算法为例,来看看用 scikit-learn 如何完成这个任务。 import pandas as pd from sklearn.neighbors import NearestNeighbors df = pd.read_csv('data.csv') # 输入是 CSV 文件,包含 20万个向量,每个向量10个元素 nn = NearestNeighbors(n_neighbors=10) nn.fit(df) neighbors = nn.kneighbors(df) fit接口就是 scikit-learn 里最常用的用来学习的接口。可以看到整个过程非常简单易懂。 Mars——Numpy、pandas 和 scikit-learn 的并行和分布式加速器 Python 数据科学栈非常强大,但它们有如下几个问题: 现在是多核时代,这几个库里鲜有操作能利用得上多核的能力。 随着深度学习的流行,用来加速数据科学的新的硬件层出不穷,这其中最常见的就是 GPU,在深度学习前序流程中进行数据处理,我们是不是也能用上 GPU 来加速呢? 这几个库的操作都是命令式的(imperative),和命令式相对应的就是声明式(declarative)。命令式的更关心 how to do,每一个操作都会立即得到结果,方便对结果进行探索,优点是很灵活;缺点则是中间过程可能占用大量内存,不能及时释放,而且每个操作之间就被割裂了,没有办法做算子融合来提升性能;那相对应的声明式就刚好相反,它更关心 what to do,它只关心结果是什么,中间怎么做并没有这么关心,典型的声明式像 SQL、TensorFlow 1.x,声明式可以等用户真正需要结果的时候才去执行,也就是 lazy evaluation,这中间过程就可以做大量的优化,因此性能上也会有更好的表现,缺点自然也就是命令式的优点,它不够灵活,调试起来比较困难。 为了解决这几个问题,Mars 被我们开发出来,Mars 在 MaxCompute 团队内部诞生,它的主要目标就是让 Numpy、pandas 和 scikit-learn 等数据科学的库能够并行和分布式执行,充分利用多核和新的硬件。 Mars 的开发过程中,我们核心关注的几点包括: 我们希望 Mars 足够简单,只要会用 Numpy、pandas 或 scikit-learn 就会用 Mars。 避免重复造轮子,我们希望能利用到这些库已有的成果,只需要能让他们被调度到多核/多机上即可。 声明式和命令式兼得,用户可以在这两者之间自由选择,灵活度和性能兼而有之。 足够健壮,生产可用,能应付各种 failover 的情况。 当然这些是我们的目标,也是我们一直努力的方向。 Mars tensor:Numpy 的并行和分布式加速器 上面说过,我们的目标之一是,只要会用 Numpy 等数据科学包,就会用 Mars。我们直接来看代码,还是以蒙特卡洛为例。变成 Mars 的代码是什么样子呢? import mars.tensor as mt N = 10 ** 10 data = mt.random.uniform(-1, 1, size=(N, 2)) inside = (mt.sqrt((data ** 2).sum(axis=1)) < 1).sum() pi = (4 * inside / N).execute() print('pi: %.5f' % pi) 可以看到,区别就只有两处:import numpy as np 变成 import mars.tensor as mt ,后续的 np. 都变成 mt. ;pi 在打印之前调用了一下 .execute() 方法。 也就是默认情况下,Mars 会按照声明式的方式,代码本身移植的代价极低,而在真正需要一个数据的时候,通过 .execute() 去触发执行。这样能最大限度得优化性能,以及减少中间过程内存消耗。 这里,我们还将数据的规模扩大了 1000 倍,来到了 100 亿个点。之前 1/1000 的数据量的时候,在我的笔记本上需要 757ms;而现在数据扩大一千倍,光 data 就需要 150G 的内存,这用 Numpy 本身根本无法完成。而使用 Mars,计算时间只需要 3min 44s,而峰值内存只需要 1G 左右。假设我们认为内存无限大,Numpy 需要的时间也就是之前的 1000 倍,大概是 12min 多,可以看到 Mars 充分利用了多核的能力,并且通过声明式的方式,极大减少了中间内存占用。 前面说到,我们试图让声明式和命令式兼得,而使用命令式的风格,只需要在代码的开始配置一个选项即可。 import mars.tensor as mt from mars.config import options options.eager_mode = True # 打开 eager mode 后,每一次调用都会立即执行,行为和 Numpy 就完全一致 N = 10 ** 7 data = mt.random.uniform(-1, 1, size=(N, 2)) inside = (mt.linalg.norm(data, axis=1) < 1).sum() pi = 4 * inside / N # 不需要调用 .execute() 了 print('pi: %.5f' % pi.fetch()) # 目前需要 fetch() 来转成 float 类型,后续我们会加入自动转换 Mars DataFrame:pandas 的并行和分布式加速器 看过怎么样轻松把 Numpy 代码迁移到 Mars tensor ,想必读者也知道怎么迁移 pandas 代码了,同样也只有两个区别。我们还是以 movielens 的代码为例。 import mars.dataframe as md ratings = md.read_csv('ml-20m/ratings.csv') ratings.groupby('userId').agg({'rating': ['sum', 'mean', 'max', 'min']}).execute() Mars Learn:scikit-learn 的并行和分布式加速器 Mars Learn 也同理,这里就不做过多阐述了。但目前 Mars learn 支持的 scikit-learn 算法还不多,我们也在努力移植的过程中,这需要大量的人力和时间,欢迎感兴趣的同学一起参与。 import mars.dataframe as md from mars.learn.neighbors import NearestNeighbors df = md.read_csv('data.csv') # 输入是 CSV 文件,包含 20万个向量,每个向量10个元素 nn = NearestNeighbors(n_neighbors=10) nn.fit(df) # 这里 fit 的时候也会整体触发执行,因此机器学习的高层接口都是立即执行的 neighbors = nn.kneighbors(df).fetch() # kneighbors 也已经触发执行,只需要 fetch 数据 这里要注意的是,对于机器学习的 fit、predict 等高层接口,Mars Learn 也会立即触发执行,以保证语义的正确性。 RAPIDS:GPU 上的数据科学 相信细心的观众已经发现,GPU 好像没有被提到。不要着急,这就要说到 RAPIDS。 在之前,虽然 CUDA 已经将 GPU 编程的门槛降到相当低的一个程度了,但对于数据科学家们来说,在 GPU 上处理 Numpy、pandas 等能处理的数据无异于天方夜谭。幸运的是,NVIDIA 开源了 RAPIDS 数据科学平台,它和 Mars 的部分思想高度一致,即使用简单的 import 替换,就可以将 Numpy、pandas 和 scikit-learn 的代码移植到 GPU 上。 其中,RAPIDS cuDF 用来加速 pandas,而 RAPIDS cuML 用来加速 scikit-learn。 对于 Numpy 来说,CuPy 已经很好地支持用 GPU 来加速了,这样 RAPIDS 也得以把重心放在数据科学的其他部分。 CuPy:用 GPU 加速 Numpy 还是蒙特卡洛求解 Pi。 import cupy as cp N = 10 ** 7 data = cp.random.uniform(-1, 1, size=(N, 2)) inside = (cp.sqrt((data ** 2).sum(axis=1)) < 1).sum() pi = 4 * inside / N print('pi: %.5f' % pi) 在我的测试中,它将 CPU 的 757ms,降到只有 36ms,提升超过 20 倍,可以说效果非常显著。这正是得益于 GPU 非常适合计算密集型的任务。 RAPIDS cuDF:用 GPU 加速 pandas 将 import pandas as pd 替换成 import cudf,GPU 内部如何并行,CUDA 编程这些概念,用户都不再需要关心。 import cudf ratings = cudf.read_csv('ml-20m/ratings.csv') ratings.groupby('userId').agg({'rating': ['sum', 'mean', 'max', 'min']}) 运行时间从 CPU 上的 18s 提升到 GPU 上的 1.66s,提升超过 10 倍。 RAPIDS cuML:用 GPU 加速 scikit-learn 同样是 k-最邻近问题。 import cudf from cuml.neighbors import NearestNeighbors df = cudf.read_csv('data.csv') nn = NearestNeighbors(n_neighbors=10) nn.fit(df) neighbors = nn.kneighbors(df) 运行时间从 CPU 上 1min52s,提升到 GPU 上 17.8s。 Mars 和 RAPIDS 结合能带来什么? RAPIDS 将 Python 数据科学带到了 GPU,极大地提升了数据科学的运行效率。它们和 Numpy 等一样,是命令式的。通过和 Mars 结合,中间过程将会使用更少的内存,这使得数据处理量更大;Mars 也可以将计算分散到多机多卡,以提升数据规模和计算效率。 在 Mars 里使用 GPU 也很简单,只需要在对应函数上指定 gpu=True。例如创建 tensor、读取 CSV 文件等都适用。 import mars.tensor as mt import mars.dataframe as md a = mt.random.uniform(-1, 1, size=(1000, 1000), gpu=True) df = md.read_csv('ml-20m/ratings.csv', gpu=True) 下图是用 Mars 分别在 Scale up 和 Scale out 两个维度上加速蒙特卡洛计算 Pi 这个任务。一般来说,我们要加速一个数据科学任务,可以有这两种方式,Scale up 是指可以使用更好的硬件,比如用更好的 CPU、更大的内存、使用 GPU 替代 CPU等;Scale out 就是指用更多的机器,用分布式的方式提升效率。 可以看到在一台 24 核的机器上,Mars 计算需要 25.8s,而通过分布式的方式,使用 4 台 24 核的机器的机器几乎以线性的时间提升。而通过使用一个 NVIDIA TESLA V100 显卡,我们就能将单机的运行时间提升到 3.98s,这已经超越了4台 CPU 机器的性能。通过再将单卡拓展到多卡,时间进一步降低,但这里也可以看到,时间上很难再线性扩展了,这是因为 GPU 的运行速度提升巨大,这个时候网络、数据拷贝等的开销就变得明显。 性能测试 我们使用了 https://github.com/h2oai/db-benchmark 的数据集,测试了三个数据规模的 groupby 和 一个数据规模的 join。而我们主要对比了 pandas 和 DASK。DASK 和 Mars 的初衷很类似,也是试图并行和分布式化 Python 数据科学,但它们的设计、实现、分布式都存在较多差异,这个后续我们再撰文进行详细对比。 测试机器配置是 500G 内存、96 核、NVIDIA V100 显卡。Mars 和 DASK 在 GPU 上都使用 RAPIDS 执行计算。 Groupby 数据有三个规模,分别是 500M、5G 和 20G。 查询也有三组。 查询一 df = read_csv('data.csv') df.groupby('id1').agg({'v1': 'sum'}) 查询二 df = read_csv('data.csv') df.groupby(['id1', 'id2']).agg({'v1': 'sum'}) 查询三 df = read_csv('data.csv') df.gropuby(['id6']).agg({'v1': 'sum', 'v2': 'sum', 'v3': 'sum'}) 数据大小 500M,性能结果 数据大小 5G,性能结果 数据大小 20G,性能结果 数据大小到 20G 时,pandas 在查询2会内存溢出,得不出结果。 可以看到,随着数据增加,Mars 的性能优势会愈发明显。 得益于 GPU 的计算能力,GPU 运算性能相比于 CPU 都有数倍的提升。如果单纯使用 RAPIDS cuDF,由于显存大小的限制,数据来到 5G 都难以完成,而由于 Mars 的声明式的特点,中间过程对显存的使用大幅得到优化,所以整组测试来到 20G 都能轻松完成。这正是 Mars + RAPIDS 所能发挥的威力。 Join 测试查询: x = read_csv('x.csv') y = read_csv('y.csv') x.merge(y, on='id1') 测试数据 x 为500M,y 包含10行数据。 总结 RAPIDS 将 Python 数据科学带到了 GPU,极大提升了数据分析和处理的效率。Mars 的注意力更多放在并行和分布式。相信这两者的结合,在未来会有更多的想象空间。 Mars 诞生于 MaxCompute 团队,MaxCompute 原名 ODPS,是一种快速、完全托管的EB级数据仓库解决方案。Mars 即将通过 MaxCompute 提供服务,购买了 MaxCompute 服务的用户届时可以开箱即用体验 Mars 服务。敬请期待。 如果对 Mars 感兴趣,可以关注 Mars 团队专栏,或者钉钉扫二维码加入 Mars 讨论群。
本月,Mars 发布了 0.4.0b1 ,0.4.0b2 和 0.3.2 以及 0.3.3,点击链接查看详细的 Release Notes。本月两次发布版本是特殊情况,0.4.0b2 修复了 0.4.0b1 中比较紧急的问题。 Mars 项目发布周期 这里先简述下 Mars 的版本发布周期。Mars 以一个月为发布周期,采用双版本发布策略,一般会同时发布 Pre-release 版本和正式版。Pre-release 版本里会包含更多激进的功能或改动,可能会不稳定,而开发中我们认为稳定的功能或增强会被同步到正式版里。 查看 Github 项目的 milestones 可以看到最新的 Pre-release 和正式版本。 查看 Github Projects 页面 可以看到归类的 issues 和 PRs。 v0.4 Release 是我们按版本归档的进行中的 issues 和 PRs。其他则是按模块划分。 新版本功能 Highlight 新版本我们花了大量时间来完善 DataFrame API,经过这个版本的努力,pandas 中的一些常见的接口都得到了支持。 更完善的聚合和分组聚合 #1030 让 Groupby.aggregate 支持传入多个聚合函数。 #1054 支持了 DataFrame.aggregate 和 Series.aggregate。 #1019 和 #1069 支持了 cummax 等累积计算。 举个例子,在 pandas 中我们可以对 movielens 的数据 执行如下操作: In [1]: import pandas as pd In [2]: %%time ...: df = pd.read_csv('Downloads/ml-20m/ratings.csv') ...: df.groupby('movieId').agg({'rating': ['max', 'min', 'mean', 'std']}) ...: ...: CPU times: user 5.41 s, sys: 1.28 s, total: 6.7 s Wall time: 4.3 s Out[2]: rating max min mean std movieId 1 5.0 0.5 3.921240 0.889012 2 5.0 0.5 3.211977 0.951150 3 5.0 0.5 3.151040 1.006642 4 5.0 0.5 2.861393 1.095702 5 5.0 0.5 3.064592 0.982140 ... ... ... ... ... 131254 4.0 4.0 4.000000 NaN 131256 4.0 4.0 4.000000 NaN 131258 2.5 2.5 2.500000 NaN 131260 3.0 3.0 3.000000 NaN 131262 4.0 4.0 4.000000 NaN [26744 rows x 4 columns] 我们根据电影的 ID 进行聚合,求用户评价的最大、最小、平均值以及标准差。 使用 Mars 则可以: In [1]: import mars.dataframe as md In [2]: %%time ...: df = md.read_csv('Downloads/ml-20m/ratings.csv') ...: df.groupby('movieId').agg({'rating': ['max', 'min', 'mean', 'std']}).execute() ...: ...: CPU times: user 5.81 s, sys: 6.9 s, total: 12.7 s Wall time: 1.54 s Out[2]: rating max min mean std movieId 1 5.0 0.5 3.921240 0.889012 2 5.0 0.5 3.211977 0.951150 3 5.0 0.5 3.151040 1.006642 4 5.0 0.5 2.861393 1.095702 5 5.0 0.5 3.064592 0.982140 ... ... ... ... ... 131254 4.0 4.0 4.000000 NaN 131256 4.0 4.0 4.000000 NaN 131258 2.5 2.5 2.500000 NaN 131260 3.0 3.0 3.000000 NaN 131262 4.0 4.0 4.000000 NaN [26744 rows x 4 columns] 代码几乎一致,除了 Mars 需要通过 execute() 触发执行。 ratings.csv 有 500M+,使用 Mars 在我的笔记本上运行就可以有数倍加速。当数据量更大的时候,使用 Mars 还可以有更好的加速效果,如果单机无法胜任,也可以使用 Mars 分布式用一致的代码加速执行。 排序 #1053 支持了 sort_index。 #1046 支持了 sort_values。 还是以 movielens 数据 为例。 In [1]: import pandas as pd In [2]: %%time ...: ratings = pd.read_csv('Downloads/ml-20m/ratings.csv') ...: movies = pd.read_csv('Downloads/ml-20m/movies.csv') ...: movie_rating = ratings.groupby('movieId', as_index=False).agg({'rating': 'mean'}) ...: result = movie_rating.merge(movies[['movieId', 'title']], on='movieId') ...: result.sort_values(by='rating', ascending=False) ...: ...: CPU times: user 5.17 s, sys: 1.13 s, total: 6.3 s Wall time: 4.05 s Out[2]: movieId rating title 19152 95517 5.0 Barchester Chronicles, The (1982) 21842 105846 5.0 Only Daughter (2013) 17703 89133 5.0 Boys (Drenge) (1977) 21656 105187 5.0 Linotype: The Film (2012) 21658 105191 5.0 Rocaterrania (2009) ... ... ... ... 26465 129784 0.5 Xuxa in Crystal Moon (1990) 18534 92479 0.5 Kisses for My President (1964) 26475 129834 0.5 Tom and Jerry: The Lost Dragon (2014) 24207 115631 0.5 Alone for Christmas (2013) 25043 119909 0.5 Sharpe's Eagle (1993) [26744 rows x 3 columns] 主要目标是将数据集中的电影按平均分从高到低进行排列。 到 Mars 这边,代码还是几乎一致。 In [1]: import mars.dataframe as md In [2]: %%time ...: ratings = md.read_csv('Downloads/ml-20m/ratings.csv') ...: movies = md.read_csv('Downloads/ml-20m/movies.csv') ...: movie_rating = ratings.groupby('movieId', as_index=False).agg({'rating': 'mean'}) ...: result = movie_rating.merge(movies[['movieId', 'title']], on='movieId') ...: result.sort_values(by='rating', ascending=False).execute() ...: ...: CPU times: user 4.97 s, sys: 6.01 s, total: 11 s Wall time: 1.39 s Out[2]: movieId rating title 19152 95517 5.0 Barchester Chronicles, The (1982) 21842 105846 5.0 Only Daughter (2013) 17703 89133 5.0 Boys (Drenge) (1977) 21656 105187 5.0 Linotype: The Film (2012) 21658 105191 5.0 Rocaterrania (2009) ... ... ... ... 26465 129784 0.5 Xuxa in Crystal Moon (1990) 18534 92479 0.5 Kisses for My President (1964) 26475 129834 0.5 Tom and Jerry: The Lost Dragon (2014) 24207 115631 0.5 Alone for Christmas (2013) 25043 119909 0.5 Sharpe's Eagle (1993) [26744 rows x 3 columns] Mars 的排序采用了并行正则采样排序算法,在我们的文章(链接)中已经做了介绍,这里不再赘述。 更完善的索引支持 Mars 在之前的版本中就支持了 iloc,现在我们也支持了其他的索引方法。 #1042 中支持了 loc。 #1101 中支持了 at 和 iat。 #1073 中支持了 md.date_range 方法。 通过 loc 的支持,使得基于索引的数据的查找更加方便。 In [1]: import mars.dataframe as md In [3]: import mars.tensor as mt In [8]: df = md.DataFrame(mt.random.rand(10000, 10), index=md.date_range('2000-1-1', periods=10000)) In [9]: df.loc['2020-3-25'].execute() Out[9]: 0 0.372354 1 0.139235 2 0.511007 3 0.102200 4 0.908454 5 0.144455 6 0.290627 7 0.248334 8 0.912666 9 0.830526 Name: 2020-03-25 00:00:00, dtype: float64 自定义函数、字符串和时间处理 #1038 增加了 apply 的支持。 #1063 支持了 md.Series.str 和 md.Series.dt来处理字符串和时间列。 我们可以利用 apply 来计算每个城市(数据集)到杭州(东经120°12′,北纬30°16′)的距离。 In [1]: import numpy as np In [2]: def haversine(lat1, lon1, lat2, lon2): ...: dlon = np.radians(lon2 - lon1) ...: dlat = np.radians(lat2 - lat1) ...: a = np.sin(dlat / 2) ** 2 + np.cos(np.radians(lat1)) * np.cos(np.radians(lat2)) * np.sin(dlon / 2) ** 2 ...: c = 2 * np.arcsin(np.sqrt(a)) ...: r = 6371 ...: return c * r ...: In [4]: import mars.dataframe as md In [5]: df = md.read_csv('Downloads/world-cities-database/worldcitiespop.csv', chunk_bytes='16M', dtype={'Region': object} ...: ) In [6]: df.execute(fetch=False) In [8]: df.apply(lambda r: haversine(r['Latitude'], r['Longitude'], 30.25, 120.17), result_type='reduce', axis=1).execute() Out[8]: 0 9789.135208 1 9788.270528 2 9788.270528 3 9788.270528 4 9789.307210 ... 248061 10899.720735 248062 11220.703197 248063 10912.645753 248064 11318.038981 248065 11141.080171 Length: 3173958, dtype: float64 移动窗口函数 #1045 增加了 rolling 移动窗口的支持。 移动窗口函数在金融领域使用频率很高,rolling 是在一个固定长度(也可能是固定的时间间隔)上进行一些聚合计算。以下是一个例子。 In [1]: import pandas_datareader.data as web In [2]: data = web.DataReader("^TWII", "yahoo", "2000-01-01","2020-03-25") In [3]: import mars.dataframe as md In [4]: df = md.DataFrame(data) In [5]: df.rolling(10, min_periods=1).mean().execute() Out[5]: High Low Open Close Volume Adj Close Date 2000-01-04 8803.610352 8642.500000 8644.910156 8756.549805 0.0 8756.517578 2000-01-05 8835.645020 8655.259766 8667.754883 8803.209961 0.0 8803.177734 2000-01-06 8898.426758 8714.809896 8745.356445 8842.816732 0.0 8842.784180 2000-01-07 8909.012451 8720.964844 8772.374756 8844.580078 0.0 8844.547607 2000-01-10 8952.413867 8755.129883 8806.285742 8896.183984 0.0 8896.151172 ... ... ... ... ... ... ... 2020-03-19 10423.317090 10083.132910 10370.730078 10180.533887 4149640.0 10180.533887 2020-03-20 10202.623047 9833.786914 10105.280078 9971.761914 4366130.0 9971.761914 2020-03-23 9983.399023 9611.036914 9885.659082 9763.000977 3990040.0 9763.000977 2020-03-24 9821.716016 9436.392969 9703.275098 9591.208984 3927690.0 9591.208984 2020-03-25 9685.129980 9290.444922 9543.636035 9466.308984 4003760.0 9466.308984 [4974 rows x 6 columns] 下一个版本计划 下一个版本会是 0.4.0rc1 和 0.3.4,我们仍然会专注提升 DataFrame API 的覆盖率和性能,提升稳定性,并增加文档。 如果对 Mars 感兴趣,可以关注 Mars 团队专栏,或者钉钉扫二维码加入 Mars 讨论群。
相信大家对排序算法都非常熟悉了,快速排序、堆排序、归并排序等等。如果我们想在一个很大的数据集上进行排序,能利用上多核,甚至是分布式集群,有什么办法么? 本文就介绍一种并行排序算法:并行正则采样排序算法(Parallel Sorting by Regular Sampling),简称 PSRS 算法。 PSRS 算法过程 PSRS 算法的整个过程分为四步,如图所示,我们拆解开来讲。 现在假设我们有一个数组,有 48 个元素,现在数据被分成4份,即有4个分区。 阶段1,每个分区分别排序,并正则采样 我们对每个分区的数据调用快速排序,这样每个分区都是排好的数据。接着,我们从排好序的数据里正则采样4个数据。所谓正则,即有规律的,这里我们就每隔4个元素采样一个元素。 阶段2,归并采样数据,选出关键点 前面四个分区产生了4份采样数据,收集之,然后调用归并排序让他们有序。接着,我们从中选出 p - 1 (p 是分区个数)个关键点,这里还是正则采样的方式。 阶段3,数据分区 此时将关键点数据广播给每个分区,每个分区就可以根据关键点,将数据分成4份,使得每个数据落在各自的范围内。 阶段4,合并数据,归并排序 最后一个阶段是一个 shuffle 阶段,即每个下游都依赖前面的所有上游。此时每个分区将上游分好的数据收集起来,最终再进行一个归并排序。这样,我们最终的结果就是整体排序的了。 整个过程中,阶段1、阶段3 和 阶段4 可以并行。 MPI 的实现可以参考这里。 PSRS 算法在 Mars 中的应用 Mars 以并行和分布式化 Python 数据科学栈为目标,PSRS 算法能很好解决并行排序问题,因此,Mars 中和排序有关的操作都是基于 PSRS 算法实现的。 以张量排序为例。 首先我们通过 Numpy 创建 1 亿个元素的数组。 In [1]: import numpy as np In [2]: a = np.random.rand(1_0000_0000) In [3]: a.nbytes Out[3]: 800000000 我们来看看使用 Numpy 的排序需要多久。 In [4]: %time np.sort(a) CPU times: user 10.8 s, sys: 394 ms, total: 11.2 s Wall time: 9.4 s Out[4]: array([1.05764619e-10, 5.86309734e-09, 1.76225879e-08, ..., 9.99999976e-01, 9.99999983e-01, 9.99999998e-01]) 接着,我们来看看基于 PSRS 算法的 Mars tensor 排序需要多长时间。 In [10]: t = mt.tensor(a, chunk_size=1500_0000) In [12]: %time mt.sort(t).execute() CPU times: user 18.7 s, sys: 7.03 s, total: 25.7 s Wall time: 2.66 s 在我的笔记本上,可以看到 Numpy 的排序时长是 Mars 的 3.53 倍。 总结 本文介绍了并行正则排序算法,这个算法也在 Mars 项目里得到了广泛的使用。 如果对 Mars 感兴趣,可以关注 Mars 团队专栏,或者钉钉扫二维码加入 Mars 讨论群。
背景 在数据科学世界,Python 是一个不可忽视的存在,且有愈演愈烈之势。而其中主要的使用工具,包括 Numpy、Pandas 和 Scikit-learn 等。 Numpy Numpy 是数值计算的基础包,内部提供了多维数组(ndarray)这样一个数据结构,用户可以很方便地在任意维度上进行数值计算。 我们举一个蒙特卡洛方法求解 Pi 的例子。这背后的原理非常简单,现在我们有个半径为1的圆和边长为2的正方形,他们的中心都在原点。现在我们生成大量的均匀分布的点,让这些点落在正方形内,通过简单的推导,我们就可以知道,Pi 的值 = 落在圆内的点的个数 / 点的总数 * 4。 这里要注意,就是随机生成的点的个数越多,结果越精确。 用 Numpy 实现如下: import numpy as np N = 10 ** 7 # 1千万个点 data = np.random.uniform(-1, 1, size=(N, 2)) # 生成1千万个x轴和y轴都介于-1和1间的点 inside = (np.sqrt((data ** 2).sum(axis=1)) < 1).sum() # 计算到原点的距离小于1的点的个数 pi = 4 * inside / N print('pi: %.5f' % pi) 可以看到,用 Numpy 来进行数值计算非常简单,只要寥寥数行代码,而如果读者习惯了 Numpy 这种面相数组的思维方式之后,无论是代码的可读性还是执行效率都会有巨大提升。 pandas pandas 是一个强大的数据分析和处理的工具,它其中包含了海量的 API 来帮助用户在二维数据(DataFrame)上进行分析和处理。 pandas 中的一个核心数据结构就是 DataFrame,它可以简单理解成表数据,但不同的是,它在行和列上都包含索引(Index),要注意这里不同于数据库的索引的概念,它的索引可以这么理解:当从行看 DataFrame 时,我们可以把 DataFrame 看成行索引到行数据的这么一个字典,通过行索引,可以很方便地选中一行数据;列也同理。 我们拿 movielens 的数据集 作为简单的例子,来看 pandas 是如何使用的。这里我们用的是 Movielens 20M Dataset. import pandas as pd ratings = pd.read_csv('ml-20m/ratings.csv') ratings.groupby('userId').agg({'rating': ['sum', 'mean', 'max', 'min']}) 通过一行简单的 pandas.read_csv 就可以读取 CSV 数据,接着按 userId 做分组聚合,求 rating 这列在每组的总和、平均、最大、最小值。 “食用“ pandas 的最佳方式,还是在 Jupyter notebook 里,以交互式的方式来分析数据,这种体验会让你不由感叹:人生苦短,我用 xx() scikit-learn scikit-learn 是一个 Python 机器学习包,提供了大量机器学习算法,用户不需要知道算法的细节,只要通过几个简单的 high-level 接口就可以完成机器学习任务。当然现在很多算法都使用深度学习,但 scikit-learn 依然能作为基础机器学习库来串联整个流程。 我们以 K-最邻近算法为例,来看看用 scikit-learn 如何完成这个任务。 import pandas as pd from sklearn.neighbors import NearestNeighbors df = pd.read_csv('data.csv') # 输入是 CSV 文件,包含 20万个向量,每个向量10个元素 nn = NearestNeighbors(n_neighbors=10) nn.fit(df) neighbors = nn.kneighbors(df) fit接口就是 scikit-learn 里最常用的用来学习的接口。可以看到整个过程非常简单易懂。 Mars——Numpy、pandas 和 scikit-learn 的并行和分布式加速器 Python 数据科学栈非常强大,但它们有如下几个问题: 现在是多核时代,这几个库里鲜有操作能利用得上多核的能力。 随着深度学习的流行,用来加速数据科学的新的硬件层出不穷,这其中最常见的就是 GPU,在深度学习前序流程中进行数据处理,我们是不是也能用上 GPU 来加速呢? 这几个库的操作都是命令式的(imperative),和命令式相对应的就是声明式(declarative)。命令式的更关心 how to do,每一个操作都会立即得到结果,方便对结果进行探索,优点是很灵活;缺点则是中间过程可能占用大量内存,不能及时释放,而且每个操作之间就被割裂了,没有办法做算子融合来提升性能;那相对应的声明式就刚好相反,它更关心 what to do,它只关心结果是什么,中间怎么做并没有这么关心,典型的声明式像 SQL、TensorFlow 1.x,声明式可以等用户真正需要结果的时候才去执行,也就是 lazy evaluation,这中间过程就可以做大量的优化,因此性能上也会有更好的表现,缺点自然也就是命令式的优点,它不够灵活,调试起来比较困难。 为了解决这几个问题,Mars 被我们开发出来,Mars 在 MaxCompute 团队内部诞生,它的主要目标就是让 Numpy、pandas 和 scikit-learn 等数据科学的库能够并行和分布式执行,充分利用多核和新的硬件。 Mars 的开发过程中,我们核心关注的几点包括: 我们希望 Mars 足够简单,只要会用 Numpy、pandas 或 scikit-learn 就会用 Mars。 避免重复造轮子,我们希望能利用到这些库已有的成果,只需要能让他们被调度到多核/多机上即可。 声明式和命令式兼得,用户可以在这两者之间自由选择,灵活度和性能兼而有之。 足够健壮,生产可用,能应付各种 failover 的情况。 当然这些是我们的目标,也是我们一直努力的方向。 Mars tensor:Numpy 的并行和分布式加速器 上面说过,我们的目标之一是,只要会用 Numpy 等数据科学包,就会用 Mars。我们直接来看代码,还是以蒙特卡洛为例。变成 Mars 的代码是什么样子呢? import mars.tensor as mt N = 10 ** 10 data = mt.random.uniform(-1, 1, size=(N, 2)) inside = (mt.sqrt((data ** 2).sum(axis=1)) < 1).sum() pi = (4 * inside / N).execute() print('pi: %.5f' % pi) 可以看到,区别就只有两处:import numpy as np 变成 import mars.tensor as mt ,后续的 np. 都变成 mt. ;pi 在打印之前调用了一下 .execute() 方法。 也就是默认情况下,Mars 会按照声明式的方式,代码本身移植的代价极低,而在真正需要一个数据的时候,通过 .execute() 去触发执行。这样能最大限度得优化性能,以及减少中间过程内存消耗。 这里,我们还将数据的规模扩大了 1000 倍,来到了 100 亿个点。之前 1/1000 的数据量的时候,在我的笔记本上需要 757ms;而现在数据扩大一千倍,光 data 就需要 150G 的内存,这用 Numpy 本身根本无法完成。而使用 Mars,计算时间只需要 3min 44s,而峰值内存只需要 1G 左右。假设我们认为内存无限大,Numpy 需要的时间也就是之前的 1000 倍,大概是 12min 多,可以看到 Mars 充分利用了多核的能力,并且通过声明式的方式,极大减少了中间内存占用。 前面说到,我们试图让声明式和命令式兼得,而使用命令式的风格,只需要在代码的开始配置一个选项即可。 import mars.tensor as mt from mars.config import options options.eager_mode = True # 打开 eager mode 后,每一次调用都会立即执行,行为和 Numpy 就完全一致 N = 10 ** 7 data = mt.random.uniform(-1, 1, size=(N, 2)) inside = (mt.linalg.norm(data, axis=1) < 1).sum() pi = 4 * inside / N # 不需要调用 .execute() 了 print('pi: %.5f' % pi.fetch()) # 目前需要 fetch() 来转成 float 类型,后续我们会加入自动转换 Mars DataFrame:pandas 的并行和分布式加速器 看过怎么样轻松把 Numpy 代码迁移到 Mars tensor ,想必读者也知道怎么迁移 pandas 代码了,同样也只有两个区别。我们还是以 movielens 的代码为例。 import mars.dataframe as md ratings = md.read_csv('ml-20m/ratings.csv') ratings.groupby('userId').agg({'rating': ['sum', 'mean', 'max', 'min']}).execute() Mars Learn:scikit-learn 的并行和分布式加速器 Mars Learn 也同理,这里就不做过多阐述了。但目前 Mars learn 支持的 scikit-learn 算法还不多,我们也在努力移植的过程中,这需要大量的人力和时间,欢迎感兴趣的同学一起参与。 import mars.dataframe as md from mars.learn.neighbors import NearestNeighbors df = md.read_csv('data.csv') # 输入是 CSV 文件,包含 20万个向量,每个向量10个元素 nn = NearestNeighbors(n_neighbors=10) nn.fit(df) # 这里 fit 的时候也会整体触发执行,因此机器学习的高层接口都是立即执行的 neighbors = nn.kneighbors(df).fetch() # kneighbors 也已经触发执行,只需要 fetch 数据 这里要注意的是,对于机器学习的 fit、predict 等高层接口,Mars Learn 也会立即触发执行,以保证语义的正确性。 RAPIDS:GPU 上的数据科学 相信细心的观众已经发现,GPU 好像没有被提到。不要着急,这就要说到 RAPIDS。 在之前,虽然 CUDA 已经将 GPU 编程的门槛降到相当低的一个程度了,但对于数据科学家们来说,在 GPU 上处理 Numpy、pandas 等能处理的数据无异于天方夜谭。幸运的是,NVIDIA 开源了 RAPIDS 数据科学平台,它和 Mars 的部分思想高度一致,即使用简单的 import 替换,就可以将 Numpy、pandas 和 scikit-learn 的代码移植到 GPU 上。 其中,RAPIDS cuDF 用来加速 pandas,而 RAPIDS cuML 用来加速 scikit-learn。 对于 Numpy 来说,CuPy 已经很好地支持用 GPU 来加速了,这样 RAPIDS 也得以把重心放在数据科学的其他部分。 CuPy:用 GPU 加速 Numpy 还是蒙特卡洛求解 Pi。 import cupy as cp N = 10 ** 7 data = cp.random.uniform(-1, 1, size=(N, 2)) inside = (cp.sqrt((data ** 2).sum(axis=1)) < 1).sum() pi = 4 * inside / N print('pi: %.5f' % pi) 在我的测试中,它将 CPU 的 757ms,降到只有 36ms,提升超过 20 倍,可以说效果非常显著。这正是得益于 GPU 非常适合计算密集型的任务。 RAPIDS cuDF:用 GPU 加速 pandas 将 import pandas as pd 替换成 import cudf,GPU 内部如何并行,CUDA 编程这些概念,用户都不再需要关心。 import cudf ratings = cudf.read_csv('ml-20m/ratings.csv') ratings.groupby('userId').agg({'rating': ['sum', 'mean', 'max', 'min']}) 运行时间从 CPU 上的 18s 提升到 GPU 上的 1.66s,提升超过 10 倍。 RAPIDS cuML:用 GPU 加速 scikit-learn 同样是 k-最邻近问题。 import cudf from cuml.neighbors import NearestNeighbors df = cudf.read_csv('data.csv') nn = NearestNeighbors(n_neighbors=10) nn.fit(df) neighbors = nn.kneighbors(df) 运行时间从 CPU 上 1min52s,提升到 GPU 上 17.8s。 Mars 和 RAPIDS 结合能带来什么? RAPIDS 将 Python 数据科学带到了 GPU,极大地提升了数据科学的运行效率。它们和 Numpy 等一样,是命令式的。通过和 Mars 结合,中间过程将会使用更少的内存,这使得数据处理量更大;Mars 也可以将计算分散到多机多卡,以提升数据规模和计算效率。 在 Mars 里使用 GPU 也很简单,只需要在对应函数上指定 gpu=True。例如创建 tensor、读取 CSV 文件等都适用。 import mars.tensor as mt import mars.dataframe as md a = mt.random.uniform(-1, 1, size=(1000, 1000), gpu=True) df = md.read_csv('ml-20m/ratings.csv', gpu=True) 下图是用 Mars 分别在 Scale up 和 Scale out 两个维度上加速蒙特卡洛计算 Pi 这个任务。一般来说,我们要加速一个数据科学任务,可以有这两种方式,Scale up 是指可以使用更好的硬件,比如用更好的 CPU、更大的内存、使用 GPU 替代 CPU等;Scale out 就是指用更多的机器,用分布式的方式提升效率。 可以看到在一台 24 核的机器上,Mars 计算需要 25.8s,而通过分布式的方式,使用 4 台 24 核的机器的机器几乎以线性的时间提升。而通过使用一个 NVIDIA TESLA V100 显卡,我们就能将单机的运行时间提升到 3.98s,这已经超越了4台 CPU 机器的性能。通过再将单卡拓展到多卡,时间进一步降低,但这里也可以看到,时间上很难再线性扩展了,这是因为 GPU 的运行速度提升巨大,这个时候网络、数据拷贝等的开销就变得明显。 性能测试 我们使用了 https://github.com/h2oai/db-benchmark 的数据集,测试了三个数据规模的 groupby 和 一个数据规模的 join。而我们主要对比了 pandas 和 DASK。DASK 和 Mars 的初衷很类似,也是试图并行和分布式化 Python 数据科学,但它们的设计、实现、分布式都存在较多差异,这个后续我们再撰文进行详细对比。 测试机器配置是 500G 内存、96 核、NVIDIA V100 显卡。Mars 和 DASK 在 GPU 上都使用 RAPIDS 执行计算。 Groupby 数据有三个规模,分别是 500M、5G 和 20G。 查询也有三组。 查询一 df = read_csv('data.csv') df.groupby('id1').agg({'v1': 'sum'}) 查询二 df = read_csv('data.csv') df.groupby(['id1', 'id2']).agg({'v1': 'sum'}) 查询三 df = read_csv('data.csv') df.gropuby(['id6']).agg({'v1': 'sum', 'v2': 'sum', 'v3': 'sum'}) 数据大小 500M,性能结果 数据大小 5G,性能结果 数据大小 20G,性能结果 数据大小到 20G 时,pandas 在查询2会内存溢出,得不出结果。 可以看到,随着数据增加,Mars 的性能优势会愈发明显。 得益于 GPU 的计算能力,GPU 运算性能相比于 CPU 都有数倍的提升。如果单纯使用 RAPIDS cuDF,由于显存大小的限制,数据来到 5G 都难以完成,而由于 Mars 的声明式的特点,中间过程对显存的使用大幅得到优化,所以整组测试来到 20G 都能轻松完成。这正是 Mars + RAPIDS 所能发挥的威力。 Join 测试查询: x = read_csv('x.csv') y = read_csv('y.csv') x.merge(y, on='id1') 测试数据 x 为500M,y 包含10行数据。 总结 RAPIDS 将 Python 数据科学带到了 GPU,极大提升了数据分析和处理的效率。Mars 的注意力更多放在并行和分布式。相信这两者的结合,在未来会有更多的想象空间。 Mars 诞生于 MaxCompute 团队,MaxCompute 原名 ODPS,是一种快速、完全托管的EB级数据仓库解决方案。Mars 即将通过 MaxCompute 提供服务,购买了 MaxCompute 服务的用户届时可以开箱即用体验 Mars 服务。敬请期待。 如果对 Mars 感兴趣,可以关注 Mars 团队专栏,或者钉钉扫二维码加入 Mars 讨论群。
PyODPS 提供了 DataFrame API 来用类似 pandas 的接口进行大规模数据分析以及预处理,本文主要介绍如何使用 PyODPS 执行笛卡尔积的操作。 笛卡尔积最常出现的场景是两两之间需要比较或者运算。以计算地理位置距离为例,假设大表 Coordinates1 存储目标点经纬度坐标,共有 M 行数据,小表 Coordinates2 存储出发点经纬度坐标,共有 N 行数据,现在需要计算所有离目标点最近的出发点坐标。对于一个目标点来说,我们需要计算所有的出发点到目标点的距离,然后找到最小距离,所以整个中间过程需要产生 M * N 条数据,也就是一个笛卡尔积问题。 haversine 公式 首先简单介绍一下背景知识,已知两个地理位置的坐标点的经纬度,求解两点之间的距离可以使用 haversine 公式,使用 Python 的表达如下: def haversine(lat1, lon1, lat2, lon2): # lat1, lon1 为位置 1 的经纬度坐标 # lat2, lon2 为位置 2 的经纬度坐标 import numpy as np dlon = np.radians(lon2 - lon1) dlat = np.radians(lat2 - lat1) a = np.sin( dlat /2 ) **2 + np.cos(np.radians(lat1)) * np.cos(np.radians(lat2)) * np.sin( dlon /2 ) **2 c = 2 * np.arcsin(np.sqrt(a)) r = 6371 # 地球平均半径,单位为公里 return c * r MapJoin 目前最推荐的方法就是使用 mapjoin,PyODPS 中使用 mapjoin 的方式十分简单,只需要两个 dataframe join 时指定 mapjoin=True,执行时会对右表做 mapjoin 操作。 In [3]: df1 = o.get_table('coordinates1').to_df() In [4]: df2 = o.get_table('coordinates2').to_df() In [5]: df3 = df1.join(df2, mapjoin=True) In [6]: df1.schema Out[6]: odps.Schema { latitude float64 longitude float64 id string } In [7]: df2.schema Out[7]: odps.Schema { latitude float64 longitude float64 id string } In [8]: df3.schema Out[8]: odps.Schema { latitude_x float64 longitude_x float64 id_x string latitude_y float64 longitude_y float64 id_y string } 可以看到在执行 join 时默认会将重名列加上 _x 和 _y 后缀,可通过在 suffixes 参数中传入一个二元 tuple 来自定义后缀,当有了 join 之后的表后,通过 PyODPS 中 DataFrame 的自建函数就可以计算出距离,十分简洁明了,并且效率很高。 In [9]: r = 6371 ...: dis1 = (df3.latitude_y - df3.latitude_x).radians() ...: dis2 = (df3.longitude_y - df3.longitude_x).radians() ...: a = (dis1 / 2).sin() ** 2 + df3.latitude_x.radians().cos() * df3.latitude_y.radians().cos() * (dis2 / 2).sin() ** 2 ...: df3['dis'] = 2 * a.sqrt().arcsin() * r In [12]: df3.head(10) Out[12]: latitude_x longitude_x id_x latitude_y longitude_y id_y dis 0 76.252432 59.628253 0 84.045210 6.517522 0 1246.864981 1 76.252432 59.628253 0 59.061796 0.794939 1 2925.953147 2 76.252432 59.628253 0 42.368304 30.119837 2 4020.604942 3 76.252432 59.628253 0 81.290936 51.682749 3 584.779748 4 76.252432 59.628253 0 34.665222 147.167070 4 6213.944942 5 76.252432 59.628253 0 58.058854 165.471565 5 4205.219179 6 76.252432 59.628253 0 79.150677 58.661890 6 323.070785 7 76.252432 59.628253 0 72.622352 123.195778 7 1839.380760 8 76.252432 59.628253 0 80.063614 138.845193 8 1703.782421 9 76.252432 59.628253 0 36.231584 90.774527 9 4717.284949 In [13]: df1.count() Out[13]: 2000 In [14]: df2.count() Out[14]: 100 In [15]: df3.count() Out[15]: 200000 df3 已经是有 M * N 条数据了,接下来如果需要知道最小距离,直接对 df3 调用 groupby 接上 min 聚合函数就可以得到每个目标点的最小距离。 In [16]: df3.groupby('id_x').dis.min().head(10) Out[16]: dis_min 0 323.070785 1 64.755493 2 1249.283169 3 309.818288 4 1790.484748 5 385.107739 6 498.816157 7 615.987467 8 437.765432 9 272.589621 DataFrame 自定义函数 如果我们需要知道对应最小距离的点的城市,也就是表中对应的 id ,可以在 mapjoin 之后调用 MapReduce,不过我们还有另一种方式是使用 DataFrame 的 apply 方法。要对一行数据使用自定义函数,可以使用 apply 方法,axis 参数必须为 1,表示在行上操作。 表资源 要注意 apply 是在服务端执行的 UDF,所以不能在函数内使用类似于df=o.get_table('table_name').to_df() 的表达式去获得表数据,具体原理可以参考PyODPS DataFrame 的代码在哪里跑。以本文中的情况为例,要想将表 1 与表 2 中所有的记录计算,那么需要将表 2 作为一个资源表,然后在自定义中引用该表资源。PyODPS 中使用表资源也十分方便,只需要将一个 collection 传入 resources 参数即可。collection 是个可迭代对象,不是一个 DataFrame 对象,不可以直接调用 DataFrame 的接口,每个迭代值是一个 namedtuple,可以通过字段名或者偏移来取对应的值。 ## use dataframe udf df1 = o.get_table('coordinates1').to_df() df2 = o.get_table('coordinates2').to_df() def func(collections): import pandas as pd collection = collections[0] ids = [] latitudes = [] longitudes = [] for r in collection: ids.append(r.id) latitudes.append(r.latitude) longitudes.append(r.longitude) df = pd.DataFrame({'id': ids, 'latitude':latitudes, 'longitude':longitudes}) def h(x): df['dis'] = haversine(x.latitude, x.longitude, df.latitude, df.longitude) return df.iloc[df['dis'].idxmin()]['id'] return h df1[df1.id, df1.apply(func, resources=[df2], axis=1, reduce=True, types='string').rename('min_id')].execute( libraries=['pandas.zip', 'python-dateutil.zip', 'pytz.zip', 'six.tar.gz']) 在自定义函数中,将表资源通过循环读成 pandas DataFrame,利用 pandas 的 loc 可以很方便的找到最小值对应的行,从而得到距离最近的出发点 id。另外,如果在自定义函数中需要使用到三方包(例如本例中的 pandas)可以参考这篇文章。 全局变量 当小表的数据量十分小的时候,我们甚至可以将小表数据作为全局变量在自定义函数中使用。 df1 = o.get_table('coordinates1').to_df() df2 = o.get_table('coordinates2').to_df() df = df2.to_pandas() def func(x): df['dis'] = haversine(x.latitude, x.longitude, df.latitude, df.longitude) return df.iloc[df['dis'].idxmin()]['id'] df1[df1.id, df1.apply(func, axis=1, reduce=True, types='string').rename('min_id')].execute( libraries=['pandas.zip', 'python-dateutil.zip', 'pytz.zip', 'six.tar.gz']) 在上传函数的时候,会将函数内使用到的全局变量(上面代码中的 df) pickle 到 UDF 中。但是注意这种方式使用场景很局限,因为 ODPS 的上传的文件资源大小是有限制的,所以数据量太大会导致 UDF 生成的资源太大从而无法上传,而且这种方式最好保证三方包的客户端与服务端的版本一致,否则很有可能出现序列化的问题,所以建议只在数据量非常小的时候使用。 总结 使用 PyODPS 解决笛卡尔积的问题主要分为两种方式,一种是 mapjoin,比较直观,性能好,一般能用 mapjoin 解决的我们都推荐使用 mapjoin,并且最好使用内建函数计算,能到达最高的效率,但是它不够灵活。另一种是使用 DataFrame 自定义函数,比较灵活,性能相对差一点(可以使用 pandas 或者 numpy 获得性能上的提升),通过使用表资源,将小表作为表资源传入 DataFrame 自定义函数中,从而完成笛卡尔积的操作。
在使用 PyODPS DataFrame 编写数据应用时,尽管编写的是同一个脚本文件,但其中的代码会在不同位置执行,这可能导致一些无法预期的问题,本文介绍当出现相关问题时,如何确定代码在何处执行,以及提供部分场景下解决问题的方法。 概述 假定我们要执行下面的代码: from odps import ODPS, options import numpy as np o = ODPS(access_id, access_key, project, endpoint) df = o.get_table('pyodps_iris').to_df() coeffs = [0.1, 0.2, 0.4] def handle(v): import numpy as np return float(np.cosh(v)) * sum(coeffs) options.df.supersede_libraries = True val = df.sepal_length.map(handle).sum().execute(libraries=['numpy.zip', 'other.zip']) print(np.sinh(val)) 在开始分析之前,首先需要指出的是,PyODPS 是一个 Python 包而非 Python Implementation,PyODPS 的运行环境均为未经修改的 Python,因而并不会出现与正常 Python 解释器不一致的行为。亦即,你所写的每一条语句不会有与标准 Python 语句不同的行为,例如自动变成分布式代码,等等。 下面解释该代码的执行过程。 上图是执行上述代码时可能涉及的系统。代码本身执行的位置在图中用紫色表示,这些系统都位于 MaxCompute 外部,为了方便表述,下文称为“本地”。在本地执行的代码包括 handle 函数之外的部分(注意 handle 传入 map 时仅传入了函数本身而并未执行)。因而,这些代码在执行时,行为与普通 Python code 的执行行为类似,import 第三方包时,引用的是本地的包。因而,上面的代码中,libraries=['numpy.zip', 'other.zip']引用的other.zip因为并没有在本地安装,因而如果代码中有诸如 import other 这样的语句,会导致执行报错。即便 other.zip 已被上传到 MaxCompute 资源也是如此,因为本地根本没有这个包。理论上,本地代码如果不涉及 PyODPS 包,则与 PyODPS 无关,用户需要自行排查。 对于 handle 函数,情况发生了变化。handle 函数传入 map 方法时,如果使用的后端是 MaxCompute 后端,会先被 cloudpickle 模块提取闭包和字节码,此后 PyODPS DataFrame 会使用闭包和字节码生成一个 Python UDF,提交到 MaxCompute。最后,作业以 SQL 的形式在 MaxCompute 执行时,会调用这个 Python UDF,其中的字节码和闭包内容会被 unpickle,此后在 MaxCompute Executor 执行。由此可见,在上述代码中, 在 handle 函数体中的代码都不会在本地执行,而会在 MaxCompute Executor 中执行; handle 函数体中无法引用本地安装的包,只有在 MaxCompute Executor 中存在的包才有效; 上传的第三方包必须能够在 MaxCompute Executor 中的 Python 版本(目前为 Python 2.7,UCS2)中调用; handle 函数体中修改引用的外部变量(上述代码中的 coeffs)不会导致本地的 coeffs 值被修改; 如果在 handle 中引用在 handle 外 import 的包,在 handle 中调用可能会报错,因为在不同环境中,包的结构可能不同,而 cloudpickle 会将本地包的引用带到 MaxCompute Executor,导致报错,因而建议 import 在 handle 中进行; 由于使用 cloudpickle,如果在 handle 中调用了其他文件中的代码,该文件所在的包必须存在于 MaxCompute Executor 中。如果你不想使用第三方包的形式解决该问题,请将所有引用的个人代码放在同一个文件中。 上述对 handle 函数的解释对于自定义聚合、apply 和 map_reduce 中调用的自定义方法 / Agg 类均适用。如果使用的后端是 Pandas 后端,则所有代码都会在本地运行,因而本地也需要安装相关的包。但鉴于 Pandas 后端调试完毕后通常会转移到 MaxCompute 运行,建议在本地装包的同时,参照 MaxCompute 后端的惯例进行开发。 使用第三方包 个人电脑 / 自有服务器在本地使用第三方包 / 其他文件中的代码 在相应的 Python 版本上安装即可。 DataWorks 中本地使用其他文件中的代码 该部分功能由 DataWorks 提供,请参考 DataWorks 文档。 map / apply / map_reduce / 自定义聚合中使用第三方包 / 其他文件中的代码 参考 https://yq.aliyun.com/articles/591508 。需要补充的是,在 DataWorks 上上传资源后,需要点击“提交”确保资源被正确上传到 MaxCompute。如果需要使用自己的 Numpy 版本,在上传正确版本的 wheel 包的同时,需要配置 odps.df.supersede_libraries = True,同时确保你上传的 numpy 包名位于 libraries 的最前面,如果指定了 options.df.libraries,则 numpy 包名需要位于 options.df.libraries 的最前面。 引用其他 MaxCompute 表中的数据 个人电脑 / 自有服务器在本地访问 MaxCompute 表 如果 Endpoint 可以连接,使用 PyODPS / DataFrame 访问。 map / apply / map_reduce / 自定义聚合中访问其他 MaxCompute 表 MaxCompute Executor 中通常不支持访问 Endpoint / Tunnel Endpoint,其上也没有 PyODPS 包可用,因而不能直接使用 ODPS 入口对象或者 PyODPS DataFrame,也不能从自定义函数外部传入这些对象。如果表的数据量不大,建议将 DataFrame 作为资源传入(见 https://pyodps.readthedocs.io/zh_CN/latest/df-element.html#function-resource )。如果数据量较大,建议改写成 join。 访问其他服务 个人电脑 / 自有服务器在本地访问其他服务 保证自己的环境中可以正常访问相关服务,生产服务器可以联系 PE。 DataWorks 上的本地代码中访问其他服务 请咨询 DataWorks。 map / apply / map_reduce / 自定义聚合中访问其他服务 参考 https://yq.aliyun.com/articles/591508 启用 Isolation,如果仍然遇到网络报错,请联系 MaxCompute 用户群,找售后帮忙解决。
之前我们介绍过在 PyODPS DataFrame 中使用三方包。对于二进制包而言,MaxCompute 要求使用包名包含 cp27-cp27m 的 Wheel 包。但对于部分长时间未更新的包,例如 oss2 依赖的 crcmod,PyPI 并未提供 Wheel 包,因而需要自行打包。本文介绍了如何使用 quay.io/pypa/manylinux1_x86_64 镜像制作可在 MaxCompute 上使用的 Wheel 包。 本文参考 https://github.com/pypa/manylinux ,quay.io/pypa/manylinux1_x86_64 镜像也是目前绝大多数 Python 项目在 Travis CI 上打包的标准工具,如有进一步的问题可研究该项目。 准备依赖项 不少包都有依赖项,例如 devel rpm 包或者其他 Python 包,在打包前需要了解该包的依赖,通常可以在 Github 中找到安装或者打包的相关信息。对于 crcmod,除 gcc 外不再有别的依赖,因而此步可略去。 修改 setup.py 并验证(建议在 Mac OS 或者 Linux 下) 较旧的 Python 包通常不支持制作 Wheel 包。具体表现为在使用 python setup.py bdist_wheel 打包时报错。如果需要制作 Wheel 包,需要修改 setup.py 以支持 Wheel 包的制作。对于一部分包,可以简单地将 distutils 中的 setup 函数替换为 setuptools 中的 setup 函数。而对于部分自定义操作较多的 setup.py,需要详细分析打包过程,这一项工作可能会很复杂,本文就不讨论了。 例如,对于 crcmod,修改 setup.py 中的 from distutils.core import setup 为 from setuptools import setup 即可。 修改完成后,在项目根目录执行 python setup.py bdist_wheel 如果没有报错且生成的 Wheel 包可在本地使用,说明 setup.py 已可以使用。 准备打包脚本 在项目中新建 bin 目录,并在其中创建 build-wheel.sh: mkdir bin && vim bin/build-wheel.sh 在其中填入以下内容: #!/bin/bash # modified from https://github.com/pypa/python-manylinux-demo/blob/master/travis/build-wheels.sh set -e -x # Install a system package required by our library # 将这里修改为安装依赖项的命令 # Compile wheels PYBIN=/opt/python/cp27-cp27m/bin # 如果包根目录下有 dev-requirements.txt,取消下面的注释 # "${PYBIN}/pip" install -r /io/dev-requirements.txt "${PYBIN}/pip" wheel /io/ -w wheelhouse/ # Bundle external shared libraries into the wheels for whl in wheelhouse/*.whl; do auditwheel repair "$whl" -w /io/wheelhouse/ done 将第一步获知的依赖项安装脚本填入此脚本,在使用 python 或 pip 时,注意使用 /opt/python/cp27-cp27m/bin 中的版本。 最后,设置执行权限 chmod a+x bin/build-wheel.sh 打包 使用 Docker 下载所需的镜像(本步需要使用 Docker,请提前安装),此后在项目根目录下打包: docker pull quay.io/pypa/manylinux1_x86_64 docker run --rm -v `pwd`:/io quay.io/pypa/manylinux1_x86_64 /io/bin/build-wheel.sh 完成的 Wheel 包位于项目根目录下的 wheelhouse 目录下。
源于MaxCompute,阿里首款自研科学计算引擎Mars1月16日开源发布https://promotion.aliyun.com/ntms/act/cloud/marsfbh.html Mars 是一个基于矩阵的统一分布式计算框架,在之前的文章中已经介绍了 Mars 是什么, 以及 Mars 分布式执行 ,而且 Mars 已经在 GitHub 中开源。当你看完 Mars 的介绍可能会问它能做什么,这几乎取决于你想做什么,因为 Mars 作为底层运算库,实现了 numpy 70% 的常用接口。这篇文章将会介绍如何使用 Mars 完成你想做的事情。 奇异值分解 (SVD) 在处理纷繁的数据时,作为数据处理者,首先想到的就是降维,SVD 就是其中一种比较常见的降维方法,在 numpy.linalg 模块中就有 svd 方法,当我们有20000个100维的数据需要处理,调用 SVD 接口: In [1]: import numpy as np In [2]: a = np.random.rand(20000, 100) In [3]: %time U, s, V = np.linalg.svd(a) CPU times: user 4min 3s, sys: 10.2 s, total: 4min 13s Wall time: 1min 18s 可以看到即使 Numpy 使用了 mkl 加速,也需要1分多钟的运行时间,当数据量更大时,单机的内存已经无法处理。 Mars 也实现了 SVD ,但是它比 Numpy 有更快的速度,因为利用矩阵分块计算的算法,能够并行计算: In [1]: import mars.tensor as mt In [2]: a = mt.random.rand(20000, 100, chunk_size=100) In [3]: %time U, s, V = mt.linalg.svd(a).execute() CPU times: user 5.42 s, sys: 1.49 s, total: 6.91 s Wall time: 1.87 s 可以看到在相同数据量情况下,Mars 有几十倍速度的提升,仅仅需要1秒多钟就可以解决20000数据量的降维问题。想象一下淘宝用户数据做矩阵分解时,分布式的矩阵运算就显现出其价值。 主成分分析 (PCA) 提到降维,主成分分析也是一种重要的手段。PCA 会选取包含信息量最多的方向对数据进行投影,其投影方向可以从最大化方差或者最小化投影误差两个角度理解。也就是通过低维表征的向量和特征向量矩阵,可以基本重构出所对应的原始高维向量。其最主要的公式如下所示: $$ \mathop {\max }\limits_{{\mu _j}} \frac{1}{n}{\sum\limits_i^n {\left( {{x_i}{\mu _j} - \overline x } \right)} ^T}\left( {{x_i}{\mu _j} - \overline x } \right) = {\mu _j}^TC{\mu _j} $$ $x_i$为每个样本的数据,$\mu _j$为新的投影方向,我们的目标就是使得投影方差最大化,从而找到主特征。上面式子中的矩阵$C$在数学中可以用协方差矩阵表示,当然首先要对输入的样本做中心化调整。我们可以用随机产生的数组看一下 Numpy 是如何实现 PCA 降维操作: import numpy as np a = np.random.randint(0, 256, size=(100, 10000)) a_mean = a.mean(axis=1, keepdims=True) a_new = a - a_mean cov_a = (a_new.dot(a_new.T)) / a.shape[0] #利用SVD求协方差矩阵前20个特征值 U, s, V = np.linalg.svd(cov_a) vecs = V.T[:, :20] #用低纬度的特征向量表示原数据 a_transformed = a.T.dot(vecs) 由于随机产生的数据本身就没有太强的特征,所以在100维数据中象征性的取出前20维,一般可以用特征值的比例取总和的前99%之类的数值。 再看一下 Mars 是如何实现的: import mars.tensor as mt a = mt.random.randint(0, 256, size=(100, 10000)) a_mean = a.mean(axis=1, keepdims=True) a_new = a - a_mean cov_a = (a_new.dot(a_new.T)) / a.shape[0] #利用SVD求协方差矩阵前20个特征值 U, s, V = mt.linalg.svd(cov_a) vecs = V.T[:, :20] #用低纬度的特征向量表示原数据 a_transformed = a.T.dot(vecs).execute() 可以看到除了 import 的不同,再者就是对最后需要数据的变量调用 execute 方法,甚至在未来我们做完 eager 模式后, execute 都可以省去,以前用 Numpy 写的算法可以几乎无缝转化成多进程以及分布式的程序,再也不用自己手动去写MapReduce。 人脸识别 当 Mars 实现了基础算法时,便可以使用到实际的算法场景中。PCA最著名的应用就是人脸特征提取以及人脸识别,单个人脸图片的维度很大,分类器很难处理,早起比较知名的人脸识别 Eigenface 算法就是采用PCA算法。本文以一个简单的人脸识别程序作为例子,看看 Mars 是如何实现该算法的。 本文的人脸数据库用的是ORL face database,有40个不同的人共400张人脸图片,每张图片为 92*112 像素的灰度图片。这里选取每组图片的第一张人脸图片作为测试图片,其余九张图片作为训练集。 首先利用 python 的 OpenCV 的库将所有图片读取成一个大矩阵,也就是 360*10304大小的矩阵,每一行是每个人脸的灰度值,一共有360张训练样本。利用 PCA 训练数据,data_mat 就是输入的矩阵,k 是需要保留的维度。 import mars.tensor as mt from mars.session import new_session session = new_session() def cov(x): x_new = x - x.mean(axis=1, keepdims=True) return x_new.dot(x_new.T) / (x_new.shape[1] - 1) def pca_compress(data_mat, k): data_mean = mt.mean(data_mat, axis=0, keepdims=True) data_new = data_mat - data_mean cov_data = cov(data_new) U, s, V = mt.linalg.svd(cov_data) V = V.T vecs = V[:, :k] data_transformed = vecs.T.dot(data_new) return session.run(data_transformed, data_mean, vecs) 由于后续做预测识别,所以除了转化成低维度的数据,还需要返回平均值以及低维度空间向量。可以看到中间过程平均脸的样子,前几年比较火的各地的平均脸就可以通过这种方式获取,当然这里的维度以及样本比较少,大概只能看出个人脸的样子。 其实 data_transformed 中保存的特征脸按照像素排列之后也能看出特征脸的形状。图中有15个特征脸,足以用来做一个人脸分类器。 另外在函数 PCA 中用了 session.run 这个函数,这是由于三个需要返回的结果并不是相互独立的,目前的延迟执行模式下提交三次运算会增加运算量,同一次提交则不会,当然立即执行模式以及运算过的部分图的剪枝工作我们也在进行中。 当训练完成之后,就可以利用降维后的数据做人脸识别了。将之前非训练样本的图片输入,转化成降维后的维度表示,在这里我们就用简单的欧式距离判断与之前训练样本中每个人脸数据的差距,距离最小的就是识别出的人脸,当然也可以设置某个阈值,最小值超过阈值的判断为识别失败。最终在这个数据集下跑出来的准确率为 92.5%,意味着一个简单的人脸识别算法搭建完成。 # 计算欧氏距离 def compare(vec1, vec2): distance = mt.dot(vec1, vec2) / (mt.linalg.norm(vec1) * mt.linalg.norm(vec2)) return distance.execute() 未来 上文展示了如何利用 Mars 一步一步地完成人脸识别小算法的过程,可以看到 Mars 类 Numpy 的接口对算法开发人员十分友好,算法规模超出单机能力时,不再需要关注如果扩展到分布式环境,Mars 帮你处理背后所有的并行逻辑。 当然,Mars 还有很多可以改进的地方,比如在 PCA 中对协方差矩阵的分解,可以用特征值、特征向量计算,计算量会远小于 SVD 方法,不过目前线性代数模块还没有实现计算特征向量的方法,这些特性我们会一步步完善,包括 SciPy 里各种上层算法接口的实现。大家有需求的可以在 GitHub 上提 issue 或者帮助我们共建 Mars。 Mars 作为一个刚刚开源的项目,十分欢迎提出其他任何想法与建议,我们需要大家的加入,让 Mars 越来越好。 Mars user group 钉钉群扫码加入。
先前,我们已经介绍过 Mars 是什么。如今 Mars 已在 Github 开源并对内上线试用,本文将介绍 Mars 已实现的分布式执行架构,欢迎大家提出意见。 架构 Mars 提供了一套分布式执行 Tensor 的库。该库使用 mars.actors 实现的 Actor 模型编写,包含 Scheduler、Worker 和 Web 服务。 用户向 Mars Web Service 提交的是由 Tensor 组成的 Graph。Web Service 接收这些图并提交到一台 Scheduler。在提交作业到各个 Worker 之前,Mars Scheduler 先将 Tensor 图编译成一张由 Chunk 和 Operand 组成的图,此后对图进行分析和切分。此后,Scheduler 在所有 Scheduler 中根据一致性哈希创建一系列控制单个 Operand 执行的 OperandActor。Operand 以符合拓扑序的顺序进行调度,当所有 Operand 完成执行,整张图将被标记为已完成,客户端能够从 Web 中拉取结果。整个执行过程如下图所述。 作业提交 用户端通过 RESTful API 向 Mars 服务提交作业。用户通过编写 Tensor 上的代码,此后通过 session.run(tensor) 将 Tensor 操作转换为 Tensor 构成的 Graph 并提交到 Web API。此后,Web API 将作业提交到 SessionActor 并在集群中创建一个 GraphActor 用于图的分析和管理。用户端则开始查询图的执行状态,直至执行结束。 在 GraphActor 中,我们首先根据 chunks 设置将 Tensor 图转换为 Operand 和 Chunk 组成的图,这一过程使得图可以被进一步拆分并能够并行执行。此后,我们在图上进行一系列的分析以获得 Operand 的优先级,同时向起始 Operand 指派 Worker,关于这一部分的细节可以参考 准备执行图 章节。此后,每个 Operand 均建立一个 OperandActor 用于控制该 Operand 的具体执行。当 Operand 处于 READY状态(如同在 Operand 状态 章节描述的那样),Scheduler 将会为 Operand 选择目标 Worker,随后作业被提交 Worker 进行实际的执行。 执行控制 当一个 Operand 被提交到 Worker,OperandActor 等待 Worker 上的回调。如果 Operand 执行成功,Operand 的后继将被调度。如果 Operand 执行失败,OperandActor 将会尝试数次,如果仍失败则将此次执行标记为失败。 取消作业 用户端可以使用 RESTful API 取消运行中的作业。取消请求将被写入 Graph 的状态存储中,同时 GraphActor 上的取消接口将被调用。如果作业在准备阶段,它将在检测到停止请求后立即结束,否则请求将被下发到每个 OperandActor,并设置状态为 CANCELLING。如果此时 Operand 没有运行,Operand 状态将被直接置为 CANCELLED。如果 Operand 正在运行,停止请求将被下发到 Worker 中并导致一个 ExecutionInterrupted 错误,该错误将返回给 OperandActor,此时 Operand 的状态将被标记为 CANCELLED。 准备执行图 当一个 Tensor 图被提交到 Mars Scheduler,一张包含更细粒度的,由 Operand 和 Chunk 构成的图将根据数据源中包含的 chunks 参数被生成。 图压缩 当完成 Chunk 图的生成后,我们将会通过合并图中相邻的节点来减小图的规模,这一合并也能让我们充分利用 numexpr 这样的加速库来加速计算过程。目前 Mars 仅会合并形成单条链的 Operand。例如,当执行下面的代码 import mars.tensor as mt a = mt.random.rand(100, chunks=100) b = mt.random.rand(100, chunks=100) c = (a + b).sum() Mars 将会合并 Operand ADD 和 SUM 成为 FUSE 节点。RAND Operand 不会被合并,因为它们并没有和 ADD 及 SUM 组成一条简单的直线。 初始 Worker 分配 为 Operand 分配 Worker 对于图执行的性能而言至关重要。随机分配初始 Operand 可能导致巨大的网络开销,并有可能导致不同 Worker 间作业分配的不平衡。因为非初始节点的分配能够根据其前驱生成数据的物理分布及各个 Worker 的空闲情况方便地确定,在执行图准备阶段,我们只考虑初始 Operand 的分配问题。 初始 Worker 分配需要遵循几个准则。首先,分配给每个 Worker 执行的 Operand 需要尽量保持平衡满,这能够使计算集群在整个执行阶段都有较高的利用率,这在执行的最后阶段显得尤其重要。其次,初始节点分配需要使后续节点执行时的网络”传输尽量小。也就是说,初始点分配需要充分遵循局部性原则。 需要注意的是,上述准则在某些情况下会彼此冲突。一个网络传输量最小的分配方案可能会非常偏斜。我们开发了一套启发式算法来获取两个目标的平衡,该算法描述如下: 选择列表中的第一个初始节点和第一台机器; 从 Operand 图转换出的无向图中自该点开始进行深度优先搜索; 如果另一个未被分配的初始节点被访问到,我们将其分配给步骤1中选择的机器; 当访问到的 Operand 总数大于平均每个 Worker 接受的 Operand 个数时,停止分配; 前往步骤1,如果仍有 Worker 未被分配 Operand,否则结束。 调度策略 当一个 Operand 组成的 Graph 执行时,合适的执行顺序会减少集群中暂存的数据总量,从而减小数据被 Spill 到磁盘的可能性。合适的 Worker 能够减少执行时网络传输的总量。 Operand 选择策略 合适的执行顺序能够显著减小集群中暂存的数据总量。下图中展示了 Tree Reduction 的例子,圆形代表 Operand,方形代表 Chunk,红色代表 Operand 正在执行,蓝色代表 Operand 可被执行,绿色代表 Operand 产生的 Chunk 已被存储,灰色代表 Operand 及其相关数据已被释放。假设我们有两台 Worker,并且每个 Operand 的资源使用量均相等,每张图展示的是不同策略下经过5个时间单元的执行后的状态。左图展示的是节点依照层次分别执行,而右图展示的是依照接近深度优先的顺序执行。左图中,有6个 Chunk 的数据需要暂存,右图只有2个。 因为我们的目标是减少存储在集群中的数据总数,我们为进入 READY 状态的 Operand 设定了一套优先级策略: 深度更大的 Operand 需要被优先执行; 被更深的 Operand 依赖的 Operand 需要被优先执行; 输出规模更小的节点需要被优先执行。 Worker 选择策略 当 Scheduler 准备执行图时,初始 Operand 的 Worker 已被确定。我们选择后续 Operand 分配 Worker 的依据是输入数据所在的 Worker。如果某个 Worker 拥有的输入数据大小最大,则该 Worker 将被选择用于执行后续 Operand。如果这样的 Worker 有多个,则各个候选 Worker 的资源状况将起到决定作用。 Operand 状态 Mars 中的每一个操作符都被一个 OperandActor 单独调度。执行的过程是一个状态转移的过程。在 OperandActor 中,我们为每一个状态的进入过程定义一个状态转移函数。起始 Operand 在初始化时位于 READY 状态,非起始 Operand 在初始化时则位于 UNSCHEDULED 状态。当给定的条件满足,Operand 将转移到另一个状态并执行相应的操作。状态转移的流程可以参考下图: 我们在下面描述每个状态的含义及 Mats 在这些状态下执行的操作。 UNSCHEDUED:一个 Operand 位于此状态,当它的上游数据没有准备好。 READY:一个 Operand 位于此状态,当所有上游输入数据均已准备完毕。在进入这一状态时,OperandActor 向 AssignerActor 中选择的所有 Worker 提交作业。如果某一 Worker 准备运行作业,它将向 Scheduler 发送消息,Scheduler 将向其他 Worker 发送停止运行的消息,此后向该 Worker 发送消息以启动作业执行。 RUNNING:一个 Operand 位于此状态,当它的执行已经启动。在进入此状态时,OperandActor 会检查作业是否已经提交。如果尚未提交,OperandActor 将构造一个由 FetchChunk Operand 和当前 Operand 组成的图,并将其提交到 Worker 中。此后,OperandActor 会在 Worker 中注册一个回调来获取作业执行完成的消息。 FINISHED:一个 Operand 位于此状态,当作业执行已完成。当 Operand 进入此状态,且 Operand 无后继,一个消息将被发送到 GraphActor 以决定是否整个 Graph 的执行都已结束。与此同时,OperandActor 向它的前驱和后继发送执行完成的消息。如果一个前驱收到此消息,它将检查是否所有的后继都已执行完成。如是,当前 Operand 上的数据可以被释放。如果一个后继收到此消息,它将检查是否所有的前驱已完成。如是,该后继的状态可以转移到 READY。 FREED:一个 Operand 位于此状态,当其上所有数据都已被释放。 CANCELLED:一个 Operand 位于此状态,当所有重新执行的尝试均告失败。当 Operand 进入此状态,它将把相同状态传递到后继节点。 CANCELLING:一个 Operand 位于此状态,当它正在被取消执行。如果此前作业正在执行,一个取消执行的请求会被发送到 Worker 上。 CANCELLED:一个 Operand 位于此状态,当执行已被取消并停止运行。如果执行进入这一状态,OperandActor 会尝试将书友的后继都转为 CANCELLING。 Worker 中的执行细节 一个 Mars Worker 包含多个进程,以减少全局解释器锁(GIL)对执行的影响。具体的执行在独立的进程中完成。为减少不必要的内存拷贝和进程间通讯,Mars Worker 使用共享内存来存储执行结果。 当一个作业被提交到 Worker,它将首先被置于队列中等待分配内存。当内存被分配后,其他 Worker 上的数据,或者当前 Worker 上已被 spill 到磁盘的数据将会被重新载入内存中。此时,所有计算需要的数据已经都在内存中,真正的计算过程将启动。当计算完成,Worker 将会把作业放到共享存储空间中。这四种执行状态的转换关系见下图。 执行控制 Mars Worker 通过 ExecutionActor 控制所有 Operand 在 Worker 中的执行。该 Actor 本身并不参与实际运算或者数据传输,只是向其他 Actor 提交任务。 Scheduler 中的 OperandActor 通过 ExecutionActor 上的 enqueue_graph 调用向 Worker 提交作业。Worker 接受 Operand 提交并且将其换存在队列中。当作业可以执行时,ExecutionActor 将会向 Scheduler 发送消息,Scheduler 将确定是否将执行该操作。当 Scheduler 确定在当前 Worker 上执行 Operand,它将调用 start_execution 方法,并通过 add_finish_callback注册一个回调。这一设计允许执行结果被多个位置接收,这对故障恢复有价值。 ExecutionActor 使用 mars.promise 模块来同时处理多个 Operand 的执行请求。具体的执行步骤通过 Promise 类的 then 方法相串联。当最终的执行结果被存储,之前注册的回调将被触发。如果在之前的任意执行步骤中发生错误,该错误会被传导到最后 catch 方法注册的处理函数中并得到处理。 Operand 的排序 所有在 READY 状态的 Operand 都被提交到 Scheduler 选择的 Worker 中。因此,在执行的绝大多数时间里,提交到 Operand 的 Worker 个数通常都高于单个 Worker 能够处理的 Operand 总数。因此,Worker 需要对 Operand 进行排序,此后选择一部分 Worker 来执行。这一排序过程在 TaskQueueActor 中进行,该 Actor 中维护一个优先队列,其中存储 Operand 的相关信息。与此同时,TaskQueueActor 定时运行一个作业分配任务,对处于优先队列头部的 Operand 分配执行资源直至没有多余的资源来运行 Operand,这一分配过程也会在新 Operand 提交或者 Operand 执行完成时触发。 内存管理 Mars Worker 管理两部分内存。第一部分是每个 Worker 进程私有的内存空间,由每个进程自己持有。第二部分是所有进程共享的内存空间,由 Apache Arrow 中的 plasma_store 持有。 为了避免进程内存溢出,我们引入了 Worker 级别的 QuotaActor,用于分配进程内存。当一个 Operand 开始执行前,它将为输入和输出 Chunk 向 QuotaActor 发送批量内存请求。如果剩余的内存空间可以满足请求,该请求会被 QuotaActor 接受。否则,请求将排队等待空闲资源。当相关内存使用被释放,请求的资源会被释放,此时,QuotaActor 能够为其他 Operand 分配资源。 共享内存由 plasma_store 管理,通常会占据整个内存的 50%。由于不存在溢出的可能,这部分内存无需经过 QuotaActor 而是直接通过 plasma_store 的相关方法进行分配。当共享内存使用殆尽,Mars Worker 会尝试将一部分不在使用的 Chunk spill 到磁盘中,以腾出空间容纳新的 Chunk。 从共享内存 spill 到磁盘的 Chunk 数据可能会被未来的 Operand 重新使用,而从磁盘重新载入共享内存的操作可能会非常耗费 IO 资源,尤其在共享内存已经耗尽,需要 spill 其他 Chunk 到磁盘以容纳载入的 Chunk 时。因此,当数据共享并不需要时,例如该 Chunk 只会被一个 Operand 使用,我们会将 Chunk 直接载入进程私有内存中,而不是共享内存,这可以显著减少作业总执行时间。 未来工作 Mars 目前正在快速迭代,近期将考虑实现 Worker 级别的 failover 及 shuffle 支持,Scheduler 级别的 failover 也在计划中。 Mars user group 钉钉群扫码加入。
最近,在 PyCon China 2018 的北京主会场、成都和杭州分会场都分享了我们最新的工作 Mars,基于矩阵的统一计算框架。本文会以文字的形式对 PyCon 中国上的分享再进行一次阐述。 听到 Mars,很多第一次听说的同学都会灵魂三问:Mars 是什么,能做什么,怎么做的。今天我们就会从背景,以及一个例子出发,来回答这几个问题。 背景 首先是 scipy 技术栈的全景图,numpy 是基础,它提供了多维数组的数据结构,并提供了它上面的各种计算。再往上,重要的有 scipy,主要面向各种科学计算的操作;pandas,其中核心的概念是 DataFrame,他提供对表类型数据的处理、清洗等功能。往上一层,比较经典的库,有 scikit-learn,它是最知名的机器学习框架之一。最上面一层,是各种垂直领域的库,如 astropy 主要面向天文,biopython 面向生物领域等。 从 scipy 技术栈可以看出,numpy 是一个核心的地位,大量上层的库都使用了 numpy 的数据结构和计算。 我们真实世界的数据,并不只是表这种二维类型数据那么简单,很多时候,我们要面对的往往是多维数据,比如我们常见的图片处理,首先我们有图片的个数,然后有图片的长宽,以及 RGBA 通道,这就是四维的数据;这样的例子不胜枚举。有这样多维的处理能力,就有处理各种更加复杂,甚至是科学领域的能力;同时,由于多维数据本身包含二维数据,所以,我们也因此具备表类型数据的处理能力。 另外,如果我们需要探究数据的内在,光靠对表数据进行一些统计等操作是绝对不够的,我们需要更深层的“数学” 的方法,比如运用矩阵乘法、傅里叶变换等等的能力,来对数据进行更深层次的分析。而 numpy 由于是数值计算的库,加上各种上层的库,我们认为它们很适合用来提供这方面的能力。 为什么要做 Mars,从一个例子开始 那么,为什么要做 Mars 这个项目呢?我们不妨从一个例子来看。 我们试图用蒙特卡洛方法来求解 pi,蒙特卡洛方法其实很简单,就是用随机数的方法来解决特定的问题。如图,这里我们有个半径为1的圆和边长为2的正方形,我们生成很多随机点的方式,通过右下角的公式,我们就可以计算出 pi 的值为 4 乘以落在圆里点的个数除以总的点个数。随机生成的点越多,计算出来的 pi 就越精确。 用纯 Python 实现非常简单,我们只要遍历 N 次,生成 x 和 y 点,计算是不是落在圆内即可。运行1千万个点,需要超过10秒的时间。 Cython 是常见加速 Python 代码的方式,Cython 定义了 Python 语言的超集,把这个语言翻译到 c/c++,然后再进行编译来加速执行。这里,我们增加了几个变量的类型,可以看到比纯 Python 提升了 40% 的性能。 Cython 现在已经成为 Python 项目的标配,核心的 Python 三方库基本都使用 Cython 来加速 Python 代码的性能。 我们这个例子里的数据都是一个类型,我们可以想到用专门的数值计算的库,通过矢量化的方式,能极快加速这个任务的性能。numpy 就是当仁不让的选择了,使用 numpy,我们需要的是面向 array 的思维方式,我们应当减少使用循环。这里先用 numpy.random.uniform 来生成 N*2 的一个二维数组,然后 data ** 2 会对该数组里的所有数据做平方操作,然后 sum(axis=1) ,会对 axis=1 也就是行方向上求和,这个时候,得到的是长度为 N 的 vector,然后我们用 numpy.sqrt 来对这个 vector 的每一个值求开方,<1 会得到一个布尔值的 vector,即每个点是不是都是落在圆里,最后接一个 sum,就可以求出来总共的点的个数。初次上手 numpy 可能会不太习惯,但是用多了以后,就会发现这种写法的方便,它其实是非常符合直觉的。 可以看到,通过使用 numpy,我们写出了更简单的代码,但是性能确大幅提升,比纯 Python 的写法性能提升超过 10 倍。 那么 numpy 的代码还能够优化么,答案是肯定的,我们通过一个叫 numexpr 的库,来将 numpy 的多个操作合并成一个操作执行,来加速 numpy 的执行。 可以看到,通过 numexpr 优化的代码,性能比纯 Python 代码提升超过 25 倍。 此时的代码运行已经相当快了,如果我们手上有 GPU,那么我们可以利用硬件来加速任务执行。 这里必须要安利一个库,叫 cupy,他提供了和 numpy 一致的 API,通过简单的 import 替换,就能让 numpy 代码跑在英伟达的显卡之上。 这时可以看到,性能大幅提升超过 270 倍。真的非常夸张了。 为了让蒙特卡洛方法计算的结果更加精确,我们把计算量扩大 1000 倍。会碰到什么情况呢? 没错,这就是大家不时碰到的,OutOfMemory,内存溢出。更惨的是,在 jupyter 里,有时候内存溢出导致进程被杀,甚至会导致之前跑的全部结果都丢失。 蒙特卡洛方法还是比较容易处理的,我把问题分解成 1000 个,每个求解1千万数据就好了嘛,写个循环,做个汇总。但此时,整个计算的时间来到了12分钟多,太慢了。 此时我们会发现,整个运行过程中,其实只有一个 CPU 在干活,其他核都在原地吆喝。那么,我们怎么让 numpy 并行化呢? 首先,numpy 里有一些操作是能并行的,比如 tensordot 来做矩阵乘法,其他大部分操作都不能利用多核。那么,要将 numpy 并行化,我们可以: 采用多线程/多进程编写任务 分布式化 蒙特卡洛方法算 pi 改写成多线程和多进程实现还是非常容易的,我们写一个函数来处理1千万的数据,我们把这个函数通过 concurrent.futures 的 ThreadPoolExecutor 和 ProcessPoolExecutor 来分别提交函数 1000 遍用多线程和多进程执行即可。可以看到性能能提升到 2倍和3倍。 但是呢,蒙特卡洛求解 pi 本身就很容易手写并行,考虑更复杂的情况。 import numpy as np a = np.random.rand(100000, 100000) (a.dot(a.T) - a).std() 这里创建了 10万*10万的矩阵 a,输入就有大概 75G,我们拿 a 矩阵乘 a 的转置,再减去 a 本身,最终求标准差。这个任务的输入数据就很难塞进内存,后续的手写并行就更加困难。 这里问题就引出来了,我们需要什么样框架呢? 提供熟悉的接口,像 cupy 这样,通过简单的 import 替换,就能让原来 numpy 写的代码并行。 具备可扩展性。小到单机,也可以利用多核并行;大到一个很大的集群,支持上千台机器的规模来一起分布式处理任务。 支持硬件加速,支持用 GPU 等硬件来加速任务执行。 支持各种优化,比如操作合并,能利用到一些库来加速执行合并的操作。 我们虽然是内存计算的,但不希望单机或者集群内存不足,任务就会失败。我们应当让暂时用不到的数据 spill 到磁盘等等存储,来保证即使内存不够,也能完成整个计算。 Mars 是什么,能做什么事 Mars 就是这样一个框架,它的目标就是解决这几个问题。目前 Mars 包括了 tensor :分布式的多维矩阵计算。 100亿大小的蒙特卡洛求解 pi的问题规模是 150G,它会导致 OOM。通过 Mars tensor API,只需要将 import numpy as np 替换成 import mars.tensor as mt,后续的计算完全一致。不过有一个不同,mars tensor 需要通过 execute 触发执行,这样做的好处是能够对整个中间过程做尽量多的优化,比如操作合并等等。不过这种方式对 debug 不太友好,后续我们会提供 eager mode,来对每一步操作都触发计算,这样就和 numpy 代码完全一致了。 可以看到这个计算时间和手写并行时间相当,峰值内存使用也只是 1个多G,因此可以看到 Mars tensor 既能充分并行,又能节省内存的使用 。 目前,Mars 实现了 70% 的常见 numpy 接口,完整列表见 这里。我们一致在努力提供更多 numpy 和 scipy 的接口,我们最近刚刚完成了对逆矩阵计算的支持。 Mars tensor 也提供了对 GPU 和稀疏矩阵的支持。eye 是创建单位对角矩阵,它只有对角线上有值为1,如果用稠密的方式存储会浪费存储。不过目前,Mars tensor 还只支持二维稀疏矩阵。 Mars 怎么做到并行和更省内存 和所有的 dataflow 的框架一样,Mars 本身也有计算图的概念,不同的是,Mars 包含粗粒度图和细粒度图的概念,用户写的代码在客户端生成粗粒度图,在提交到服务端后,会有 tile 的过程,将粗粒度图 tile 成细粒度图,然后我们会调度细粒度图执行。 这里,用户写下的代码,在内存里会表达成 Tensor 和 Operand 构成的粗粒度图。 当用户调用 execute 方法时,粗粒度的图会被序列化到服务端,反序列化后,我们会把这个图 tile 成细粒度图。对于输入 10002000 的矩阵,假设指定每个维度上的 chunk 大小都是 500,那它会被 tile 成 24 一共 8 个chunk。 后续,我们会对每个我们实现的 operand 也就是算子提供 tile 的操作,将一个粗粒度的图 tile 成细粒度图。这时,我们可以看到,在单机,如果有8个核,那么我们就可以并行执行整个细粒度图;另外给定 1/8 大小的内存,我们就可以完成整个图的计算。 不过,我们在真正执行前,会对整个图进行 fuse 也就是操作合并的优化,这里的三个操作真正执行的时候,会被合并成一个算子。针对执行目标的不同,我们会使用 numexpr 和 cupy 的 fuse 支持来分别对 CPU 和 GPU 进行操作合并执行。 上面的例子,都是我们造出来很容易并行的任务。如我们先前提到的例子,通过 tile 之后生成的细粒度图其实是非常复杂的。真实世界的计算场景,这样的任务其实是很多的。 为了将这些复杂的细粒度图能够充分调度执行,我们必须要满足一些基本的准则,才能让执行足够高效。 首先,初始节点的分配非常重要。比如上图,假设我们有两个 worker,如果我们把 1和3 分配到一个 worker,而将 2和4 分配到另一个 worker,这时当 5 或者 6 被调度的时候,他们就需要触发远程数据拉取,这样执行效率会大打折扣。如果我们一开始将 1和2 分配到一个 worker,将 3和4 分配到另一个 worker,这时执行就会非常高效。初始节点的分配对整体的执行影响是很大的,这就需要我们对整个细粒度的图有个全局的掌握,我们才能做到比较好的初始节点分配。 另外,深度优先执行的策略也是相当重要的。假设这时,我们只有一个 worker,执行完 1和2 后,我们调度了 3 的话,就会导致 1和2 的内存不能释放,因为 5 此时还没有被触发执行。但是,如果我们执行完 1和2 后,调度了 5 执行,那么当 5 执行完后,1和2 的内存就可以释放,这样整个执行过程中的内存就会是最省的。 所以,初始节点分配,以及深度优先执行是两个最基本的准则,光有这两点是远远不够的,mars 的整个执行调度中有很多具有挑战的任务,这也是我们需要长期优化的对象。 Mars 分布式 所以,Mars 本质上其实是一个细粒度的,异构图的调度系统。我们把细粒度的算子调度到各个机器上,在真正执行的时候其实是调用 numpy、cupy、numexpr 等等的库。我们充分利用了成熟的、高度优化的单机库,而不是重复在这些领域造轮子。 在这个过程中,我们会遇到一些难点: 因为我们是 master-slave 架构,我们 master 如何避免单点? 我们的 worker 如何避免 Python 的 GIL(全局解释器锁)的限制? Master 的控制逻辑交错复杂,我们很容易写出来高耦合的,又臭又长的代码,我们如何将代码解耦? 我们的解法是使用 Actor model。Actor模型定义了并行的方式,也就是一切皆 Actor,每个 Actor 维护一个内部状态,它们都持有邮箱,Actor 之间通过消息传递,消息收到会放在邮箱中,Actor 从邮箱中取消息进行处理,一个 Actor 同时只能处理一个消息。Actor 就是一个最小的并行单元,由于一个 Actor 同时只能处理一个消息,你完全不需要担心并发的问题,并发应当是 Actor 框架来处理的。而所有 Actor 是不是在同一台机器上,这在 Actor 模型里也变得不重要,Actor 在不同机器上,只要能完成消息的传递就可以了,这样 Actor 模型也天然支持分布式系统。 因为 Actor 是最小的并行单元,我们在写代码的时候,可以将整个系统分解成很多 Actor,每个 Actor 是单一职责的,这有点类似面向对象的思想,这样让我们的代码得以解耦。 另外,Master 解耦成 Actor 之后,我们可以让这些 Actor 分布在不同的机器上,这样就让 Master 不再成为单点。同时,我们让这些 Actor 根据一致性哈希来进行分配,后续如果有 scheduler 机器挂掉, Actor 可以根据一致性哈希重新分配并重新创建来达到容错的目的。 最后,我们的 actors 是跑在多进程上的,每个进程里是很多的协程,这样,我们的 worker 也不会受到 GIL 的限制。 像 Scala 或者 Java 这些 JVM 语言 可以使用 akka 这个 Actor 框架,对于 Python 来说,我们并没有什么标准做法,我们认为我们只是需要一个轻量的 Actor 框架就可以满足我们使用,我们不需要 akka 里面一些高阶的功能。因此,我们开发了 Mars actors,一个轻量的 Actor 框架,我们 Mars 整个分布式的 schedulers 和 workers 都在 Mars actors 层之上。 这是我们 Mars actors 的架构图,在启动 Actor pool 的时候,我们子进程会根据并发启动若干子进程。主进程上有 socket handler 来接受远程 socket 连接传递消息,另外主进程有个 Dispatcher 对象,用来根据消息的目的地来进行分发。我们所有的 Actor 都在子进程上创建,当 Actor 收到一个消息处理时,我们会通过协程调用 Actor.on_receive(message) 方法。 一个 Actor 发送消息到另一个 Actor,分三种情况。 它们在同一个进程,那么直接通过协程调用即可。 它们在一台机器不同进程,这个消息会被序列化通过管道送到主进程的 Dispatcher,dispatcher 通过解开二进制的头部信息得到目标的进程 ID,通过对应的管道送到对应子进程,子进程通过协程触发相应 Actor 的消息处理即可。 它们在不同机器,那么当前子进程会通过 socket 把序列化的消息发送到对应机器的主进程,该机器再通过 Dispatcher 把消息送到对应子进程。 由于使用协程作为子进程内的并行方式,而协程本身在 IO 处理上有很强的性能,所以,我们的 Actor 框架在 IO 方面也会有很好的性能。 上图是裸用 Mars actors 来求解蒙特卡洛方法算 pi。这里定义两个 Actor,一个 Actor 是 ChunkInside,它接受一个 chunk 的大小,来计算落在圆内点的个数;另外一个 Actor 是 PiCaculator,它负责接受总的点个数,来创建 ChunkInside,这个例子就是直接创建 1000 个 ChunkInside,然后通过发送消息来触发他们计算。create_actor 时指定 address 可以让 Actor 分配在不同的机器上。 这里可以看到,我们裸用 Mars actors 的性能是要快过多进程版本的。 这里我们总结一下,通过使用 Mars actors,我们能不受 GIL 限制,编写分布式代码变得非常容易,它让我们 IO 变得高效,此外,因为 Actor 解耦,代码也变得更容易维护。 现在让我们看下 Mars 分布式的完整执行过程。现在有1个 client,3个 scheduler 和 5个worker。用户创建一个 session,在服务端会创建一个 SessionActor 对象,通过一致性哈希,分配到 scheduler1 上。此时,用户运行了一个 tensor,首先 SessionActor 会创建一个 GraphActor,它会 tile 粗粒度图,图上假设有三个节点,则会创建三个 OperandActor,分别分配到不同的 scheduler 上。每个 OperandActor 会控制 operand 的提交、任务状态的监督和内存的释放等操作。此时 1 和 2 的 OperandActor 发现没有依赖,并且集群资源充足,那么他们会把任务提交到相应的 worker 执行,在执行完成后,向 3 通知任务完成,3 发现 1和2 都执行完成后,因为数据在不同 worker 执行,决定好执行 worker 后,先触发数据拉取操作,然后再执行。客户端这边,通过轮询 GraphActor 得知任务完成,则会触发数据拉取到本地的操作。整个任务就完成了。 我们对 Mars 分布式做了两个 benchmark,第一个是对 36 亿数据的每个元素加一再乘以2,图中红色叉是 numpy 的执行时间,可以看到,我们比 numpy 有数倍提升,蓝色的虚线是理论运行时间,可以看到我们真实加速非常接近理论时间加速。第二个 benchmark,我们增加了数据量,来到 144 亿数据,对每个元素加1乘以2后,再求和,可以看到单机 numpy 已经不能完成任务了,此时,针对这个任务,我们也可以取得不错的加速比。 未来计划 Mars 已经在 Github 上源代码,让更多同学来一起参与共建 Mars:https://github.com/mars-project/mars 。 在后续 Mars 的开发计划上,如上文说,我们会支持 eager mode,让每一步触发执行,提升对性能不敏感的任务开发以及 debug 时的使用体验;我们会支持更多 numpy 和 scipy 接口;后续很重要的一个是,我们会提供 100% 兼容 pandas 的接口,由于利用了 mars tensor 作为基础,我们也可以提供 GPU 的支持;我们会提供兼容 scikit-learn 的机器学习的支持;我们还会提供在细粒度图上调度自定义函数和自定义类的功能,增强灵活性;最后,因为我们客户端其实并不依赖 Python,任意语言都可以序列化粗粒度图,所以我们完全可以提供多语言的客户端版本,不过这点,我们会视需求而定。 总之,开源对我们是很重要的,庞大的 scipy 技术栈的并行化,光靠我们的力量是不够的,需要大家来一起帮我们来共建。 现场图片 最后再 po 一点现场图片吧,现场观众对 Mars 的问题还是蛮多的。我大致总结下: Mars 在一些特定计算的性能,比如 SVD 分解,这里我们有和用户合作项目的一些测试数据,输入数据是 8亿*32的矩阵做 SVD 分解,分解完再矩阵乘起来和原矩阵做对比,这整个计算过程使用 100个 worker(8核),用7分钟时间算完 Mars 何时开源,我们已经开源:https://github.com/mars-project/mars Mars 开源后会不会闭源,答:不会 Mars actors 的详细工作原理 Mars 是静态图还是动态图,目前是静态图,eager mode 做完后可以支持动态图 Mars 会不会涉及深度学习,答:目前不会 Mars user group 钉钉群扫码加入。
很高兴在这里宣布我们的新项目:Mars,一个基于张量的统一分布式计算框架。我们已经在 Github 开源:https://github.com/mars-project/mars 。 背景 Python Python 是一门相当古老的语言了,如今,在数据科学计算、机器学习、以及深度学习领域,Python 越来越受欢迎。 大数据领域,由于 hadoop 和 spark 等,Java 等还是占据着比较核心的位置,但是在 spark 上也可以看到,pyspark 的用户占据很大一部分。 深度学习领域,绝大部分的库(tensorflow、pytorch、mxnet、chainer)都支持 Python 语言,且 Python 语言也是这些库上使用最广泛的语言。 对 MaxCompute 来说,Python 用户也是一股重要力量。 PyData(numpy、scipy、pandas、scikit-learn、matplotlib) Python 在数据科学领域,有非常丰富的包可以选择,下图展示了整个 Python 数据科学技术栈。 可以看到 numpy 作为基础,在其上,有 scipy 面向科学家,pandas 面向数据分析,scikit-learn 则是最著名的机器学习库,matplotlib 专注于可视化。 对 numpy 来说,其中最核心的概念就是 ndarray——多维数组,pandas、scikit-learn 等库都构建于这个数据结构基础之上。 问题 虽然 Python 在这些领域越来越流行,PyData 技术栈给数据科学家们提供了多维张量、DataFrame 上的分析和计算能力、基于二维张量的机器学习算法,但这些库都仅仅受限于单机运算,在大数据时代,数据量一大,这些库的处理能力都显得捉襟见肘。 虽然大数据时代,有各种各样基于 SQL 的计算引擎,但对科学计算领域,这些引擎都不太适合用来进行大规模的多维张量的运算操作。而且,相当一部分用户,尤其是数据科学家们,习惯于使用各种成熟的单机库,他们不希望改变自己的使用习惯,去学习一些新的库和语法。 此外,在深度学习领域,ndarray/tensor 也是最基本的数据结构,但它们仅仅限制在深度学习上,也不适合大规模的多维张量运算。 基于这些考量,我们开发了 Mars,一个基于 tensor 的统一分布式计算框架,前期我们关注怎么将 tensor 这层做到极致。 我们的工作 Mars 的核心用 python 实现,这样做的好处是能利用到现有的 Python 社区的工作,我们能充分利用 numpy、cupy、pandas 等来作为我们小的计算单元,我们能快速稳定构建我们整个系统;其次,Python 本身能轻松和 c/c++ 做继承,我们也不必担心 Python 语言本身的性能问题,我们可以对性能热点模块轻松用 c/cython 重写。 接下来,主要集中介绍 Mars tensor,即多维张量计算的部分。 Numpy API Numpy 成功的一个原因,就是其简单易用的 API。Mars tensor 在这块可以直接利用其作为我们的接口。所以在 numpy API 的基础上,用户可以写出灵活的代码,进行数据处理,甚至是实现各种算法。 下面是两段代码,分别是用 numpy 和 Mars tensor 来实现一个功能。 import numpy as np a = np.random.rand(1000, 2000) (a + 1).sum(axis=1) import mars.tensor as mt a = mt.random.rand(1000, 2000) (a + 1).sum(axis=1).execute() 这里,创建了一个 1000x2000 的随机数张量,对其中每个元素加1,并在 axis=1(行)上求和。 目前,Mars 实现了大约 70% 的 Numpy 常用接口。 可以看到,除了 import 做了替换,用户只需要通过调用 execute 来显式触发计算。通过 execute 显式触发计算的好处是,我们能对中间过程做更多的优化,来更高效地执行计算。 不过,静态图的坏处是牺牲了灵活性,增加了 debug 的难度。下个版本,我们会提供 instant/eager mode,来对每一步操作触发计算,这样,用户能更有效地进行 debug,且能利用到 Python 语言来做循环,当然性能也会有所损失。 使用 GPU 计算 Mars tensor 也支持使用 GPU 计算。对于某些张量创建的接口,我们提供了 gpu=True 的选项,来指定分配到 GPU,后续这个张量上的计算将会在 GPU 上进行。 import mars.tensor as mt a = mt.random.rand(1000, 2000, gpu=True) (a + 1).sum(axis=1).execute() 这里 a 是分配在 GPU 上,因此后续的计算在 GPU 上进行。 稀疏张量 Mars tensor 支持创建稀疏张量,不过目前 Mars tensor 还只支持二维稀疏张量。比如,我们可以创建一个稀疏的单位矩阵,通过指定 sparse=True 即可。 import mars.tensor as mt a = mt.eye(1000, sparse=True, gpu=True) b = (a + 1).sum(axis=1) 这里看到,gpu 和 sparse 选项可以同时指定。 基于 Mars tensor 的上层建筑 这部分在 Mars 里尚未实现,这里提下我们希望在 Mars 上构建的各个组件。 DataFrame 相信有部分同学也知道 PyODPS DataFrame,这个库是我们之前的一个项目,它能让用户写出类似 pandas 类似的语法,让运算在 ODPS 上进行。但 PyODPS DataFrame 由于 ODPS 本身的限制,并不能完全实现 pandas 的全部功能(如 index 等),而且语法也有不同。 基于 Mars tensor,我们提供 100% 兼容 pandas 语法的 DataFrame。使用 mars DataFrame,不会受限于单个机器的内存。这个是我们下个版本的最主要工作之一。 机器学习 scikit-learn 的一些算法的输入就是二维的 numpy ndarray。我们也会在 Mars 上提供分布式的机器学习算法。我们大致有以下三条路: scikit-learn 有些算法支持 partial_fit,因此,我们直接在每个 worker 上调用 sklearn 的算法。 提供基于 Mars 的 joblib 后端。由于 sklearn 使用 joblib 来做并行,因此,我们可以通过实现 joblib 的 backend,来让 scikit-learn 直接跑在 Mars 的分布式环境。但是,这个方法的输入仍然是 numpy ndarray,因此,总的输入数据还是受限于内存。 在 Mars tensor 的基础上实现机器学习算法,这个方法需要的工作量是最高的,但是,好处是,这些算法就能利用 Mars tensor 的能力,比如 GPU 计算。以后,我们需要更多的同学来帮我们贡献代码,共建 Mars 生态。 细粒度的函数和类 Mars 的核心,其实是一个基于 Actor 的细粒度的调度引擎。因此,实际上,用户可以写一些并行的 Python 函数和类,来进行细粒度的控制。我们可能会提供以下几种接口。 函数 用户能写普通的 Python 函数,通过 mars.remote.spawn 来将函数调度到 Mars 上来分布式运行 import mars.remote as mr def add(x, y): return x + y data = [ (1, 2), (3, 4) ] for item in data: mr.spawn(add, item[0], item[1]) 利用 mr.spawn,用户能轻松构建分布式程序。在函数里,用户也可以使用 mr.spawn,这样,用户可以写出非常精细的分布式执行程序。 类 有时候,用户需要一些有状态的类,来进行更新状态等操作,这些类在 Mars 上被称为 RemoteClass。 import mars.remote as mr class Counter(mr.RemoteClass): def __init__(self): self.value = 0 def inc(self, n=1): self.value += n counter = mr.spawn(Counter) counter.inc() 目前,这些函数和类的部分尚未实现,只是在构想中,所以届时接口可能会做调整。 内部实现 这里,我简单介绍下 Mars tensor 的内部原理。 客户端 在客户端,我们不会做任何真正的运算操作,用户写下代码,我们只会在内存里用图记录用户的操作。 对于 Mars tensor 来说,我们有两个重要的概念,operand 和 tensor,分别如下图的蓝色圆和粉色方块所示。Operand 表示算子,tensor 表示生成的多维数组。 比如,下图,用户写下这些代码,我们会依次在图上生成对应的 operand 和 tensor。 当用户显式调用 execute 的时候,我们会将这个图提交到 Mars 的分布式执行环境。 我们客户端部分,并不会对语言有任何依赖,只需要有相同的 tensor graph 序列化,因此可以用任何语言实现。下个版本我们要不要提供 Java 版本的 Mars tensor,我们还要看是不是有用户需要。 分布式执行环境 Mars 本质上是一个对细粒度图的执行调度系统。 对于 Mars tensor 来说,我们接收到了客户端的 tensor 级别的图(粗粒度),我们要尝试将其转化成 chunk 级别的图(细粒度)。每个 chunk 以及其输入,在执行时,都应当能被内存放下。我们称这个过程叫做 tile。 在拿到细粒度的 chunk 级别的图后,我们会将这个图上的 Operand 分配到各个 worker 上去执行。 总结 Mars 在九月份的云栖大会发布,目前我们已经在 Github 开源:https://github.com/mars-project/mars 。我们项目完全以开源的方式运作,而不是简单把代码放出来。 期待有更多的同学能参与 Mars,共建 Mars。 努力了很久,我们不会甘于做一个平庸的项目,我们期待对世界做出一点微小的贡献——我们的征途是星辰大海! Mars user group 钉钉群扫码加入。
背景 PyODPS DataFrame 提供了类似 pandas 的接口,来操作 ODPS 数据,同时也支持在本地使用 pandas,和使用数据库来执行。 PyODPS DataFrame 除了支持类似 pandas 的 map 和 apply 方法,也提供了 MapReduce API 来扩展 pandas 语法以适应大数据环境。 PyODPS 的自定义函数是序列化到 MaxCompute 上执行的,MaxCompute 的 Python 环境只包含了 numpy 这一个第三方包,用户常常问的问题是,如何在自定义函数里使用 pandas、scipy 或者 scikit-learn 这样的包含c代码的库? 现在,MaxCompute 在 sprint 27 及更高版本的 isolation,让在自定义函数中使用这些包成为可能。同时,PyODPS也需要至少0.7.4版本 。接下来我会详细介绍使用步骤。 步骤 上传第三方包(只需做一次) 这个步骤只需要做一次,当 MaxCompute 资源里有了这些包,这一步直接跳过。 现在这些主流的 Python 包都提供了 whl 包,提供了各平台包含二进制文件的包,因此找到能在 MaxCompute 上能运行的包是第一步。 其次,要想在 MaxCompute 上运行,需要包含所有的依赖包,这个是比较繁琐的。我们可以看下各个包的依赖情况(删除表示已经包含) 包名 依赖 pandas numpy, python-dateutil, pytz, six scipy numpy scikit-learn numpy, scipy 所以,我们一共需要上传 python-dateutil、pytz、pandas、scipy、sklearn、six 这六个包,就能保证 pandas、scipy 和 scikit-learn 可用。 我们直接通过 http://mirrors.aliyun.com/pypi/simple 来找包。首先是 python-dateutils:http://mirrors.aliyun.com/pypi/simple/python-dateutil/ 。我们找到最新版,这个包是纯 Python 的包,我们找到最新版的 zip 包,python-dateutil-2.6.0.zip,下载。 重命名为 python-dateutil.zip,通过 MaxCompute Console 上传资源。 add archive python-dateutil.zip; pytz 一样,找到 pytz-2017.2.zip。上传不表。 six 找到 six-1.11.0.tar.gz。 接下来,是pandas,对于这种包含c的包,我们一定要找 名字中包含cp27-cp27m-manylinux1_x86_64 的whl包,这样才能在 MaxCompute 上正确执行。因此,这样我们找到最新版的包是:pandas-0.20.2-cp27-cp27m-manylinux1_x86_64.whl。 这里我们把后缀改成zip,上传。 add archive pandas.zip; 其他包也是一样,因此,我们把它们都列出来: 包名 文件名 上传资源名 python-dateutil python-dateutil-2.6.0.zip python-dateutil.zip pytz pytz-2017.2.zip pytz.zip six six-1.11.0.tar.gz six.tar.gz pandas pandas-0.20.2-cp27-cp27m-manylinux1_x86_64.zip pandas.zip scipy scipy-0.19.0-cp27-cp27m-manylinux1_x86_64.zip scipy.zip scikit-learn scikit_learn-0.18.1-cp27-cp27m-manylinux1_x86_64.zip sklearn.zip 至此,全部包上传都已完成。 当然,我们全部上传也可以使用 PyODPS 的资源上传接口来完成,同样只需要操作一遍即可。至于用哪个,看个人喜好了。 编写代码验证 我们写一个简单的函数,里面用到了所有的库,最好是在函数里来 import 这些第三方库。 def test(x): from sklearn import datasets, svm from scipy import misc import numpy as np iris = datasets.load_iris() assert iris.data.shape == (150, 4) assert np.array_equal(np.unique(iris.target), [0, 1, 2]) clf = svm.LinearSVC() clf.fit(iris.data, iris.target) pred = clf.predict([[5.0, 3.6, 1.3, 0.25]]) assert pred[0] == 0 assert misc.face().shape is not None return x 这段代码只是示例,目标是用到以上所说的所有的包。 写完函数后,我们写一个简单的 map,记住, 运行时要确保 isolation 打开 ,如果不在 project 级别打开,可以在运行时打开,一个可以设置全局的选项: from odps import options options.sql.settings = {'odps.isolation.session.enable': True} 也可以在 execute 方法上指定本次执行打开 isolation。 同样,我们可以在全局通过 options.df.libraries 指定用到的包,也可以在 execute 时指定。这里,我们要指定所有的包,包括依赖。下面就是调用刚刚定义的函数的例子。 hints = { 'odps.isolation.session.enable': True } libraries = ['python-dateutil.zip', 'pytz.zip', 'six.tar.gz', 'pandas.zip', 'scipy.zip', 'sklearn.zip'] iris = o.get_table('pyodps_iris').to_df() print iris[:1].sepal_length.map(test).execute(hints=hints, libraries=libraries) 可以看到,我们的函数顺利执行。 总结 对于要用到的第三方库及其依赖,如果已经上传,则可以直接编写代码,并指定用到的 libraries 即可;否则,需要按照教程上传第三方库。 可以看到,当第一步上传包做过后,以后每次使用都是优雅的,只需指定 libraries 就可以了。 PyODPS 相关资源 文档:http://pyodps.readthedocs.io/zh_CN/latest/ 代码:https://github.com/aliyun/aliyun-odps-python-sdk ,欢迎提 issue 和 pull request MaxCompute 钉钉群
新版 MaxCompute Isolation Session 支持 Python UDF。也就是说,Python UDF 中已经可以跑二进制包。刚才以 Scipy 为例踩了一下坑,把相关的过程分享出来。 下载 Scipy 包并上传资源 首先,从 PyPI 或其他镜像下载 Scipy 包。你需要下载后缀为“cp27-cp27m-manylinux1_x86_64.whl”的包,其他的包会无法加载,包括名为“cp27-cp27mu”的包。以下的截图来自 https://pypi.python.org/pypi/scipy ,仅有打勾的包可以直接使用: 下载 whl 后,将文件名更改为 scipy.zip。此后,在 MaxCompute Console 中执行 add archive scipy.zip; 此后,scipy.zip 即被创建为 MaxCompute Archive 资源。不建议使用其他类型的资源,因为在执行时,MaxCompute 会自动解压 Archive 类型的资源,从而省去手动解压的步骤。 从非 Whl 包生成 Whl 包 如果列出的包中包含 Whl,则可以直接上传并跳过此步骤。如果列出的包不包含 whl(如手中仅有图中的 scipy-0.19.0.zip),需要在 Linux 环境中手动编译并打包为 whl。打包前,需要确保下列命令返回“cp27m”而不是“cp27mu”: python -c "import pip; print pip.pep425tags.get_abi_tag()" 如果返回值为“cp27mu”,你需要使用 “--enable-unicode=no" 选项编译一个可用的 Python 2.7,再使用编译得到的 Python。如果返回值正确,通常可以在该环境下使用 python setup.py bdist_wheel 完成,具体请参考各个包的编译/安装说明。 打包完成后,将生成的 whl 包上传。 编写和创建 UDF 我们需要编写一个 UDF 支持计算 psi。编写下列代码: from odps.udf import annotate from odps.distcache import get_cache_archive def include_package_path(res_name): import os, sys archive_files = get_cache_archive(res_name) dir_names = sorted([os.path.dirname(os.path.normpath(f.name)) for f in archive_files if '.dist_info' not in f.name], key=lambda v: len(v)) sys.path.append(os.path.dirname(dir_names[0])) @annotate("double->double") class MyPsi(object): def __init__(self): include_package_path('scipy.zip') def evaluate(self, arg0): from scipy.special import psi return float(psi(arg0)) 这里有必要解释一下 include_package_path 这个函数。get_cache_archive 返回一个包含包中所有文件的文件对象。我们首先取出所有的文件名,此后获得最短的路径作为包的路径,并加入 sys.path。此后,便可以正常 import scipy 这个包。 需要注意的是,因为 MaxCompute 会在执行前通过原有的沙箱检查 UDF 的输入/输出,因而 include_package_path 和 import 在函数外调用会报错。 编写完成后,将代码保存为 my_psi.py,并在 MaxCompute Console 中执行 add py my_psi.py; 此后创建函数。在 MaxCompute Console 中输入 create function my_psi as my_psi.MyPsi using my_psi.py,scipy.zip; 注意在 create function 时,不要忘记加上刚才上传的包,例如上面的 scipy.zip。 执行 创建 UDF 后,便可以在 MaxCompute Console 中执行查询(暂不支持 pypy,因而需禁用 pypy): set odps.pypy.enabled=false; set odps.isolation.session.enable = true; select my_psi(sepal_length) from iris; 其他 如果包依赖了其他 Python 包,需要一并上传并同时加入到 UDF 依赖中。 使用 0.7.4 以上的 PyODPS DataFrame 可以简化使用二进制包的 UDF 的编写,无需手动调用 include_package_path,具体可见 http://pyodps.readthedocs.io/zh_CN/latest/df-element-zh.html#third-party-library 。 本人没有进行更深入的使用,相关问题请提工单提问,或者加入 MaxCompute 钉钉群讨论。 MaxCompute 钉钉群
PyODPS 中使用 Python UDF 包含两方面,一个是直接使用,也就是在 MaxCompute SQL 中使用;一个是间接的方式,也就是 PyODPS DataFrame,这种方式你不需要直接写 Python UDF,而是写普通的 Python 函数或者类。下面我们分开说明。 作为准备工作,我们需要 ODPS 入口,可以通过直接初始化,或者使用 room 机制 加载。 from odps import ODPS o = ODPS('your-access-id', 'your-access-key', 'your-project') MaxCompute SQL 中使用 Python UDF 首先,我们需要写一个 Python 文件,假设我们就是把某一列按 csv 格式放的一列转成 json 格式。 import json from odps.udf import annotate @annotate('string->string') class Transform(object): def evaluate(self, x): columns = list('abc') d = dict(zip(columns, x.split(','))) return json.dumps(d) 假设这个文件叫 my.py,接下来我们就需要创建 py 资源。 r = o.create_resource('csv_to_json.py', 'py', fileobj=open('my.py')) fileobj 参数也可以是 str 类型,就是表示文件的内容 接着我们就可以创建 Python UDF 了。 o.create_function('csv_to_json', class_type='csv_to_json.Transform', resources=[r]) 这里我们指定了函数名叫 csv_to_json,主类使我们上传的 csv_to_json.py 文件里的 Transform 类。 现在我们就可以在 MaxCompute SQL 中调用这个 UDF 了。 o.execute_sql('select csv_to_json(raw) from pyodps_test_udf') 这样我们就完成了在 PyODPS 中使用 MaxCompute SQL + Python UDF 的整个过程。 PyODPS DataFrame 对于 PyODPS DataFrame 来说,用户只需要写普通的 Python 函数或者类,在函数或者类里,甚至可以读取全局变量,这样给开发带来了极大的方便。 和上面的例子目标相同,我们定义一个 transform 函数即可。然后我们对于 DataFrame 的一列调用 map 方法来应用这个函数。 passed_columns = list('abc') # 可以从数据库中读取或者写死 def transform(x): import json d = dict(zip(passed_columns, x.split(','))) return json.dumps(d) df.raw.map(transform) In [30]: df raw 0 1,2,3 1 4,5,6 2 7,8,9 In [31]: df.raw.map(transform) raw 0 {"a": "1", "c": "3", "b": "2"} 1 {"a": "4", "c": "6", "b": "5"} 2 {"a": "7", "c": "9", "b": "8"} 实际上,PyODPS DataFrame 在用 MaxCompute 执行的时候,也会创建 Python UDF 来实现这个功能,但用户不需要去创建文件、资源和函数这些过程,一切都是 Python 原生函数和类,整个过程相当顺畅。 另外可以看到,在上面的 my.py 里,我们也是定义了一个 columns 参数的,而如果这个参数是通过变量传进去的话,在 Python UDF 里非常麻烦,可能常常需要用一些 tricky 的方法,比如写到某个文件资源,然后在 UDF 里读取之类的。而对于 DataFrame 来说,完全没有这个问题,我们可以自由读取全局变量。 不过要注意的是,这个全局变量是被序列化到各个机器上的,所以你修改它不会全局生效。 好了,还有什么问题可以随时和我们取得联系。 文档:http://pyodps.readthedocs.io/zh_CN/latest/ 代码:https://github.com/aliyun/aliyun-odps-python-sdk ,欢迎提 issue 和 merge request
本文将用户安装 PyODPS 时遇到的常见问题列举如下,希望在遇到类似问题时可供借鉴。在参考下列步骤之前,请首先尝试卸载并重装 PyODPS。卸载的步骤为执行“pip uninstall pyodps”。 1. Warning: XXX not installed, ...... Warning 不是 Error,Warning 不是 Error,Warning 不是 Error,重要的事情说三遍! 参考 Warning 文字,它将告诉你需要什么组件,你可以使用 pip 命令进行安装。 2. 安装后 import odps 报 no module named odps 这说明 odps package 无法被加载。这里有几种可能对的情形 安装有多个 Python 版本。这常见于 Mac 下使用 Homebrew 安装新的 Python 包(而不是系统自带的 Python),或者同时安装了 Python 2 和 Python 3,或者同时安装了 CPython 和 Anaconda / Miniconda,而当前使用的 Python 下并未安装 PyODPS。为确认此问题,请遵循下列步骤: 打开 Python 命令行; 执行 import site; print(site.getsitepackages()); 检查输出的每个路径中是否存在名为“odps”的文件夹。若不存在,确认此问题。 如果问题确认,请使用命令行定位到需要安装的 Python 路径,然后执行 ./python -m pip install pyodps[full]。 在 Mac / Linux 下可使用 find / -regex '.*/python[^\./-]*' 2>/dev/null 查找本机到底安装了几个 Python。 Search Path(通常是当前目录)中包含一个 odps.py 或一个包含 __init__.py 的名为 odps 的文件夹。该文件可能是你自己不小心创建的,或者曾经安装过一个名为“odps”的 Python 包。对于前者,请检查后将该文件更名。对于后者,请尝试使用“pip uninstall odps”进行删除。 3. 安装后 "from odps import *" 报 cannot import name ODPS 首先需要检查当前工作路径下是否存在一个文件名为“odps.py”的文件。如果存在,请改名后再 import,否则可参照 2 中的步骤进行排查。 4. 安装后 import odps 报 cannot import module odps / odps.xxxx 这通常是由于 PyODPS 遇到了依赖问题。请联系 PyODPS 技术支持钉钉群(11701793),我们将尝试修复该问题。 尝试pip install -U jupyter可以解决此场景下的大多数问题。 5. 安装时报 Syntax Error Python 版本过低。PyODPS 主流支持 Python 2.7.6+ / Python 3.3+,并同时支持 Python 2.6。Python 2.5 及以下版本不被支持。建议使用主流支持的版本。 6. Mac 上安装时报 Permission denied 尝试使用 sudo pip install pyodps 7. Mac 上 sudo 安装仍然报 Operation not permitted 这是由于系统完整性保护导致的。参考 http://stackoverflow.com/questions/32659348/operation-not-permitted-when-on-root-el-capitan-rootless-disabled ,重启机器,并在重启中按 ⌘+R。此后在终端中运行 csrutil disable reboot 此后再行安装。 8. 使用时报 sourceIP is not in the white list 存在白名单保护,可咨询 Project Owner。 9. Jupyter 前端 UI 有问题 可以尝试卸载并重新安装 jupyter、ipywidgets 以及 widgetsnbextension,也可以尝试在 bash 执行 jupyter nbextension enable pyodps/main 如有问题,PyODPS 技术支持钉钉群,群号 11701793
PyODPS 支持用 Python 来对 MaxCompute 对象进行操作,它提供了 DataFrame API 来用类似 pandas 的接口进行大规模数据分析以及预处理,并且可以用 ml 模块来执行机器学习算法。 现在为了让大家能更好地使用 PyODPS,我们总结开发过程中的最佳实践,来让大家更高效地开发 PyODPS 程序。当然,希望大家能一起来帮助我们来完善总结。 除非数据量很小,否则不要试图进行本地数据处理 我们 PyODPS 提供了多种方便拉取数据到本地的操作,因此,很多用户会试图把数据拉取到本地处理,然后再上传到 ODPS 上。 很多时候,用户其实根本不清楚这种操作的低效,拉取到本地彻底丧失了 MaxCompute 的大规模并行能力。而有的用户仅仅是需要对单行数据应用一个 Python 函数,或者试图做一行变多行的操作,这些操作,用 PyODPS DataFrame 都能轻松完成,并且完全利用到了 MaxCompute 的并行计算能力。 比如说现在我有一份数据,都是 json 串,现在我想把 json 串按 key-value 对展开成一行。则可以写一个简单的函数。 In [12]: df json 0 {"a": 1, "b": 2} 1 {"c": 4, "b": 3} In [14]: from odps.df import output In [16]: @output(['k', 'v'], ['string', 'int']) ...: def h(row): ...: import json ...: for k, v in json.loads(row.json).items(): ...: yield k, v ...: In [21]: df.apply(h, axis=1) k v 0 a 1 1 b 2 2 c 4 3 b 3 而这些操作,几乎全部都可以用 apply(axis=1)和 map_reduce 接口完成。 使用 pandas 计算后端进行高效本地 debug PyODPS DataFrame 能够根据数据来源来决定如何执行,比如,通过 pandas DataFrame 创建的 PyODPS DataFrame 则可以使用 pandas 执行本地计算;而使用 MaxCompute 表创建的 DataFrame 则可以在 MaxCompute 上执行。 而这两种方式,除了初始化不同,后续代码完全一致,因此,我们可以利用这点来进行本地 debug。 所以我们可以写出如下的代码: df = o.get_table('movielens_ratings').to_df() DEBUG = True if DEBUG: df = df[:100].to_pandas(wrap=True) to_pandas 是将数据下载,根据 wrap 参数来决定是否返回 PyODPS DataFrame,如果是 True,则返回 PyODPS DataFrame;否则,返回 pandas DataFrame。 当我们把所有后续代码都编写完成,本地的测试速度就非常快,当测试结束后,我们就可以把 debug 改为 False,这样后续就能在 ODPS 上执行全量的计算。 使用本地调试还有个好处,就是能利用到 IDE 的如断点和单步调试自定义函数的功能。要知道,在 ODPS 上执行,是把函数序列化到远端去执行,所以本地是没法断点进入的。而使用本地进行调试时,则可以断点进入自定义函数,方便进行调试。 推荐大家使用 MaxCompute studio 来本地调试 PyODPS 程序。 利用 Python 语言特性来实现丰富的功能 编写 Python 函数 一个常见的例子就是,计算两点之间的距离,有多种计算方法,比如欧氏距离、曼哈顿距离等等,我们可以定义一系列函数,在计算时就可以根据具体情况调用相应的函数即可。 def euclidean_distance(from_x, from_y, to_x, to_y): return ((from_x - to_x) ** 2 + (from_y - to_y) ** 2).sqrt() def manhattan_distance(center_x, center_y, x, y): return (from_x - to_x).abs() + (from_y - to_y).abs() 调用则如下: In [42]: df from_x from_y to_x to_y 0 0.393094 0.427736 0.463035 0.105007 1 0.629571 0.364047 0.972390 0.081533 2 0.460626 0.530383 0.443177 0.706774 3 0.647776 0.192169 0.244621 0.447979 4 0.846044 0.153819 0.873813 0.257627 5 0.702269 0.363977 0.440960 0.639756 6 0.596976 0.978124 0.669283 0.936233 7 0.376831 0.461660 0.707208 0.216863 8 0.632239 0.519418 0.881574 0.972641 9 0.071466 0.294414 0.012949 0.368514 In [43]: euclidean_distance(df.from_x, df.from_y, df.to_x, df.to_y).rename('distance') distance 0 0.330221 1 0.444229 2 0.177253 3 0.477465 4 0.107458 5 0.379916 6 0.083565 7 0.411187 8 0.517280 9 0.094420 In [44]: manhattan_distance(df.from_x, df.from_y, df.to_x, df.to_y).rename('distance') distance 0 0.392670 1 0.625334 2 0.193841 3 0.658966 4 0.131577 5 0.537088 6 0.114198 7 0.575175 8 0.702558 9 0.132617 利用 Python 语言的条件和循环语句 一个常见的需求是,用户有大概30张表,需要合成一张表,这个时候如果写 SQL,需要写 union all 30张表,如果表的数量更多,会更让人崩溃。使用 PyODPS,只需要一句话就搞定了。 table_names = ['table1', ..., 'tableN'] dfs = [o.get_table(tn).to_df() for tn in table_names] reduce(lambda x, y: x.union(y), dfs) 大功告成。稍微解释下,这里的 reduce 这句等价于: df = dfs[0] for other_df in dfs[1:]: df = df.union(other_df) 稍微扩展下,经常有一些 case 是这样,用户要计算的表保存在某个地方,比如说数据库,需要根据配置来对表的字段进行处理,然后对所有表进行 union 或者 join 操作。这个时候,用 SQL 实现可能是相当复杂的,但是用 DataFrame 进行处理会非常简单,而实际上我们就有用户用 PyODPS 解决了这样的问题。 尽量使用内建算子,而不是自定义函数 比如上文提到的欧氏距离的计算,实际上,计算的过程都是使用的 DataFrame 的内建算子,比如说指数和 sqrt 等操作,如果我们对一行数据应用自定义函数,则会发现,速度会慢很多。 In [54]: euclidean_distance(df.from_x, df.from_y, df.to_x, df.to_y).rename('distance').mean() |==========================================| 1 / 1 (100.00%) 7s 0.5216082314224464 In [55]: @output(['distance'], ['float']) ...: def euclidean_distance2(row): ...: import math ...: return math.sqrt((row.from_x - row.to_x) ** 2 + (row.from_y - row.to_y) ** 2) ...: In [56]: df.apply(euclidean_distance2, axis=1, reduce=True).mean() |==========================================| 1 / 1 (100.00%) 27s 0.5216082314224464 可以看到,当我们对一行应用了自定义函数后,执行时间从7秒延长到了27秒,这个数据只是1百万行数据计算的结果,如果有更大的数据集,更复杂的操作,时间的差距可能会更长。 总结 利用 PyODPS,我们其实能挖掘更多更灵活、更高效操作 MaxCompute 数据的方式。最佳实践可以不光是我们提供的一些建议,如果你有更多好玩有用的实践,可以多多分享出来。 文档:http://pyodps.readthedocs.io/ 代码:https://github.com/aliyun/aliyun-odps-python-sdk ,欢迎提 issue 和 merge request 钉钉群:11701793
前几天,PyODPS发布了0.7版本,这篇文章给大家介绍下PyODPS新版本带来的重要特性。 之前也有若干篇文章介绍过了,我们PyODPS DataFrame是延迟执行的,在调用立即执行的方法,比如execute、persist等之前,都只是构建了表达式。而真正的执行根据具体的输入数据,来决定执行的后端。 比如,我们可以根据输入是pandas DataFrame(本地数据),还是MaxCompute Table(MaxCompute数据)来决定是在本地执行,还是在MaxComput上执行。 In [1]: import pandas as pd In [2]: pd_df = pd.DataFrame({'a': range(3)}) In [3]: from odps.df import DataFrame In [4]: df = DataFrame(pd_df) # 本地数据 In [5]: df.a.sum() |==========================================| 1 / 1 (100.00%) 0s 3 In [6]: %load_ext odps In [7]: %enter Out[7]: <odps.inter.Room at 0x105951990> In [8]: df = DataFrame(o.get_table('pyodps_iris')) # MaxCompute数据 In [9]: df.sepal_width.sum() |==========================================| 1 / 1 (100.00%) 15s 458.10000000000014 数据库执行 来到了0.7版本,我们的后端武器库进一步扩充,现在我们支持Postgresql和MySQL,原则上我们支持所有的主流数据库,但我们只在这两个数据库上做了测试。 我们的数据库执行后端使用 sqlalchemy 实现,想要执行还需要对应数据库的driver。 现在,如果DataFrame输入的数据是sqlalchemy Table,那么我们就可以使用数据库后端来执行。 In [24]: mysql_engine = sqlalchemy.create_engine('mysql://root:123@localhost/movielens') In [25]: metadata = sqlalchemy.MetaData(bind=mysql_engine) # 需要绑定engine In [26]: table = sqlalchemy.Table('top_users', metadata, extend_existing=True, autoload=True) In [27]: top_users = DataFrame(table) In [28]: top_users.age.sum() |==========================================| 1 / 1 (100.00%) 0s 763 对于postgresql也是一样。 值得注意的是,现在还有部分DataFrame操作,比如自定义函数尚未支持数据库后端 。 可以看到,PyODPS DataFrame就是一个统一的数据查询语言,用户不需要改写一行代码,就可以根据输入让数据在MaxCompute、本地和数据库上执行,由于DataFrame框架的灵活性,我们甚至还可以扩展出非SQL执行后端的支持。 JOIN或者UNION数据库和MaxCompute数据 过去 一篇文章 提到过,我们可以join或者union本地和MaxCompute上的数据,这样的典型场景就是,比如我有个本地excel文件,我可以轻松读取成本地DataFrame,然后直接就可以和MaxCompute数据进行操作,省去了一大堆麻烦的过程。 现在,我们也同样可以join 数据库和MaxCompute上的数据,试想,有一堆用户数据是在数据库中进行处理,然后我们无需经过同步数据等繁琐的过程,我们就可以直接join 数据库和MaxCompute上的数据,这是何其方便的事情。 比如: In [29]: ratings = o.get_table('movielens_ratings').to_df() In [32]: female_top_users = top_users[top_users.sex == 'F'] # MySQL中的数据 In [33]: ratings.join(female_top_users).rating.mean() |==========================================| 1 / 1 (100.00%) 14s 2.9451170298627924 总结 我们PyODPS一直处在快速迭代的过程中,我们所有所做的努力,都是为了让大家以更好的体验来进行数据分析和机器学习。尽管我们很努力,但精力毕竟有限,难免会有bug,会有功能不完善。希望大家能给我们提issue,能贡献代码就更好啦。 项目文档:http://pyodps.readthedocs.io项目地址:https://github.com/aliyun/aliyun-odps-python-sdk提issue:https://github.com/aliyun/aliyun-odps-python-sdk/issues钉钉扫码:
PyCon China 2016上海分会场对PyODPS做了简单介绍,并介绍了下PyODPS DataFrame的实现原理,现在把PPT贴出来。
最近已经写了几篇有关PyODPS DataFrame的文章,但是还是有些同学不明白PyODPS DataFrame是什么,以及能做什么事情。这篇文章,我会做出解释,以及简单介绍一下实现的原理。 PyODPS DataFrame 首先什么是DataFrame,我在以前的文章也解释过,我们可以把它认为是二维表结构。在单机上,数据科学家一般会使用R或者Python库pandas来做数据分析,DataFrame是它们上广泛使用的数据结构。在DataFrame上,我们可以做过滤、列筛选、join、union等等操作。 因此,DataFrame也常常拿来和SQL做比较。我觉得主要的区别有: 可能每个系统都有自己的SQL语法,但是对于DataFrame来说,可以把一套语法应用到不同的系统中,也就是说,各个系统上层的DataFrame语法可以是一致的。 DataFrame可以和本身的实现语言相关,因此能用到语言相关的特性,变量赋值、和语言三方库集成等等都不在话下。 因此从第一点上来说,就能解释为什么我们的PyODPS DataFrame能在ODPS和本地上执行了。同样的语法,灵活性很高。 对于PyODPS DataFrame来说,什么时候数据在MaxCompute上执行,什么时候在本地执行呢?这和用户的初始输入有关。当用户用MaxCompute表来初始化DataFrame的时候,后续计算就会在MaxCompute上执行。 In [5]: iris = DataFrame(o.get_table('pyodps_iris')) In [6]: iris[iris.sepalwidth < 4].head(3) |==========================================| 1 / 1 (100.00%) 19s Out[6]: sepallength sepalwidth petallength petalwidth name 0 5.1 3.5 1.4 0.2 Iris-setosa 1 4.9 3.0 1.4 0.2 Iris-setosa 2 4.7 3.2 1.3 0.2 Iris-setosa 比如例子里,我们用一张MaxCompute表来初始化,因此后续的计算都是在MaxCompute上执行的。而如果用pandas DataFrame来初始化,那么后续的计算就是在本地执行。 我们前面一篇文章提过,我们从0.4版本开始带来一个特性,我们能join SQL和本地数据时,具体是怎样实现的呢?其实很简单,我们先把本地数据都计算完成,然后通过Tunnel上传到ODPS,再执行ODPS上的计算。 在ODPS上计算时,和本地的计算能力是无关的,除非获取最终计算结果,数据也不会放在本地。 对于在ODPS上的计算,目前来说,我们绝大多数的操作会使用ODPS SQL执行,但有部分情况,我们会使用tunnel执行,以提高执行速度。这些包括: 对原始表筛选字段 非分区表的切片,或分区表不选取分区或者选取前几个分区字段的切片 非分区表取条数总数,或分区表选取分区的条数总数 举个例子,我们的pyodps_iris是个非分区表,以下情况会使用tunnel,而不是转化成SQL执行。 In [7]: iris.count() |==========================================| 1 / 1 (100.00%) 0s 150 In [10]: iris.exclude('name')[:3] |==========================================| 1 / 1 (100.00%) 0s sepallength sepalwidth petallength petalwidth 0 5.1 3.5 1.4 0.2 1 4.9 3.0 1.4 0.2 2 4.7 3.2 1.3 0.2 可以看到,使用Tunnel的计算是很快的。因此,我们可以利用这个特性来从ODPS上下载少量数据,来利用本地计算来做debug。 In [6]: iris[iris.sepalwidth < 4].head(3) # 利用ODPS计算时,对小数据量是没有优势的 |==========================================| 1 / 1 (100.00%) 19s Out[6]: sepallength sepalwidth petallength petalwidth name 0 5.1 3.5 1.4 0.2 Iris-setosa 1 4.9 3.0 1.4 0.2 Iris-setosa 2 4.7 3.2 1.3 0.2 Iris-setosa In [11]: local_iris = iris[:100].to_pandas(wrap=True) |==========================================| 1 / 1 (100.00%) 0s In [12]: local_iris[local_iris.sepalwidth < 4].head(3) |==========================================| 1 / 1 (100.00%) 0s Out[12]: sepallength sepalwidth petallength petalwidth name 0 5.1 3.5 1.4 0.2 Iris-setosa 1 4.9 3.0 1.4 0.2 Iris-setosa 2 4.7 3.2 1.3 0.2 Iris-setosa wrap为True时,等同于DataFrame(iris[:100].to_pandas())。 原理简述 下面,简单来说下PyODPS DataFrame的计算原理。 在某种意义上,PyODPS DataFrame可以认为是DSL(领域特定语言)。在到立即执行的操作(如execute)前,得到的都是一个AST(抽象语法树)。 在交互式环境下,为了方便,我们在repr一个对象时,里面会调用立即执行的方法。因此,我们先把这个选项关掉,来看看执行后会是什么。 In [13]: options.interactive = False In [14]: iris[iris.sepalwidth < 4][:10] Out[14]: Collection: ref_0 odps.Table name: odps_test_sqltask_finance.`pyodps_iris` schema: sepallength : double sepalwidth : double petallength : double petalwidth : double name : string Collection: ref_1 Filter[collection] collection: ref_0 predicate: Less[sequence(boolean)] sepalwidth = Column[sequence(float64)] 'sepalwidth' from collection ref_0 Scalar[int8] 4 Slice[collection] collection: ref_1 stop: Scalar[int8] 10 现在我们把verbose打开,执行的中间过程会被打印出来,我们可以看到在ODPS上,目前会把这个AST给compile成ODPS SQL来执行。 In [15]: options.verbose = True In [16]: iris[iris.sepalwidth < 4][:10].execute() Sql compiled: CREATE TABLE tmp_pyodps_07ec2ed0_88c5_4649_9413_0bce14f72d6f LIFECYCLE 1 AS SELECT * FROM odps_test_sqltask_finance.`pyodps_iris` t1 WHERE t1.`sepalwidth` < 4 LIMIT 10 logview: http://webconsole.odps.aliyun-inc.com:8080/logview/?*** |==========================================| 1 / 1 (100.00%) 32s Out[16]: sepallength sepalwidth petallength petalwidth name 0 5.1 3.5 1.4 0.2 Iris-setosa 1 4.9 3.0 1.4 0.2 Iris-setosa 2 4.7 3.2 1.3 0.2 Iris-setosa 3 4.6 3.1 1.5 0.2 Iris-setosa 4 5.0 3.6 1.4 0.2 Iris-setosa 5 5.4 3.9 1.7 0.4 Iris-setosa 6 4.6 3.4 1.4 0.3 Iris-setosa 7 5.0 3.4 1.5 0.2 Iris-setosa 8 4.4 2.9 1.4 0.2 Iris-setosa 9 4.9 3.1 1.5 0.1 Iris-setosa 而对于本地数据,我们在compile阶段会把AST转化成一个执行DAG(有向无环图),在执行阶段,会按照DAG的拓扑顺序来执行,得到最终结果。 In [17]: local_iris[local_iris.sepalwidth < 4][:10].compile() Out[17]: <odps.dag.DAG at 0x10c233950> 好了,至此,已经简单说明了PyODPS DataFrame框架的执行原理。 PyODPS还很年轻,期待大家来使用、提feature、贡献代码。 安装方法:pip install pyodps Github:https://github.com/aliyun/aliyun-odps-python-sdk 外部文档:http://pyodps.readthedocs.io/ bug report:https://github.com/aliyun/aliyun-odps-python-sdk/issues
在MaxCompute上,大家有很多种分析和机器学习的方式。大家可以用在数加的web界面编写SQL,提交SQL作业;可以用console直接执行SQL,等等等。那机器学习呢,大家需要通过PAI命令提交PAI任务,或者在xlab上操作xlib;画图呢?导出数据绘图或者使用xlab。而这一切工具,都是割裂的,你不得不在各个地方进行切换,而且,也没有传统的数据分析和机器学习的快感。 那传统的任务是怎么做的呢,使用RStudio或者jupyter notebook(前身是ipython notebook),R我不熟,但对于Pythoner,用pandas进行数据分析、绘图,再用scikit-learn执行机器学习算法,在一个notebook里,能做所有想做的事情,非常高效。 现在呢,整合这一切的就是PyODPS,我们包含有基础MaxCompute SDK,因此一切对MaxCompute模型的操作你都可以。除此之外,我们还包括了DataFrame框架,和机器学习模块,这一切操作都进行了整合。 话不多说,直接上截图。 DataFrame执行的过程。 这个是我们的任务执行的详细过程,我们的任务执行包含一定的显示信息,亦能轻松跳转到logview来查看明细。执行完成也有通知。 下面是我们完整的使用SQL、DataFrame和ML机器学习的过程。 而这一切,你自己也可以在本地完成。你只需要: pip install pyodps[full] 然后随意到一个目录下,启动jupyter notebook jupyter notebook 就可以体验和使用PyODPS哦。 我们的文档在:http://pyodps.readthedocs.io欢迎吐槽。
有这么个故事(如有雷同,纯属巧合)。有一天,某运营同学给某开发同学一个excel文件,里面是个客户清单。 “帮我查下这些用户的消耗呢”。 开发同学扫了一眼,几百个用户。这个事肯定是可以办的,但是想到麻烦程度,开发同学心里肯定是有不少羊驼经过的啦。 “有点麻烦啊”,开发同学轻轻抱怨。 “我懂的,把这个表和ODPS里的表join下就好了嘛。”运营同学努努嘴。 “……”。于是,开发同学把excel数据导出成文本格式,然后dship上传到ODPS,ODPS上编写SQL,dship下载,大功告成。 这里说得很轻松,但其实整个过程真的挺麻烦呢。要是这个过程中还要对excel中的数据进行过滤,最终结果还要绘个图,还是需要不少时间。 但是,如果这个开发同学使用PyOdps 0.4+版本新特性,一切就都轻松写意了。 为了模拟这个过程,我们拿movielens 100K的数据做例子,现在本地有一个excel表格,里面有100个需要查询的用户,表格包含两个字段,分别是用户ID和年龄。在ODPS上,我们有一张电影评分表,现在我们要求出这100用户个中年龄在20-30之间,按每个年龄来求电影评分均值,并用条形图展现。 可以想象,这个过程如果按照前面的描述,有多麻烦。那么用PyOdps DataFrame API呢。 首先,我们读出本地Excel文件。 In [14]: from odps.df import read_excel In [15]: users = read_excel('/Users/chine/userids.xlsx') In [16]: users.head(10) |==========================================| 1 / 1 (100.00%) 0s Out[16]: id age 0 46 27 1 917 22 2 217 22 3 889 24 4 792 40 5 267 23 6 626 23 7 433 27 8 751 24 9 932 58 In [40]: users.count() |==========================================| 1 / 1 (100.00%) 0s 100 然后我们用join语句,过滤出来电影评分表中这些用户的评分数据。 In [17]: ratings = DataFrame(o.get_table('pyodps_ml_100k_ratings')) In [18]: ratings.head(10) |==========================================| 1 / 1 (100.00%) 2s Out[18]: user_id movie_id rating unix_timestamp 0 196 242 3 881250949 1 186 302 3 891717742 2 22 377 1 878887116 3 244 51 2 880606923 4 166 346 1 886397596 5 298 474 4 884182806 6 115 265 2 881171488 7 253 465 5 891628467 8 305 451 3 886324817 9 6 86 3 883603013 In [25]: filter_ratings = ratings.join(users.filter(users.age.between(20, 30)), ('user_id', 'id'))[ratings, lambda x, y: y.age] # 这里做字段抽取时,可以使用Collection,也可以使用lambda表达式,参数是左右两个Collection In [26]: filter_ratings.head(10) |==========================================| 1 / 1 (100.00%) 44s Out[26]: user_id movie_id rating unix_timestamp age 0 3 350 3 889237076 23 1 3 332 1 889237224 23 2 3 327 4 889237455 23 3 3 341 1 889237055 23 4 3 317 2 889237482 23 5 3 336 1 889237198 23 6 3 322 3 889237269 23 7 3 323 2 889237269 23 8 3 339 3 889237141 23 9 3 268 3 889236961 23 然后我们就可以按年龄聚合,求出评分均值啦。绘图也一气呵成。 In [28]: age_ratings = filter_ratings.groupby('age').agg(lambda x: x.rating.mean()) In [29]: age_ratings.head(10) |==========================================| 1 / 1 (100.00%) 30s Out[29]: age rating_mean 0 20 4.002309 1 21 4.051643 2 22 3.227513 3 23 3.519174 4 24 3.481013 5 25 3.774744 6 26 3.391509 7 27 3.355130 8 28 3.382883 9 29 3.705660 In [30]: age_ratings.plot(kind='bar', rot=45) |==========================================| 1 / 1 (100.00%) 29s Out[30]: <matplotlib.axes._subplots.AxesSubplot at 0x10b875f10> 超级简单,有木有! 这里的users其实是存在于本地的,而ratings是存在于ODPS上,用户依然可以join这两个Collection。其实对于0.4之前的版本,本地数据上传的接口也很容易(但是无法使用DataFrame API来进行本地过滤),但是对于0.4版本,不管一个Collection是存在于ODPS还是本地,用户都可以执行join和union的操作。 而这一切都源自0.4版本带来的新特性,DataFrame API的pandas计算后端。 DataFrame API使用pandas计算 我们知道,PyOdps DataFrame API类似于pandas的接口,但还是有些许不同的,那我们为什么不能用pandas来执行本地计算呢,这样也能充分利用pandas的一些特性,如支持各种数据输入。 所以,除了过去使用odps.models.Table来初始化DataFrame,我们也可以使用pandas DataFrame来初始化。 In [41]: import numpy as np In [42]: import pandas as pd In [44]: pandas_df = pd.DataFrame(np.random.random((10, 3)), columns=list('abc')) In [45]: pandas_df Out[45]: a b c 0 0.583845 0.301504 0.764223 1 0.153269 0.335511 0.455193 2 0.725460 0.460367 0.294741 3 0.315234 0.907264 0.849361 4 0.678395 0.642199 0.746051 5 0.977872 0.841084 0.931561 6 0.903927 0.846036 0.982424 7 0.347098 0.373247 0.193810 8 0.672611 0.242942 0.381713 9 0.461411 0.687164 0.514689 In [46]: df = DataFrame(pandas_df) In [49]: type(df) Out[49]: odps.df.core.DataFrame In [47]: df.head(3) |==========================================| 1 / 1 (100.00%) 0s Out[47]: a b c 0 0.583845 0.301504 0.764223 1 0.153269 0.335511 0.455193 2 0.725460 0.460367 0.294741 In [48]: df[df.a < 0.5].a.sum() |==========================================| 1 / 1 (100.00%) 0s 1.2770121422535428 这里转化成PyOdps DataFrame后,所有的计算也一样,变成延迟执行,PyOdps DataFrame在计算前的优化也同样适用。 这样做的好处是,除了前面我们提到的,能结合本地和ODPS做计算外;还有个好处就是方便进行本地调试。所以,我们可以用写出以下代码: DEBUG = True if DEBUG: # 这个操作使用tunnel下载,因此速度很快。对于分区表,需要给出所有分区值。 df = ratings[:100].to_pandas(wrap=True) else: df = ratings 在DEBUG的时候,我们能够利用PyOdps DataFrame在对原始表做切片操作时使用tunnel下载,速度很快的特性,选择原始表的一小部分数据来作为本地测试数据。值得注意的是, 本地计算通过不一定能在ODPS上也计算通过,比如自定义函数的沙箱限制 。 目前pandas计算后端尚不支持窗口函数。 apply和MapReduce API 使用apply对单行数据调用自定义函数 以前我们对于某个字段,能调用map来使用自定义函数,现在结合axis=1的apply,我们能对一行数据进行操作。 In [13]: ratings.apply(lambda row: row.rating / float(row.age), axis=1, reduce=True, types='float', names='rda').head(10) |==========================================| 1 / 1 (100.00%) 1m44s Out[13]: rda 0 0.166667 1 0.166667 2 0.208333 3 0.208333 4 0.125000 5 0.208333 6 0.166667 7 0.208333 8 0.208333 9 0.125000 reduce为True的时候,会返回一个sequence,详细参考文档。 MapReduce API PyOdps DataFrame API也提供MapReduce API。我们还是以movielens 100K为例子,看如何使用。 现在假设我们需要求出每部电影前两名的评分,直接上代码。 from odps.df import output @output(['movie_id', 'movie_title', 'movie_rating'], ['int', 'string', 'int']) def mapper(row): yield row.movie_id, row.title, row.rating @output(['title', 'top_rating'], ['string', 'int']) def reducer(keys): i = [0] def h(row, done): if i[0] < 2: yield row.movie_title, row.movie_rating i[0] += 1 return h In [7]: top_ratings = ratings.map_reduce(mapper, reducer, group='movie_id', sort='movie_rating', ascending=False) In [10]: top_ratings.head(10) |==========================================| 1 / 1 (100.00%) 3m48s Out[10]: title top_rating 0 Toy Story (1995) 5 1 Toy Story (1995) 5 2 GoldenEye (1995) 5 3 GoldenEye (1995) 5 4 Four Rooms (1995) 5 5 Four Rooms (1995) 5 6 Get Shorty (1995) 5 7 Get Shorty (1995) 5 8 Copycat (1995) 5 9 Copycat (1995) 5 利用刚刚说的本地DEBUG特性,我们也能使用本地计算来验证,计算结果能很快得出。人生苦短! In [22]: local_ratings = ratings[:100].to_pandas(wrap=True) |==========================================| 1 / 1 (100.00%) 2s In [23]: local_ratings.map_reduce(mapper, reducer, group='movie_id', sort='movie_rating', ascending=False).head(10) |==========================================| 1 / 1 (100.00%) 0s Out[23]: title top_rating 0 Shanghai Triad (Yao a yao yao dao waipo qiao) ... 5 1 Twelve Monkeys (1995) 4 2 Seven (Se7en) (1995) 4 3 Usual Suspects, The (1995) 5 4 Postino, Il (1994) 3 5 Mr. Holland's Opus (1995) 4 6 Taxi Driver (1976) 5 7 Crumb (1994) 5 8 Star Wars (1977) 5 9 Star Wars (1977) 5 cache机制 在0.4之前的版本,我们提供了一个persist接口,来保存执行结果。但是这个操作是个立即执行接口。现在我们提供cache接口,cache的collection会被单独计算,但不会立即执行。 In [25]: tmpdf = ratings[ratings.title.len() > 10].cache() In [26]: tmpdf['title', 'movie_id'].head(3) |==========================================| 1 / 1 (100.00%) 35s Out[26]: title movie_id 0 Seven (Se7en) (1995) 11 1 Event Horizon (1997) 260 2 Star Wars (1977) 50 In [27]: tmpdf.count() # tmpdf已经被cache,所以我们能立刻计算出数量 |==========================================| 1 / 1 (100.00%) 0s 99823 记住,目前的cache接口,计算的结果还是要落地的,并不是存放在内存中。 而一个collection如果已经被计算过,这个过程会自动触发cache机制,后续的计算过程会从这计算个向后进行,而不再需要从头计算。 其他特性 PyOdps 0.4版本还带来一些其他特性,比如join支持mapjoin(只对ODPS后端有效);Sequence上支持unique和nunique;execute_sql执行时支持设置hints,对于IPython插件,支持使用SET来设置hints,等等。 PyOdps下一步计划 对于PyOdps的DataFrame API来说,我们的短期目标是能完成ODPS SQL能做的所有事情,然后在这个基础上再带来更多SQL不容易做到的,但是却很有用的操作。现在,除了自定义聚合函数,我们已经能基本涵盖所有的SQL场景。 PyOdps非常年轻,期待大家来使用、提feature、贡献代码。 安装方法:pip install pyodps Github:https://github.com/aliyun/aliyun-odps-python-sdk 文档:http://pyodps.readthedocs.org/ bug report:https://github.com/aliyun/aliyun-odps-python-sdk/issues
春节结束了,是时候来些新鲜玩意,让我们来看一些酷的东西。 当当当当:隆重推出PyOdps logo。 好像跑题了,好吧,让我们言归正传。 我们知道Python提供了一个交互式的环境,能够方便探索和试验想法。同时,IPython是Python交互环境的增强,提供了很多强大的功能;IPython Notebook(现在已经是Jupyter Notebook)则更酷,提供了一个web界面,除了提供交互环境,还是一个记录计算过程的『笔记本』。 PyOdps也提供了一系列在交互式环境下的增强工具,使得探索ODPS数据更方便快捷。 配置ODPS帐号 Python交互环境 同一个环境支持配置若干个ODPS帐号,只需要: In [1]: from odps.inter import setup In [2]: setup('**your-access_id**', '**your-access-key**', '**your-project**', endpoint='**your-endpoint**') 此时这个帐号会被配置到一个叫做default的我们称之为room的地方。以后我们再使用这个帐号只需要: In [3]: from odps.inter import enter In [4]: room = enter() In [5]: o = room.odps In [6]: o.get_table('dual') Out[6]: odps.Table name: odps_test_sqltask_finance.`dual` schema: c_int_a : bigint c_int_b : bigint c_double_a : double c_double_b : double c_string_a : string c_string_b : string c_bool_a : boolean c_bool_b : boolean c_datetime_a : datetime c_datetime_b : datetime 通过room的odps属性,我们可以取到ODPS的入口,这样就可以接着进行ODPS操作了。配置了别的room比如叫做myodps,要取到ODPS入口,只需要enter('myodps').odps即可。 list_rooms方法能列出所有的room。 In [17]: from odps.inter import list_rooms In [18]: list_rooms() Out[18]: ['default', 'meta'] IPython及Jupyter Notebook PyOdps还提供了IPython插件。首先我们需要加载插件: In [11]: %load_ext odps In [14]: %enter Out[14]: <odps.inter.Room at 0x1071d1790> In [15]: o = _.odps In [16]: o.get_table('dual') Out[16]: odps.Table name: odps_test_sqltask_finance.`dual` schema: c_int_a : bigint c_int_b : bigint c_double_a : double c_double_b : double c_string_a : string c_string_b : string c_bool_a : boolean c_bool_b : boolean c_datetime_a : datetime c_datetime_b : datetime _下划线能取到上一步的结果。 保存常用的ODPS对象 room除了提供ODPS入口的功能,还能保存常用的ODPS对象。比如,我们能把常用的表起个名字,给保存起来。 In [19]: iris = o.get_table('pyodps_iris') In [23]: room.store('iris_test', iris, desc='保存测试ODPS对象') In [28]: room['iris_test'] Out[28]: odps.Table name: odps_test_sqltask_finance.`pyodps_iris` schema: sepallength : double sepalwidth : double petallength : double petalwidth : double name : string In [29]: room.iris_test Out[29]: odps.Table name: odps_test_sqltask_finance.`pyodps_iris` schema: sepallength : double sepalwidth : double petallength : double petalwidth : double name : string 这两种方式都可以取到保存的ODPS对象。如果要列出当前room保存的所有ODPS对象,则可以: In [30]: room.display() Out[30]: default desc name iris_test 保存测试ODPS对象 iris 安德森鸢尾花卉数据集 也可以使用IPython插件命令: In [31]: %stores Out[31]: default desc name iris_test 保存测试ODPS对象 iris 安德森鸢尾花卉数据集 要删除某个ODPS对象: In [32]: room.drop('iris_test') In [33]: %stores Out[33]: default desc name iris 安德森鸢尾花卉数据集 执行SQL命令 PyOdps提供了执行SQL的方法,但是在交互式环境下却不甚方便。使用PyOdps提供的IPython插件,可以通过sql命令来直接执行。 在执行时,需要配置全局帐号,如果已经使用了enter方法或者命令,则已经配置;如果没有,则会尝试enter默认的room;如果这也没有配置,则需要使用to_global方法。 In [34]: o = ODPS('**your-access-id**', '**your-secret-access-key**', project='**your-project**', endpoint='**your-end-point**')) In [35]: o.to_global() 这时我们就可以使用sql命令,单个百分号输入单行SQL,多行SQL使用两个百分号: In [37]: %sql select * from pyodps_iris limit 5 |==========================================| 1 / 1 (100.00%) 3s Out[37]: sepallength sepalwidth petallength petalwidth name 0 5.1 3.5 1.4 0.2 Iris-setosa 1 4.9 3.0 1.4 0.2 Iris-setosa 2 4.7 3.2 1.3 0.2 Iris-setosa 3 4.6 3.1 1.5 0.2 Iris-setosa 4 5.0 3.6 1.4 0.2 Iris-setosa In [38]: %%sql ....: select * from pyodps_iris ....: where sepallength < 5 ....: limit 5 ....: |==========================================| 1 / 1 (100.00%) 15s Out[38]: sepallength sepalwidth petallength petalwidth name 0 4.9 3.0 1.4 0.2 Iris-setosa 1 4.7 3.2 1.3 0.2 Iris-setosa 2 4.6 3.1 1.5 0.2 Iris-setosa 3 4.6 3.4 1.4 0.3 Iris-setosa 4 4.4 2.9 1.4 0.2 Iris-setosa 在Jupyter Notebook里,多行SQL会提供语法高亮: 持久化pandas DataFrame为ODPS表 使用persist命令即可: In [42]: import pandas as pd In [43]: df = pd.read_csv('https://raw.github.com/pydata/pandas/master/pandas/tests/data/iris.csv') In [48]: %persist df pyodps_iris_test |==========================================| 150 /150 (100.00%) 0s In [49]: from odps.df import DataFrame In [61]: DataFrame(o.get_table('pyodps_iris_test')).head(5) |==========================================| 1 / 1 (100.00%) 0s Out[61]: sepallength sepalwidth petallength petalwidth name 0 5.1 3.5 1.4 0.2 Iris-setosa 1 4.9 3.0 1.4 0.2 Iris-setosa 2 4.7 3.2 1.3 0.2 Iris-setosa 3 4.6 3.1 1.5 0.2 Iris-setosa 4 5.0 3.6 1.4 0.2 Iris-setosa 其它交互式方面的增强 在交互式环境下,我们repr一个ODPS表的时候,会打印这个表的schema,包括字段注释,省去了查这个表的meta信息。 In [41]: o.get_table('china_stock', project='odpsdemo') Out[41]: odps.Table name: odpsdemo.`china_stock` schema: d : string # 日期 c : string # 股票代码 n : string # 股票名称 t_close : double # 收盘价 high : double # 最高价 low : double # 最低价 opening : double # 开盘价 l_close : double # 昨日收盘价 chg : double # 涨跌额 chg_pct : double # 涨跌幅 vol : bigint # 成交量 turnover : double # 成交额 partitions: code : string # 股票代码 当使用sql命令或者使用DataFrame框架计算的时候,在终端或者Jupyter Notebook里都提供一个进度条来方便用户来查看执行进度。 后记 PyOdps现在处于快速迭代阶段,我们所有的开发都是开源的。大家如果需要什么功能,可以给我们提issue(GitHub);也可以直接参与到开发,直接给我们发Merge Request就行啦。 欢迎大家一起来建设PyOdps。 github:https://github.com/aliyun/aliyun-odps-python-sdk文档:http://pyodps.readthedocs.org/zh_CN/latest/
PyOdps正式发布DataFrame框架(此处应掌声经久不息),DTer的福音!有了它,就像卷福有了花生,比翼双飞,哦不,如虎添翼。 快过年了,大家一定没心情看长篇大论的分析文章。作为介绍PyOdps DataFrame的开篇文章,我只说说其用起来爽的地方。其余的部分,从使用、问题到实现原理,我会分文章细说。 如果不知道是DataFrame什么,它是存在于pandas和R里的数据结构,你可以把它当做是表结构。如果想快速浏览PyOdps DataFrame能做什么,可以看我们的快速开始文档。 让我们开始吧。 强类型支持 DataFrame API在计算的过程中,从字段到类型都是确定的,因此,若取一个不存在的字段,会丢给你个大大的异常。 In [4]: from odps.df import DataFrame In [5]: iris = DataFrame(o.get_table('pyodps_iris')) In [6]: iris.dtypes Out[6]: odps.Schema { sepallength float64 sepalwidth float64 petallength float64 petalwidth float64 name string } In [7]: iris.field_not_exist --------------------------------------------------------------------------- AttributeError Traceback (most recent call last) <ipython-input-7-fc8de079a0de> in <module>() ----> 1 iris.field_not_exist /Users/chine/Workspace/pyodps/odps/df/expr/expressions.pyc in __getattr__(self, attr) 510 return self[attr] 511 --> 512 raise e 513 514 def output_type(self): AttributeError: 'DataFrame' object has no attribute 'field_not_exist' 如果取存在的字段,自然是没问题啦。 In [11]: iris.sepalwidth.head(5) |==========================================| 1 / 1 (100.00%) 0s Out[11]: sepalwidth 0 3.5 1 3.0 2 3.2 3 3.1 4 3.6 有些方法,比如说取平均数,非数字肯定是不能调用的咯。 In [12]: iris['name'].mean() --------------------------------------------------------------------------- AttributeError Traceback (most recent call last) <ipython-input-12-3f90cae4a19f> in <module>() ----> 1 iris['name'].mean() /Users/chine/Workspace/pyodps/odps/df/expr/expressions.pyc in __getattribute__(self, attr) 171 if new_attr in self._get_attr('_args'): 172 return self._get_arg(new_attr) --> 173 raise e 174 175 def _defunc(self, field): AttributeError: 'Column' object has no attribute 'mean' 数字类型的字段则可以调用。 In [10]: iris.sepalwidth.mean() |==========================================| 1 / 1 (100.00%) 27s 3.0540000000000007 操作数据如此简单 我们常常需要select一个表字段,但是只是不需要一个字段,却需要写一堆SQL。在DataFrame API里,调用exclude方法就行了。 In [13]: iris.exclude('name').head(5) |==========================================| 1 / 1 (100.00%) 0s Out[13]: sepallength sepalwidth petallength petalwidth 0 5.1 3.5 1.4 0.2 1 4.9 3.0 1.4 0.2 2 4.7 3.2 1.3 0.2 3 4.6 3.1 1.5 0.2 4 5.0 3.6 1.4 0.2 使用DataFrame写出来的代码,天然有Python的特点,清晰易懂。某些快捷API,能使得操作更加简单。 比如我们要取name的个数从大到小前10的值分别是多少。 In [16]: iris.groupby('name').agg(count=iris.name.count()).sort('count', ascending=False)[:10] |==========================================| 1 / 1 (100.00%) 37s Out[16]: name count 0 Iris-virginica 50 1 Iris-versicolor 50 2 Iris-setosa 50 直接使用value_counts来得更快。 In [17]: iris['name'].value_counts()[:10] |==========================================| 1 / 1 (100.00%) 34s Out[17]: name count 0 Iris-virginica 50 1 Iris-versicolor 50 2 Iris-setosa 50 很多时候,写一个SQL,我们需要检查中间结果的执行,就显得很麻烦,我们常常需要选取中间的SQL来执行,在DataFrame的世界,中间结果赋值一个变量就行了,这都不是事儿。 计算的过程和结果展示 在DataFrame的执行过程中,我们在终端里和IPython notebook里,都会有进度条显示任务的完成情况。结果的输出也会有更好的格式化展现,在IPython notebook里会以HTML表格的形式展现。 绘图集成 DataFrame的计算结果能直接调用plot方法来制作图表,不过绘图需要安装pandas和matplotlib。 In [21]: iris.plot() |==========================================| 1 / 1 (100.00%) 0s Out[21]: <matplotlib.axes._subplots.AxesSubplot at 0x10feab610> 导出数据再用excel画图,这事儿……咳咳,未来我们还会提供更好的可视化展现,比如提供交互式的图表。 自定义函数和Lambda表达式 DataFrame支持map方法,想对一个字段调用自定义函数非常方便。 In [30]: GLOBAL_VAR = 3.2 In [31]: def myfunc(x): if x < GLOBAL_VAR: return 0 else: return 1 In [32]: iris['sepalwidth', iris.sepalwidth.map(myfunc).rename('sepalwidth2')].head(5) |==========================================| 1 / 1 (100.00%) 18s Out[32]: sepalwidth sepalwidth2 0 3.5 1 1 3.0 0 2 3.2 1 3 3.1 0 4 3.6 1 可惜apply和聚合的自定义函数,暂时还不支持,期待吧! 延迟执行 DataFrame API的所有操作并不会立即执行,除非用户显式调用execute方法或者一些立即执行的方法。在交互式界面下,打印或者repr对象的时候,内部也会调用execute方法,方便用户使用。 执行优化 DataFrame框架在执行前会对整个查询进行优化,比如连续的projection合并。当用户查看原始表(或者选取某个分区)时,会使用tunnel来获取结果。 PyOdps DataFrame的下一步发展 好了,说了这么多,聊一聊我们DataFrame接下来要做的事情,首先,我们会实现多计算后端,包括pandas,当数据量比较小的时候,我们可以使用本地计算,而不需要等待ODPS的调度;其次,DataFrame框架和我们的机器学习部分会有更多的集成,从数据分析,到算法,一气呵成,大伙看到文章的时候,相关功能应该已经可用了。 PyOdps非常年轻,才短短几个月的时间。我们的整个项目,在GitHub上,是开源的。我个人非常希望大家能参与到开源的建设中来,能提个建议也是极好的。所以,我会写文章详述我们PyOdps的实现原理,希望大家一起把ODPS建设得更好。 github:https://github.com/aliyun/aliyun-odps-python-sdk文档:http://pyodps.readthedocs.org/zh_CN/latest/
2020年12月
2020年10月
2019年06月
from odps import __version__
print(__version__)
看下pyodps版本?
RSS可以自行解析,然后用我们的表上传接口来上传数据。
文档:http://pyodps.readthedocs.io/zh_CN/latest/base-tables-zh.html#id7
另外,可以加PyODPS答疑群(钉钉)
用PyODPS可以写成这样,mysql那边使用sqlalchemy
import itertools
from sqlalchemy import create_engine, MetaData, Table
from odps.df import DataFrame
DB_CONNECT_STR = 'mysql+mysqldb://root:@localhost/mydb?charset=utf8'
engine = create_engine(DB_CONNECT_STR, echo=True)
conn = engine.connect()
metadata = MetaData(engine)
table = Table('mysql_table', metadata, autoload=True)
df = DataFrame(odps.get_table('my_demo_table', project='my_project'))
selected = df.filter(df.pdate == '')['imei', 'time_in', 'ntotalvote', 'ntotalcurr']
records = []
for i, record in zip(itertools.count(1), selected.execute()):
if i % 100:
conn.execute(conn.insert(), [dict(r) for r in records])
records = []
records.append(record)
if records:
conn.execute(conn.insert(), [dict(r) for r in records])
是因为交互式情况下,print或者repr的时候会执行立即执行的方法。在非交互式环境下需要显式调用立即执行的方法。
所以你可以在IDE里:
print(users.count().execute())
或者可以打开interactive选项,这样在print或者repr的时候也执行计算。
from odps jmport options
options.interactive = True
print(users.count())
有两个方法
1、 SQL写成create table as select *,这样再使用tunnel下载
odps.execute_sql('create table my_tmp_table as select ***')
t = odps.get_table('my_tmp_table')
with t.open_reader() as reader:
for record in reader:
2、 使用instance tunnel,可以用tunnel读取instance执行结果。这个会在0.6版本完成,预计在下周末或者下下周初发布。