搭建过程
1 为 DSW 实例配置 RAM 角色
PAI 默认角色仅拥有访问 PAI 内部产品、MaxCompute 和 OSS 的权限,且权限更加精细。基于 PAI 默认角色签发的临时访问凭证,在访问 MaxCompute表时,将拥有等同于 DSW 实例所有者的权限。在开始之前您可以在 DSW 实例详情中确认您当前的角色
2 安装 MaxFrame
通过pip install maxframe 命令直接安装 MaxFrame SDK
!pip install maxframe alibabacloud_credentials -U
3 准备项目
获取 MaxCompute 项目名:可以登录 MaxCompute 控制台,在左侧导航栏选择工作区 > 项目管理,查看 MaxCompute 项目名称。
Endpoint:目标 MaxCompute 项目所在地域的 Endpoint,可根据网络连接方式自行选择。
4 准备数据
import pandas as pd
import numpy as np
from odps import ODPS
from odps.df import DataFrame as ODPSDataFrame
from alibabacloud_credentials import providers
from odps.accounts import CredentialProviderAccount
# 创建 odps 对象
# 基于实例 RAM 角色访问 ODPS
account = CredentialProviderAccount(providers.DefaultCredentialsProvider())
o = ODPS(
account=account,
project=PROJECT_NAME,
endpoint=ENDPOINT
)
# 测试数据
data_sets = [
{
"table_name": "product",
"table_schema": "index bigint, product_id bigint, product_name string, current_price bigint",
"source_type": "records",
"records": [
[1, 100, "Nokia", 1000],
[2, 200, "Apple", 5000],
[3, 300, "Samsung", 9000],
[4, 500, "HP", 7000],
],
"lifecycle": 5,
},
{
"table_name": "sales",
"table_schema": "index bigint, sale_id bigint, product_id bigint, user_id bigint, year bigint, quantity bigint, price bigint",
"source_type": "records",
"records": [
[1, 1, 100, 101, 2008, 10, 5000],
[2, 2, 300, 101, 2009, 7, 4000],
[3, 4, 100, 102, 2008, 9, 4000],
[4, 5, 200, 102, 2010, 6, 6000],
[5, 8, 300, 102, 2008, 10, 9000],
[6, 9, 100, 102, 2009, 6, 2000],
[7, 13, 500, 104, 2007, 3, 8000],
],
"lifecycle": 5,
},
]
# 创建表
def prepare_data(o: ODPS, data_sets, suffix="", drop_if_exists=False):
for index, data in enumerate(data_sets):
table_name = data.get("table_name")
table_schema = data.get("table_schema")
source_type = data.get("source_type")
if not table_name or not table_schema or not source_type:
raise ValueError(
f"Dataset at index {index} is missing one or more required keys: 'table_name', 'table_schema', or 'source_type'."
)
lifecycle = data.get("lifecycle", 5)
table_name += suffix
print(f"Processing {o.project}.{table_name}...")
if drop_if_exists:
print(f"Deleting {o.project}.{table_name} if exists...")
o.delete_table(table_name, if_exists=True)
o.create_table(
name=table_name,
table_schema=table_schema,
lifecycle=lifecycle,
if_not_exists=True,
)
if source_type == "local_file":
file_path = data.get("file")
if not file_path:
raise ValueError(
f"Dataset at index {index} with source_type 'local_file' is missing the 'file' key."
)
sep = data.get("sep", ",")
pd_df = pd.read_csv(file_path, sep=sep)
ODPSDataFrame(pd_df).persist(table_name, drop_table=True)
elif source_type == "records":
records = data.get("records")
if not records:
raise ValueError(
f"Dataset at index {index} with source_type 'records' is missing the 'records' key."
)
with o.get_table(table_name).open_writer() as writer:
writer.write(records)
else:
raise ValueError(f"Unknown data set source_type: {source_type}")
print(f"Processed {o.project}.{table_name} Done")
prepare_data(o, data_sets, "_maxframe_demo", True)
使用 MaxFrame 进行数据分析
1 会话初始化
MaxFrame 依赖于 MaxCompute 的资源来执行计算,因此首先需要建立一个 MaxCompute 会话。new_session 是 MaxFrame 创建会话的入口。在创建会话之后,所有后续的计算都将默认使用该会话进行通信。MaxFrame 会在远端保存中间状态,因此你可以进行交互式操作,一边开发一边验证。
在创建完成后,可以从 session 对象中获得 session_id 和 logview_address,前者是该会话的唯一 id,后者是用于查看在该 session 中产生的所有计算的作业执行情况。
from maxframe.session import new_session
import maxframe.dataframe as md
# 初始化 MaxFrame Session
session = new_session(o)
# 打印 Session ID
print(f"MaxFrame Session ID: {session.session_id}")
# 打印 LogView 地址
print(f"MaxFrame LogView 地址: {session.get_logview_address()}")
2 数据探查
场景是使用 MaxFrame 对商品和销售数据进行分析。这些数据统一存储于两张 MaxCompute 数据表中:商品表包含商品、价格等信息;销售表包含客户、产品、销售数量、销售年份、销售价格等信息。
与 Pandas 一样,在获得 dataframe 对象之后,可以使用 dtypes 来查看数据的 schema。在下例中,md.read_odps_table 会获得 maxframe.DataFrame 对象。
print('--------------Product Table Schema-----------------------')
product = md.read_odps_table("product_maxframe_demo", index_col="product_id")
print(product.dtypes)
print('\n--------------Sales Table Schema-----------------------')
sales = md.read_odps_table("sales_maxframe_demo", index_col="index")
print(sales.dtypes)
在 MaxFrame 中,DataFrame 可以使用与 Pandas 兼容的接口进行计算,例如下例中的 head。但与 Pandas 不同的是,MaxFrame 中的数据并不存储在本地。MaxFrame 通过 Lazy 计算使你可以在本地使用各种算子,并通过 execute() 来触发计算一并提交到 MaxCompute 集群中分布式执行。
MaxFrame 已经对 Notebook 进行了适配,在 execute() 之后能够高效预览 MaxCompute 中数据的首末数行。
sales.head(5).execute()
3 数据分布式处理
利用 MaxFrame 的功能,我们能够使用与 Pandas 兼容的语法进行数据分析。所有算子的设计均为 分布式执行,确保计算的高效性。 相比于单机 Pandas,MaxFrame 的 Pandas 算子运行在 MaxCompute 计算集群中,能够大规模并行处理数据,数据读取效率也明显优于单机 Pandas。
1 场景 1:Pandas merge接口的分布式执行
连接两张数据表,以获取 product_maxframe_demo_large 表中所有 sale_id 对应的 product_name 以及该产品的所有 year 和 price
# 默认打印 Logview 信息
import logging
logging.basicConfig(level=logging.INFO)
sales = md.read_odps_table("sales_maxframe_demo", index_col="index")
product = md.read_odps_table("product_maxframe_demo", index_col="product_id")
# 这里的df并不会立即执行,除非您使用df.execute()来触发。
# 这意味着所有的计算都将最终完全在MaxCompute集群完成,避免了中间所不必要的数据传输和阻塞。
df = sales.merge(product, left_on="product_id", right_index=True)
df = df[["product_name", "year", "price"]]
execute_result = df.execute()
df.execute() 会触发在 MaxCompute 集群上进行分布式计算,您可以在 Logview 中查看当前计算的执行详情。
在内部测试中,Product 和 Sales 两张数据表的 6.8TB 数据在 MaxFrame 的 merge 算子中仅需 3 分钟即可完成计算。相比之下,在本地使用 Pandas 处理 6.8TB 的数据会超出内存限制,无法执行。
2:Pandas groupby、agg、sort_values 接口的分布式执行
连接 Product 和 Sales 两张数据表,聚合每个产品在 Sales 表中的首次售出年份。
# 聚合每个产品的首次售出年份
min_year_df = md.read_odps_table("sales_maxframe_demo", index_col="index")
min_year_df = min_year_df.groupby('product_id', as_index=False).agg(first_year=('year', 'min'))
# 计算不同首次售出年份的产品 ID 数量
sum_product_min_year_df = min_year_df.groupby('first_year', as_index=False).agg(total_product_num=('product_id', 'count'))
exe_info = sum_product_min_year_df.execute()
在执行完上述分布式计算后,如果本地内存允许,可以对 execute() 的结果使用 fetch() 将结果拉全部取到本地使用,抓取后的数据会形成 Pandas DataFrame,因此对于 MaxFrame 尚无法支持的场景也可以轻松的和本地的工作流结合。
当然需要考虑取数的耗时、带宽和本地内存的限制,尽可能的在远端完成所有的计算,避免频繁的抓取和上传。
在提取并展示了记录后,您还可以使用 Matplotlib 生成图表来可视化数据。以下是一个示例代码,用于绘制每个首次售出年份的产品数量分布图。
import matplotlib.pyplot as plt
plt.plot(local_df['first_year'],local_df['total_product_num'])
plt.xlabel('First_sale_year')
plt.ylabel('Total_product_num')
plt.title('Saled product distribution by first saled year')
plt.xticks(range(int(local_df['first_year'].min()), int(local_df['first_year'].max()) + 1))
plt.show()

(1) 实现分布式Pandas处理和大语言模型数据处理场景实践体验
MaxFrame作为阿里云提供的一个Python分布式计算框架,它使得利用Pandas进行大规模数据分析成为可能。通过将Pandas的操作分布到集群中的多个节点上执行,MaxFrame显著提高了大数据集上的操作效率。
在实践中,我按照官方提供的最佳实践文档,尝试了使用MaxFrame来加载、转换和分析大型数据集。对于大语言模型的数据处理场景,MaxFrame提供了对文本数据的高效处理能力,包括但不限于文本清洗、分词、向量化等预处理步骤。与传统单机环境下的Pandas相比,MaxFrame可以更快速地完成这些任务,并且能够轻松扩展到更大规模的数据集。
(2) 在公司/工作/学习中的作用
MaxFrame可以在需要处理大量结构化或非结构化数据的环境中发挥重要作用。例如,在金融行业用于风险评估、市场营销中的客户细分、医疗健康领域的基因数据分析等。其强大的数据处理能力和与AI技术的紧密结合,为研究人员和数据科学家提供了一种强有力的数据探索工具,加速了从数据到洞见的转化过程。
2. MaxFrame产品体验评测
(1) 使用过程中遇到的问题及优化建议
在开通和购买服务时,整个流程相对直观,但初次使用者可能会因为对阿里云生态系统的不熟悉而感到些许困惑。官方文档详尽,但对于一些特定配置项的解释不够深入,这可能导致设置过程中的一些障碍。优化建议包括提供更多关于如何选择合适配置的指导,以及更加详细的错误信息提示。
(2) 功能是否满足预期
MaxFrame的Python编程接口设计友好,易于掌握,即使是对分布式计算没有太多经验的开发者也能较快上手。算子丰富多样,覆盖了常见的数据操作需求。不过,对于某些高级功能,如自定义聚合函数的支持还有待加强。总体而言,MaxFrame的功能基本符合预期,但在降低使用门槛方面仍有提升空间,比如简化安装部署过程,提高API文档的质量等。
(3) 可改进的地方或更多功能
针对AI数据处理和Pandas处理场景,MaxFrame已经做得相当不错,但仍有一些地方值得改进。例如,进一步优化性能以支持更大的数据集;增强与其他AI平台和服务(如TensorFlow、PyTorch)的集成度;增加对更多数据源的支持,以便直接读取不同格式的数据文件。此外,还可以考虑开发图形用户界面(GUI),使非技术人员也能够方便地使用该工具。
3. AI数据预处理对比测评
(1) 与其他工具的比较
确实,市场上存在多种数据处理工具,既有商业解决方案也有开源项目。相较于其他工具,MaxFrame的优势在于其紧密集成于阿里云生态系统内,这意味着它可以无缝连接到阿里云提供的各种服务,如对象存储OSS、表格存储TableStore等。同时,由于MaxFrame是专为处理大规模数据设计的,因此在性能方面通常优于普通的单机版工具。然而,MaxFrame也有一些待改进之处,比如开放性不如一些完全开源的项目,交互体验有时也不够流畅。尽管如此,对于那些已经在使用阿里云服务的企业来说,MaxFrame无疑是一个非常有吸引力的选择。