本文作者:阿里云数据库开发专家 陈枢华
背景与挑战
Dify 作为一款低代码 AI 应用开发平台,凭借其直观的可视化工作流编排能力,极大降低了大模型应用的开发门槛。然而,在实际企业级落地过程中,我们发现其原生能力仍存在两个关键瓶颈:
- 代码执行能力受限:Dify 内置的 Sandbox 节点虽支持基础 Python 代码执行,但无法安装自定义 Python 包,难以支撑复杂的业务逻辑、数据处理或算法调用;
- 缺乏自动化调度机制:Dify 原生架构不支持 Agent 或 Agentic Workflow 的定时触发、周期性运行与依赖编排,导致其难以融入企业级自动化运维体系。
这两个问题严重制约了 Dify 在生产环境中的深度应用——尤其当我们希望构建一个具备“感知-决策-执行-反馈”闭环能力的智能 Agent 时,仅靠 Dify 自身往往力不从心。
为突破这些限制,我们在实践中探索出一套 “Dify + DMS Notebook + DMS Airflow”三位一体的一站式增强开发架构,有效补齐了 Dify 在执行能力与调度能力上的短板:
- ✅ DMS Notebook:提供完整、可定制的 Python 运行环境,支持第三方库安装、交互式开发与复杂逻辑实现,作为 Dify Sandbox 的强力补充;
- ✅ DMS Airflow:作为统一调度引擎,实现对 Dify 工作流、Notebook 脚本或 Agent 任务的定时触发、依赖管理与可靠执行;
- ✅ DMS 平台集成:实现从开发、调试、部署到调度、监控的全链路闭环管理,显著提升 Dify 在企业场景下的工程化落地能力。
本文将以一个销售数据分析机器人的完整开发案例,详细介绍如何基于 DMS 平台,构建一个可调度、可扩展、可运维的 Agent 系统。
使用 DMS Notebook 扩展 Dify 的代码执行能力
为什么需要 Notebook?
在 Dify 中,若需调用 Pandas 进行数据清洗、使用 Prophet 做时间序列预测,或集成企业内部 SDK,Sandbox 节点往往无能为力。而 DMS Notebook 提供了完整的 Python 环境,支持:
- 自定义 pip 包安装;
- 环境变量配置(如 AK/SK、API Key);
- 异步服务开发(如 FastAPI);
- 与 VPC 内其他服务安全互通。
这使其成为 Dify 外部能力扩展的理想“执行单元”。
步骤详解:构建一个销售数据分析 API 服务
1.创建 DMS Notebook 会话
- 进入 DMS 控制台 > Notebook 会话 > 创建会话;
- 各参数的定义如下:
参数 |
说明 |
所属集群 |
选择目标集群类型。
您可在集群下拉列表单击创建集群,选择创建Spark集群。 |
会话名称 |
您可自定义会话名称。 |
镜像 |
页面会根据您选择的集群类型展示对应的镜像规格。
|
规格 |
Driver的资源规格。
|
配置 |
profile资源。 您可编辑profile的名称、资源释放时长、数据存储位置、Pypi包管理和环境变量信息。 说明 资源释放时长:当资源空闲时间超过设置的时长,则会自动释放。资源释放时长设置为0,表示资源永久不会自动释放。 |
- 选择合适的 Python 镜像版本;
- 在 配置 > 编辑设置 中:
- PyPI 包管理:按 requirements.txt 格式填入依赖(如 pandas, fastapi, uvicorn, nest-asyncio);
- 环境变量:设置 ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET、大模型 API Key 等;
- 关键配置:
- fastapi、uvicorn、nest-asyncio库是必须要安装的;
- 设置 资源释放时间 = 0(防止服务被自动释放);
- 设置环境变量 DMS_KERNEL_IDLE_TIMEOUT=0(避免 Jupyter Kernel 因空闲被 kill)。
💡 踩坑提示:若未设置 DMS_KERNEL_IDLE_TIMEOUT=0,长时间运行的 API 服务可能在空闲数分钟后被系统回收,导致后续调用失败。
- 创建完成后,在notebook会话窗口中点击启动即可
2.编写并启动 FastAPI 服务
- 点击文件夹图标,右键点击default(默认库),再点击新建Notebook文件
- 在代码块中编写相关的python代码,可以参考以下模板构建你的API服务,更多FastAPI相关的使用方法请查看官方文档https://fastapi.tiangolo.com/
import os from fastapi import FastAPI, HTTPException, Request, File, UploadFile, Path, Query, Form, Header from fastapi.staticfiles import StaticFiles from typing import Optional import nest_asyncio import asyncio import httpx import io ''' 注意,Jupyter本身就在一个asyncio事件循环中运行。我们不能在已有循环中直接运行另一个循环,但nest_asyncio这个库可以“打补丁”,允许我们这样做。 ''' nest_asyncio.apply() app = FastAPI(title="Your Service Name", description="Description of Your Service") static_dir = "static" if not os.path.exists(static_dir): os.makedirs(static_dir) app.mount("/static", StaticFiles(directory=static_dir), name="static") @app.get("/") async def root(): """ 根节点,返回服务基本信息 curl -X GET "http://127.0.0.1:8000/" """ return { "message": "Service is running", "documentation": "/docs", "note": "..." } @app.post("/process-data/{item_id}") async def process_data( request: Request, # 使用 Request 对象来接收原始请求 item_id: int = Path(..., title="物品ID", ge=1), is_premium: bool = Query(False, description="是否为高级物品"), x_token: Optional[str] = Header(None, description="自定义的认证Token") ): """ 接收 JSON 请求体、路径参数、查询参数和请求头。 curl -X POST "http://127.0.0.1:8000/process-data/101?is_premium=true" \ -H "Content-Type: application/json" \ -H "X-Token: my-secret-token" \ -d '{"name": "笔记本电脑", "price": 7999.9, "tags": ["electronics", "office"]}' """ if x_token != "my-secret-token": raise HTTPException(status_code=401, detail="X-Token 无效") try: json_body = await request.json() name = json_body.get("name") price = json_body.get("price") if not name or not price: raise HTTPException(status_code=400, detail="请求体中缺少 'name' 或 'price'") return { "message": "数据处理成功", "received_data": { "item_id": item_id, "is_premium": is_premium, "x_token": x_token, "body": json_body } } except Exception as e: raise HTTPException(status_code=500, detail=f"服务执行错误: {str(e)}") @app.post("/upload-file") async def upload_file( token: str = Form(...), file: UploadFile = File(...) ): """ 通过表单(form-data)上传文件和附带的文本信息。 curl -X POST "http://127.0.0.1:8000/upload-file" \ -F "file=@./test_upload.txt" \ -F "token=my-form-token" """ if not os.path.exists("test_upload.txt"): with open("test_upload.txt", "w") as f: f.write("This is a test file for curl upload.") try: contents = await file.read() file_location = os.path.join(static_dir, file.filename) with open(file_location, "wb") as f: f.write(contents) return { "message": "文件上传成功!", "token": token, "filename": file.filename, "file_size": len(contents), "file_url": f"/static/{file.filename}" } except Exception as e: raise HTTPException(status_code=500, detail=f"文件处理错误: {str(e)}") @app.get("/status") async def get_server_status(): """ 获取服务器状态。 curl -X GET "http://127.0.0.1:8000/status" """ return {"status": "running"} async def run_server(host="127.0.0.1", port=8000): """在后台运行uvicorn服务器""" import uvicorn config = uvicorn.Config(app, host=host, port=port, log_level="info") server = uvicorn.Server(config) await server.serve() task = asyncio.create_task(run_server(host="0.0.0.0", port=8000)) await asyncio.sleep(2) async with httpx.AsyncClient() as client: print("正在向 http://127.0.0.1:8000/status/ 发送请求...") response = await client.get("http://127.0.0.1:8000/status") if response.status_code == 200: print("服务启动成功") else: print("服务启动失败,请检查报错信息")
• 接下来我们以构建一个单日销售数据分析的API为例,代码内容如下:
import os import pandas as pd from fastapi import FastAPI, HTTPException, Request, File, UploadFile from fastapi.staticfiles import StaticFiles import nest_asyncio import asyncio import httpx import io ''' 注意,Jupyter本身就在一个asyncio事件循环中运行。我们不能在已有循环中直接运行另一个循环,但nest_asyncio这个库可以“打补丁”,允许我们这样做。 ''' nest_asyncio.apply() app = FastAPI(title="Sales Data Analysis Service", description="Provides data analysis and chart generation capabilities for Dify") static_dir = "static" if not os.path.exists(static_dir): os.makedirs(static_dir) app.mount("/static", StaticFiles(directory=static_dir), name="static") def load_sales_data_from_file(file_content: bytes): """从上传的文件内容加载销售数据""" try: csv_string = file_content.decode('utf-8') df = pd.read_csv(io.StringIO(csv_string)) required_columns = ['Date', 'Product', 'Price', 'Amount', 'Region'] if not all(col in df.columns for col in required_columns): raise ValueError(f"CSV file must contain columns: {', '.join(required_columns)}") df['Date'] = pd.to_datetime(df['Date']) df['Price'] = pd.to_numeric(df['Price'], errors='coerce') df['Amount'] = pd.to_numeric(df['Amount'], errors='coerce') df['Sales'] = df['Price'] * df['Amount'] return df except Exception as e: raise HTTPException(status_code=500, detail=f"Error processing CSV file: {str(e)}") @app.get("/") async def root(): """Root endpoint, returns service information""" return { "message": "Sales Data Analysis Service is running", "documentation": "/docs", "endpoints": [ "POST /analysis/daily_sale_analysis" ], "note": "Require a CSV file upload with columns: Date, Product, Price, Amount, Region" } @app.post("/analysis/daily_sale_analysis") async def daily_sale_analysis( file: UploadFile = File(...) ): """当日销售数据分析 - 分析上传文件中的销售数据""" try: if not file.filename.endswith('.csv'): raise HTTPException(status_code=400, detail="文件必须是CSV格式") file_content = await file.read() df = load_sales_data_from_file(file_content) df['Date'] = pd.to_datetime(df['Date']).dt.date unique_dates = sorted(df['Date'].unique()) if len(unique_dates) == 0: raise HTTPException(status_code=400, detail="数据文件中没有有效的日期数据") target_date = unique_dates[-1] if len(unique_dates) > 1 else unique_dates[0] daily_data = df[df['Date'] == target_date].copy() if daily_data.empty: raise HTTPException(status_code=400, detail=f"没有找到日期 {target_date} 的销售数据") total_sales = daily_data['Sales'].sum() total_orders = len(daily_data) total_quantity = daily_data['Amount'].sum() avg_order_value = total_sales / total_orders if total_orders > 0 else 0 product_analysis = daily_data.groupby('Product').agg({ 'Sales': 'sum', 'Amount': 'sum', 'Price': 'mean' }).round(2) top_products = product_analysis.sort_values('Sales', ascending=False).head(5) top_products_list = [] for product, row in top_products.iterrows(): top_products_list.append({ "product": product, "sales": float(row['Sales']), "quantity": int(row['Amount']), "avg_price": float(row['Price']) }) region_analysis = daily_data.groupby('Region').agg({ 'Sales': 'sum', 'Amount': 'sum' }).round(2) top_regions = region_analysis.sort_values('Sales', ascending=False) region_list = [] for region, row in top_regions.iterrows(): region_list.append({ "region": region, "sales": float(row['Sales']), "quantity": int(row['Amount']), "percentage": round(float(row['Sales']) / total_sales * 100, 2) }) daily_data['price_range'] = pd.cut(daily_data['Price'], bins=[0, 100, 500, 1000, 5000, float('inf')], labels=['0-100', '100-500', '500-1000', '1000-5000', '5000+']) price_range_analysis = daily_data.groupby('price_range').agg({ 'Sales': 'sum', 'Amount': 'sum' }).round(2) price_ranges = [] for price_range, row in price_range_analysis.iterrows(): if not pd.isna(row['Sales']) and row['Sales'] > 0: price_ranges.append({ "range": str(price_range), "sales": float(row['Sales']), "quantity": int(row['Amount']) }) insights = [] if total_sales > 100000: insights.append(f"当日销售表现优秀,总销售额达到 {total_sales:,.2f} 元") elif total_sales > 50000: insights.append(f"当日销售表现良好,总销售额为 {total_sales:,.2f} 元") else: insights.append(f"当日销售额为 {total_sales:,.2f} 元,可能需要关注销售策略") if len(top_products_list) > 0: best_product = top_products_list[0] insights.append(f"最佳销售产品是 {best_product['product']},销售额 {best_product['sales']:,.2f} 元") if len(region_list) > 0: best_region = region_list[0] insights.append(f"销售表现最佳的地区是 {best_region['region']},占总销售额的 {best_region['percentage']}%") if avg_order_value > 1000: insights.append(f"平均订单价值较高,为 {avg_order_value:,.2f} 元,显示客户购买力强") return { "analysis_date": str(target_date), "summary": { "total_sales": round(float(total_sales), 2), "total_orders": int(total_orders), "total_quantity": int(total_quantity), "average_order_value": round(float(avg_order_value), 2) }, "top_products": top_products_list, "region_analysis": region_list, "price_range_analysis": price_ranges, "insights": insights, "data_info": { "date_range": f"{unique_dates[0]} 到 {unique_dates[-1]}" if len(unique_dates) > 1 else str(unique_dates[0]), "total_records": len(daily_data), "unique_products": len(daily_data['Product'].unique()), "unique_regions": len(daily_data['Region'].unique()) } } except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=f"当日销售数据分析错误: {str(e)}") @app.get("/status") async def get_server_status(): """获取服务器状态""" try: return { "status": "running" } except Exception as e: raise HTTPException(status_code=500, detail=f"Error getting server status: {str(e)}") async def run_server(host="127.0.0.1", port=8000): """在后台运行uvicorn服务器""" import uvicorn config = uvicorn.Config(app, host=host, port=port, log_level="info") server = uvicorn.Server(config) await server.serve() task = asyncio.create_task(run_server(host="0.0.0.0", port=8000)) await asyncio.sleep(2) async with httpx.AsyncClient() as client: print("正在向 http://127.0.0.1:8000/status/ 发送请求...") response = await client.get("http://127.0.0.1:8000/status") if response.status_code == 200: print("服务启动成功") else: print("服务启动失败,请检查报错信息")
✅ 异步支持:Jupyter 内置 asyncio 事件循环,可直接使用 async/await。
- 运行相关代码块,可以在输出部分看到API服务启动成功
3.查看IP地址
- 在notebook代码块中,可以使用英文感叹号+终端命令的形式来执行命令,你也可以使用!pip install xxx来安装额外需要的python包。接下来新建一个代码块,在代码块中输入!ifconfig并点击运行查看该 Notebook 会话在VPC中的IP地址,图中172.16.0.252即为所需的IP地址,API服务地址即为:http://172.16.0.252:8080/analyze_sales
在Dify on DMS实例中访问服务
现在我们使用这份模拟销售数据文件来访问API服务:
- 在工作流中添加HTTP请求节点
- 通过http://:<端口>/xxx 访问创建的API的服务,并在BODY中传入相应的参数
- 测试运行可以看到该请求成功返回了响应的输出
- 你也可以在Notebook会话中看到相应的服务被调用
接下来以该Dify工作流为例子进行完整的服务调用
在钉钉群组中添加一个自定义的机器人,并参考https://open.dingtalk.com/document/orgapp/robot-overview 了解如何获取钉钉机器人的access_token和sign_secret
在填写完你的钉钉机器人参数之后,点击右上角的运行-》从本地上传上传示例的销售数据,并点击开始运行
钉钉群中的机器人成功发送了对该销售数据的分析报告
点击右上角的发布-》发布更新发布工作流用于后面的定时调用
使用 DMS Airflow 实现定时调度
创建DMS Airflow实例
参照以下链接在DMS中创建Airflow实例
https://help.aliyun.com/zh/dms/purchase-airflow-resources
https://help.aliyun.com/zh/dms/create-and-manage-an-airflow-instance
更多关于Airflow的操作,参照https://airflow.apache.org/docs/apache-airflow/stable/index.html
编写 DAG:每日自动触发销售分析
- 以下是示例的python代码,用于定时调用Dify工作流的API
import pendulum import requests import json from airflow.models.dag import DAG from airflow.operators.python import PythonOperator from airflow.models import Variable CSV_FILE_PATH = "/yourpath/daily_sale_data.csv" DIFY_API_URL = "https://dify-dms.aliyuncs.com/v1" # 替换成你的 Dify Workflow API URL DIFY_API_KEY = Variable.get("dify_api_key") APP_API_KEY= Variable.get("app_api_key") def call_dify_workflow_with_csv(**kwargs): """ 读取 CSV 文件内容,并将其作为文件上传调用 Dify 工作流。 """ print(f"准备从 '{CSV_FILE_PATH}' 读取文件...") try: with open(CSV_FILE_PATH, 'rb') as f: files_to_upload = { 'file': ('daily_sale_data.csv', f, 'document/csv') } headers = { 'Authorization': f'Bearer {APP_API_KEY}', 'DifyApiKey': f'{DIFY_API_KEY}', } file_upload_response=requests.post( DIFY_API_URL+'/files/upload', headers=headers, data={'user': 'airflow-user-demo'}, files=files_to_upload, ) print(file_upload_response.json()) file_id=file_upload_response.json().get('id') headers.update({'Content-Type': 'application/json'}) input_data = { 'sales_data': { "type": "document", "transfer_method": "local_file", "upload_file_id": file_id } } data = { 'inputs': input_data, 'user': 'airflow-user-demo', 'response_mode': 'blocking', } print("开始调用 Dify API...") print(f"URL: {DIFY_API_URL}") response = requests.post( DIFY_API_URL+'/workflows/run', headers=headers, json=data, ) response.raise_for_status() print(f"API 调用成功!状态码: {response.status_code}") print("--- Dify API Response ---") print(response.json()["data"]["outputs"]["answer"]) print("\n--- End of Response ---") except FileNotFoundError: print(f"错误:文件未找到于 '{CSV_FILE_PATH}'") raise except requests.exceptions.RequestException as e: print(f"API 调用失败: {e}") raise with DAG( dag_id="dify_workflow", start_date=pendulum.datetime(2023, 10, 27, tz="Asia/Shanghai"), schedule="0 8 * * *", catchup=False, tags=["dify", "api", "example"], doc_md=""" 此 DAG 每天早上8点执行,它会: 1. 从本地文件系统读取一个 CSV 文件。 2. 将该 CSV 文件作为附件,调用一个 Dify 工作流。 3. 打印出 Dify API 的响应。 """ ) as dag: run_dify_workflow = PythonOperator( task_id="call_dify", python_callable=call_dify_workflow_with_csv, )
📌 注意:通过API调用Dify工作流中想要上传本地文件,需要先通过/files/upload接口传入相应的文件获取文件id,再将文件id传入工作流中。
- 创建完成后打开Airflow实例,可以看到创建的定时任务
- 每日 8:00,系统自动调用 Dify 工作流,最终由钉钉机器人推送分析报告
总结与思考
通过 DMS Notebook + DMS Airflow 对 Dify 的能力扩展,我们成功构建了一个具备以下特性的企业级 Agent 开发范式:
能力维度 |
原生 Dify |
增强后方案 |
复杂代码执行 |
❌ 有限 |
✅ 完整 Python 环境 |
第三方库支持 |
❌ 不支持 |
✅ 自定义安装 |
定时/周期调度 |
❌ 无 |
✅ Airflow 精准调度 |
工程化运维 |
❌ 弱 |
✅ 全链路可观测、可管理 |
这套方案不仅解决了 Dify 当前的局限性,更重要的是,它保留了 Dify 的低代码优势,同时通过与成熟数据基础设施(Notebook + Airflow)的深度集成,实现了 “敏捷开发”与“工程可靠”的平衡。
🌟 核心理念:Agent 的价值不在于“全自动”,而在于“可扩展、可调度、可运维”。真正的生产级智能系统,一定是平台能力与工程实践的结合体。
了解更多
钉钉搜索:96015019923,加入Dify on DMS用户群!
Dify开通申请:https://page.aliyun.com/form/act1621739703/index.htm