加速数据处理与AI开发的利器:阿里云MaxFrame实验评测

本文涉及的产品
对象存储 OSS,20GB 3个月
对象存储 OSS,恶意文件检测 1000次 1年
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: 随着数据量的爆炸式增长,传统数据分析方法逐渐显现出局限性。Python作为数据科学领域的主流语言,因其简洁易用和丰富的库支持备受青睐。阿里云推出的MaxFrame是一个专为Python开发者设计的分布式计算框架,旨在充分利用MaxCompute的强大能力,提供高效、灵活且易于使用的工具,应对大规模数据处理需求。MaxFrame不仅继承了Pandas等流行数据处理库的友好接口,还通过集成先进的分布式计算技术,显著提升了数据处理的速度和效率。

前言

随着数据量的爆炸式增长,传统的数据分析和处理方法逐渐显现出其局限性,尤其是在面对海量数据集时,计算资源的瓶颈和处理速度成为了亟待解决的问题。与此同时,Python作为一门广泛应用于数据科学、机器学习和人工智能领域的编程语言,因其简洁易用的语法和丰富的库支持而备受青睐。

正是在这样的背景下,阿里云推出了MaxFrame——一个专为Python开发者设计的分布式计算框架。MaxFrame旨在充分利用MaxCompute这一云原生大数据计算平台的强大能力,提供给用户一个高效、灵活且易于使用的工具,以应对大规模数据处理的需求。它不仅继承了Pandas等流行数据处理库的友好接口,还通过集成先进的分布式计算技术,显著提升了数据处理的速度和效率。

虽然我确实对MaxCompute和DataWorks等大数据产品比较熟悉,但日常工作更多是集中在管理和架构层面,几乎没有涉及过底层开发。因此,这次撰写关于MaxFrame的文章,对我来说既是一次挑战也是一次宝贵的学习机会,希望通过深入探索这款工具的功能和应用场景,能为读者带来有价值见解的同时,也能进一步提升自己在分布式计算和大数据处理方面的能力。

产品概述

一句话总结就是:MaxFrame对接MaxCompute,提供Python编程接口,完全兼容Pandas API,并能自动执行分布式计算。

image.png

MaxFrame商品销售分析实验

安装 MaxFrame

通过执行 pip install maxframe 命令直接安装 MaxFrame SDK,安装 alibabacloud_credentials 依赖,以便实现免密登录。

!pip install maxframe alibabacloud_credentials -U

image.png

准备项目

准备 MaxCompute 项目,并将其绑定到当前 DSW 空间,用于后续计算

登录 MaxCompute 控制台,在左侧导航栏选择【工作区 > 项目管理】,查看 MaxCompute 项目名称

image.png

目标 MaxCompute 项目所在地域的 Endpoint,可根据网络连接方式自行选择,例如 http://service.cn-chengdu.maxcompute.aliyun.com/api ,我这里选择的是杭州

image.png

https://service.cn-hangzhou.maxcompute.aliyun.com/api

最终准备如下:

PROJECT_NAME = "[你的项目名]"
ENDPOINT = "https://service.cn-hangzhou.maxcompute.aliyun.com/api"

准备数据

创建实验所需的小规模测试数据

import pandas as pd
import numpy as np
from odps import ODPS
from odps.df import DataFrame as ODPSDataFrame

from alibabacloud_credentials import providers
from odps.accounts import CredentialProviderAccount

# 创建 odps 对象
# 基于实例 RAM 角色访问 ODPS
account = CredentialProviderAccount(providers.DefaultCredentialsProvider())
o = ODPS(
    account=account,
    PROJECT_NAME = "[你的项目名]"
    ENDPOINT = "https://service.cn-hangzhou.maxcompute.aliyun.com/api"
)

# 测试数据
data_sets = [
    {
   
        "table_name": "product",
        "table_schema": "index bigint, product_id bigint, product_name string, current_price bigint",
        "source_type": "records",
        "records": [
            [1, 100, "Nokia", 1000],
            [2, 200, "Apple", 5000],
            [3, 300, "Samsung", 9000],
            [4, 500, "HP", 7000],
        ],
        "lifecycle": 5,
    },
    {
   
        "table_name": "sales",
        "table_schema": "index bigint, sale_id bigint, product_id bigint, user_id bigint, year bigint, quantity bigint, price bigint",
        "source_type": "records",
        "records": [
            [1, 1, 100, 101, 2008, 10, 5000],
            [2, 2, 300, 101, 2009, 7, 4000],
            [3, 4, 100, 102, 2008, 9, 4000],
            [4, 5, 200, 102, 2010, 6, 6000],
            [5, 8, 300, 102, 2008, 10, 9000],
            [6, 9, 100, 102, 2009, 6, 2000],
            [7, 13, 500, 104, 2007, 3, 8000],
        ],
        "lifecycle": 5,
    },
]

# 创建表
def prepare_data(o: ODPS, data_sets, suffix="", drop_if_exists=False):
    for index, data in enumerate(data_sets):
        table_name = data.get("table_name")
        table_schema = data.get("table_schema")
        source_type = data.get("source_type")

        if not table_name or not table_schema or not source_type:
            raise ValueError(
                f"Dataset at index {index} is missing one or more required keys: 'table_name', 'table_schema', or 'source_type'."
            )

        lifecycle = data.get("lifecycle", 5)
        table_name += suffix

        print(f"Processing {o.project}.{table_name}...")
        if drop_if_exists:
            print(f"Deleting {o.project}.{table_name} if exists...")
            o.delete_table(table_name, if_exists=True)

        o.create_table(
            name=table_name,
            table_schema=table_schema,
            lifecycle=lifecycle,
            if_not_exists=True,
        )

        if source_type == "local_file":
            file_path = data.get("file")
            if not file_path:
                raise ValueError(
                    f"Dataset at index {index} with source_type 'local_file' is missing the 'file' key."
                )
            sep = data.get("sep", ",")
            pd_df = pd.read_csv(file_path, sep=sep)
            ODPSDataFrame(pd_df).persist(table_name, drop_table=True)

        elif source_type == "records":
            records = data.get("records")
            if not records:
                raise ValueError(
                    f"Dataset at index {index} with source_type 'records' is missing the 'records' key."
                )
            with o.get_table(table_name).open_writer() as writer:
                writer.write(records)
        else:
            raise ValueError(f"Unknown data set source_type: {source_type}")

        print(f"Processed {o.project}.{table_name} Done")


prepare_data(o, data_sets, "_maxframe_demo", True)

注意,此处使用刚才的项目名称和ENDPOINT地址进行替换

# 创建 odps 对象
# 基于实例 RAM 角色访问 ODPS
account = CredentialProviderAccount(providers.DefaultCredentialsProvider())
o = ODPS(
    account=account,
    PROJECT_NAME = "[你的项目名]"
    ENDPOINT = "https://service.cn-hangzhou.maxcompute.aliyun.com/api"
)

image.png

使用 MaxFrame 进行数据分析

MaxFrame 依赖于 MaxCompute 的资源来执行计算,因此首先需要建立一个 MaxCompute 会话。new_session 是 MaxFrame 创建会话的入口。在创建会话之后,所有后续的计算都将默认使用该会话进行通信,MaxFrame 会在远端保存中间状态

在创建完成后,可以从 session 对象中获得 session_id logview_address,前者是该会话的唯一 id,后者是用于查看在该 session 中产生的所有计算的作业执行情况

from maxframe.session import new_session
import maxframe.dataframe as md

# 初始化 MaxFrame Session
session = new_session(o)

# 打印 Session ID
print(f"MaxFrame Session ID: {session.session_id}")

# 打印 LogView 地址
print(f"MaxFrame LogView 地址: {session.get_logview_address()}")

此时会得到session_id 和 logview_address如下:

image.png

演示的场景是使用 MaxFrame 对商品和销售数据进行分析。这些数据统一存储于两张 MaxCompute 数据表中:商品表包含商品、价格等信息;销售表包含客户、产品、销售数量、销售年份、销售价格等信息

image.png

与 Pandas 一样,在获得 dataframe 对象之后,可以使用 dtypes 来查看数据的 schema。在下例中,md.read_odps_table 会获得 maxframe.DataFrame 对象

print('--------------Product Table Schema-----------------------')
product = md.read_odps_table("product_maxframe_demo", index_col="product_id")
print(product.dtypes)

print('\n--------------Sales Table Schema-----------------------')
sales = md.read_odps_table("sales_maxframe_demo", index_col="index")
print(sales.dtypes)

image.png

在 MaxFrame 中,DataFrame 可以使用与 Pandas 兼容的接口进行计算,例如下例中的 head。但与 Pandas 不同的是,MaxFrame 中的数据并不存储在本地。MaxFrame 通过 Lazy 计算可以在本地使用各种算子,并通过 execute() 来触发计算一并提交到 MaxCompute 集群中分布式执行

MaxFrame 已经对 Notebook 进行了适配,在 execute() 之后能够高效预览 MaxCompute 中数据的首末数行。

sales.head(5).execute()

image.png

利用 MaxFrame 的功能,能够使用与 Pandas 兼容的语法进行数据分析。所有算子的设计均为 分布式执行,确保计算的高效性。 相比于单机 Pandas,MaxFrame 的 Pandas 算子运行在 MaxCompute 计算集群中,能够大规模并行处理数据,数据读取效率也明显优于单机 Pandas。

场景 1:Pandas merge接口的分布式执行

连接两张数据表,以获取 product_maxframe_demo_large 表中所有 sale_id 对应的 product_name 以及该产品的所有 year 和 price

# 默认打印 Logview 信息
import logging
logging.basicConfig(level=logging.INFO)

sales = md.read_odps_table("sales_maxframe_demo", index_col="index")
product = md.read_odps_table("product_maxframe_demo", index_col="product_id")

# 这里的df并不会立即执行,除非您使用df.execute()来触发。
# 这意味着所有的计算都将最终完全在MaxCompute集群完成,避免了中间所不必要的数据传输和阻塞。
df = sales.merge(product, left_on="product_id", right_index=True)
df = df[["product_name", "year", "price"]]

execute_result = df.execute()

image.png
image.png

df.execute() 会触发在 MaxCompute 集群上进行分布式计算,可以在 Logview 中查看当前计算的执行详情。

在内部测试中,Product 和 Sales 两张数据表的 6.8TB 数据在 MaxFrame 的 merge 算子中仅需 3 分钟即可完成计算。相比之下,在本地使用 Pandas 处理 6.8TB 的数据会超出内存限制,无法执行

image.png

场景 2:Pandas groupby、agg、sort_values 接口的分布式执行

连接 Product 和 Sales 两张数据表,聚合每个产品在 Sales 表中的首次售出年份

# 聚合每个产品的首次售出年份
min_year_df = md.read_odps_table("sales_maxframe_demo", index_col="index")
min_year_df = min_year_df.groupby('product_id', as_index=False).agg(first_year=('year', 'min'))

# 计算不同首次售出年份的产品 ID 数量
sum_product_min_year_df = min_year_df.groupby('first_year', as_index=False).agg(total_product_num=('product_id', 'count'))
exe_info = sum_product_min_year_df.execute()

image.png

image.png

在执行完上述分布式计算后,如果本地内存允许,可以对 execute() 的结果使用 fetch() 将结果拉全部取到本地使用,抓取后的数据会形成 Pandas DataFrame,因此对于 MaxFrame 尚无法支持的场景您也可以轻松的和本地的工作流结合。

当然,需要考虑取数的耗时、带宽和本地内存的限制,避免频繁的抓取和上传

local_df = exe_info.fetch().head(10)
local_df

image.png

在提取并展示了记录后,还可以使用 Matplotlib 生成图表来可视化数据。以下是一个示例代码,用于绘制每个首次售出年份的产品数量分布图

import matplotlib.pyplot as plt

plt.plot(local_df['first_year'],local_df['total_product_num'])
plt.xlabel('First_sale_year')
plt.ylabel('Total_product_num')
plt.title('Saled product distribution by first saled year')
plt.xticks(range(int(local_df['first_year'].min()), int(local_df['first_year'].max()) + 1))
plt.show()

image.png

在所有计算完成后,请确保使用 destroy() 方法来销毁会话,以释放资源

session.destroy()

基于 UDF 加载 OSS 中的 FastText 模型进行分布式语言识别实验

MaxFrame User Define Function 提供了灵活的开发能力,支持您提供原生的 Python 函数在上万机器上分布式执行,这对于海量数据需要离线推理的场景极为重要

本文以 fasttext 模型为例,将带您了解 MaxFrame 的开发流程,以及如何在 MaxFrame UDF 中访问 OSS 并加载模型进行文本语言识别

执行 pip install maxframe 命令直接安装 MaxFrame SDK,还需要额外安装 oss2 用于上传测试模型文件。

!pip install maxframe oss2 alibabacloud_credentials -U

image.png

同样,准备如下:

PROJECT_NAME = "[你的项目名]"
ENDPOINT = "https://service.cn-hangzhou.maxcompute.aliyun.com/api"

不同的是这里会用到OSS的资源,按照如下格式填写 OSS 账户信息、bucket 信息和模型的路径

# 此处仅做演示,如果是生产环境,请考虑使用 STS Token 作为临时授权
OSS_ACCESS_ID="[your-oss-access-id]" # OSS 的 Access ID
OSS_SECRET_ACCESS_KEY="[your-oss-secret-id]" # OSS 的 Secret ID

# 填写 Bucket 信息
OSS_ENDPOINT="[your-oss-internet-endpoint]" # OSS 的公网访问 endpoint,您也可以使用 sts token
OSS_BUCKET_NAME="[your-oss-bucket]" # OSS Bucket 的名字
OSS_BUCKET_ENDPOINT=f"{OSS_BUCKET_NAME}.{OSS_ENDPOINT}:80"

# 填写路径信息
OSS_MODEL_PATH="maxframe_demo/fasttext_model/lid.176.ftz" # 模型在 OSS 中的路径
MODEL_URI="https://dl.fbaipublicfiles.com/fasttext/supervised-models/lid.176.ftz" #模型下载的路径

最后运行如下脚本为您下载和上传 ,当然也可以手动在 OSS 控制台上传

import oss2
import requests

def upload_http_resource_to_oss(resource_url, auth, endpoint, bucket_name, object_path):
    # 创建Bucket实例
    bucket = oss2.Bucket(auth, endpoint, bucket_name)

    # 获取HTTP资源
    response = requests.get(resource_url)
    response.raise_for_status()
    print("下载完成")

    try:
        # 上传文件到OSS
        result = bucket.put_object(object_path, response.content)
        if result.status != 200:
            raise Exception("Upload failed with status: " + str(result.status))
        print("上传完成")
    except Exception as e:
        raise e

# 调用函数
auth = oss2.Auth(OSS_ACCESS_ID, OSS_SECRET_ACCESS_KEY)
upload_http_resource_to_oss(MODEL_URI, auth, OSS_ENDPOINT, OSS_BUCKET_NAME, OSS_MODEL_PATH)

image.png

image.png

MaxCompute 中默认不能访问外部网络,因此需要要将 bucket endpoint 配置到 maxcompute 控制台项目首页下方的 可用的外部网络地址 中,配置格式为 bucket_name.endpoint,如下图所示

image.png

准备演示数据

创建本示例所需的小规模测试数据,包含不同语言的文本

from alibabacloud_credentials import providers
from odps import ODPS
from odps.accounts import CredentialProviderAccount

# 创建 odps 对象
account = CredentialProviderAccount(providers.DefaultCredentialsProvider())
o = ODPS(
    account=account,
    project=PROJECT_NAME,
    endpoint=ENDPOINT
)
import pandas as pd
import numpy as np
from odps.df import DataFrame as ODPSDataFrame

# 测试数据
data_sets = [
    {
   
        "table_name": "test_lang_text",
        "table_schema": "index bigint, text string",
        "source_type": "records",
        "records": [
            [1, "Welcome to MaxFrame Distributed Computing Framework."],
            [2, "欢迎使用 MaxFrame 分布式计算框架。"],
            [3, "Bienvenido al marco de computación distribuida MaxFrame."],
            [4, "Bienvenue dans le cadre de calcul distribué MaxFrame."],
            [5, "Willkommen beim MaxFrame-Distrubutionsrechnerrahmen."],
            [6, "MaxFrame分散コンピューティングフレームワークへようこそ。"],
            [7, "MaxFrame 분산 컴퓨팅 프레임워크에 오신 것을 환영합니다."],
            [8, "Добро пожаловать в распределённую вычислительную структуру MaxFrame."],
            [9, "Benvenuto nel framework di calcolo distribuito MaxFrame."],
            [10, "Bem-vindo ao framework de computação distribuída MaxFrame."]
        ],
        "lifecycle": 1,
    }
]

# 创建表
def prepare_data(o: ODPS, data_sets, suffix="", drop_if_exists=False):
    for index, data in enumerate(data_sets):
        table_name = data.get("table_name")
        table_schema = data.get("table_schema")
        source_type = data.get("source_type")

        if not table_name or not table_schema or not source_type:
            raise ValueError(
                f"Dataset at index {index} is missing one or more required keys: 'table_name', 'table_schema', or 'source_type'."
            )

        lifecycle = data.get("lifecycle", 5)
        table_name += suffix

        print(f"Processing {o.project}.{table_name}...")
        if drop_if_exists:
            print(f"Deleting {o.project}.{table_name} if exists...")
            o.delete_table(table_name, if_exists=True)

        o.create_table(
            name=table_name,
            table_schema=table_schema,
            lifecycle=lifecycle,
            if_not_exists=True,
        )

        if source_type == "local_file":
            file_path = data.get("file")
            if not file_path:
                raise ValueError(
                    f"Dataset at index {index} with source_type 'local_file' is missing the 'file' key."
                )
            sep = data.get("sep", ",")
            pd_df = pd.read_csv(file_path, sep=sep)
            ODPSDataFrame(pd_df).persist(table_name, drop_table=True)

        elif source_type == "records":
            records = data.get("records")
            if not records:
                raise ValueError(
                    f"Dataset at index {index} with source_type 'records' is missing the 'records' key."
                )
            with o.get_table(table_name).open_writer() as writer:
                writer.write(records)
        else:
            raise ValueError(f"Unknown data set source_type: {source_type}")

        print(f"Processed {o.project}.{table_name} Done")

prepare_data(o, data_sets, "_maxframe_demo", True)

使用 MaxFrame 进行语言识别

MaxFrame 依赖于 MaxCompute 的资源来执行计算,因此首先需要建立一个 MaxCompute 会话。new_session 是 MaxFrame 创建会话的入口。在创建会话之后,所有后续的计算都将默认使用该会话进行通信。MaxFrame 会在远端保存中间状态,因此你可以进行交互式操作,一边开发一边验证

在创建完成后,可以从 session 对象中获得 session_id 和 logview_address,前者是该会话的唯一 id,后者是用于查看在该 session 中产生的所有计算的作业执行情况

在该例中,由于需要访问 OSS,需要在 options.sql.settings 中添加 odps.internet.access.list,值为上文中配置的外部网络白名单

import maxframe.dataframe as md
from maxframe.session import new_session

# 默认打印 Logview 信息
import logging
logging.basicConfig(level=logging.INFO)

from maxframe import options
options.sql.settings = {
   
    "odps.session.image": "common", # 使用默认镜像
    "odps.internet.access.list": OSS_BUCKET_ENDPOINT, #请替换成您的 bucket endpoint
    "odps.stage.mapper.split.size": 1 # 按 1MB 输入切割分片用于调整并发
}

# 初始化 MaxFrame Session
session = new_session(o)

# 打印 Session ID
print(f"MaxFrame Session ID: {session.session_id}")

image.png

数据探查

在上文数据准备阶段已经创建了临时表包含了如下内容。在 MaxFrame 中您可以使用 read_odps_table 来从表创建 DataFrame 对象

image.png

与 Pandas 一样,在获得 DataFrame 对象之后,可以使用 dtypes 来查看数据的 schema

print('--------------test_lang_text_maxframe_demo Table Schema-----------------------')
df = md.read_odps_table("test_lang_text_maxframe_demo", index_col="index")
print(df.dtypes)

在 MaxFrame 中,DataFrame 可以使用与 Pandas 兼容的接口进行计算,例如下例中的 head。但与 Pandas 不同的是,MaxFrame 中的数据并不存储在本地。MaxFrame 通过 Lazy 计算使你可以在本地使用各种算子,并通过 execute() 来触发计算一并提交到 MaxCompute 集群中分布式执行。

MaxFrame 已经对 Notebook 进行了适配,在 execute() 之后能够高效预览 MaxCompute 中数据的首末数行

df.head(5).execute()

image.png
image.png

image.png

利用 MaxFrame 的功能,我们能够使用与 Pandas 兼容的语法进行数据分析。所有算子的设计均为 分布式执行,确保计算的高效性。在模型调用的场景也一样,所有的输入数据会被自动分片在多台甚至上万台机器上并发运行。 相比于单机 Pandas,MaxFrame 的 Pandas 算子运行在 MaxCompute 计算集群中,能够大规模并行处理数据,数据读取效率也明显优于单机 Pandas。

对于 Fasttext 模型,我们可以使用一个 MaxFrame UDF 来加载 OSS 模型并进行推理。

在该 UDF 中,主要完成了如下工作:

  1. 使用自动化服务打包安装 oss2 和 fasttext

  2. 从 oss 下载加载模型

  3. 使用模型进行推理

  4. 把结果添加到行中作为新列返回

from maxframe.udf import with_python_requirements

# 1. 添加依赖自动打包
@with_python_requirements("oss2,fasttext,numpy==1.26.4")
def process(row, ak, sk, endpoint, bucket_name, model_path, _ctx={
   }):
    # 定义默认返回值
    row["lang"] = None
    row["score"] = None
    row["error"] = None

    try:
        def _load_model_once():
            """
            加载且只加载一次模型

            Returns:
                加载后的模型对象
            """
            if "model" in _ctx:
                return _ctx["model"]

            # 2. 下载并加载模型
            import oss2
            auth = oss2.Auth(ak, sk)
            bucket = oss2.Bucket(auth, endpoint, bucket_name)
            bucket.get_object_to_file(model_path, './model.bin')

            import fasttext
            model = fasttext.load_model('./model.bin')
            _ctx['model'] = model
            return model

        model = _load_model_once()
        # 3. 推理
        score = model.predict(row["text"])
        pred_label = score[0][0].replace('__label__', '')
        pred_score = score[1][0]

        # 4. 更新结果
        row["lang"] = pred_label
        row["score"] = float(pred_score)
    except Exception as e:
        row["error"] = str(e)
        raise e
        # raise for debugging

    return row

上述为原生 Python 函数,使用 apply 算子 可以将该函数发送到集群中完成计算。

需要注意的是,这里入参 row 为原生的 series 代表一行数据,返回的数据需要和 apply 算子中指定的 dtypes 在顺序和类型上都匹配,使用中如果遇到任何问题可以查看 apply 算子的参数说明。

# 声明返回值的类型,在该例中,将会在每行按顺序添加三个新列 lang, score, error
return_schema = df.dtypes.copy()
return_schema["lang"] = np.str_
return_schema["score"] = np.float_
return_schema["error"] = np.str_

# 使用 apply 算子在 dataframe df 上进行计算得到 result_dfs
result_df = df.apply(
    process, # 指定函数
    axis=1, # 指定按行计算
    result_type="expand", # 返回多列
    output_type="dataframe", # 返回为 dataframe
    dtypes=return_schema, # 设置返回行类型
    ak=OSS_ACCESS_ID, # 以下向 UDF 传参
    sk=OSS_SECRET_ACCESS_KEY,
    endpoint=OSS_ENDPOINT,
    bucket_name=OSS_BUCKET_NAME,
    model_path=OSS_MODEL_PATH)

# 运行并打印预览结果
execute_info = result_df.execute()

image.png
image.png

如果您想在本地查看所有计算结果,可以对 execute_info 使用 fetch() 方法将所有结果提取到本地进行查看

execute_info.fetch()

image.png

如果您接下来对数据的处理依旧发生在 MaxCompute 中,您可以对 result_df 使用 to_odps_table 将所有的结果写入指定表中

execute_info = md.to_odps_table(result_df, "test_lang_paragraph_maxframe_demo_result", overwrite=True, lifecycle=1).execute()

image.png
image.png

image.png

在所有计算完成后,请确保使用 destroy() 方法来销毁会话,以释放资源

session.destroy()

总结

这次我另辟蹊径,没有完全按照官方给出的最佳实践文档依葫芦画瓢,而是按照PAI中的实验步骤来完成的,通过这次体验确实有较为深入的理解了MaxFrame的用法,它不仅简化了从数据处理到模型训练的整个流程,还通过其强大的功能集显著提升了工作效率与创新能力

比如在企业内部,MaxFrame可以极大地促进跨部门间协作。例如,在数据分析团队与开发团队之间,常常存在信息孤岛的问题,导致沟通成本增加,项目进展缓慢。而MaxFrame凭借其高度兼容的Python接口以及对MaxCompute计算资源和服务的无缝集成,使得不同背景的技术人员能够在一个统一平台上协同工作。数据科学家可以直接使用熟悉的Pandas语法进行大规模数据操作,无需担心底层架构复杂性;与此同时,工程师们也可以轻松调用各种高级算子来构建高效的机器学习流水线。

在实际操作中确实也有碰到部分问题,但是最后通过查询资料和社区的问答都得以解决了。

问题一:读表时读到有列是JSON格式的就报错。当时我想将一个包含多种数据类型的表格上传到MaxFrame中,但是其中有一列是用于存储用户的偏好设置,所以设置的格式为JSON,上传后发现报错了。

经过一番排查和研究文档后,发现MaxFrame默认并不支持直接读取JSON格式的数据列。MaxFrame为了保证计算效率和稳定性,默认情况下只处理简单且固定结构的数据类型。对于JSON这样的非结构化数据,需要额外配置才能正确解析。

解决办法是考虑使用MaxCompute提供的内置函数如get_json_object()来提取JSON中的特定值。比如说我的JSON字段里有一个名为“preferences”的键,那么可以通过get_json_object(json_column, '$.preferences')的方式获取对应的值。这种方法适用于只需要从JSON中抽取少量关键信息的情况。但如果涉及到更复杂的JSON结构或者需要频繁访问多个层级的数据,则可能需要先将JSON列转换成表格形式再导入MaxFrame中。

问题二是使用MaxCompute时碰到的

image.png

错误原因是因为在创建会话时,_odps_entry 对象没有被正确初始化或设置,导致其为 None,此时尝试访问 endpoint 属性时,抛出了 AttributeError,进一步检查访问密钥、安全密钥以及项目名称等是否正确填写。

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
相关文章
|
1天前
|
数据采集 人工智能 分布式计算
MaxFrame:链接大数据与AI的高效分布式计算框架深度评测与实践!
阿里云推出的MaxFrame是链接大数据与AI的分布式Python计算框架,提供类似Pandas的操作接口和分布式处理能力。本文从部署、功能验证到实际场景全面评测MaxFrame,涵盖分布式Pandas操作、大语言模型数据预处理及企业级应用。结果显示,MaxFrame在处理大规模数据时性能显著提升,代码兼容性强,适合从数据清洗到训练数据生成的全链路场景...
12 5
MaxFrame:链接大数据与AI的高效分布式计算框架深度评测与实践!
|
4天前
|
人工智能 运维 Kubernetes
阿里云容器服务AI助手2.0 - 新一代容器智能运维能力
2024年11月,阿里云容器服务团队进一步深度融合现有运维可观测体系,在场景上覆盖了K8s用户的全生命周期,正式推出升级版AI助手2.0,旨在更好地为用户使用和运维K8S保驾护航。
|
7天前
|
机器学习/深度学习 存储 人工智能
2024阿里云AI交出答卷,全球领先!
2024阿里云AI交出答卷,全球领先!
53 9
2024阿里云AI交出答卷,全球领先!
|
1天前
|
人工智能 搜索推荐 Serverless
打造智能购物新体验:主动式智能导购AI助手解决方案评测
阿里云推出的《主动式智能导购AI助手构建》解决方案,基于百炼大模型和函数计算,采用Multi-Agent架构,提供个性化、智能化的购物体验。系统具备主动交互、精准推荐、自动化架构等亮点,支持快速部署和生产环境应用。评测结果显示,该方案在功能效果和架构设计上表现出色,但仍需优化文档和技术细节。欢迎参加官方评测活动... 详细评测及参与方式请参考:[链接](https://developer.aliyun.com/topic/build-an-ai-shopping-assistant?spm=a2c6h.12873639.article-detail.17.13902d93dZhiyK)。
17 1
打造智能购物新体验:主动式智能导购AI助手解决方案评测
|
8天前
|
人工智能 Serverless API
《智能导购 AI 助手构建》解决方案评测:极具吸引力的产品,亟待完善的教程文档
《智能导购 AI 助手构建》解决方案评测:极具吸引力的产品,亟待完善的教程文档
68 8
《智能导购 AI 助手构建》解决方案评测:极具吸引力的产品,亟待完善的教程文档
|
4天前
|
存储 人工智能 自然语言处理
效率翻倍!2024免费AI流程图生成工具评测
2分钟了解有哪些好用的AI流程图生成工具。
40 4
效率翻倍!2024免费AI流程图生成工具评测
|
7天前
|
人工智能 云计算
官宣!阿里云成为总台春晚云计算AI独家合作伙伴
官宣!阿里云成为总台春晚云计算AI独家合作伙伴
154 22
|
1天前
|
人工智能 搜索推荐 算法
解决方案评测|主动式智能导购AI助手构建
阿里云的主动式智能导购AI助手是电商商家提升用户体验和销量的利器。它能实时分析用户行为,提供个性化推荐,支持多渠道无缝对接,并具备语音和文本交互功能。通过注册阿里云账号、开通服务、配置项目、设置推荐策略、集成到平台并测试优化,商家可以轻松部署这一工具。关键代码示例帮助理解API对接和数据处理。建议增强个性化推荐算法、优化交互体验并增加自定义选项,以进一步提升效果。
31 11
|
6天前
|
机器学习/深度学习 人工智能 安全
阿里云先知安全沙龙(武汉站) ——AI赋能软件漏洞检测,机遇, 挑战与展望
本文介绍了漏洞检测的发展历程、现状及未来展望。2023年全球披露的漏洞数量达26447个,同比增长5.2%,其中超过7000个具有利用代码,115个已被广泛利用,涉及多个知名软件和系统。文章探讨了从人工审计到AI技术的应用,强调了数据集质量对模型性能的重要性,并展示了不同检测模型的工作原理与实现方法。此外,还讨论了对抗攻击对模型的影响及提高模型可解释性的多种方法,展望了未来通过任务大模型实现自动化漏洞检测与修复的趋势。
|
1天前
|
人工智能 运维 监控
阿里云Milvus产品发布:AI时代云原生专业向量检索引擎
随着大模型和生成式AI的兴起,非结构化数据市场迅速增长,预计2027年占比将达到86.8%。Milvus作为开源向量检索引擎,具备极速检索、云原生弹性及社区支持等优势,成为全球最受欢迎的向量数据库之一。阿里云推出的全托管Milvus产品,优化性能3-10倍,提供企业级功能如Serverless服务、分钟级开通、高可用性和成本降低30%,助力企业在电商、广告推荐、自动驾驶等场景下加速AI应用构建,显著提升业务价值和稳定性。