当 Mars 遇上 RAPIDS:用 GPU 以并行的方式加速数据科学

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 在数据科学世界,Python 是一个不可忽视的存在,且有愈演愈烈之势。而其中主要的使用工具,包括 Numpy、Pandas 和 Scikit-learn 等。Mars 在 MaxCompute 团队内部诞生,它的主要目标就是让 Numpy、pandas 和 scikit-learn 等数据科学的库能够并行和分布式执行,支持通过 RAPIDS 平台用 GPU 加速数据科学。

背景

在数据科学世界,Python 是一个不可忽视的存在,且有愈演愈烈之势。而其中主要的使用工具,包括 Numpy、Pandas 和 Scikit-learn 等。

Numpy

Numpy 是数值计算的基础包,内部提供了多维数组(ndarray)这样一个数据结构,用户可以很方便地在任意维度上进行数值计算。

image

我们举一个蒙特卡洛方法求解 Pi 的例子。这背后的原理非常简单,现在我们有个半径为1的圆和边长为2的正方形,他们的中心都在原点。现在我们生成大量的均匀分布的点,让这些点落在正方形内,通过简单的推导,我们就可以知道,Pi 的值 = 落在圆内的点的个数 / 点的总数 * 4。

这里要注意,就是随机生成的点的个数越多,结果越精确。

用 Numpy 实现如下:

import numpy as np

N = 10 ** 7  # 1千万个点

data = np.random.uniform(-1, 1, size=(N, 2))  # 生成1千万个x轴和y轴都介于-1和1间的点
inside = (np.sqrt((data ** 2).sum(axis=1)) < 1).sum()  # 计算到原点的距离小于1的点的个数
pi = 4 * inside / N
print('pi: %.5f' % pi)
AI 代码解读

可以看到,用 Numpy 来进行数值计算非常简单,只要寥寥数行代码,而如果读者习惯了 Numpy 这种面相数组的思维方式之后,无论是代码的可读性还是执行效率都会有巨大提升。

pandas

pandas 是一个强大的数据分析和处理的工具,它其中包含了海量的 API 来帮助用户在二维数据(DataFrame)上进行分析和处理。

pandas 中的一个核心数据结构就是 DataFrame,它可以简单理解成表数据,但不同的是,它在行和列上都包含索引(Index),要注意这里不同于数据库的索引的概念,它的索引可以这么理解:当从行看 DataFrame 时,我们可以把 DataFrame 看成行索引到行数据的这么一个字典,通过行索引,可以很方便地选中一行数据;列也同理。

我们拿 movielens 的数据集 作为简单的例子,来看 pandas 是如何使用的。这里我们用的是 Movielens 20M Dataset.

import pandas as pd

ratings = pd.read_csv('ml-20m/ratings.csv')
ratings.groupby('userId').agg({'rating': ['sum', 'mean', 'max', 'min']})
AI 代码解读

通过一行简单的 pandas.read_csv 就可以读取 CSV 数据,接着按 userId 做分组聚合,求 rating 这列在每组的总和、平均、最大、最小值。

“食用“ pandas 的最佳方式,还是在 Jupyter notebook 里,以交互式的方式来分析数据,这种体验会让你不由感叹:人生苦短,我用 xx(😉)

scikit-learn

scikit-learn 是一个 Python 机器学习包,提供了大量机器学习算法,用户不需要知道算法的细节,只要通过几个简单的 high-level 接口就可以完成机器学习任务。当然现在很多算法都使用深度学习,但 scikit-learn 依然能作为基础机器学习库来串联整个流程。

我们以 K-最邻近算法为例,来看看用 scikit-learn 如何完成这个任务。

import pandas as pd
from sklearn.neighbors import NearestNeighbors

df = pd.read_csv('data.csv')  # 输入是 CSV 文件,包含 20万个向量,每个向量10个元素
nn = NearestNeighbors(n_neighbors=10)
nn.fit(df)
neighbors = nn.kneighbors(df)
AI 代码解读

fit接口就是 scikit-learn 里最常用的用来学习的接口。可以看到整个过程非常简单易懂。

Mars——Numpy、pandas 和 scikit-learn 的并行和分布式加速器

Python 数据科学栈非常强大,但它们有如下几个问题:

  1. 现在是多核时代,这几个库里鲜有操作能利用得上多核的能力。
  2. 随着深度学习的流行,用来加速数据科学的新的硬件层出不穷,这其中最常见的就是 GPU,在深度学习前序流程中进行数据处理,我们是不是也能用上 GPU 来加速呢?
  3. 这几个库的操作都是命令式的(imperative),和命令式相对应的就是声明式(declarative)。命令式的更关心 how to do,每一个操作都会立即得到结果,方便对结果进行探索,优点是很灵活;缺点则是中间过程可能占用大量内存,不能及时释放,而且每个操作之间就被割裂了,没有办法做算子融合来提升性能;那相对应的声明式就刚好相反,它更关心 what to do,它只关心结果是什么,中间怎么做并没有这么关心,典型的声明式像 SQL、TensorFlow 1.x,声明式可以等用户真正需要结果的时候才去执行,也就是 lazy evaluation,这中间过程就可以做大量的优化,因此性能上也会有更好的表现,缺点自然也就是命令式的优点,它不够灵活,调试起来比较困难。

为了解决这几个问题,Mars 被我们开发出来,Mars 在 MaxCompute 团队内部诞生,它的主要目标就是让 Numpy、pandas 和 scikit-learn 等数据科学的库能够并行和分布式执行,充分利用多核和新的硬件。

Mars 的开发过程中,我们核心关注的几点包括:

  1. 我们希望 Mars 足够简单,只要会用 Numpy、pandas 或 scikit-learn 就会用 Mars。
  2. 避免重复造轮子,我们希望能利用到这些库已有的成果,只需要能让他们被调度到多核/多机上即可。
  3. 声明式和命令式兼得,用户可以在这两者之间自由选择,灵活度和性能兼而有之。
  4. 足够健壮,生产可用,能应付各种 failover 的情况。

当然这些是我们的目标,也是我们一直努力的方向。

Mars tensor:Numpy 的并行和分布式加速器

上面说过,我们的目标之一是,只要会用 Numpy 等数据科学包,就会用 Mars。我们直接来看代码,还是以蒙特卡洛为例。变成 Mars 的代码是什么样子呢?

import mars.tensor as mt

N = 10 ** 10

data = mt.random.uniform(-1, 1, size=(N, 2))
inside = (mt.sqrt((data ** 2).sum(axis=1)) < 1).sum()
pi = (4 * inside / N).execute()
print('pi: %.5f' % pi)
AI 代码解读

可以看到,区别就只有两处:import numpy as np 变成 import mars.tensor as mt ,后续的 np. 都变成 mt. ;pi 在打印之前调用了一下 .execute() 方法。

也就是默认情况下,Mars 会按照声明式的方式,代码本身移植的代价极低,而在真正需要一个数据的时候,通过 .execute() 去触发执行。这样能最大限度得优化性能,以及减少中间过程内存消耗。

这里,我们还将数据的规模扩大了 1000 倍,来到了 100 亿个点。之前 1/1000 的数据量的时候,在我的笔记本上需要 757ms;而现在数据扩大一千倍,光 data 就需要 150G 的内存,这用 Numpy 本身根本无法完成。而使用 Mars,计算时间只需要 3min 44s,而峰值内存只需要 1G 左右。假设我们认为内存无限大,Numpy 需要的时间也就是之前的 1000 倍,大概是 12min 多,可以看到 Mars 充分利用了多核的能力,并且通过声明式的方式,极大减少了中间内存占用。

前面说到,我们试图让声明式和命令式兼得,而使用命令式的风格,只需要在代码的开始配置一个选项即可。

import mars.tensor as mt
from mars.config import options

options.eager_mode = True  # 打开 eager mode 后,每一次调用都会立即执行,行为和 Numpy 就完全一致

N = 10 ** 7

data = mt.random.uniform(-1, 1, size=(N, 2))
inside = (mt.linalg.norm(data, axis=1) < 1).sum()
pi = 4 * inside / N  # 不需要调用 .execute() 了
print('pi: %.5f' % pi.fetch())  # 目前需要 fetch() 来转成 float 类型,后续我们会加入自动转换
AI 代码解读

Mars DataFrame:pandas 的并行和分布式加速器

看过怎么样轻松把 Numpy 代码迁移到 Mars tensor ,想必读者也知道怎么迁移 pandas 代码了,同样也只有两个区别。我们还是以 movielens 的代码为例。

import mars.dataframe as md

ratings = md.read_csv('ml-20m/ratings.csv')
ratings.groupby('userId').agg({'rating': ['sum', 'mean', 'max', 'min']}).execute()
AI 代码解读

Mars Learn:scikit-learn 的并行和分布式加速器

Mars Learn 也同理,这里就不做过多阐述了。但目前 Mars learn 支持的 scikit-learn 算法还不多,我们也在努力移植的过程中,这需要大量的人力和时间,欢迎感兴趣的同学一起参与。

import mars.dataframe as md
from mars.learn.neighbors import NearestNeighbors

df = md.read_csv('data.csv')  # 输入是 CSV 文件,包含 20万个向量,每个向量10个元素
nn = NearestNeighbors(n_neighbors=10)
nn.fit(df)  # 这里 fit 的时候也会整体触发执行,因此机器学习的高层接口都是立即执行的
neighbors = nn.kneighbors(df).fetch()  # kneighbors 也已经触发执行,只需要 fetch 数据
AI 代码解读

这里要注意的是,对于机器学习的 fitpredict 等高层接口,Mars Learn 也会立即触发执行,以保证语义的正确性。

RAPIDS:GPU 上的数据科学

相信细心的观众已经发现,GPU 好像没有被提到。不要着急,这就要说到 RAPIDS

在之前,虽然 CUDA 已经将 GPU 编程的门槛降到相当低的一个程度了,但对于数据科学家们来说,在 GPU 上处理 Numpy、pandas 等能处理的数据无异于天方夜谭。幸运的是,NVIDIA 开源了 RAPIDS 数据科学平台,它和 Mars 的部分思想高度一致,即使用简单的 import 替换,就可以将 Numpy、pandas 和 scikit-learn 的代码移植到 GPU 上。

image

其中,RAPIDS cuDF 用来加速 pandas,而 RAPIDS cuML 用来加速 scikit-learn。

对于 Numpy 来说,CuPy 已经很好地支持用 GPU 来加速了,这样 RAPIDS 也得以把重心放在数据科学的其他部分。

CuPy:用 GPU 加速 Numpy

还是蒙特卡洛求解 Pi。

import cupy as cp
 
N = 10 ** 7

data = cp.random.uniform(-1, 1, size=(N, 2))
inside = (cp.sqrt((data ** 2).sum(axis=1)) < 1).sum()
pi = 4 * inside / N
print('pi: %.5f' % pi)
AI 代码解读

在我的测试中,它将 CPU 的 757ms,降到只有 36ms,提升超过 20 倍,可以说效果非常显著。这正是得益于 GPU 非常适合计算密集型的任务。

RAPIDS cuDF:用 GPU 加速 pandas

import pandas as pd 替换成 import cudf,GPU 内部如何并行,CUDA 编程这些概念,用户都不再需要关心。

import cudf

ratings = cudf.read_csv('ml-20m/ratings.csv')
ratings.groupby('userId').agg({'rating': ['sum', 'mean', 'max', 'min']})
AI 代码解读

运行时间从 CPU 上的 18s 提升到 GPU 上的 1.66s,提升超过 10 倍。

RAPIDS cuML:用 GPU 加速 scikit-learn

同样是 k-最邻近问题。

import cudf
from cuml.neighbors import NearestNeighbors

df = cudf.read_csv('data.csv')
nn = NearestNeighbors(n_neighbors=10)
nn.fit(df)
neighbors = nn.kneighbors(df)
AI 代码解读

运行时间从 CPU 上 1min52s,提升到 GPU 上 17.8s。

Mars 和 RAPIDS 结合能带来什么?

RAPIDS 将 Python 数据科学带到了 GPU,极大地提升了数据科学的运行效率。它们和 Numpy 等一样,是命令式的。通过和 Mars 结合,中间过程将会使用更少的内存,这使得数据处理量更大;Mars 也可以将计算分散到多机多卡,以提升数据规模和计算效率。

在 Mars 里使用 GPU 也很简单,只需要在对应函数上指定 gpu=True。例如创建 tensor、读取 CSV 文件等都适用。

import mars.tensor as mt
import mars.dataframe as md

a = mt.random.uniform(-1, 1, size=(1000, 1000), gpu=True)
df = md.read_csv('ml-20m/ratings.csv', gpu=True)
AI 代码解读

下图是用 Mars 分别在 Scale up 和 Scale out 两个维度上加速蒙特卡洛计算 Pi 这个任务。一般来说,我们要加速一个数据科学任务,可以有这两种方式,Scale up 是指可以使用更好的硬件,比如用更好的 CPU、更大的内存、使用 GPU 替代 CPU等;Scale out 就是指用更多的机器,用分布式的方式提升效率。

image

可以看到在一台 24 核的机器上,Mars 计算需要 25.8s,而通过分布式的方式,使用 4 台 24 核的机器的机器几乎以线性的时间提升。而通过使用一个 NVIDIA TESLA V100 显卡,我们就能将单机的运行时间提升到 3.98s,这已经超越了4台 CPU 机器的性能。通过再将单卡拓展到多卡,时间进一步降低,但这里也可以看到,时间上很难再线性扩展了,这是因为 GPU 的运行速度提升巨大,这个时候网络、数据拷贝等的开销就变得明显。

性能测试

我们使用了 https://github.com/h2oai/db-benchmark 的数据集,测试了三个数据规模的 groupby 和 一个数据规模的 join。而我们主要对比了 pandas 和 DASK。DASK 和 Mars 的初衷很类似,也是试图并行和分布式化 Python 数据科学,但它们的设计、实现、分布式都存在较多差异,这个后续我们再撰文进行详细对比。

测试机器配置是 500G 内存、96 核、NVIDIA V100 显卡。Mars 和 DASK 在 GPU 上都使用 RAPIDS 执行计算。

Groupby

数据有三个规模,分别是 500M、5G 和 20G。

查询也有三组。

查询一

df = read_csv('data.csv')
df.groupby('id1').agg({'v1': 'sum'})
AI 代码解读

查询二

df = read_csv('data.csv')
df.groupby(['id1', 'id2']).agg({'v1': 'sum'})
AI 代码解读

查询三

df = read_csv('data.csv')
df.gropuby(['id6']).agg({'v1': 'sum', 'v2': 'sum', 'v3': 'sum'})
AI 代码解读

数据大小 500M,性能结果

image

数据大小 5G,性能结果

image

数据大小 20G,性能结果

image

数据大小到 20G 时,pandas 在查询2会内存溢出,得不出结果。

可以看到,随着数据增加,Mars 的性能优势会愈发明显。

得益于 GPU 的计算能力,GPU 运算性能相比于 CPU 都有数倍的提升。如果单纯使用 RAPIDS cuDF,由于显存大小的限制,数据来到 5G 都难以完成,而由于 Mars 的声明式的特点,中间过程对显存的使用大幅得到优化,所以整组测试来到 20G 都能轻松完成。这正是 Mars + RAPIDS 所能发挥的威力。

Join

测试查询:

x = read_csv('x.csv')
y = read_csv('y.csv')
x.merge(y, on='id1')
AI 代码解读

测试数据 x 为500M,y 包含10行数据。

image

总结

RAPIDS 将 Python 数据科学带到了 GPU,极大提升了数据分析和处理的效率。Mars 的注意力更多放在并行和分布式。相信这两者的结合,在未来会有更多的想象空间。

Mars 诞生于 MaxCompute 团队,MaxCompute 原名 ODPS,是一种快速、完全托管的EB级数据仓库解决方案。Mars 即将通过 MaxCompute 提供服务,购买了 MaxCompute 服务的用户届时可以开箱即用体验 Mars 服务。敬请期待。

如果对 Mars 感兴趣,可以关注 Mars 团队专栏,或者钉钉扫二维码加入 Mars 讨论群。

IMG_8215

相关实践学习
部署Stable Diffusion玩转AI绘画(GPU云服务器)
本实验通过在ECS上从零开始部署Stable Diffusion来进行AI绘画创作,开启AIGC盲盒。
目录
打赏
0
0
0
1
78980
分享
相关文章
推荐场景GPU优化的探索与实践:CUDA Graph与多流并行的比较与分析
RTP 系统(即 Rank Service),是一个面向搜索和推荐的 ranking 需求,支持多种模型的在线 inference 服务,是阿里智能引擎团队沉淀多年的技术产品。今年,团队在推荐场景的GPU性能优化上又做了新尝试——在RTP上集成了Multi Stream,改变了TensorFlow的单流机制,让多流的执行并行,作为增加GPU并行度的另一种选择。本文详细介绍与比较了CUDA Graph与多流并行这两个方案,以及团队的实践成果与心得。
exo:22.1K Star!一个能让任何人利用日常设备构建AI集群的强大工具,组成一个虚拟GPU在多台设备上并行运行模型
exo 是一款由 exo labs 维护的开源项目,能够让你利用家中的日常设备(如 iPhone、iPad、Android、Mac 和 Linux)构建强大的 AI 集群,支持多种大模型和分布式推理。
455 100
DeepSeek开源周第四弹之二!EPLB:专为V3/R1设计的专家并行负载均衡器,让GPU利用率翻倍!
EPLB 是 DeepSeek 推出的专家并行负载均衡器,通过冗余专家策略和负载均衡算法,优化大规模模型训练中的 GPU 资源利用率和训练效率。
74 1
DeepSeek开源周第四弹之二!EPLB:专为V3/R1设计的专家并行负载均衡器,让GPU利用率翻倍!
关于实现Halcon算法加速的基础知识(2)(多核并行/GPU)
关于实现Halcon算法加速的基础知识(多核并行/GPU)
3521 0
关于实现Halcon算法加速的基础知识(2)(多核并行/GPU)
GPU(图形处理单元)因其强大的并行计算能力而备受关注。与传统的CPU相比,GPU在处理大规模数据密集型任务时具有显著的优势。
GPU(图形处理单元)因其强大的并行计算能力而备受关注。与传统的CPU相比,GPU在处理大规模数据密集型任务时具有显著的优势。
【多GPU炼丹-绝对有用】PyTorch多GPU并行训练:深度解析与实战代码指南
本文介绍了PyTorch中利用多GPU进行深度学习的三种策略:数据并行、模型并行和两者结合。通过`DataParallel`实现数据拆分、模型不拆分,将数据批次在不同GPU上处理;数据不拆分、模型拆分则将模型组件分配到不同GPU,适用于复杂模型;数据和模型都拆分,适合大型模型,使用`DistributedDataParallel`结合`torch.distributed`进行分布式训练。代码示例展示了如何在实践中应用这些策略。
2242 2
【多GPU炼丹-绝对有用】PyTorch多GPU并行训练:深度解析与实战代码指南
极智AI | 谈谈GPU并行推理的几个方式
大家好,我是极智视界,本文主要聊一下 GPU 并行推理的几个方式。
532 0
关于实现Halcon算法加速的基础知识(1)(多核并行/GPU)
关于实现Halcon算法加速的基础知识(多核并行/GPU)
1351 0
关于实现Halcon算法加速的基础知识(1)(多核并行/GPU)
2025年阿里云GPU服务器租用价格、选型策略与应用场景详解
随着AI与高性能计算需求的增长,阿里云提供了多种GPU实例,如NVIDIA V100、A10、T4等,适配不同场景。2025年重点实例中,V100实例GN6v单月3830元起,适合大规模训练;A10实例GN7i单月3213.99元起,适用于混合负载。计费模式有按量付费和包年包月,后者成本更低。针对AI训练、图形渲染及轻量级推理等场景,推荐不同配置以优化成本和性能。阿里云还提供抢占式实例、ESSD云盘等资源优化策略,支持eRDMA网络加速和倚天ARM架构,助力企业在2025年实现智能计算的效率与成本最优平衡。 (该简介为原文内容的高度概括,符合要求的字符限制。)
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等