MaxCompute MaxFrame评测 | 分布式Python计算服务MaxFrame(完整操作版)

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
大数据开发治理平台DataWorks,Serverless资源组抵扣包300CU*H
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 在当今数字化迅猛发展的时代,数据信息的保存与分析对企业决策至关重要。MaxCompute MaxFrame是阿里云自研的分布式计算框架,支持Python编程接口、兼容Pandas接口并自动进行分布式计算。通过MaxCompute的海量计算资源,企业可以进行大规模数据处理、可视化数据分析及科学计算等任务。本文将详细介绍如何开通MaxCompute和DataWorks服务,并使用MaxFrame进行数据操作。包括创建项目、绑定数据源、编写PyODPS 3节点代码以及执行SQL查询等内容。最后,针对使用过程中遇到的问题提出反馈建议,帮助用户更好地理解和使用MaxFrame。

前言

在当今数字化迅猛发展的时代,数据信息的保存与数据分析对企业的决策和工作方向具有极为重要的指导价值。通过企业数据分析,企业能够精准统计出自身的成本投入、经营收益以及利润等重要数据。这些数据犹如企业运营的“晴雨表”,为企业后续的决策提供了坚实可靠的依据,助力企业在市场竞争中优化经营策略,从而实现更大的价值创造。

今天我们要讲的正是可以帮助企业实现数据保存于数据分析的一款分布式计算框架MaxCompute MaxFrame,那么什么是MaxCompute MaxFrame?

MaxCompute MaxFrame

在开始测评之前,先了解一下什么是MaxCompute MaxFrame?

以下是来自官网的介绍:【MaxFrame是由阿里云自研的分布式计算框架,支持Python编程接口、兼容Pandas接口且自动进行分布式计算。您可利用MaxCompute的海量计算资源及数据进行大规模数据处理、可视化数据探索分析以及科学计算、ML/AI开发等工作。】关于MaxCompute MaxFrame 的更多内容你可以直接在官网中详细了解,包括产品优势、产品功能、应用场景等,这里我主要是测评 MaxCompute MaxFrame 的操作体验,因此对于 MaxCompute MaxFrame 的详细介绍大家可以移步官网:https://www.aliyun.com/product/bigdata/odps/maxframe

image.png 简单介绍完了我们今天的主角 MaxCompute MaxFrame,那么下面就开始今天的操作吧。

服务开通

在开始测试 MaxCompute MaxFrame 的功能前,首先需要开通 MaxCompute和DataWorks 服务。

开通 MaxCompute 服务

对于全新的、此前未开通过MaxCompute的阿里云账号,阿里云在部分地域提供了免费的MaxCompute资源包,您可以先申请免费资源包体验试用,体验地址:https://free.aliyun.com/  在免费试用页面输入你想要试用的服务,比如输入 MaxCompute

image.png

如果你的阿里云账号没有免费试用资格,那么你只能通过 阿里云MaxCompute产品首页 :https://www.aliyun.com/product/maxcompute 单击【立即购买】,选择【按量付费标准版开通MaxCompute服务

image.png

这里我的账号在当前北京地域下已经开通过 MaxCompute 服务,因此这里才有这个提示,正常情况下的话你直接开通即可

image.png

开通完 MaxCompute 服务之后,我们还需要开通 DataWorks 服务。

开通 DataWorks 服务

同样的,对于全新的、此前未开通过DataWorks  的阿里云账号,阿里云在部分地域提供了免费的DataWorks资源包,您可以先申请免费资源包体验试用。免费试用地址同上,在免费试用界面输入 DataWorks  可以查看试用资格

image.png

如果你的阿里云账号没有试用资格的话,你可以到 DataWorks   官网,官网地址: https://www.aliyun.com/product/bigdata/ide

image.png

点击【立即购买】选择需要开通 DataWorks   服务的地域以及 【按量付费】计费方式后开通即可,这里我已经开通过按量付费了

image.png

资源准备完成之后,下面就开始创建资源用于后面的操作。

资源准备

在我们开通了MaxCompute和DataWorks 服务 之后,下面我们就可以创建资源了,下面按照步骤创建资源内容。

创建 DataWorks 工作空间

登录DataWorks控制台在控制台顶部菜单栏切换所需地域,单击左侧导航栏的【工作空间】,进入工作空间列表页面,点击【创建工作空间】,这里我已经创建好了工作空间 User_dataworks

image.png

输入工作空间名称,定义工作空间模式,即工作空间的生产环境和开发环境是否隔离等参数,根据实际情况选择即可

image.png

创建 MaxCompute 项目

登录MaxCompute控制台,在左上角选择地域,选择左侧菜单【项目管理】,点击【新建项目】,这里为了区分后面的测试和生产环境,需要创建两个 MaxCompute 项目空间,这里我已经创建好了

image.png

MaxCompute  新增项目页面,需要输入项目名称,选择 计算资源付费类型、默认Quota 等信息后,点击确定即可完成 MaxCompute 创建。

image.png


创建MaxCompute数据源

完成上述操作之后,回到 DataWorks控制台 ,在 DataWorks 控制台 选择查看 【工作空间】列表页面,点击工作空间名称,进入工作空间详情页面,在工作空间详情页面点击【数据源】-【数据源列表】可以看到这里我已经创建成功的数据源

image.png

点击【新增数据源】,选择 MaxCompute,根据界面指引创建数据源

image.png

选择 MaxCompute 在新增 MaxCompute 数据源页面,我们需要输入 数据源名称 ,所属云账号、地域等信息,可以选择我们刚才创建好的 MaxCompute项目名称

image.png

点击【完成创建】之后,创建完成MaxCompute  数据源,就可以返回【数据源列表】查看已经创建好的数据源信息了。

绑定数据源或集群

等待MaxCompute  数据源创建成功之后, 登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的【数据开发与治理】 - 【数据开发】,在下拉框中选择对应工作空间后单击【进入数据开发】

image.png

在左侧导航栏单击【数据源】,进入数据源或集群绑定页面,您可通过名称搜索找到目标数据源或集群进行绑定操作。绑定后,便可基于数据源的连接信息读取该数据源的数据,进行相关开发操作。如果你找不到具体的 数据源 绑定操作入口,又不想去按照官方文档说的那样去个人设置里面找,这里可以直接点击 绑定 进入到绑定页面,在绑定页面选择资源绑定,这里我已经绑定过了

image.png

在开发 PyODPS 3 任务之前,先来简单说一下PyODPS 3 任务。 DataWorks为我们提供PyODPS 3节点,我们可以在该节点中直接使用Python代码编写MaxCompute作业,并进行作业的周期性调度。开始之前需要先创建一个 PyODPS 3节点。

创建MaxCompute节点

登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的【数据开发与治理】 > 【数据开发】,在下拉框中选择对应工作空间后单击【进入数据开发】,在 数据开发页面选择【新建】>【MaxCompute】 > 【PyODPS 3

image.png

在弹框中配置节点的名称,选择路径,完成后单击确认,完成MaxCompute节点创建操作,后续您即可在节点中进行对应MaxCompute任务开发与配置

image.png

节点创建成功之后,可以在数据开发页面看到节点信息

image.png

以上的资源准备好了之后,我们就可以使用DataWorks的PyODPS 3节点开发和运行MaxFrame作业。

在DataWorks中使用MaxFrame

在开始使用MaxFrame之前,先来简单介绍一下。 DataWorks为MaxCompute项目提供任务调度能力,且已在PyODPS 3节点内置了MaxFrame,我们可直接使用DataWorks的PyODPS 3节点开发和运行MaxFrame作业。PyODPS 3内置了MaxCompute用户和项目信息,因此我们可以直接创建MaxFrame会话,复制代码放入创建的 PyODPS 3节点 的 命令操作台

image.png

代码示例如下

import maxframe.dataframe as md
from maxframe import new_session
from maxframe.config import options
options.sql.enable_mcqa = False

table = o.create_table("test_source_table", "a string, b bigint", if_not_exists=True)
with table.open_writer() as writer:
    writer.write([
        ["value1", 0],
        ["value2", 1],
    ])

# 创建MaxFrame session
session = new_session(o)

df = md.read_odps_table("test_source_table",index_col="b")
df["a"] = "prefix_" + df["a"]

# 打印dataframe数据
print(df.execute().fetch())

# MaxFrame DataFrame数据写入MaxCompute表
md.to_odps_table(df, "test_prefix_source_table").execute()

# 销毁 maxframe session
session.destroy()

在数据开发页面命令控制台上侧点击【执行】按钮,执行Python代码可以看到如下的返回结果

image.png

此结果表示MaxFrame安装成功,且已成功连接MaxCompute集群。在目标MaxCompute项目中运行如下SQL,查询test_prefix_source_table表的数据,新建 ODPS SQL 节点

image.png

点击【确认】完成 新建 ODPS SQL 节点 新建 成功之后,在我们新建的 User_sql2 节点输入查询语句

SELECT * FROM test_prefix_source_table;

点击【执行】可以看到sql 查询的结果数据

image.png

到这里就说明我们的 MaxFrame 以及所有需要的服务和资源都可以正常运行,下面来使用与Pandas相同的API来分析数据。

分布式Pandas 处理

在基于MaxFrame实现分布式Pandas处理 之前,首先需要准备一些调用过程中需要用到的ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET 、your-default-project、your-end-point。 这里 进入AccessKey管理页面获取AccessKey ID以及对应的AccessKey Secret

image.png

复制 后备用。登录MaxCompute控制台,在左侧导航栏选择【工作区】-【项目管理】,查看MaxCompute项目名称

image.png

Endpoint 页面找到当前地域对应的 Endpoint 并复制,

image.png

替换掉示例代码中对应的上述获取的账号信息,这里给出的是示例代码,替换后的代码这里不方便给出哈

from odps import ODPS
from maxframe.session import new_session
import maxframe.dataframe as md
import pandas as pd
import os

o = ODPS(
    # 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID,
    # ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret,
    # 不建议直接使用AccessKey ID和 AccessKey Secret字符串。
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
    project='your-default-project',
    endpoint='your-end-point',
)

data_sets = [{
    "table_name": "product",
    "table_schema" : "index bigint, product_id bigint, product_name string, current_price bigint",
    "source_type": "records",
    "records" : [
        [1, 100, 'Nokia', 1000],
        [2, 200, 'Apple', 5000],
        [3, 300, 'Samsung', 9000]
    ],
},
{
    "table_name" : "sales",
    "table_schema" : "index bigint, sale_id bigint, product_id bigint, user_id bigint, year bigint, quantity bigint, price bigint",
    "source_type": "records",
    "records" : [
        [1, 1, 100, 101, 2008, 10, 5000],
        [2, 2, 300, 101, 2009, 7, 4000],
        [3, 4, 100, 102, 2011, 9, 4000],
        [4, 5, 200, 102, 2013, 6, 6000],
        [5, 8, 300, 102, 2015, 10, 9000],
        [6, 9, 100, 102, 2015, 6, 2000]
    ],
    "lifecycle": 5
}]

def prepare_data(o: ODPS, data_sets, suffix="", drop_if_exists=False):
    for index, data in enumerate(data_sets):
        table_name = data.get("table_name")
        table_schema = data.get("table_schema")
        source_type = data.get("source_type")
        
        if not table_name or not table_schema or not source_type:
            raise ValueError(f"Dataset at index {index} is missing one or more required keys: 'table_name', 'table_schema', or 'source_type'.")

        lifecycle = data.get("lifecycle", 5)
        table_name += suffix
        
        print(f"Processing {table_name}...")
        if drop_if_exists:
            print(f"Deleting {table_name}...")
            o.delete_table(table_name, if_exists=True)
        
        o.create_table(name=table_name, table_schema=table_schema, lifecycle=lifecycle, if_not_exists=True)

        if source_type == "local_file":
            file_path = data.get("file")
            if not file_path:
                raise ValueError(f"Dataset at index {index} with source_type 'local_file' is missing the 'file' key.")
            sep = data.get("sep", ",")
            pd_df = pd.read_csv(file_path, sep=sep)
            ODPSDataFrame(pd_df).persist(table_name, drop_table=True)
        elif source_type == 'records':
            records = data.get("records")
            if not records:
                raise ValueError(f"Dataset at index {index} with source_type 'records' is missing the 'records' key.")
            with o.get_table(table_name).open_writer() as writer:
                writer.write(records)
        else:
            raise ValueError(f"Unknown data set source_type: {source_type}")
        
        print(f"Processed {table_name} Done")

prepare_data(o, data_sets, "_maxframe_demo", True)

这里我们新建 PyODPS 3节点 User_node2 来执行替换了密钥信息后的上述示例代码,等待运行成功

image.png

查询sales_maxframe_demo表和product_maxframe_demo表的数据,SQL命令如下

--查询sales_maxframe_demo表
SELECT * FROM sales_maxframe_demo;

image.png

--查询product_maxframe_demo表数据
SELECT * FROM product_maxframe_demo;

image.png

这里需要说明一下,我没有执行结束,在执行的过程中,一直执行超时,不知道什么原因


Executing user script with PyODPS 0.12.1 (wrapper version: 0.12.1spawn)

Processing product_maxframe_demo...
Deleting product_maxframe_demo...

/opt/taobao/tbdpapp/pyodps/pyodpswrapper.py:1191: UserWarning: Global variable __doc__ you are about to set conflicts with pyodpswrapper or builtin variables. It might not be runnable with multiprocessing.
  "It might not be runnable with multiprocessing." % key
2025-01-07 20:34:40,348 WARNING:urllib3.connectionpool:Retrying (Retry(total=3, connect=None, read=None, redirect=None, status=None)) after connection broken by 'ConnectTimeoutError(<urllib3.connection.HTTPSConnection object at 0x7f48ac8fb198>, 'Connection to service.cn-beijing.maxcompute.aliyun.com timed out. (connect timeout=120)')': /api/tenants?curr_project=user_project_dev

总的来说

MaxFrame可以在分布式环境下使用与Pandas相同的API来分析数据,通过MaxFrame,您能够以高于开源Pandas数十倍的性能在MaxCompute上快速完成数据分析和计算工作。MaxFrame兼容Pandas接口且自动进行分布式处理,在保证强大数据处理能力的同时,可以大幅度提高数据处理规模及计算效率。

反馈建议

  1. 在本次体验 分布式Python计算服务MaxFrame 的功能时,关于操作 基于MaxFrame实现分布式Pandas处理 服务开通的文档中有一段关于开通 MaxCompute和DataWorks 服务的描述,文档中给出的【立即开通】和【立即购买】的跳转链接是同一个链接,跳转页面都是跳转到 MaxCompute 服务【立即购买】页面,对用户操作有误导的作用,文档地址:https://help.aliyun.com/zh/maxcompute/getting-started/activate-maxcompute-and-dataworks?spm=a2c4g.11186623.0.0.5582e92bq1s4XR#34249b30f60px

image.png

  1. 在数据开发页面,我想要创建一个 ODPS SQL 的节点,选择了【数据开发】节点,输入了 节点名称 User_sql,但是在点击【确认】时却提示创建失败,说已经存在当前名称,但是实际是没有存在的,不知道具体什么原因

    image.png

3.在安装了MaxFrame的Python环境下运行如下脚本,准备测试表和测试数据的这一步,对于预计执行时间没有一个大概的预估,导致在实际操作的时候并不知道大概完成时间,希望可以在操作文档中补充一下

image.png

4.在执行 MaxFrame使用常用的Pandas算子 进行数据查询对比时,遇到了这样的问题,一直提示超时,查阅了文档也没有找到具体超时应该怎么处理,这里给出的提示信息也没有什么参考的价值

image.png

希望后面可以补充一下具体遇到这种问题应该怎么处理。

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
相关文章
|
1月前
|
分布式计算 DataWorks 数据处理
产品测评 | 上手分布式Python计算服务MaxFrame产品最佳实践
MaxFrame是阿里云自研的分布式计算框架,专为大数据处理设计,提供高效便捷的Python开发体验。其主要功能包括Python编程接口、直接利用MaxCompute资源、与MaxCompute Notebook集成及镜像管理功能。本文基于MaxFrame最佳实践,详细介绍了在DataWorks中使用MaxFrame创建数据源、PyODPS节点和MaxFrame会话的过程,并展示了如何通过MaxFrame实现分布式Pandas处理和大语言模型数据处理。测评反馈指出,虽然MaxFrame具备强大的数据处理能力,但在文档细节和新手友好性方面仍有改进空间。
|
29天前
|
人工智能 分布式计算 大数据
MaxFrame 产品评测:大数据与AI融合的Python分布式计算框架
MaxFrame是阿里云MaxCompute推出的自研Python分布式计算框架,支持大规模数据处理与AI应用。它提供类似Pandas的API,简化开发流程,并兼容多种机器学习库,加速模型训练前的数据准备。MaxFrame融合大数据和AI,提升效率、促进协作、增强创新能力。尽管初次配置稍显复杂,但其强大的功能集、性能优化及开放性使其成为现代企业与研究机构的理想选择。未来有望进一步简化使用门槛并加强社区建设。
66 7
|
27天前
|
SQL 分布式计算 数据处理
云产品评测|分布式Python计算服务MaxFrame | 在本地环境中使用MaxFrame + 基于MaxFrame实现大语言模型数据处理
本文基于官方文档,介绍了由浅入深的两个部分实操测试,包括在本地环境中使用MaxFrame & 基于MaxFrame实现大语言模型数据处理,对步骤有详细说明。体验下来对MaxCompute的感受是很不错的,值得尝试并使用!
47 1
|
23天前
|
SQL 数据可视化 大数据
从数据小白到大数据达人:一步步成为数据分析专家
从数据小白到大数据达人:一步步成为数据分析专家
197 92
|
3月前
|
存储 分布式计算 数据挖掘
数据架构 ODPS 是什么?
数据架构 ODPS 是什么?
697 7
|
3月前
|
存储 分布式计算 大数据
大数据 优化数据读取
【11月更文挑战第4天】
90 2
|
1月前
|
分布式计算 Shell MaxCompute
odps测试表及大量数据构建测试
odps测试表及大量数据构建测试
|
3月前
|
数据采集 监控 数据管理
数据治理之道:大数据平台的搭建与数据质量管理
【10月更文挑战第26天】随着信息技术的发展,数据成为企业核心资源。本文探讨大数据平台的搭建与数据质量管理,包括选择合适架构、数据处理与分析能力、数据质量标准与监控机制、数据清洗与校验及元数据管理,为企业数据治理提供参考。
151 1
|
21天前
|
存储 搜索推荐 大数据
数据大爆炸:解析大数据的起源及其对未来的启示
数据大爆炸:解析大数据的起源及其对未来的启示
81 15
数据大爆炸:解析大数据的起源及其对未来的启示
|
13天前
|
分布式计算 大数据 流计算
玩转数据:初学者的大数据处理工具指南
玩转数据:初学者的大数据处理工具指南
67 14