用 Mars Remote API 轻松分布式执行 Python 函数

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: Mars 是一个并行和分布式 Python 框架,能轻松把单机大家耳熟能详的的 numpy、pandas、scikit-learn 等库,以及 Python 函数利用多核或者多机加速。这其中,并行和分布式 Python 函数主要利用 Mars Remote API。

Mars 是一个并行和分布式 Python 框架,能轻松把单机大家耳熟能详的的 numpy、pandas、scikit-learn 等库,以及 Python 函数利用多核或者多机加速。这其中,并行和分布式 Python 函数主要利用 Mars Remote API。

启动 Mars 分布式环境可以参考:

  1. 命令行方式在集群中部署
  2. Kubernetes 中部署
  3. MaxCompute 开箱即用的环境,购买了 MaxCompute 服务的可以直接使用。

如何使用 Mars Remote API

使用 Mars Remote API 非常简单,只需要对原有的代码做少许改动,就可以分布式执行。

拿用蒙特卡洛方法计算 π 为例。代码如下,我们编写了两个函数,calc_chunk 用来计算每个分片内落在圆内的点的个数,calc_pi 用来把多个分片 calc_chunk 计算的结果汇总最后得出 π 值。

from typing import List
import numpy as np

def calc_chunk(n: int, i: int):
    # 计算n个随机点(x和y轴落在-1到1之间)到原点距离小于1的点的个数
    rs = np.random.RandomState(i)
    a = rs.uniform(-1, 1, size=(n, 2))
    d = np.linalg.norm(a, axis=1)
    return (d < 1).sum()

def calc_pi(fs: List[int], N: int):
    # 将若干次 calc_chunk 计算的结果汇总,计算 pi 的值
    return sum(fs) * 4 / N

N = 200_000_000
n = 10_000_000

fs = [calc_chunk(n, i)
      for i in range(N // n)]
pi = calc_pi(fs, N)
print(pi)

%%time 下可以看到结果:

3.1416312
CPU times: user 9.47 s, sys: 2.62 s, total: 12.1 s
Wall time: 12.3 s

在单机需要 12.3 s。

要让这个计算使用 Mars Remote API 并行起来,我们不需要对函数做任何改动,需要变动的仅仅是最后部分。

import mars.remote as mr

# 函数调用改成 mars.remote.spawn
fs = [mr.spawn(calc_chunk, args=(n, i))
      for i in range(N // n)]
# 把 spawn 的列表传入作为参数,再 spawn 新的函数
pi = mr.spawn(calc_pi, args=(fs, N))
# 通过 execute() 触发执行,fetch() 获取结果
print(pi.execute().fetch())

%%time 下看到结果:

3.1416312
CPU times: user 29.6 ms, sys: 4.23 ms, total: 33.8 ms
Wall time: 2.85 s

结果一模一样,但是却有数倍的性能提升。

可以看到,对已有的 Python 代码,Mars remote API 几乎不需要做多少改动,就能有效并行和分布式来加速执行过程。

一个例子

为了让读者理解 Mars Remote API 的作用,我们从另一个例子开始。现在我们有一个数据集,我们希望对它们做一个分类任务。要做分类,我们有很多算法和库可以选择,这里我们用 RandomForest、LogisticRegression,以及 XGBoost。

困难的地方是,除了有多个模型选择,这些模型也会包含多个超参,那哪个超参效果最好呢?对于调参不那么有经验的同学,跑过了才知道。所以,我们希望能生成一堆可选的超参,然后把他们都跑一遍,看看效果。

准备数据

这个例子里我们使用 otto 数据集

首先,我们准备数据。读取数据后,我们按 2:1 的比例把数据分成训练集和测试集。

import pandas as pd
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split

def gen_data():
    df = pd.read_csv('otto/train.csv')
    
    X = df.drop(['target', 'id'], axis=1)
    y = df['target']
    
    label_encoder = LabelEncoder()
    label_encoder.fit(y)
    y = label_encoder.transform(y)
    
    return train_test_split(X, y, test_size=0.33, random_state=123)

X_train, X_test, y_train, y_test = gen_data()

模型

接着,我们使用 scikit-learn 的 RandomForest 和 LogisticRegression 来处理分类。

RandomForest:

from sklearn.ensemble import RandomForestClassifier

def random_forest(X_train: pd.DataFrame, 
                  y_train: pd.Series, 
                  verbose: bool = False,
                  **kw):
    model = RandomForestClassifier(verbose=verbose, **kw)
    model.fit(X_train, y_train)
    return model

接着,我们生成供 RandomForest 使用的超参,我们用 yield 的方式来迭代返回。

def gen_random_forest_parameters():
    for n_estimators in [50, 100, 600]:
        for max_depth in [None, 3, 15]:
            for criterion in ['gini', 'entropy']:
                yield {
                    'n_estimators': n_estimators,
                    'max_depth': max_depth,
                    'criterion': criterion
                }

LogisticRegression 也是这个过程。我们先定义模型。

from sklearn.linear_model import LogisticRegression

def logistic_regression(X_train: pd.DataFrame,
                        y_train: pd.Series,
                        verbose: bool = False,
                        **kw):
    model = LogisticRegression(verbose=verbose, **kw)
    model.fit(X_train, y_train)
    return model

接着生成供 LogisticRegression 使用的超参。

def gen_lr_parameters():
    for penalty in ['l2', 'none']:
        for tol in [0.1, 0.01, 1e-4]:
            yield {
                'penalty': penalty,
                'tol': tol
            }

XGBoost 也是一样,我们用 XGBClassifier 来执行分类任务。

from xgboost import XGBClassifier

def xgb(X_train: pd.DataFrame,
        y_train: pd.Series,
        verbose: bool = False,
        **kw):
    model = XGBClassifier(verbosity=int(verbose), **kw)
    model.fit(X_train, y_train)
    return model

生成一系列超参。

def gen_xgb_parameters():
    for n_estimators in [100, 600]:
        for criterion in ['gini', 'entropy']:
            for learning_rate in [0.001, 0.1, 0.5]:
                yield {
                    'n_estimators': n_estimators,
                    'criterion': criterion,
                    'learning_rate': learning_rate
                }

验证

接着我们编写验证逻辑,这里我们使用 log_loss 来作为评价函数。

from sklearn.metrics import log_loss

def metric_model(model, 
                 X_test: pd.DataFrame,
                 y_test: pd.Series) -> float:
    if isinstance(model, bytes):
        model = pickle.loads(model)
    y_pred = model.predict_proba(X_test)
    return log_loss(y_test, y_pred)


def train_and_metric(train_func,
                     train_params: dict,
                     X_train: pd.DataFrame, 
                     y_train: pd.Series, 
                     X_test: pd.DataFrame, 
                     y_test: pd.Series,
                     verbose: bool = False
                     ):
    # 把训练和验证封装到一起
    model = train_func(X_train, y_train, verbose=verbose, **train_params)
    metric = metric_model(model, X_test, y_test)
    return model, metric

找出最好的模型

做好准备工作后,我们就开始来跑模型了。针对每个模型,我们把每次生成的超参们送进去训练,除了这些超参,我们还把 n_jobs 设成 -1,这样能更好利用单机的多核。

results = []

# -------------
# Random Forest
# -------------

for params in gen_random_forest_parameters():
    print(f'calculating on {params}')
    # fixed random_state
    params['random_state'] = 123
    # use all CPU cores
    params['n_jobs'] = -1
    model, metric = train_and_metric(random_forest, params,
                                     X_train, y_train,
                                     X_test, y_test)
    print(f'metric: {metric}')
    results.append({'model': model, 
                    'metric': metric})
    
# -------------------
# Logistic Regression
# -------------------

for params in gen_lr_parameters():
    print(f'calculating on {params}')
    # fixed random_state
    params['random_state'] = 123
    # use all CPU cores
    params['n_jobs'] = -1
    model, metric = train_and_metric(logistic_regression, params,
                                     X_train, y_train,
                                     X_test, y_test)
    print(f'metric: {metric}')
    results.append({'model': model, 
                    'metric': metric})
    
# -------
# XGBoost
# -------
    
for params in gen_xgb_parameters():
    print(f'calculating on {params}')
    # fixed random_state
    params['random_state'] = 123
    # use all CPU cores
    params['n_jobs'] = -1
    model, metric = train_and_metric(xgb, params,
                                     X_train, y_train,
                                     X_test, y_test)
    print(f'metric: {metric}')
    results.append({'model': model, 
                    'metric': metric})

运行一下,需要相当长时间,我们省略掉一部分输出内容。

calculating on {'n_estimators': 50, 'max_depth': None, 'criterion': 'gini'}
metric: 0.6964123781828575
calculating on {'n_estimators': 50, 'max_depth': None, 'criterion': 'entropy'}
metric: 0.6912312790832288
# 省略其他模型的输出结果
CPU times: user 3h 41min 53s, sys: 2min 34s, total: 3h 44min 28s
Wall time: 31min 44s

从 CPU 时间和 Wall 时间,能看出来这些训练还是充分利用了多核的性能。但整个过程还是花费了 31 分钟。

使用 Remote API 分布式加速

现在我们尝试使用 Remote API 通过分布式方式加速整个过程。

集群方面,我们使用最开始说的第三种方式,直接在 MaxCompute 上拉起一个集群。大家可以选择其他方式,效果是一样的。

n_cores = 8
mem = 2 * n_cores  # 16G
# o 是 MaxCompute 入口,这里创建 10 个 worker 的集群,每个 worker 8核16G
cluster = o.create_mars_cluster(10, n_cores, mem, image='extended')

为了方便在分布式读取数据,我们对数据处理稍作改动,把数据上传到 MaxCompute 资源。对于其他环境,用户可以考虑 HDFS、Aliyun OSS 或者 Amazon S3 等存储。

if not o.exist_resource('otto_train.csv'):
    with open('otto/train.csv') as f:
        # 上传资源
        o.create_resource('otto_train.csv', 'file', fileobj=f)
        
def gen_data():
    # 改成从资源读取
    df = pd.read_csv(o.open_resource('otto_train.csv'))
    
    X = df.drop(['target', 'id'], axis=1)
    y = df['target']
    
    label_encoder = LabelEncoder()
    label_encoder.fit(y)
    y = label_encoder.transform(y)
    
    return train_test_split(X, y, test_size=0.33, random_state=123)

稍作改动之后,我们使用 mars.remote.spawn 方法来让 gen_data 调度到集群上运行。

import mars.remote as mr

# n_output 说明是 4 输出
# execute() 执行后,数据会读取到 Mars 集群内部
data = mr.ExecutableTuple(mr.spawn(gen_data, n_output=4)).execute()
# remote_ 开头的都是 Mars 对象,这时候数据在集群内,这些对象只是引用
remote_X_train, remote_X_test, remote_y_train, remote_y_test = data

目前 Mars 能正确序列化 numpy ndarray、pandas DataFrame 等,还不能序列化模型,所以,我们要对 train_and_metric 稍作改动,把模型 pickle 了之后再返回。

def distributed_train_and_metric(train_func,
                                 train_params: dict,
                                 X_train: pd.DataFrame, 
                                 y_train: pd.Series, 
                                 X_test: pd.DataFrame, 
                                 y_test: pd.Series,
                                 verbose: bool = False
                                 ):
    model, metric = train_and_metric(train_func, train_params,
                                     X_train, y_train, 
                                     X_test, y_test, verbose=verbose)
    return pickle.dumps(model), metric

后续 Mars 支持了序列化模型后可以直接 spawn 原本的函数。

接着我们就对前面的执行过程稍作改动,把函数调用全部都用 mars.remote.spawn 来改写。

import numpy as np

tasks = []
models = []
metrics = []

# -------------
# Random Forest
# -------------

for params in gen_random_forest_parameters():
    # fixed random_state
    params['random_state'] = 123
    task = mr.spawn(distributed_train_and_metric,
        args=(random_forest, params,
              remote_X_train, remote_y_train,
              remote_X_test, remote_y_test), 
        kwargs={'verbose': 2},
        n_output=2
    )
    tasks.extend(task)
    # 把模型和评价分别存储
    models.append(task[0])
    metrics.append(task[1])
    
    
# -------------------
# Logistic Regression
# -------------------

for params in gen_lr_parameters():
    # fixed random_state
    params['random_state'] = 123
    task = mr.spawn(distributed_train_and_metric,
        args=(logistic_regression, params,
              remote_X_train, remote_y_train,
              remote_X_test, remote_y_test), 
        kwargs={'verbose': 2},
        n_output=2
    )
    tasks.extend(task)
    # 把模型和评价分别存储
    models.append(task[0])
    metrics.append(task[1])

# -------
# XGBoost
# -------
    
for params in gen_xgb_parameters():
    # fixed random_state
    params['random_state'] = 123
    # 再指定并发为核的个数
    params['n_jobs'] = n_cores
    task = mr.spawn(distributed_train_and_metric,
        args=(xgb, params,
              remote_X_train, remote_y_train,
              remote_X_test, remote_y_test), 
        kwargs={'verbose': 2},
        n_output=2
    )
    tasks.extend(task)
    # 把模型和评价分别存储
    models.append(task[0])
    metrics.append(task[1])


# 把顺序打乱,目的是能分散到 worker 上平均一点
shuffled_tasks = np.random.permutation(tasks)
_ = mr.ExecutableTuple(shuffled_tasks).execute()

可以看到代码几乎一致。

运行查看结果:

CPU times: user 69.1 ms, sys: 10.9 ms, total: 80 ms
Wall time: 1min 59s

时间一下子从 31 分钟多来到了 2 分钟,提升 15x+。但代码修改的代价可以忽略不计。

细心的读者可能注意到了,分布式运行的代码中,我们把模型的 verbose 给打开了,在分布式环境下,因为这些函数远程执行,打印的内容只会输出到 worker 的标准输出流,我们在客户端不会看到打印的结果,但 Mars 提供了一个非常有用的接口来让我们查看每个模型运行时的输出。

以第0个模型为例,我们可以在 Mars 对象上直接调用 fetch_log 方法。

print(models[0].fetch_log())

输出我们简略一部分。

building tree 1 of 50
building tree 2 of 50
building tree 3 of 50
building tree 4 of 50
building tree 5 of 50
building tree 6 of 50
# 中间省略
building tree 49 of 50
building tree 50 of 50

要看哪个模型都可以通过这种方式。试想下,如果没有 fetch_log API,你确想看中间过程的输出有多麻烦。首先这个函数在哪个 worker 上执行,不得而知;然后,即便知道是哪个 worker,因为每个 worker 上可能有多个函数执行,这些输出就可能混杂在一起,甚至被庞大日志淹没了。fetch_log 接口让用户不需要关心在哪个 worker 上执行,也不用担心日志混合在一起。

想要了解 fetch_log 接口,可以查看 文档

还有更多

Mars Remote API 的能力其实不止这些,举个例子,在 remote 内部可以 spawn 新的函数;也可以调用 Mars tensor、DataFrame 或者 learn 的算法。这些内容,读者们可以先行探索,后续我们再写别的文章介绍。

总结

Mars Remote API 通过并行和分布式 Python 函数,用很小的修改代价,极大提升了执行效率。

对 Mars 项目感兴趣的读者们,欢迎 star Github 项目,以及订阅我们的专栏。

联系我们

除了可以在 Github Issues 和我们联系,也可以加入钉钉群 32697156 和我们交流。

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
16天前
|
分布式计算 DataWorks 大数据
分布式Python计算服务MaxFrame测评
一文带你入门分布式Python计算服务MaxFrame
67 22
分布式Python计算服务MaxFrame测评
|
11天前
|
分布式计算 DataWorks 数据处理
产品测评 | 上手分布式Python计算服务MaxFrame产品最佳实践
MaxFrame是阿里云自研的分布式计算框架,专为大数据处理设计,提供高效便捷的Python开发体验。其主要功能包括Python编程接口、直接利用MaxCompute资源、与MaxCompute Notebook集成及镜像管理功能。本文基于MaxFrame最佳实践,详细介绍了在DataWorks中使用MaxFrame创建数据源、PyODPS节点和MaxFrame会话的过程,并展示了如何通过MaxFrame实现分布式Pandas处理和大语言模型数据处理。测评反馈指出,虽然MaxFrame具备强大的数据处理能力,但在文档细节和新手友好性方面仍有改进空间。
|
19天前
|
数据采集 人工智能 分布式计算
🚀 MaxFrame 产品深度体验评测:Python 分布式计算的未来
在数据驱动的时代,大数据分析和AI模型训练对数据预处理的效率要求极高。传统的Pandas工具在小数据集下表现出色,但面对大规模数据时力不从心。阿里云推出的Python分布式计算框架MaxFrame,以“Pandas风格”为核心设计理念,旨在降低分布式计算门槛,同时支持超大规模数据处理。MaxFrame不仅保留了Pandas的操作习惯,还通过底层优化实现了高效的分布式调度、内存管理和容错机制,并深度集成阿里云大数据生态。本文将通过实践评测,全面解析MaxFrame的能力与价值,展示其在大数据和AI场景中的卓越表现。
41 4
🚀 MaxFrame 产品深度体验评测:Python 分布式计算的未来
|
7天前
|
人工智能 分布式计算 大数据
MaxFrame 产品评测:大数据与AI融合的Python分布式计算框架
MaxFrame是阿里云MaxCompute推出的自研Python分布式计算框架,支持大规模数据处理与AI应用。它提供类似Pandas的API,简化开发流程,并兼容多种机器学习库,加速模型训练前的数据准备。MaxFrame融合大数据和AI,提升效率、促进协作、增强创新能力。尽管初次配置稍显复杂,但其强大的功能集、性能优化及开放性使其成为现代企业与研究机构的理想选择。未来有望进一步简化使用门槛并加强社区建设。
43 7
|
5天前
|
SQL 分布式计算 数据处理
云产品评测|分布式Python计算服务MaxFrame | 在本地环境中使用MaxFrame + 基于MaxFrame实现大语言模型数据处理
本文基于官方文档,介绍了由浅入深的两个部分实操测试,包括在本地环境中使用MaxFrame & 基于MaxFrame实现大语言模型数据处理,对步骤有详细说明。体验下来对MaxCompute的感受是很不错的,值得尝试并使用!
29 1
|
14天前
|
SQL 分布式计算 DataWorks
MaxCompute MaxFrame评测 | 分布式Python计算服务MaxFrame(完整操作版)
在当今数字化迅猛发展的时代,数据信息的保存与分析对企业决策至关重要。MaxCompute MaxFrame是阿里云自研的分布式计算框架,支持Python编程接口、兼容Pandas接口并自动进行分布式计算。通过MaxCompute的海量计算资源,企业可以进行大规模数据处理、可视化数据分析及科学计算等任务。本文将详细介绍如何开通MaxCompute和DataWorks服务,并使用MaxFrame进行数据操作。包括创建项目、绑定数据源、编写PyODPS 3节点代码以及执行SQL查询等内容。最后,针对使用过程中遇到的问题提出反馈建议,帮助用户更好地理解和使用MaxFrame。
|
14天前
|
数据采集 供应链 API
Python爬虫与1688图片搜索API接口:深度解析与显著收益
在电子商务领域,数据是驱动业务决策的核心。阿里巴巴旗下的1688平台作为全球领先的B2B市场,提供了丰富的API接口,特别是图片搜索API(`item_search_img`),允许开发者通过上传图片搜索相似商品。本文介绍如何结合Python爬虫技术高效利用该接口,提升搜索效率和用户体验,助力企业实现自动化商品搜索、库存管理优化、竞品监控与定价策略调整等,显著提高运营效率和市场竞争力。
53 3
|
15天前
|
Python
[oeasy]python057_如何删除print函数_dunder_builtins_系统内建模块
本文介绍了如何删除Python中的`print`函数,并探讨了系统内建模块`__builtins__`的作用。主要内容包括: 1. **回忆上次内容**:上次提到使用下划线避免命名冲突。 2. **双下划线变量**:解释了双下划线(如`__name__`、`__doc__`、`__builtins__`)是系统定义的标识符,具有特殊含义。
26 3
|
15天前
|
分布式计算 数据处理 MaxCompute
云产品评测|分布式Python计算服务MaxFrame
云产品评测|分布式Python计算服务MaxFrame
51 2
|
22天前
|
JavaScript API C#
【Azure Developer】Python代码调用Graph API将外部用户添加到组,结果无效,也无错误信息
根据Graph API文档,在单个请求中将多个成员添加到组时,Python代码示例中的`members@odata.bind`被错误写为`members@odata_bind`,导致用户未成功添加。
42 10