技术评测:MaxCompute MaxFrame——阿里云自研分布式计算框架的Python编程接口

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 随着大数据和人工智能技术的发展,数据处理的需求日益增长。阿里云推出的MaxCompute MaxFrame(简称“MaxFrame”)是一个专为Python开发者设计的分布式计算框架,它不仅支持Python编程接口,还能直接利用MaxCompute的云原生大数据计算资源和服务。本文将通过一系列最佳实践测评,探讨MaxFrame在分布式Pandas处理以及大语言模型数据处理场景中的表现,并分析其在实际工作中的应用潜力。

引言

随着大数据和人工智能技术的发展,数据处理的需求日益增长。阿里云推出的MaxCompute MaxFrame(简称“MaxFrame”)是一个专为Python开发者设计的分布式计算框架,它不仅支持Python编程接口,还能直接利用MaxCompute的云原生大数据计算资源和服务。本文将通过一系列最佳实践测评,探讨MaxFrame在分布式Pandas处理以及大语言模型数据处理场景中的表现,并分析其在实际工作中的应用潜力。
1111.png


一、MaxFrame产品最佳实践测评

1.1 分布式Pandas处理的最佳实践

1111.png

环境准备

为了实现基于MaxFrame的分布式Pandas处理,首先需要确保环境已经正确配置了MaxCompute服务,并安装了必要的Python库。以下是环境搭建的基本步骤:

# 安装maxcompute-python-sdk
pip install pyodps

# 安装其他依赖项如pandas等
pip install pandas numpy


##### 数据准备
1.在安装了MaxFrame的Python环境下运行如下脚本,准备测试表和测试数据。
```bash
from odps import ODPS
from maxframe.session import new_session
import maxframe.dataframe as md
import pandas as pd
import os

o = ODPS(
    # 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID,
    # ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret,
    # 不建议直接使用AccessKey ID和 AccessKey Secret字符串。
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
    project='your-default-project',
    endpoint='your-end-point',
)

data_sets = [{
   
    "table_name": "product",
    "table_schema" : "index bigint, product_id bigint, product_name string, current_price bigint",
    "source_type": "records",
    "records" : [
        [1, 100, 'Nokia', 1000],
        [2, 200, 'Apple', 5000],
        [3, 300, 'Samsung', 9000]
    ],
},
{
   
    "table_name" : "sales",
    "table_schema" : "index bigint, sale_id bigint, product_id bigint, user_id bigint, year bigint, quantity bigint, price bigint",
    "source_type": "records",
    "records" : [
        [1, 1, 100, 101, 2008, 10, 5000],
        [2, 2, 300, 101, 2009, 7, 4000],
        [3, 4, 100, 102, 2011, 9, 4000],
        [4, 5, 200, 102, 2013, 6, 6000],
        [5, 8, 300, 102, 2015, 10, 9000],
        [6, 9, 100, 102, 2015, 6, 2000]
    ],
    "lifecycle": 5
}]

def prepare_data(o: ODPS, data_sets, suffix="", drop_if_exists=False):
    for index, data in enumerate(data_sets):
        table_name = data.get("table_name")
        table_schema = data.get("table_schema")
        source_type = data.get("source_type")

        if not table_name or not table_schema or not source_type:
            raise ValueError(f"Dataset at index {index} is missing one or more required keys: 'table_name', 'table_schema', or 'source_type'.")

        lifecycle = data.get("lifecycle", 5)
        table_name += suffix

        print(f"Processing {table_name}...")
        if drop_if_exists:
            print(f"Deleting {table_name}...")
            o.delete_table(table_name, if_exists=True)

        o.create_table(name=table_name, table_schema=table_schema, lifecycle=lifecycle, if_not_exists=True)

        if source_type == "local_file":
            file_path = data.get("file")
            if not file_path:
                raise ValueError(f"Dataset at index {index} with source_type 'local_file' is missing the 'file' key.")
            sep = data.get("sep", ",")
            pd_df = pd.read_csv(file_path, sep=sep)
            ODPSDataFrame(pd_df).persist(table_name, drop_table=True)
        elif source_type == 'records':
            records = data.get("records")
            if not records:
                raise ValueError(f"Dataset at index {index} with source_type 'records' is missing the 'records' key.")
            with o.get_table(table_name).open_writer() as writer:
                writer.write(records)
        else:
            raise ValueError(f"Unknown data set source_type: {source_type}")

        print(f"Processed {table_name} Done")

prepare_data(o, data_sets, "_maxframe_demo", True)

##### 使用MaxFrame进行分布式处理
现在我们来展示如何使用MaxFrame执行分布式操作。以下代码片段展示了如何加载数据到MaxFrame中并执行一些基本的操作,例如过滤和聚合。

```python
from odps import ODPS
from odps.df import DataFrame

# 初始化ODPS客户端
odps = ODPS('<your-access-id>', '<your-secret-access-key>', '<your-project>', endpoint='<your-endpoint>')

# 将本地pandas DataFrame转换为MaxCompute DataFrame
max_df = DataFrame(df)

# 执行分布式过滤操作
filtered_df = max_df[max_df['value'] > 0.5]

# 执行分布式聚合操作
aggregated_df = filtered_df.groupby('id').agg({
   'value': 'sum'})

# 将结果转换回pandas DataFrame查看
result = aggregated_df.to_pandas()
print(result)
性能评估

为了评估MaxFrame在分布式Pandas处理方面的性能,我们可以通过比较相同任务在单机环境下的执行时间和在MaxFrame上的执行时间来进行对比。通常情况下,对于大规模数据集,MaxFrame能够显著缩短处理时间。

1.2 大语言模型数据处理的最佳实践

数据预处理

在训练大型语言模型时,数据预处理是非常重要的一步。MaxFrame可以帮助加速这一过程,特别是当处理海量文本数据时。下面的例子展示了如何使用MaxFrame清洗和格式化文本数据以供后续训练使用。

# 假设有一个包含文本数据的大表
text_data = odps.get_table('large_text_corpus')

# 使用MaxFrame读取表格内容
text_df = DataFrame(text_data)

# 对文本进行初步清理(去除HTML标签、特殊字符等)
cleaned_text_df = text_df.map(lambda row: (row.id, clean_html(row.text)), schema='id string, cleaned_text string')

# 存储清理后的数据到新表中
cleaned_text_df.persist('cleaned_large_text_corpus')

参数说明:

ALIBABA_CLOUD_ACCESS_KEY_ID:需将该环境变量设置为具备目标MaxCompute项目中待操作对象相关MaxCompute权限的AccessKey ID。您可以进入AccessKey管理页面获取AccessKey ID。
ALIBABA_CLOUD_ACCESS_KEY_SECRET:需将该环境变量设置为AccessKey ID对应的AccessKey Secret。
your-default-project:使用的MaxCompute项目名称。您可以登录MaxCompute控制台,在左侧导航栏选择工作区>项目管理,查看MaxCompute项目名称。
your-end-point:目标MaxCompute项目所在地域的Endpoint,可根据网络连接方式自行选择,例如http://service.cn-chengdu.maxcompute.aliyun.com/api。详情请参见Endpoint。
2.查询sales_maxframe_demo表和product_maxframe_demo表的数据,SQL命令如下。

--查询sales_maxframe_demo表
SELECT * FROM sales_maxframe_demo;

--返回
+------------+------------+------------+------------+------------+------------+------------+
| index      | sale_id    | product_id | user_id    | year       | quantity   | price      |
+------------+------------+------------+------------+------------+------------+------------+
| 1          | 1          | 100        | 101        | 2008       | 10         | 5000       |
| 2          | 2          | 300        | 101        | 2009       | 7          | 4000       |
| 3          | 4          | 100        | 102        | 2011       | 9          | 4000       |
| 4          | 5          | 200        | 102        | 2013       | 6          | 6000       |
| 5          | 8          | 300        | 102        | 2015       | 10         | 9000       |
| 6          | 9          | 100        | 102        | 2015       | 6          | 2000       |
+------------+------------+------------+------------+------------+------------+------------+


--查询product_maxframe_demo表数据
SELECT * FROM product_maxframe_demo;

--返回
+------------+------------+--------------+---------------+
| index      | product_id | product_name | current_price |
+------------+------------+--------------+---------------+
| 1          | 100        | Nokia        | 1000          |
| 2          | 200        | Apple        | 5000          |
| 3          | 300        | Samsung      | 9000          |
+------------+------------+--------------+---------------+

使用MaxFrame进行数据分析

1:使用merge方法连接两张数据表,以获取sales_maxframe_demo表中所有sale_id对应的product_name以及该产品的所有year和price

示例代码:

from odps import ODPS
from maxframe.session import new_session
import maxframe.dataframe as md
import os

o = ODPS(
    # 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID,
    # ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret,
    # 不建议直接使用 Access Key ID / Access Key Secret 字符串
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
    project='your-default-project',
    endpoint='your-end-point',
)

session = new_session(o)

#session id是一串用于关联MaxFrame task的字符,对于调试和追踪任务状态有重要的作用。
print(session.session_id)

sales = md.read_odps_table("sales_maxframe_demo", index_col="index")
product = md.read_odps_table("product_maxframe_demo", index_col="product_id")

#这里的df并不会立即执行,除非您使用df.execute()来触发。
#这意味着所有的计算都将最终完全在MaxCompute集群完成,避免了中间所不必要的数据传输和阻塞。
df = sales.merge(product, left_on="product_id", right_index=True)
df = df[["product_name", "year", "price"]]

print(df.execute().fetch())

#保存结果到MaxCompute表中,并销毁Session
md.to_odps_table(df, "result_df", overwrite=True).execute()

session.destroy()

返回结果:

index product_name  year  price                   
1            Nokia  2008   5000
2          Samsung  2009   4000
3            Nokia  2011   4000
4            Apple  2013   6000
5          Samsung  2015   9000
6            Nokia  2015   2000

性能对比
在sales表数据量为5000W条(size:1.96 GB),product表数据量为10W条(size:3 MB)的数据样本中进行运算,可以得到如下耗时对比结果:
1111.png

2:选出每个出售过的产品第一年销售的产品ID、年份、数量和价格
示例代码:

from odps import ODPS
from maxframe.session import new_session
import maxframe.dataframe as md
import os

o = ODPS(
    # 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID,
    # ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret,
    # 不建议直接使用 Access Key ID / Access Key Secret 字符串
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
    project='your-default-project',
    endpoint='your-end-point',
)

session = new_session(o)

#session id是一串用于关联MaxFrame task的字符,对于调试和追踪任务状态有重要的作用。
print(session.session_id)

# 聚合获取每个产品的第一个年份
min_year_df = md.read_odps_table("sales_maxframe_demo", index_col="index")
min_year_df = min_year_df.groupby('product_id', as_index=False).agg(first_year=('year', 'min'))

# join 找到对应的销售记录
sales = md.read_odps_table("sales_maxframe_demo", index_col=['product_id', 'year'])
result_df = md.merge(sales, min_year_df, 
                        left_index=True, 
                        right_on=['product_id','first_year'],
                        how='inner')

#这里的result_df并不会立即执行,除非您使用 result_df.execute()来触发。
#这意味着所有的计算都将最终完全在MaxCompute中集群完成,避免了中间所不必要的数据传输和阻塞。
result_df = result_df[['product_id', 'first_year', 'quantity', 'price']]

print(result_df.execute().fetch())

#销毁 Session
session.destroy()

返回结果:

    product_id  first_year  quantity  price
100         100        2008        10   5000
300         300        2009         7   4000
200         200        2013         6   6000

性能对比:
在sales表数据量为5000W条(size:1.96 GB),product表数据量为10W条(size:3 MB)的数据样本中进行运算,可以得到如下耗时对比结果:

1111.png

MaxFrame兼容Pandas接口且自动进行分布式处理,在保证强大数据处理能力的同时,可以大幅度提高数据处理规模及计算效率。


二、MaxFrame在公司/工作/学习中的作用

2.1 提升数据分析效率

MaxFrame使得数据科学家和工程师能够在熟悉的Python环境中高效地处理大规模数据集,而无需担心底层基础设施的问题。通过简化复杂的ETL流程,团队可以更快地迭代实验,提高项目开发速度。

2.2 加速AI模型开发周期

对于从事机器学习和深度学习的研究人员来说,MaxFrame提供了强大的工具链来支持从数据收集到模型部署的整个生命周期。它允许用户无缝地在本地和云端之间切换,从而更好地利用计算资源。

2.3 促进跨部门协作

由于MaxFrame与MaxCompute Notebook、镜像管理等功能紧密结合,形成了完整的Python开发生态系统,不同专业背景的团队成员可以在统一平台上合作,共同推进项目的进展。

2.4 支持创新应用探索

最后但同样重要的是,MaxFrame为企业和个人开发者提供了一个理想的平台去尝试新的想法和技术。无论是构建智能推荐系统还是自然语言处理解决方案,MaxFrame都能为用户提供所需的灵活性和支持。


最后附上官方整理出来的常见问题:

问题1:报错invalid type INT for function UDF definition, you need to set odps.sql.type.system.odps2=true; in order to use it
报错原因:在未开启MaxCompute 2.0数据类型版本的情况下,使用MaxCompute 2.0的数据类型,导致作业执行时出现错误。
解决方案:通过Flag开启MaxCompute 2.0数据类型,示例如下:

from maxframe import config
# 在new_session之前添加
config.options.sql.settings = {
   
    "odps.sql.type.system.odps2": "true"
}

问题2:报错UDF : No module named 'cloudpickle'
报错原因:缺少依赖的cloudpickle包。
解决方案:引用MaxCompute基础镜像,示例如下:

from maxframe import config
# 在new_session之前添加
config.options.sql.settings = {
   
    "odps.session.image": "common",
}

问题3:如何在DataFrame提交(apply)的UDF中实现资源复用?
在部分UDF场景中,可能涉及到某些较多的资源创建或者销毁行为(例如初始化数据库连接、加载模型等),希望在每个UDF被加载时只会执行一次。可以利用Python中函数参数默认值只被初始化一次的特性,实现资源复用。例如,下述UDF中,模型只会被加载一次。

def predict(s, _ctx={
   }):
    from ultralytics import YOLO
    # _ctx 的初始值是一个空 dict,在 Python 执行过程中只会被初始化一次。
    # 使用模型时,可以先判断 _ctx 中是否存在该模型,不存在则执行加载,然后存入 dict 中。
    if not _ctx.get("model", None):
        model = YOLO(os.path.join("./", "yolo11n.pt"))
        _ctx["model"] = model
    model = _ctx["model"]

    # 后续调用模型的相关接口

下面给出了一个需要销毁资源的UDF示例,该示例中使用了一个自定义的类MyConnector负责创建和关闭数据库连接。

class MyConnector:

    def __init__(self):
        # 在 __init__ 中创建数据库连接
        self.conn = create_connection()

    def __del__(self):
        # 在 __del__ 中关闭数据库连接
        try:
            self.conn.close()
        except:
            pass


def process(s, connector=MyConnector()):
    # 直接调用 connector 内的数据库连接,无需在 UDF 内部再次执行连接创建和关闭
    connector.conn.execute("xxxxx")

通过对MaxFrame产品的深入体验和评测,我们可以看到它不仅是一个强大的分布式计算框架,而且是连接大数据和AI领域的桥梁。无论是在提升数据分析效率、加速AI模型开发周期,还是促进跨部门协作方面,MaxFrame都展现出了巨大的潜力。未来,随着更多功能的不断加入和完善,相信MaxFrame将继续引领云计算时代的创新发展潮流。

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
2天前
|
JavaScript 前端开发 Android开发
【03】仿站技术之python技术,看完学会再也不用去购买收费工具了-修改整体页面做好安卓下载发给客户-并且开始提交网站公安备案-作为APP下载落地页文娱产品一定要备案-包括安卓android下载(简单)-ios苹果plist下载(稍微麻烦一丢丢)-优雅草卓伊凡
【03】仿站技术之python技术,看完学会再也不用去购买收费工具了-修改整体页面做好安卓下载发给客户-并且开始提交网站公安备案-作为APP下载落地页文娱产品一定要备案-包括安卓android下载(简单)-ios苹果plist下载(稍微麻烦一丢丢)-优雅草卓伊凡
33 13
【03】仿站技术之python技术,看完学会再也不用去购买收费工具了-修改整体页面做好安卓下载发给客户-并且开始提交网站公安备案-作为APP下载落地页文娱产品一定要备案-包括安卓android下载(简单)-ios苹果plist下载(稍微麻烦一丢丢)-优雅草卓伊凡
|
4天前
|
数据采集 JavaScript Android开发
【02】仿站技术之python技术,看完学会再也不用去购买收费工具了-本次找了小影-感觉页面很好看-本次是爬取vue需要用到Puppeteer库用node.js扒一个app下载落地页-包括安卓android下载(简单)-ios苹果plist下载(稍微麻烦一丢丢)-优雅草卓伊凡
【02】仿站技术之python技术,看完学会再也不用去购买收费工具了-本次找了小影-感觉页面很好看-本次是爬取vue需要用到Puppeteer库用node.js扒一个app下载落地页-包括安卓android下载(简单)-ios苹果plist下载(稍微麻烦一丢丢)-优雅草卓伊凡
28 7
【02】仿站技术之python技术,看完学会再也不用去购买收费工具了-本次找了小影-感觉页面很好看-本次是爬取vue需要用到Puppeteer库用node.js扒一个app下载落地页-包括安卓android下载(简单)-ios苹果plist下载(稍微麻烦一丢丢)-优雅草卓伊凡
|
4天前
|
JavaScript 搜索推荐 Android开发
【01】仿站技术之python技术,看完学会再也不用去购买收费工具了-用python扒一个app下载落地页-包括安卓android下载(简单)-ios苹果plist下载(稍微麻烦一丢丢)-客户的麻将软件需要下载落地页并且要做搜索引擎推广-本文用python语言快速开发爬取落地页下载-优雅草卓伊凡
【01】仿站技术之python技术,看完学会再也不用去购买收费工具了-用python扒一个app下载落地页-包括安卓android下载(简单)-ios苹果plist下载(稍微麻烦一丢丢)-客户的麻将软件需要下载落地页并且要做搜索引擎推广-本文用python语言快速开发爬取落地页下载-优雅草卓伊凡
23 8
【01】仿站技术之python技术,看完学会再也不用去购买收费工具了-用python扒一个app下载落地页-包括安卓android下载(简单)-ios苹果plist下载(稍微麻烦一丢丢)-客户的麻将软件需要下载落地页并且要做搜索引擎推广-本文用python语言快速开发爬取落地页下载-优雅草卓伊凡
|
8天前
|
API Python
python泛微e9接口开发
通过POST请求向指定IP的API注册设备以获取`secrit`和`spk`。请求需包含`appid`、`loginid`、`pwd`等头信息。响应中包含状态码、消息及`secrit`(注意拼写)、`secret`和`spk`字段。示例代码使用`curl`命令发送请求,成功后返回相关信息。
30 5
|
8天前
|
API 文件存储 Python
python 群晖nas接口(二)
这段代码展示了如何通过API将文件上传到群晖NAS。它使用`requests`库发送POST请求,指定文件路径、创建父级目录及覆盖同名文件的参数,并打印上传结果。确保替换`yourip`和`sid`为实际值。
25 2
|
10天前
|
API 文件存储 数据安全/隐私保护
python 群晖nas接口(一)
这段代码展示了如何通过群晖NAS的API获取认证信息(SID)并列出指定文件夹下的所有文件。首先,`get_sid()`函数通过用户名和密码登录NAS,获取会话ID(SID)。接着,`list_file(filePath, sid)`函数使用该SID访问FileStation API,列出给定路径`filePath`下的所有文件。注意需替换`yourip`、`username`和`password`为实际值。
51 18
|
23天前
|
存储 缓存 Java
Python高性能编程:五种核心优化技术的原理与Python代码
Python在高性能应用场景中常因执行速度不及C、C++等编译型语言而受质疑,但通过合理利用标准库的优化特性,如`__slots__`机制、列表推导式、`@lru_cache`装饰器和生成器等,可以显著提升代码效率。本文详细介绍了这些实用的性能优化技术,帮助开发者在不牺牲代码质量的前提下提高程序性能。实验数据表明,这些优化方法能在内存使用和计算效率方面带来显著改进,适用于大规模数据处理、递归计算等场景。
58 5
Python高性能编程:五种核心优化技术的原理与Python代码
|
1月前
|
人工智能 分布式计算 大数据
MaxFrame 产品评测:大数据与AI融合的Python分布式计算框架
MaxFrame是阿里云MaxCompute推出的自研Python分布式计算框架,支持大规模数据处理与AI应用。它提供类似Pandas的API,简化开发流程,并兼容多种机器学习库,加速模型训练前的数据准备。MaxFrame融合大数据和AI,提升效率、促进协作、增强创新能力。尽管初次配置稍显复杂,但其强大的功能集、性能优化及开放性使其成为现代企业与研究机构的理想选择。未来有望进一步简化使用门槛并加强社区建设。
80 7
|
1月前
|
API Python
【02】优雅草央央逆向技术篇之逆向接口协议篇-以小红书为例-python逆向小红书将用户名转换获得为uid-优雅草央千澈
【02】优雅草央央逆向技术篇之逆向接口协议篇-以小红书为例-python逆向小红书将用户名转换获得为uid-优雅草央千澈
95 1
|
1月前
|
安全 数据挖掘 编译器
【01】优雅草央央逆向技术篇之逆向接口协议篇-如何用python逆向接口协议?python逆向接口协议的原理和步骤-优雅草央千澈
【01】优雅草央央逆向技术篇之逆向接口协议篇-如何用python逆向接口协议?python逆向接口协议的原理和步骤-优雅草央千澈
66 6

热门文章

最新文章

推荐镜像

更多