
本文主要从以下几个方面展开: Mars简介 典型场景 Demo 最佳实践 一、Mars简介 Mars是统一的数据科学平台,它用来加速传统的Python数据科学技术栈,在单机中也可以用多核加速,或用分布式来加速。Mars可以部署在单机的分布式集群,或者Kubernetes和Hadoop Yarn上。 Mars整个框架构建在单机的并行和分布式的这两个调度的基础之上,它的数据科学基础包括三个核心部分,Tensor、DataFrame和Remote。而构建在这个基础之上的,是Mars Learn模块,它可以兼容Scikit-learn API,能简单地进行更大数据规模的分布式处理。此外,Mars还支持深度学习和机器学习的框架,比如能轻松运行TensorFlow、PyTorch等,而且可视化也可以在Mars上完成。除此之外,Mars还支持了丰富的数据源。 从传统Python技术栈到Mars也非常简单,比如在NumPy和Pandas里要变成Mars,只需要替换import,然后后面变为延迟执行即可。 普通的Python函数,在调用的时候变成mr.spawn来延迟这个过程,最后通过execute来并发执行,不用担心Mars是运行在单机上还是分布式运行。 而Mars上的TensorFlow大部分也一样,区别在于main函数部分的变化。最后,我们需要通过run_tensorflow_script的方式把脚本运行到Mars中。 二、典型场景 场景1. CPU和GPU混合计算 在安全和金融领域可以用Mars做CPU和GPU的混合计算,加速现有的工作流。 在这个领域,因为传统大数据平台挖掘周期长,资源紧张,需要很久来执行任务,不能达到客户需求。所以能用Mars DataFrame来加速数据处理,它可以做大规模数据排序,并帮助用户做高阶统计和聚合分析。 另外在安全领域有很多无监督学习的算法,Mars learn能加速无监督学习,同时拉起分布式深度学习计算加速现有的深度学习训练。之后,对于某些计算任务也可以利用GPU来加速。 场景2. 可解释性计算 在广告领域,在广告归因和洞察特征的解释算法中,因为本身计算量大,所以耗时很长。这种情况下,单机加速是比较困难的,基于传统大数据平台的分布式也不太灵活,但是通过Mars remote,可以很轻松地把计算分布到几十台机器上来加速,达到百倍的性能提升。 场景3. 大规模K-最邻近算法 Mars非常广泛地应用在K-最邻近算法中,因为Embedding越来越流行,它让向量表述实体非常常见。另外,Mars的NearestNeighbors算法兼容scikit-learn,它里面有暴力算法,而用户也需要暴力算法来进行大规模计算,可以通过多个worker来完成,从而让性能提升百倍。最后,Mars支持分布式的方式加速Faiss和Proxima,达到千万级别和上亿级别的规模。 三、Demo Demo1. 分析豆瓣电影数据 我们从这个Demo看一下Mars如何加速pandas数据处理及其可视化。 开始演示之前我们需要安装Mars。这里已经创建了Jupyter,然后 pip install pymars。 安装之后,我们可以到IPython进行验证,可以看到下面的结果没有问题,接下来我们就可以进入到Jupyter notebook里。 我们开始demo。这个数据可以在GitHub地址下载,然后我们用pandas来分析电影的数据,使用ipython memory usage来查看内存使用。 我们的数据主要用到4个CSV文件,分别是movies、ratings、users和comments。 接下来根据上映日期统计有多少电影发布。这里先处理一下数据,让发行日期只取到年份,去掉日期,并对年份做聚合。 数据出来后,可以用pandas bokeh把图绘制出来,并通过交互式的方式查看。 接下来看电影评分的统计。首先把有评分的电影筛选出来,然后把豆瓣评分的数值数量从大到小进行排序。可以看到,最多的评分是6.8分。 同样,通过pandas bokeh把它画成柱状图,评分差不多呈现正态分布。 接下来做一个标签词云,看电影哪个标签词最多,这里从movies取出tags,用斜杠分割,然后max words是50。 接下来我们再对电影的Top K进行分析。首先按电影ID进行聚合,求出评价的平均值和个数。然后我们对评价个数进行过滤,从高到低,算出top20的电影。 然后做评论数据分析。因为评论是中文的,所以需要做一个分词,然后对每一句话做切分,在统计的时候进行排序。这里可以加一个进度条,在处理数据的时候方便看到进程。这个过程大概花了20分钟,所以在单机上跑大任务的时候对机器的压力还是比较大。 这是最终的词云图。 接下来我们用Mars做同样的分析任务。首先是对Mars环境进行部署,然后这里有5个worker,每个worker是8个CPU和32G内存。还是一样,我们打开内存的监控,做一些import,这里把import Pandas替换成import mars.dataframe,然后Numpy是import mars.tensor。 随后我们在SDK里来创建to mars dataframe,这一步几乎没有用到内存,最终得到的结果也和之前一样。 我们用同样的方式来分析上映日期的电影个数和电影评分。得益于Mars跟Pandas的高度兼容,我们也能用Pandas bokeh来呈现结果。 电影评论的分析也一样,但是在显示的时候,Mars只会拉取头几条和最后几条,所以客户端几乎没有内存使用。而且整个running过程只用了45秒,与之前的20分钟相比提升了几十倍性能。 接下来我们用Mars做一个地区的统计,让它有一个动态的效果。首先我们看一下刚刚计算过的已经released的电影dataframe,然后取1980-2019这几年的电影,而regions部分可能有多个,所以用斜杠分割开,最后执行排出top10地域电影。 然后我们通过bar chart race来生成动态效果。 Demo2. 豆瓣电影推荐 第二个demo我们会基于刚才豆瓣电影的数据来做一个推荐。我们首先会用TensorFlow Mars来进行训练,接着用Mars分布式KNN算法来加速召回计算。 我们先使用单机的技术栈,这个数据已经分成了训练和测试集,所以我们先to pandas把它下载到本地,接着来对用户和电影做一个label encode,把它变成一个数字,而不是字符串的值。随后我们对数据进行处理,先按照时间排序,然后按照用户进行分组,生成分组聚合的结果。 接下来开始训练,我们需要用TensorFlow训练出代表user的embedding。之前说过embedding,可以对任一实体用向量描述,所以得到embedding之后,我们在给用户推荐电影时就可以查找在这个向量空间里面跟这个用户比较接近的电影embedding。 训练后我们可以保存向量,这里的搜索规模是60万乘7万,单机花费了22分钟,但如果达到千万乘千万级别,搜索耗时要超过800小时,这是不可接受的。 接下来我们看如何用Mars来实现这一过程。首先创建一个Mars集群,这里有8个worker。然后和上面一样,对数据进行预处理,做label encode,按时间排序,按user分组生成分组聚合。 这里唯一的区别是Mars会自动推断DataFrame的结果,如果推断失败就需要用户自己提供dtypes和output type。 然后是执行和训练。这里TensorFlow可以写Python文件,不用写到notebook里。 接着我们用Mars的run tensorflow script来跑这个脚本,然后指定worker是8。可以看到,执行的时间缩小到了23分钟。同时,我们也拿到了最终的embedding,用Mars做embedding只需1分25秒,比刚刚的时间提升个十倍左右。1400万乘1400万也可以稳定在1小时左右,与单机800个小时相比提升是非常巨大的。 四、最佳实践 首先尽量不要使用to pandas和to numpy,因为这会把Mars的分布式数据变成单机的数据,失去了Mars本身的优势,除非这个操作不能用Mars实现;其次,Mars tensor、DataFrame和learn由于本身受限于API的原因需要自己写一些函数,所以可以考虑用Mars remote来加速,把操作抽象成函数;第三,Pandas的加速技巧在Mars DataFrame依然适用,比如可以使用更高效的数据类型,可以优先使用内建操作,使用apply取代循环。 以上就是今天的课程,欢迎大家继续关注后续内容。
本文分为4个部分: Mars的背景和现状 Mars解决了什么问题 Mars背后的哲学 总结与展望 一、Mars的背景和现状 说到加速数据科学的新方式,就不得不说什么是数据科学,以下是维基的定义: 数据科学通过运用各种相关的数据来帮助非专业人士来理解问题。第一,它的目标是从数据中提取输入价值的部分,第二,它要生产数据产品,第三它结合了非常多重要的技术,包括数学、统计、机器学习、可视化,最后,它需要真正解决问题。 它包含三个方面,计算机科学、数学和统计、领域和商业知识。它们结合起来分别是机器学习、软件开发和传统研究,中间是数据科学。 数据科学技术栈包含语言、数据分析、数据工程、机器学习、Web开发、运维和商业智能。每个技术栈都包含不同的工具,提供不同的数据服务。 传统Python数据科学栈的三大底座是NumPy、Pandas和SciPy。NumPy是最核心的部分,它用来做数值计算,几乎其他所有Python的数据科学技术栈都建立在NumPy上,因为它有最基础的数据结构,也就是多维数组;另外,Pandas也用NumPy实现,它上面有很多API来进行分析操作;而SciPy主要负责科学计算。在此基础上,是机器学习和可视化,同时还有丰富的Python函数。 上面是Python数据科学技术栈整体的状况,他们有几大好处,比如广泛使用,是事实标准;上手成本低,容易为初学者和学生入门;和语言结合紧密,能用Python来组织函数调用。但是它们都是单机的库,不能处理很大的数据量,所以需要用到大数据的数据工程技术栈,比如Hadoop、Hive、Spark等。它们虽然支持多语言,没有强绑定,但是学习门槛比较陡峭,也需要对系统本身有足够的了解。做数据科学需要把精力放在数据本身,而不是工具,但是这几个库让工作围绕着库展开,而非数据。因此,我们是否能把这两个工作连接起来,利用传统的技术价值而不是关注大数据系统本身,来解决很多问题。 现在大家说摩尔定律已经失效,我们可以回顾一下它的历史。早期它通过复杂指令集和精简指令集的方式让速度提升,但是随着缩放比例定律和阿姆达尔定律的终结,未来是不是要20年才能达到2倍效率的提升呢? 另外随着深度学习、机器学习和AI的火热,机器学习模型也会越来越大,它的训练已经呈指数级增长,但是摩尔定律并没有追上机器学习模型训练发展的速度。 另外一个现状就是技术栈的问题,NumPy、Pandas等它们更多只能在单核上来应用计算,但是阿姆达尔定律下的核数是会越来越多,所以并没有很好地进行利用。同时,不管是计算量还是数据规模的增长速度,都超过了摩尔定律的速度,所以也不能用单机解决问题。 基于以上,加速数据科学主要有两种方式。第一是Scale up,利用多核和更好的硬件,比如GPU、FPGA和TPU等,以及现有的库,包括Modin、CuPy、Rapids等来提升效率。第二是 Scale out,利用分布式的方式来加速,因为单核的性能总归是有限的,可以利用Ray、Dask和Modin等。而结合Scale up和Scale out,就可以构建一个大规模的更好的硬件集成,比如今天介绍的加速数据科学的新方式Mars。 二、Mars解决了什么问题 Mars就是我们试图构建的“桥”,能来兼顾小规模和大规模数据处理。大规模数据处理能构建集群,有三种主要方式,第一是在物理机上,第二是kubernetes,第三是Hadoop Yarn的调度器上,拉起Mars的集群。 Mars的核心基础部分对应着传统Python数据技术栈,比如Mars Tensor对应NumPy,DataFrame对应Pandas等。而构建在这个基础之上的,是Mars Learn模块,它可以兼容Scikit-learn API,能简单地进行更大数据规模的分布式处理。此外,Mars还支持深度学习和机器学习的框架,比如能轻松运行TensorFlow、PyTorch等,而且可视化也可以在Mars上宣布。除此之外,Mars还支持了丰富的数据源。 从传统Python技术栈到Mars也非常简单,比如在NumPy和Pandas里要变成Mars,只需要替换import,然后后面变为延迟执行即可。 普通的Python函数,在调用的时候变成mr.spawn来延迟这个过程,最后通过execute来并发执行,不用担心Mars是运行在单机上还是分布式运行。 而Mars上的TensorFlow大部分也一样,区别在于main函数部分的变化。 三、Mars背后的设计哲学 第一是分而治之,不重复造轮子。比如创建一个Mars的Tensor,对里面所有元素进行求和操作,在触发execute之后,Mars会生成小任务的计算图,它可以将小任务调度到分布式的集群中来运行。此外还做了很多优化,通过算子融合来提升性能。 第二,在Mars的世界,一切皆可并行。在Mars里实现了并行正则排序算法,比如创建一个cluster,有5个Worker,每个Worker是8核32G,那么相比单机的NumPy函数,它提升了4倍。 下面看下Mars整体数据的处理流程。我们通过客户端的代码触发执行,生成一个粗粒度的计算图,然后通过Web服务器提交任务,并将任务转发给Scheduler,然后在这里切分成小任务。随后,Scheduler根据Worker工作负载的情况来把它分发到各个机器上运行,这里会用到调度的策略等等。这个过程中,Mars会自动触发数据传输工作,保证在节点执行的时候它所有输入的数据都在这台机器上。此外Mars还会自动进行Spill操作,当我们内存不够的时候,它会把不常用的数据Spill到磁盘上,让Mars可以处理远超过这个内存能放下的数据规模。 四、总结与展望 首先Mars是完全开源的项目,完全遵循开源的规范,而且项目的progress都能在GitHub上看到;其次Mars有兼容性,它的API高度兼容NumPy、Pandas和Scikit-learn;同时,它的单机、多核与分布式的处理性能也更高;最后是交互性,Mars Remote可以把以前的代码变成分布式代码,并且内部也可以与第三方库做集成。 Github地址:https://github.com/mars-project/mars 下面是关于Mars未来的展望,大家可以访问专栏。未来我们的开发计划重心放在提升Mars learn的接口覆盖率和Mars DataFrame接口覆盖率上,统一单机和分布式执行层等等。 以上就是今天的课程,欢迎大家继续关注Mars。戳我观看直播回放
Mars 是一个并行和分布式 Python 框架,能轻松把单机大家耳熟能详的的 numpy、pandas、scikit-learn 等库,以及 Python 函数利用多核或者多机加速。这其中,并行和分布式 Python 函数主要利用 Mars Remote API。 启动 Mars 分布式环境可以参考: 命令行方式在集群中部署。 Kubernetes 中部署。 MaxCompute 开箱即用的环境,购买了 MaxCompute 服务的可以直接使用。 如何使用 Mars Remote API 使用 Mars Remote API 非常简单,只需要对原有的代码做少许改动,就可以分布式执行。 拿用蒙特卡洛方法计算 π 为例。代码如下,我们编写了两个函数,calc_chunk 用来计算每个分片内落在圆内的点的个数,calc_pi 用来把多个分片 calc_chunk 计算的结果汇总最后得出 π 值。 from typing import List import numpy as np def calc_chunk(n: int, i: int): # 计算n个随机点(x和y轴落在-1到1之间)到原点距离小于1的点的个数 rs = np.random.RandomState(i) a = rs.uniform(-1, 1, size=(n, 2)) d = np.linalg.norm(a, axis=1) return (d < 1).sum() def calc_pi(fs: List[int], N: int): # 将若干次 calc_chunk 计算的结果汇总,计算 pi 的值 return sum(fs) * 4 / N N = 200_000_000 n = 10_000_000 fs = [calc_chunk(n, i) for i in range(N // n)] pi = calc_pi(fs, N) print(pi) %%time 下可以看到结果: 3.1416312 CPU times: user 9.47 s, sys: 2.62 s, total: 12.1 s Wall time: 12.3 s 在单机需要 12.3 s。 要让这个计算使用 Mars Remote API 并行起来,我们不需要对函数做任何改动,需要变动的仅仅是最后部分。 import mars.remote as mr # 函数调用改成 mars.remote.spawn fs = [mr.spawn(calc_chunk, args=(n, i)) for i in range(N // n)] # 把 spawn 的列表传入作为参数,再 spawn 新的函数 pi = mr.spawn(calc_pi, args=(fs, N)) # 通过 execute() 触发执行,fetch() 获取结果 print(pi.execute().fetch()) %%time 下看到结果: 3.1416312 CPU times: user 29.6 ms, sys: 4.23 ms, total: 33.8 ms Wall time: 2.85 s 结果一模一样,但是却有数倍的性能提升。 可以看到,对已有的 Python 代码,Mars remote API 几乎不需要做多少改动,就能有效并行和分布式来加速执行过程。 一个例子 为了让读者理解 Mars Remote API 的作用,我们从另一个例子开始。现在我们有一个数据集,我们希望对它们做一个分类任务。要做分类,我们有很多算法和库可以选择,这里我们用 RandomForest、LogisticRegression,以及 XGBoost。 困难的地方是,除了有多个模型选择,这些模型也会包含多个超参,那哪个超参效果最好呢?对于调参不那么有经验的同学,跑过了才知道。所以,我们希望能生成一堆可选的超参,然后把他们都跑一遍,看看效果。 准备数据 这个例子里我们使用 otto 数据集。 首先,我们准备数据。读取数据后,我们按 2:1 的比例把数据分成训练集和测试集。 import pandas as pd from sklearn.preprocessing import LabelEncoder from sklearn.model_selection import train_test_split def gen_data(): df = pd.read_csv('otto/train.csv') X = df.drop(['target', 'id'], axis=1) y = df['target'] label_encoder = LabelEncoder() label_encoder.fit(y) y = label_encoder.transform(y) return train_test_split(X, y, test_size=0.33, random_state=123) X_train, X_test, y_train, y_test = gen_data() 模型 接着,我们使用 scikit-learn 的 RandomForest 和 LogisticRegression 来处理分类。 RandomForest: from sklearn.ensemble import RandomForestClassifier def random_forest(X_train: pd.DataFrame, y_train: pd.Series, verbose: bool = False, **kw): model = RandomForestClassifier(verbose=verbose, **kw) model.fit(X_train, y_train) return model 接着,我们生成供 RandomForest 使用的超参,我们用 yield 的方式来迭代返回。 def gen_random_forest_parameters(): for n_estimators in [50, 100, 600]: for max_depth in [None, 3, 15]: for criterion in ['gini', 'entropy']: yield { 'n_estimators': n_estimators, 'max_depth': max_depth, 'criterion': criterion } LogisticRegression 也是这个过程。我们先定义模型。 from sklearn.linear_model import LogisticRegression def logistic_regression(X_train: pd.DataFrame, y_train: pd.Series, verbose: bool = False, **kw): model = LogisticRegression(verbose=verbose, **kw) model.fit(X_train, y_train) return model 接着生成供 LogisticRegression 使用的超参。 def gen_lr_parameters(): for penalty in ['l2', 'none']: for tol in [0.1, 0.01, 1e-4]: yield { 'penalty': penalty, 'tol': tol } XGBoost 也是一样,我们用 XGBClassifier 来执行分类任务。 from xgboost import XGBClassifier def xgb(X_train: pd.DataFrame, y_train: pd.Series, verbose: bool = False, **kw): model = XGBClassifier(verbosity=int(verbose), **kw) model.fit(X_train, y_train) return model 生成一系列超参。 def gen_xgb_parameters(): for n_estimators in [100, 600]: for criterion in ['gini', 'entropy']: for learning_rate in [0.001, 0.1, 0.5]: yield { 'n_estimators': n_estimators, 'criterion': criterion, 'learning_rate': learning_rate } 验证 接着我们编写验证逻辑,这里我们使用 log_loss 来作为评价函数。 from sklearn.metrics import log_loss def metric_model(model, X_test: pd.DataFrame, y_test: pd.Series) -> float: if isinstance(model, bytes): model = pickle.loads(model) y_pred = model.predict_proba(X_test) return log_loss(y_test, y_pred) def train_and_metric(train_func, train_params: dict, X_train: pd.DataFrame, y_train: pd.Series, X_test: pd.DataFrame, y_test: pd.Series, verbose: bool = False ): # 把训练和验证封装到一起 model = train_func(X_train, y_train, verbose=verbose, **train_params) metric = metric_model(model, X_test, y_test) return model, metric 找出最好的模型 做好准备工作后,我们就开始来跑模型了。针对每个模型,我们把每次生成的超参们送进去训练,除了这些超参,我们还把 n_jobs 设成 -1,这样能更好利用单机的多核。 results = [] # ------------- # Random Forest # ------------- for params in gen_random_forest_parameters(): print(f'calculating on {params}') # fixed random_state params['random_state'] = 123 # use all CPU cores params['n_jobs'] = -1 model, metric = train_and_metric(random_forest, params, X_train, y_train, X_test, y_test) print(f'metric: {metric}') results.append({'model': model, 'metric': metric}) # ------------------- # Logistic Regression # ------------------- for params in gen_lr_parameters(): print(f'calculating on {params}') # fixed random_state params['random_state'] = 123 # use all CPU cores params['n_jobs'] = -1 model, metric = train_and_metric(logistic_regression, params, X_train, y_train, X_test, y_test) print(f'metric: {metric}') results.append({'model': model, 'metric': metric}) # ------- # XGBoost # ------- for params in gen_xgb_parameters(): print(f'calculating on {params}') # fixed random_state params['random_state'] = 123 # use all CPU cores params['n_jobs'] = -1 model, metric = train_and_metric(xgb, params, X_train, y_train, X_test, y_test) print(f'metric: {metric}') results.append({'model': model, 'metric': metric}) 运行一下,需要相当长时间,我们省略掉一部分输出内容。 calculating on {'n_estimators': 50, 'max_depth': None, 'criterion': 'gini'} metric: 0.6964123781828575 calculating on {'n_estimators': 50, 'max_depth': None, 'criterion': 'entropy'} metric: 0.6912312790832288 # 省略其他模型的输出结果 CPU times: user 3h 41min 53s, sys: 2min 34s, total: 3h 44min 28s Wall time: 31min 44s 从 CPU 时间和 Wall 时间,能看出来这些训练还是充分利用了多核的性能。但整个过程还是花费了 31 分钟。 使用 Remote API 分布式加速 现在我们尝试使用 Remote API 通过分布式方式加速整个过程。 集群方面,我们使用最开始说的第三种方式,直接在 MaxCompute 上拉起一个集群。大家可以选择其他方式,效果是一样的。 n_cores = 8 mem = 2 * n_cores # 16G # o 是 MaxCompute 入口,这里创建 10 个 worker 的集群,每个 worker 8核16G cluster = o.create_mars_cluster(10, n_cores, mem, image='extended') 为了方便在分布式读取数据,我们对数据处理稍作改动,把数据上传到 MaxCompute 资源。对于其他环境,用户可以考虑 HDFS、Aliyun OSS 或者 Amazon S3 等存储。 if not o.exist_resource('otto_train.csv'): with open('otto/train.csv') as f: # 上传资源 o.create_resource('otto_train.csv', 'file', fileobj=f) def gen_data(): # 改成从资源读取 df = pd.read_csv(o.open_resource('otto_train.csv')) X = df.drop(['target', 'id'], axis=1) y = df['target'] label_encoder = LabelEncoder() label_encoder.fit(y) y = label_encoder.transform(y) return train_test_split(X, y, test_size=0.33, random_state=123) 稍作改动之后,我们使用 mars.remote.spawn 方法来让 gen_data 调度到集群上运行。 import mars.remote as mr # n_output 说明是 4 输出 # execute() 执行后,数据会读取到 Mars 集群内部 data = mr.ExecutableTuple(mr.spawn(gen_data, n_output=4)).execute() # remote_ 开头的都是 Mars 对象,这时候数据在集群内,这些对象只是引用 remote_X_train, remote_X_test, remote_y_train, remote_y_test = data 目前 Mars 能正确序列化 numpy ndarray、pandas DataFrame 等,还不能序列化模型,所以,我们要对 train_and_metric 稍作改动,把模型 pickle 了之后再返回。 def distributed_train_and_metric(train_func, train_params: dict, X_train: pd.DataFrame, y_train: pd.Series, X_test: pd.DataFrame, y_test: pd.Series, verbose: bool = False ): model, metric = train_and_metric(train_func, train_params, X_train, y_train, X_test, y_test, verbose=verbose) return pickle.dumps(model), metric 后续 Mars 支持了序列化模型后可以直接 spawn 原本的函数。 接着我们就对前面的执行过程稍作改动,把函数调用全部都用 mars.remote.spawn 来改写。 import numpy as np tasks = [] models = [] metrics = [] # ------------- # Random Forest # ------------- for params in gen_random_forest_parameters(): # fixed random_state params['random_state'] = 123 task = mr.spawn(distributed_train_and_metric, args=(random_forest, params, remote_X_train, remote_y_train, remote_X_test, remote_y_test), kwargs={'verbose': 2}, n_output=2 ) tasks.extend(task) # 把模型和评价分别存储 models.append(task[0]) metrics.append(task[1]) # ------------------- # Logistic Regression # ------------------- for params in gen_lr_parameters(): # fixed random_state params['random_state'] = 123 task = mr.spawn(distributed_train_and_metric, args=(logistic_regression, params, remote_X_train, remote_y_train, remote_X_test, remote_y_test), kwargs={'verbose': 2}, n_output=2 ) tasks.extend(task) # 把模型和评价分别存储 models.append(task[0]) metrics.append(task[1]) # ------- # XGBoost # ------- for params in gen_xgb_parameters(): # fixed random_state params['random_state'] = 123 # 再指定并发为核的个数 params['n_jobs'] = n_cores task = mr.spawn(distributed_train_and_metric, args=(xgb, params, remote_X_train, remote_y_train, remote_X_test, remote_y_test), kwargs={'verbose': 2}, n_output=2 ) tasks.extend(task) # 把模型和评价分别存储 models.append(task[0]) metrics.append(task[1]) # 把顺序打乱,目的是能分散到 worker 上平均一点 shuffled_tasks = np.random.permutation(tasks) _ = mr.ExecutableTuple(shuffled_tasks).execute() 可以看到代码几乎一致。 运行查看结果: CPU times: user 69.1 ms, sys: 10.9 ms, total: 80 ms Wall time: 1min 59s 时间一下子从 31 分钟多来到了 2 分钟,提升 15x+。但代码修改的代价可以忽略不计。 细心的读者可能注意到了,分布式运行的代码中,我们把模型的 verbose 给打开了,在分布式环境下,因为这些函数远程执行,打印的内容只会输出到 worker 的标准输出流,我们在客户端不会看到打印的结果,但 Mars 提供了一个非常有用的接口来让我们查看每个模型运行时的输出。 以第0个模型为例,我们可以在 Mars 对象上直接调用 fetch_log 方法。 print(models[0].fetch_log()) 输出我们简略一部分。 building tree 1 of 50 building tree 2 of 50 building tree 3 of 50 building tree 4 of 50 building tree 5 of 50 building tree 6 of 50 # 中间省略 building tree 49 of 50 building tree 50 of 50 要看哪个模型都可以通过这种方式。试想下,如果没有 fetch_log API,你确想看中间过程的输出有多麻烦。首先这个函数在哪个 worker 上执行,不得而知;然后,即便知道是哪个 worker,因为每个 worker 上可能有多个函数执行,这些输出就可能混杂在一起,甚至被庞大日志淹没了。fetch_log 接口让用户不需要关心在哪个 worker 上执行,也不用担心日志混合在一起。 想要了解 fetch_log 接口,可以查看 文档。 还有更多 Mars Remote API 的能力其实不止这些,举个例子,在 remote 内部可以 spawn 新的函数;也可以调用 Mars tensor、DataFrame 或者 learn 的算法。这些内容,读者们可以先行探索,后续我们再写别的文章介绍。 总结 Mars Remote API 通过并行和分布式 Python 函数,用很小的修改代价,极大提升了执行效率。 对 Mars 项目感兴趣的读者们,欢迎 star Github 项目,以及订阅我们的专栏。 Mars 开源地址:https://github.com/mars-project/mars Mars 团队专栏:https://zhuanlan.zhihu.com/mars-project 联系我们 除了可以在 Github Issues 和我们联系,也可以加入钉钉群 32697156 和我们交流。
Mars 简介 Mars 能利用并行和分布式技术,加速 Python 数据科学栈,包括 numpy、pandas 和 scikit-learn。新的 Remote API 能轻松并行执行 Python 函数。此外,也能轻松与 TensorFlow、PyTorch 和 XGBoost 集成。 Mars tensor 的接口和 numpy 保持一致,但支持大规模高维数组。样例代码如下。 import mars.tensor as mt a = mt.random.rand(10000, 50) b = mt.random.rand(50, 5000) a.dot(b).execute() Mars DataFrame 接口和 pandas 保持一致,但可以支撑大规模数据处理和分析。样例代码如下。 import mars.dataframe as md ratings = md.read_csv('Downloads/ml-20m/ratings.csv') movies = md.read_csv('Downloads/ml-20m/movies.csv') movie_rating = ratings.groupby('movieId', as_index=False).agg({'rating': 'mean'}) result = movie_rating.merge(movies[['movieId', 'title']], on='movieId') result.sort_values(by='rating', ascending=False).execute() Mars learn 保持和 scikit-learn 接口一致。样例代码如下。 import mars.dataframe as md from mars.learn.neighbors import NearestNeighbors df = md.read_csv('data.csv') nn = NearestNeighbors(n_neighbors=10) nn.fit(df) neighbors = nn.kneighbors(df).fetch() Mars learn 可以很方便地与 TensorFlow、PyTorch 和 XGBoost 集成,点击链接查看文档。 Mars Remote 可以轻松并行和分布式执行一系列 Python 函数。样例代码如下: import numpy as np import mars.remote as mr def calc_chunk(n, i): rs = np.random.RandomState(i) a = rs.uniform(-1, 1, size=(n, 2)) d = np.linalg.norm(a, axis=1) return (d < 1).sum() def calc_pi(fs, N): return sum(fs) * 4 / N N = 200_000_000 n = 10_000_000 fs = [mr.spawn(calc_chunk, args=(n, i)) for i in range(N // n)] pi = mr.spawn(calc_pi, args=(fs, N)) print(pi.execute().fetch()) 这里 calc_chunk 被 spawn 了 20次,这20次调用可以分布到多核,或者到集群里并行执行。最后 Mars 会自动等待这20次调用结束后触发 calc_pi 的执行。 在 MaxCompute 上使用 Mars,我们提供了简单易用的接口来拉起 Mars 集群,用户不需要关心安装和维护集群。同时,通过 MaxCompute 拉起的 Mars,也支持直接读写 MaxCompute 表。 申请试用 目前我们采用申请开通的方式,公共云用户请提工单申请。 环境准备 要在 MaxCompute 上运行 Mars,需要有相应的运行环境。这可以分为如下几种情况。 开箱即用的环境,如 dataworks,会包含所需要的依赖。 其他环境,需要自己安装相关依赖。 我们分别展开。 开箱即用的环境 开箱即用的环境,如 dataworks 的 pyodps3 节点,已经包含了 PyODPS 和 Mars。 在新建的 pyodps3 节点里运行如下命令检查版本,确保满足要求。 from odps import __version__ as odps_version from mars import __version__ as mars_version print(odps_version) print(mars_version) 输出的第一个为 PyODPS 版本,第二个为 Mars 版本。要求 PyODPS 至少是 0.9.0 。 其他环境 这个环境就要求通过 pip 安装 PyODPS 和 Mars。Python 版本推荐使用 3.7 版本,至少需要是 3.5 版本。 通过如下命令安装: pip install -U pip # 可选,确保 pip 版本足够新 pip install pyarrow==0.12.1 # 目前 pyarrow 版本固定到 0.12.1 pip install pyodps>0.9.0 # pyodps 需要至少 0.9.0 pip install pymars>=0.4.0rc1 # mars 版本需要至少是 0.4.0rc1 准备 ODPS 入口 ODPS 入口是 MaxCompute 所有操作的基础: 对于开箱即用的环境,如 dataworks,我们会自动创建 o 即 ODPS 入口实例,因此可以不需要创建。 对于其他环境,需要通过 access_id、access_key 等参数创建,详细参考 文档。 基本概念 MaxCompute 任务实例:MaxCompute 上任务以 instance 概念存在。Mars 集群也是通过一个 MaxCompute Instance 拉起。 Logview 地址:每个 MaxCompute instance 包含一个 logview 地址来查看任务状态。拉起 Mars 集群的 instance 也不例外。 Mars UI: Mars 集群拉起后,会包含一个 Web UI,通过这个 Web UI,可以查看 Mars 集群、任务状态,可以提交任务。当集群拉起后,一般来说就不需要和 MaxCompute 任务实例交互了。 Mars session:Mars session 和具体的执行有关,一般情况下用户不需要关心 session,因为会包含默认的 session。通过 o.create_mars_cluster 创建了 Mars 集群后,会创建默认连接到 Mars 集群的 session。 Jupyter Notebook:Jupyter Notebook 是一个基于网页的用于交互式计算的应用程序,可以用来开发、文档编写、运行代码和展示结果。 基础用法 创建 Mars 集群 准备好环境后,接着我们就可以拉起 Mars 集群了。 有了 o 这个对象后,拉起 Mars 集群非常简单,只需要运行如下代码。 from odps import options options.verbose = True # 在 dataworks pyodps3 里已经设置,所以不需要前两行代码 client = o.create_mars_cluster(5, 4, 16, min_worker_num=3) 这个例子里指定了 worker 数量为 5 的集群,每个 worker 是4核、16G 内存的配置,min_worker_num 指当 worker 已经起了3个后,就可以返回 client 对象了,而不用等全部 5 个 worker 都启动再返回。Mars 集群的创建过程可能比较慢,需要耐心等待。 注意:申请的单个 worker 内存需大于 1G,CPU 核数和内存的最佳比例为 1:4,例如单 worker 4核、16G。同时,新建的 worker 个数也不要超过 30 个,否则会对镜像服务器造成压力,如果需要使用超过 30 个 worker,请工单申请。 这个过程中会打印 MaxCompute instance 的 logview、 Mars UI 以及 Notebook 地址。Mars UI 可以用来连接 Mars 集群,亦可以用来查看集群、任务状态。 Mars 集群的创建就是一个 MaxCompute 任务,因此也有 instance id、logview 等 MaxCompute 通用的概念。 提交作业 Mars 集群创建的时候会设置默认 session,通过 .execute() 执行时任务会被自动提交到集群。 import mars.dataframe as md import mars.tensor as mt md.DataFrame(mt.random.rand(10, 3)).execute() # execute 自动提交任务到创建的集群 停止并释放集群 目前一个 Mars 集群超过3天就会被自动释放。当 Mars 集群不再需要使用时,也可以通过调用 client.stop_server() 手动释放: client.stop_server() MaxCompute 表读写支持 创建了 Mars 集群后,集群内的 Mars 任务可以直读和直写 MaxCompute 表。 读表 通过 o.to_mars_dataframe 来读取 MaxCompute 表,并返回 Mars DataFrame。 In [1]: df = o.to_mars_dataframe('test_mars') In [2]: df.head(6).execute() Out[2]: col1 col2 0 0 0 1 0 1 2 0 2 3 1 0 4 1 1 5 1 2 写表 通过 o.persist_mars_dataframe(df, 'table_name') 将 Mars DataFrame 保存成 MaxCompute 表。 In [3]: df = o.to_mars_dataframe('test_mars') In [4]: df2 = df + 1 In [5]: o.persist_mars_dataframe(df2, 'test_mars_persist') # 保存 Mars DataFrame In [6]: o.get_table('test_mars_persist').to_df().head(6) # 通过 PyODPS DataFrame 查看数据 col1 col2 0 1 1 1 1 2 2 1 3 3 2 1 4 2 2 5 2 3 使用 Mars 集群自带的 Jupyter Notebook 创建 Mars 集群会自动创建一个 Jupyter Notebook 以编写代码。 新建一个 Notebook 会自动设置 session,提交任务到集群。因此在这个 notebook 内也不需要显示创建 session。 import mars.dataframe as md md.DataFrame(mt.random.rand(10, 3)).sum().execute() # 在 notebook 里运行,execute 自动提交到当前集群 有一点要注意:这个 notebook 不会保存你的 notebook 文件,所以要记得自行保存。 用户也可以使用自己的 notebook 连接到集群,此时参考 使用已经创建的 Mars 集群 章节。 其他用法 使用已经创建的 Mars 集群 首先,我们可以通过 instance id 重建 Mars 集群的 client。 client = o.create_mars_cluster(instance_id=**instance-id**) 如果只是想使用 Mars,可以使用 Mars session 来连接。给定 Mars UI 的地址。则: from mars.session import new_session new_session('**Mars UI address**').as_default() # 设置为默认 session 获取 Mars UI 地址 Mars 集群创建的时候指定了 options.verbose=True 会打印 Mars UI 地址。 也可以通过 client.endpoint 来获取 Mars UI。 print(client.endpoint) 获取 Logview 地址 创建集群的时候指定了 options.verbose=True 会自动打印 logview。 也可以通过 client.get_logview_address() 获取 logview 地址。 print(client.get_logview_address()) 获取 Jupyter Notebook 地址 Mars 集群创建的时候指定了 options.verbose=True 会打印 Jupyter Notebook 地址。 也可以通过 client.get_notebook_endpoint() 获取 Jupyter Notebook 地址。 print(client.get_notebook_endpoint()) Mars 和 PyODPS DataFrame 对比 有同学会问,Mars 和 PyODPS DataFrame 有什么区别呢? API Mars DataFrame 的接口完全兼容 pandas。除了 DataFrame,Mars tensor 兼容 numpy,Mars learn 兼容 scikit-learn。 而 PyODPS 只有 DataFrame 接口,和 pandas 的接口存在着很多不同。 索引 Mars DataFrame 有 pandas 索引的概念。 In [1]: import mars.dataframe as md In [5]: import mars.tensor as mt In [7]: df = md.DataFrame(mt.random.rand(10, 3), index=md.date_range('2020-5-1', periods=10)) In [9]: df.loc['2020-5'].execute() Out[9]: 0 1 2 2020-05-01 0.061912 0.507101 0.372242 2020-05-02 0.833663 0.818519 0.943887 2020-05-03 0.579214 0.573056 0.319786 2020-05-04 0.476143 0.245831 0.434038 2020-05-05 0.444866 0.465851 0.445263 2020-05-06 0.654311 0.972639 0.443985 2020-05-07 0.276574 0.096421 0.264799 2020-05-08 0.106188 0.921479 0.202131 2020-05-09 0.281736 0.465473 0.003585 2020-05-10 0.400000 0.451150 0.956905 PyODPS 里没有索引的概念,因此跟索引有关的操作全部都不支持。 数据顺序 Mars DataFrame 一旦创建,保证顺序,因此一些时序操作比如 shift,以及向前向后填空值如ffill、bfill ,只有 Mars DataFrame 支持。 In [3]: df = md.DataFrame([[1, None], [None, 1]]) In [4]: df.execute() Out[4]: 0 1 0 1.0 NaN 1 NaN 1.0 In [5]: df.ffill().execute() # 空值用上一行的值 Out[5]: 0 1 0 1.0 NaN 1 1.0 1.0 PyODPS 由于背后使用 MaxCompute 计算和存储数据,而 MaxCompute 并不保证数据顺序,所以这些操作在 MaxCompute 上都无法支持。 执行层 PyODPS 本身只是个客户端,不包含任何服务端部分。PyODPS DataFrame 在真正执行时,会将计算编译到 MaxCompute SQL 执行。因此,PyODPS DataFrame 支持的操作,取决于 MaxCompute SQL 本身。此外,每一次调用 execute 方法时,会提交一次 MaxCompute 作业,需要在集群内调度。 Mars 本身包含客户端和分布式执行层。通过调用 o.create_mars_cluster ,会在 MaxCompute 内部拉起 Mars 集群,一旦 Mars 集群拉起,后续的交互就直接和 Mars 集群进行。计算会直接提交到这个集群,调度开销极小。在数据规模不是特别大的时候,Mars 应更有优势。 使用场景指引 有同学会关心,何时使用 Mars,何时使用 PyODPS DataFrame?我们分别阐述。 适合 Mars 的使用场景。 如果你经常使用 PyODPS DataFrame 的 to_pandas() 方法,将 PyODPS DataFrame 转成 pandas DataFrame,推荐使用 Mars DataFrame。 Mars DataFrame 目标是完全兼容 pandas 的接口以及行为,如果你熟悉 pandas 的接口,而不愿意学习 PyODPS DataFrame 的接口,那么使用 Mars。 Mars DataFrame 因为兼容 pandas 的行为,因此如下的特性如果你需要用到,那么使用 Mars。 Mars DataFrame 包含行和列索引,如果需要使用索引,使用 Mars。 Mars DataFrame 创建后会保证顺序,通过 iloc 等接口可以获取某个偏移的数据。如 df.iloc[10] 可以获取第10行数据。此外,如 df.shift() 、df.ffill() 等需要有保证顺序特性的接口也在 Mars DataFrame 里得到了实现,有这方面的需求可以使用 Mars。 Mars 还包含 Mars tensor 来并行和分布式化 Numpy,以及 Mars learn 来并行和分布式化 scikit-learn、以及支持在 Mars 集群里分布式运行 TensorFlow、PyTorch 和 XGBoost。有这方面的需求使用 Mars。 Mars 集群一旦创建,后续不再需要通过 MaxCompute 调度,任务可以直接提交到 Mars 集群执行;此外,Mars 对于中小型任务(数据量 T 级别以下),会有较好的性能。这些情况可以使用 Mars。 适合 PyODPS DataFrame 的使用场景 PyODPS DataFrame 会把 DataFrame 任务编译成 MaxCompute SQL 执行,如果希望依托 MaxCompute 调度任务,使用 PyODPS DataFrame。 PyODPS DataFrame 会编译任务到 MaxCompute 执行,由于 MaxCompute 相当稳定,而 Mars 相对比较新,如果对稳定性有很高要求,那么使用 PyODPS DataFrame。 数据量特别大(T 级别以上),使用 PyODPS DataFrame。 Mars 参考文档 Mars 开源地址:https://github.com/mars-project/mars Mars 文档:https://docs.pymars.org/zh_CN/latest/ Mars 团队专栏:https://zhuanlan.zhihu.com/mars-project 技术支持 技术支持请加 PyODPS 钉钉群:11701793 FAQ Q:一个用户创建的 Mars 集群,别人能不能用。 A:可以,参考 使用已经创建的 Mars 集群 章节。
文章原载于 Mars 团队专栏,欢迎关注。 从这篇文章开始,我们开始一个新的读 paper 系列。 今天要介绍的 paper 是 Towards Scalable Dataframe Systems,目前还是预印本。作者 Devin Petersohn 来自 Riselab,该实验室的前身是大名鼎鼎的 APMLab,诞生了 Apache Spark、Apache Mesos 等一系列著名开源项目。 个人觉得这篇 paper 蛮有意义的,第一次(据我所知)试图在学术上对 DataFrame 做定义,给了很好的理论指导意义。 这篇文章我不会拘泥于原 paper,我会加入自己的理解。本篇文章会大致分三部分: 什么是真正的 DataFrame? 为什么现在的所谓 DataFrame 系统,典型的如 Spark DataFrame,有可能正在杀死 DataFrame 的原本含义。 从 Mars DataFrame 的角度来看这个问题。 什么是真正的 DataFrame? 起源 最早的 "DataFrame" (开始被称作 "data frame"),来源于贝尔实验室开发的 S 语言。"data frame" 在 1990 年就发布了,书《S 语言统计模型》第3章里详述了它的概念,书里着重强调了 dataframe 的矩阵起源。 书中描述 DataFrame 看上去很像矩阵,且支持类似矩阵的操作;同时又很像关系表。 R 语言,作为 S 语言的开源版本,于 2000 年发布了第一个稳定版本,并且实现了 dataframe。pandas 于 2009 年被开发,Python 中于是也有了 DataFrame 的概念。这些 DataFrame 都同宗同源,有着相同的语义和数据模型。 DataFrame 数据模型 DataFrame 的需求来源于把数据看成矩阵和表。但是,矩阵中只包含一种数据类型,未免过于受限;同时,关系表要求数据必须要首先定义 schema。对于 DataFrame 来说,它的列类型可以在运行时推断,并不需要提前知晓,也不要求所有列都是一个类型。因此,DataFrame 可以理解成是关系系统、矩阵、甚至是电子表格程序(典型如 Excel)的合体。 跟关系系统相比,DataFrame 有几个特别有意思的属性,让 DataFrame 因此独一无二。 保证顺序,行列对称 首先,无论在行还是列方向上,DataFrame 都是有顺序的;且行和列都是一等公民,不会区分对待。 拿 pandas 举例子,当创建了一个 DataFrame 后,无论行和列上数据都是有顺序的,因此,在行和列上都可以使用位置来选择数据。 In [1]: import pandas as pd In [2]: import numpy as np In [3]: df = pd.DataFrame(np.random.rand(5, 4)) In [4]: df Out[4]: 0 1 2 3 0 0.736385 0.271232 0.940270 0.926548 1 0.319533 0.891928 0.471176 0.583895 2 0.440825 0.500724 0.402782 0.109702 3 0.300279 0.483571 0.639299 0.778849 4 0.341113 0.813870 0.054731 0.059262 In [5]: df.iat[2, 2] # 第二行第二列元素 Out[5]: 0.40278182653648853 因为行和列的对称关系,因此聚合函数在两个方向上都可以计算,只需指定 axis 即可。 In [6]: df.sum() # 默认 axis == 0,在行方向上做聚合,因此结果是4个元素 Out[6]: 0 2.138135 1 2.961325 2 2.508257 3 2.458257 dtype: float64 In [7]: df.sum(axis=1) # axis == 1,在列方向上做聚合,因此是5个元素 Out[7]: 0 2.874434 1 2.266533 2 1.454032 3 2.201998 4 1.268976 dtype: float64 如果熟悉 numpy(数值计算库,包含多维数组和矩阵的定义),可以看到这个特性非常熟悉,从而可以看出 DataFrame 的矩阵本质。 丰富的 API DataFrame 的 API 非常丰富,横跨关系(如 filter、join)、线性代数(如 transpose、dot)以及类似电子表格(如 pivot)的操作。 还是以 pandas 为例,一个 DataFrame 可以做转置操作,让行和列对调。 In [8]: df.transpose() Out[8]: 0 1 2 3 4 0 0.736385 0.319533 0.440825 0.300279 0.341113 1 0.271232 0.891928 0.500724 0.483571 0.813870 2 0.940270 0.471176 0.402782 0.639299 0.054731 3 0.926548 0.583895 0.109702 0.778849 0.059262 直观的语法,适合交互式分析 用户可以对 DataFrame 数据不断进行探索,查询结果可以被后续的结果复用,可以非常方便地用编程的方式组合非常复杂的操作,很适合交互式的分析。 列中允许异构数据 DataFrame 的类型系统允许一列中有异构数据的存在,比如,一个 int 列中允许有 string 类型数据存在,它可能是脏数据。这点看出 DataFrame 非常灵活。 In [10]: df2 = df.copy() In [11]: df2.iloc[0, 0] = 'a' In [12]: df2 Out[12]: 0 1 2 3 0 a 0.271232 0.940270 0.926548 1 0.319533 0.891928 0.471176 0.583895 2 0.440825 0.500724 0.402782 0.109702 3 0.300279 0.483571 0.639299 0.778849 4 0.341113 0.813870 0.054731 0.059262 数据模型 现在我们可以对什么是真正的 DataFrame 正式下定义: DataFrame 由二维混合类型的数组、行标签、列标签、以及类型(types 或者 domains)组成。在每列上,这个类型是可选的,可以在运行时推断。从行上看,可以把 DataFrame 看做行标签到行的映射,且行之间保证顺序;从列上看,可以看做列类型到列标签到列的映射,同样,列间同样保证顺序。 行标签和列标签的存在,让选择数据时非常方便。 In [13]: df.index = pd.date_range('2020-4-15', periods=5) In [14]: df.columns = ['c1', 'c2', 'c3', 'c4'] In [15]: df Out[15]: c1 c2 c3 c4 2020-04-15 0.736385 0.271232 0.940270 0.926548 2020-04-16 0.319533 0.891928 0.471176 0.583895 2020-04-17 0.440825 0.500724 0.402782 0.109702 2020-04-18 0.300279 0.483571 0.639299 0.778849 2020-04-19 0.341113 0.813870 0.054731 0.059262 In [16]: df.loc['2020-4-16': '2020-4-18', 'c2': 'c3'] # 注意这里的切片是闭区间 Out[16]: c2 c3 2020-04-16 0.891928 0.471176 2020-04-17 0.500724 0.402782 2020-04-18 0.483571 0.639299 这里的 index 和 columns 就分别是行和列标签。我们可以很容易选择一段时间(行上选择)和几列(列上选择)数据。当然这些建立在数据是按顺序存储的基础上。 按顺序存储的特性让 DataFrame 非常适合用来做统计方面的工作。 In [17]: df3 = df.shift(1) # 把 df 的数据整体下移一格,行列索引保持不变 In [18]: df3 Out[18]: c1 c2 c3 c4 2020-04-15 NaN NaN NaN NaN 2020-04-16 0.736385 0.271232 0.940270 0.926548 2020-04-17 0.319533 0.891928 0.471176 0.583895 2020-04-18 0.440825 0.500724 0.402782 0.109702 2020-04-19 0.300279 0.483571 0.639299 0.778849 In [19]: df - df3 # 数据减法会自动按标签对齐,因此这一步可以用来计算环比 Out[19]: c1 c2 c3 c4 2020-04-15 NaN NaN NaN NaN 2020-04-16 -0.416852 0.620697 -0.469093 -0.342653 2020-04-17 0.121293 -0.391205 -0.068395 -0.474194 2020-04-18 -0.140546 -0.017152 0.236517 0.669148 2020-04-19 0.040834 0.330299 -0.584568 -0.719587 In [21]: (df - df3).bfill() # 第一行的空数据按下一行填充 Out[21]: c1 c2 c3 c4 2020-04-15 -0.416852 0.620697 -0.469093 -0.342653 2020-04-16 -0.416852 0.620697 -0.469093 -0.342653 2020-04-17 0.121293 -0.391205 -0.068395 -0.474194 2020-04-18 -0.140546 -0.017152 0.236517 0.669148 2020-04-19 0.040834 0.330299 -0.584568 -0.719587 从例子看到,正因为数据是按顺序存放的,因此我们可以索引保持不变,整体下移一行,这样,昨天的数据就到了今天的行上,然后拿原数据减去位移后的数据时,因为 DataFrame 会自动按标签做对齐,因此,对于一个日期,相当于用当天的数据减去了前天的数据,这样就可以做类似于环比的操作。这简直太方便了。试想,对于关系系统来说,恐怕需要想办法找一列作为 join 的条件,然后再做减法等等。最后,对于空数据,我们还可以填充上一行(ffill)或者下一行的数据(bfill)。想在关系系统里想达到同样效果,想必是需要大费周章的。 DataFrame 的真正含义正在被杀死 近几年,DataFrame 系统如同雨后春笋般出现,然而,这其中的绝大多数系统只包含了关系表的语义,并不包含我们之前说的矩阵方面的意义,且它们大多也并不保证数据顺序,因此真正 DataFrame 所拥有的统计和机器学习方面的特质也不复存在。这些 “DataFrame” 系统的出现,让 “DataFrame” 这个词本身几乎变得没有意义。数据科学家们为了处理大规模的数据,思维方式不得不作出改变,这其中必然存在风险。 Spark DataFrame 和 Koalas 不是真正的 DataFrame 这些 DataFrame 系统的代表是 Spark DataFrame, Spark 当然是伟大的,它解决了数据规模的问题;同时又首次把 ”DataFrame“ 的概念带到了大数据的领域。但其实它只是 spark.sql的另一种形式(当然 Spark DataFrame 确实在 spark.sql 下)。Spark DataFrame 只包含了关系表的语义,schema 需要确定,数据也并不保证顺序。 那么会有同学说 Koalas 呢?Koalas 提供了 pandas API,用 pandas 的语法就可以在 spark 上分析了。实际上,因为 Koalas 也是将 pandas 的操作转成 Spark DataFrame 来执行,因为 Spark DataFrame 内核本身的特性,注定 Koalas 只是看上去和 pandas 一致。 为了说明这点,我们使用 数据集(Hourly Ridership by Origin-Destination Pairs),只取 2019 年的数据。 对于 pandas,我们按天聚合,并按 30 天滑动窗口来计算平均值。 In [22]: df = pd.read_csv('Downloads/bart-dataset/date-hour-soo-dest-2019.csv', ...: names=['Date','Hour','Origin','Destination','Trip Count']) In [23]: df.groupby('Date').mean()['Trip Count'].rolling(30).mean().plot() Out[23]: <matplotlib.axes._subplots.AxesSubplot at 0x118077d90> 如果是 Koalas,因为它的 API 看上去和 pandas 一致,因此,我们按照 Koalas 的文档做 import 替换。 In [1]: import databricks.koalas as ks In [2]: df = ks.read_csv('Downloads/bart-dataset/date-hour-soo-dest-2019.csv', names=['Date','Hour','Origin','Destination','Trip Count']) In [3]: df.groupby('Date').mean()['Trip Count'].rolling(30).mean().plot() 然后令人惊讶的是,结果并不一致。大费周章后才查到,原因是顺序问题,聚合的结果后并不保证排序,因此要得到一样的结果需要在 rolling 前加 sort_index(),确保 groupby 后的结果是排序的。 In [4]: df.groupby('Date').mean()['Trip Count'].sort_index().rolling(30).mean().plot() 默认的排序规则非常重要,这对以时间作为索引的数据尤其关键,而且这让数据科学家更容易观察数据,也更容易复现结果。 所以,在使用 Koalas 时请小心,要时刻关注你的数据在你心中是不是排序的,因为 Koalas 很可能表现地和你想的不一致。 让我们再看 shift,它能工作的一个前提就是数据是排序的,那么在 Koalas 中调用会发生什么呢? In [6]: df.shift(1) --------------------------------------------------------------------------- Py4JJavaError Traceback (most recent call last) /usr/local/opt/apache-spark/libexec/python/pyspark/sql/utils.py in deco(*a, **kw) 62 try: ---> 63 return f(*a, **kw) 64 except py4j.protocol.Py4JJavaError as e: /usr/local/opt/apache-spark/libexec/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 327 "An error occurred while calling {0}{1}{2}.\n". --> 328 format(target_id, ".", name), value) 329 else: Py4JJavaError: An error occurred while calling o110.select. : org.apache.spark.sql.AnalysisException: cannot resolve 'isnan(lag(`Date`, 1, NULL) OVER (ORDER BY `__natural_order__` ASC NULLS FIRST ROWS BETWEEN -1 FOLLOWING AND -1 FOLLOWING))' due to data type mismatch: argument 1 requires (double or float) type, however, 'lag(`Date`, 1, NULL) OVER (ORDER BY `__natural_order__` ASC NULLS FIRST ROWS BETWEEN -1 FOLLOWING AND -1 FOLLOWING)' is of timestamp type.;; 'Project [__index_level_0__#41, CASE WHEN (isnull(lag(Date#30, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1))) || isnan(lag(Date#30, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)))) THEN null ELSE lag(Date#30, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) END AS Date#87, CASE WHEN (isnull(lag(Hour#31, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1))) || isnan(cast(lag(Hour#31, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) as double))) THEN cast(null as int) ELSE lag(Hour#31, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) END AS Hour#88, CASE WHEN (isnull(lag(Origin#32, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1))) || isnan(cast(lag(Origin#32, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) as double))) THEN cast(null as string) ELSE lag(Origin#32, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) END AS Origin#89, CASE WHEN (isnull(lag(Destination#33, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1))) || isnan(cast(lag(Destination#33, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) as double))) THEN cast(null as string) ELSE lag(Destination#33, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) END AS Destination#90, CASE WHEN (isnull(lag(Trip Count#34, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1))) || isnan(cast(lag(Trip Count#34, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) as double))) THEN cast(null as int) ELSE lag(Trip Count#34, 1, null) windowspecdefinition(__natural_order__#50L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) END AS Trip Count#91, __natural_order__#50L] +- Project [__index_level_0__#41, Date#30, Hour#31, Origin#32, Destination#33, Trip Count#34, monotonically_increasing_id() AS __natural_order__#50L] +- Project [__index_level_0__#41, Date#30, Hour#31, Origin#32, Destination#33, Trip Count#34] +- Project [Date#30, Hour#31, Origin#32, Destination#33, Trip Count#34, _w0#42L, _we0#43, (_we0#43 - 1) AS __index_level_0__#41] +- Window [row_number() windowspecdefinition(_w0#42L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _we0#43], [_w0#42L ASC NULLS FIRST] +- Project [Date#30, Hour#31, Origin#32, Destination#33, Trip Count#34, monotonically_increasing_id() AS _w0#42L] +- Project [0#20 AS Date#30, 1#21 AS Hour#31, 2#22 AS Origin#32, 3#23 AS Destination#33, 4#24 AS Trip Count#34] +- Project [_c0#10 AS 0#20, _c1#11 AS 1#21, _c2#12 AS 2#22, _c3#13 AS 3#23, _c4#14 AS 4#24] +- Relation[_c0#10,_c1#11,_c2#12,_c3#13,_c4#14] csv at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$3.applyOrElse(CheckAnalysis.scala:116) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$3.applyOrElse(CheckAnalysis.scala:108) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:280) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:280) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:279) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277) at org.apache.spark.sql.catalyst.trees.TreeNode.org$apache$spark$sql$catalyst$trees$TreeNode$$mapChild$2(TreeNode.scala:306) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4$$anonfun$apply$13.apply(TreeNode.scala:356) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:392) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.immutable.List.map(List.scala:296) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:356) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:277) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:328) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:326) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277) at ... # 后面还有一大段报错信息,此处省略 这个报错可能会让数据科学家们震惊,什么,我就做了个 shift 啊,报错里掺杂着 Java 异常栈和一大堆看不懂的错误。 这里真正的错误和 Date 是时间戳有关,那么我们只取 int 类型的字段做 shift 总可以了吧。 In [10]: df['Hour'].shift(1) Out[10]: 20/04/20 17:22:38 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 20/04/20 17:22:38 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 确实可以运行,但却看到一句话,大意是数据会被放到一个分区来执行,这正是因为数据本身之间并不保证顺序,因此只能把数据收集到一起,排序,再调用 shift。这样就不再是一个分布式的程序了,甚至比 pandas 本身更慢。 如 DataFrame.dot 等矩阵相关的操作在 Koalas 里也不包含,这些操作已经很难用关系代数来表达了。 PyODPS DataFrame 相信用过 MaxCompute(原名 ODPS,阿里云自研的大数据系统),应该会听说过 PyODPS。这个库是我们前几年的产品,PyODPS 里也包含一个 DataFrame,而 PyODPS DataFrame 在执行的时候会被编译到 ODPS SQL 来执行。 提 PyODPS DataFrame 的原因是,我们在几年前就发现,虽然它提供了 pandas-like 的接口,一定程度上让用户能用类似 pandas 的思维解决问题,然而,当用户问我们,如何向后填充数据?如何通过索引获取数据?答案都是不能。原因也是一样的,因为 PyODPS DataFrame 只是将计算代理给不保证有序、只有关系代数算子的引擎来执行。 如果系统本身的数据模型不是真正的 DataFrame 模型,仅仅让接口看起来像是远远不够的。 Mars DataFrame 因此这里要说到 Mars DataFrame,其实我们做 Mars 的初衷和这篇 paper 的想法是一致的,因为现有的系统虽然能很好地解决规模问题,但那些传统数据科学包中好的部分却被人遗忘了,我们希望 Mars 能保留这些库中好的部分,又能解决规模问题,也能充分利用新硬件。 Mars DataFrame 会自动将 DataFrame 分割成很多小的 chunk,每个 chunk 也是一个 DataFrame,而无论是 chunk 间还是 chunk 内的数据,都保证顺序。 图里的示例中,一个行数 380、列数 370 的 DataFrame,被 Mars 分成 3x3 一共 9 个 chunk,根据计算在 CPU 还是 NVIDIA GPU 上进行,用 pandas DataFrame 或者 cuDF DataFrame 来存储数据和执行真正的计算。可以看到,Mars 既会在行上,也会在列上进行分割,这种在行上和列上的对等性,让 DataFrame 的矩阵本质能得以发挥。 在单机真正执行时,根据初始数据的位置,Mars 会自动把数据分散到多核或者多卡执行;对于分布式,会将计算分散到多台机器执行。 Mars DataFrame 保留了行标签、列标签和类型的概念。因此能够想象如同 pandas 一样,可以在比较大的数据集上根据标签进行筛选。 In [1]: import mars.dataframe as md In [2]: import mars.tensor as mt In [8]: df = md.DataFrame(mt.random.rand(10000, 10, chunk_size=1000), ...: index=md.date_range('2020-1-1', periods=10000)) In [9]: df.loc['2020-4-15'].execute() Out[9]: 0 0.622763 1 0.446635 2 0.007870 3 0.107846 4 0.288893 5 0.219340 6 0.228806 7 0.969435 8 0.033130 9 0.853619 Name: 2020-04-15 00:00:00, dtype: float64 Mars 会保持和 pandas 一致的排序特性,因此对于 groupby 等操作,无需担心结果和所想不一致。 In [6]: import mars.dataframe as md In [7]: df = md.read_csv('Downloads/bart-dataset/date-hour-soo-dest-2019.csv', n ...: ames=['Date','Hour','Origin','Destination','Trip Count']) In [8]: df.groupby('Date').mean()['Trip Count'].rolling(30).mean().plot() # 结果正确 Out[8]: <matplotlib.axes._subplots.AxesSubplot at 0x11ff8ab90> 对于 shift,不光结果正确,而且执行时能利用多核、多卡和分布式的能力。 In [3]: df.shift(1).head(10).execute() Out[3]: Date Hour Origin Destination Trip Count 0 NaN NaN NaN NaN NaN 1 2019-01-01 0.0 12TH 16TH 4.0 2 2019-01-01 0.0 12TH ANTC 1.0 3 2019-01-01 0.0 12TH BAYF 1.0 4 2019-01-01 0.0 12TH CIVC 2.0 5 2019-01-01 0.0 12TH COLM 1.0 6 2019-01-01 0.0 12TH COLS 1.0 7 2019-01-01 0.0 12TH CONC 1.0 8 2019-01-01 0.0 12TH DALY 1.0 9 2019-01-01 0.0 12TH DELN 2.0 不只是 DataFrame Mars 还包含 tensor 模块来支持并行和分布式化 numpy,以及 learn 模块来并行和分布式化 scikit-learn,因此可以想象,如 mars.tensor.linalg.svd 可以直接作用在 Mars DataFrame 上,这就赋予了 Mars 超越 DataFrame 本身的语义。 In [1]: import mars.dataframe as md In [2]: import mars.tensor as mt In [3]: df = md.DataFrame(mt.random.rand(10000, 10, chunk_size=1000)) In [5]: mt.linalg.svd(df).execute() 总结 《Towards Scalable DataFrame Systems》赋予了 DataFrame 学术定义。而要做到可扩展的 DataFrame,首先必须是真正的 DataFrame,其次才是可扩展。 在我们看来,Mars 是真正的 DataFrame,它生来目标就是可扩展,而 Mars 又不仅仅是 DataFrame。在我们看来,Mars 在数据科学领域大有可为。 Mars 诞生于 MaxCompute 团队,MaxCompute 原名 ODPS,是一种快速、完全托管的EB级数据仓库解决方案。Mars 即将通过 MaxCompute 提供服务,购买了 MaxCompute 服务的用户届时可以开箱即用体验 Mars 服务。敬请期待。 参考 Towards Scalable Dataframe Systems:https://arxiv.org/abs/2001.00888 Preventing the Death of the DataFrame:https://towardsdatascience.com/preventing-the-death-of-the-dataframe-8bca1c0f83c8 如果对 Mars 感兴趣,可以关注 Mars 团队专栏,或者钉钉扫二维码加入 Mars 讨论群。
2020年12月
2020年10月
from odps import __version__
print(__version__)
看下pyodps版本?
RSS可以自行解析,然后用我们的表上传接口来上传数据。
文档:http://pyodps.readthedocs.io/zh_CN/latest/base-tables-zh.html#id7
另外,可以加PyODPS答疑群(钉钉)
用PyODPS可以写成这样,mysql那边使用sqlalchemy
import itertools
from sqlalchemy import create_engine, MetaData, Table
from odps.df import DataFrame
DB_CONNECT_STR = 'mysql+mysqldb://root:@localhost/mydb?charset=utf8'
engine = create_engine(DB_CONNECT_STR, echo=True)
conn = engine.connect()
metadata = MetaData(engine)
table = Table('mysql_table', metadata, autoload=True)
df = DataFrame(odps.get_table('my_demo_table', project='my_project'))
selected = df.filter(df.pdate == '')['imei', 'time_in', 'ntotalvote', 'ntotalcurr']
records = []
for i, record in zip(itertools.count(1), selected.execute()):
if i % 100:
conn.execute(conn.insert(), [dict(r) for r in records])
records = []
records.append(record)
if records:
conn.execute(conn.insert(), [dict(r) for r in records])
是因为交互式情况下,print或者repr的时候会执行立即执行的方法。在非交互式环境下需要显式调用立即执行的方法。
所以你可以在IDE里:
print(users.count().execute())
或者可以打开interactive选项,这样在print或者repr的时候也执行计算。
from odps jmport options
options.interactive = True
print(users.count())
有两个方法
1、 SQL写成create table as select *,这样再使用tunnel下载
odps.execute_sql('create table my_tmp_table as select ***')
t = odps.get_table('my_tmp_table')
with t.open_reader() as reader:
for record in reader:
2、 使用instance tunnel,可以用tunnel读取instance执行结果。这个会在0.6版本完成,预计在下周末或者下下周初发布。