1. 前言
本方案接入了最新开源的Qwen3-235B-A22B模型,在以往的MCP文章中,我们介绍了MCP的概念原理,今天这篇文章将结合实际场景,从用户角度出发,思考MCP在未来AI场景中更深度的用途和作用。去探索MCP的能做什么,以及MCP的能力范围是什么。
本文中,将使用通义千问大语言模型通过MCP的多个Servers和部署在阿里云自研GPU集群上的ComfyUI Server进行交互,实现打破AI云服务的数据孤岛的诉求。
最终能够做到通过大模型让ComfyUI服务获取本地、网络、数据库等数据源的数据,让ComfyUI生产的图像、视频传输到本地、云上存储、数据库,甚至是自动发布到微博、抖音、小红书、微信公众号等媒体社交平台,方案架构如下:
通过上面的方案架构图可以看到,通过MCP可以很便捷的接入多个MCP Server让大模型调度、协作,相比多Agent的方案,可扩展性、易用性都很高。
2. 实践方案
2.1 场景介绍
从本地存储prompt的文件中读取内容,然后将prompt内容转换成图片,再将生成的图片下载到本地的images文件夹中。最后再将这些图片配合描述生成小红书文案,发布到小红书中。
- MCP Server
- FileSystem(现有可用)
- 小红书(个人开发部署)
- ComfyUI(个人开发部署)
- 大模型
- Qwen3-235B-A22B
- prompt内容
一只小狗在草地上奔跑 一群小朋友在欢乐的玩耍 写实风格的山水画
2.2 效果展示
2.3 最终效果
笔记最终成功发布到小红书中
3. 环境准备
3.1 Python
Python 3.10版本及以上
3.2 MCP
MCP环境准备参考官方文档:地址
3.3 ComfyUI
服务可以部署在本地,也可以是云端。具体安装部署请参考官方文档:地址
4. 实践部署
4.1 通义Qwen3接入
前往百炼申请AccessKey [1]
VSCode + CLine
1. VSCode安装CLine插件
2. CLine接入Qwen3接口
- 选择“OpenAI Compatible”
- Base URL添加:链接
- Model ID: qwen3-235b-a22b
4.2 MCP Server
FileSytem
参考百炼MCP Filesystem Server[2]
Filesystem MCP Server 功能概述
- 安全的终端命令执行:允许在受控环境中运行命令而不影响系统的安全性。
- 目录导航:支持对目录结构的查询和浏览。
- 文件系统操作:包括创建、读取、更新和删除文件等基本文件操作功能。
ComfyUI MCP Server
代码
1. 本server实现代码中,使用了固定的workflow,可以按需替换成自己需要的workflow;
2. 只针对文生图场景的实现,其他场景可以按需自己添加mcp server tool;
import urllib import uuid from typing import Dict, Any, List import httpx import logging import websockets from PIL import Image import io from fastmcp import FastMCP from fastmcp.prompts import UserMessage import json from time import sleep # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger("comfy-image-mcp-server") COMFY_SERVER = "8.147.113.150:8000" CLIENT_ID = str(uuid.uuid4()) # Create a basic server instance mcp = FastMCP(name="CompyImageServer") # You can also add instructions for how to interact with the server mcp_with_instructions = FastMCP( name="HelpfulAssistant", instructions="这个服务是用来通过comfyui生成图片的." "调用generate_image_async()来异步地生成需要的图片." ) def queue_prompt(prompt: Dict[str, Any] = None) -> Dict[str, Any]: url = f"http://{COMFY_SERVER}/api/prompt" json = { "prompt": prompt, "client_id": CLIENT_ID } try: response = httpx.post(url=url, json=json, verify=False, trust_env=False, timeout=60.0) if response.status_code != 200: raise RuntimeError(f"Failed to queue prompt: {response.status_code} - {response.text}") return response.json() except httpx.RequestError as e: # logger.info(traceback.format_exc()) raise RuntimeError(f"HTTP request failed: {e}") async def download_image(prompt_id: str) -> bytes: uri = f"ws://{COMFY_SERVER}/ws?clientId={CLIENT_ID}" logger.info(f"Connecting to websocket at {uri}") async with websockets.connect(uri) as websocket: while True: try: message = await websocket.recv() if isinstance(message, str): try: data = json.loads(message) logger.info(f"Received text message: {data}") if data.get("type") == "executing": exec_data = data.get("data", {}) if exec_data.get("prompt_id") == prompt_id: node = exec_data.get("node") logger.info(f"Processing node: {node}") if node is None: logger.info("Generation complete signal received") break except: pass else: logger.info(f"Received binary message of length: {len(message)}") if len(message) > 8: # Check if we have actual image data return message[8:] # Remove binary header else: logger.warning(f"Received short binary message: {message}") except websockets.exceptions.ConnectionClosed as e: logger.error(f"WebSocket connection closed: {e}") break except Exception as e: logger.error(f"Error processing message: {e}") continue def get_image(filename, subfolder, folder_type): data = {"filename": filename, "subfolder": subfolder, "type": folder_type} url_values = urllib.parse.urlencode(data) with urllib.request.urlopen("http://{}/view?{}".format(COMFY_SERVER, url_values)) as response: return response.read() def get_history(prompt_id): with urllib.request.urlopen("http://{}/history/{}".format(COMFY_SERVER, prompt_id)) as response: return json.loads(response.read()) def get_image_and_download(prompt_id, path): output_images = [] while True: history = get_history(prompt_id)[prompt_id] if history['status']['status_str'] == 'success': for node_id in history['outputs']: node_output = history['outputs'][node_id] images_output = [] if'images' in node_output: for image in node_output['images']: image_data = get_image(image['filename'], image['subfolder'], image['type']) image_bytes = Image.open(io.BytesIO(image_data)) file_path = "" if path.endswith("/"): file_path = path + image['filename'] else: file_path = path + "/" + image['filename'] image_bytes.save(file_path) output_images.append(file_path) # images_output.append(image_data) # output_images[node_id] = images_output # output_images_names[node_id] = return output_images else: logger.info(f"promot {prompt_id} unfinished meta: {history}") sleep(1) continue @mcp.prompt() def generate_image_request(prompt: str, style: str = "动漫风格") -> UserMessage: """Generates a user message requesting image generation""" content = f"生成一个comfyui的英文prompt,要求包含下面的内容: {prompt} 并且要求生成的图片需要具有很高的质量,风格是{style}" return UserMessage(content=content) @mcp.tool() def generate_image_async(prompt: str = "a cat with yellow hat", width=512, height=512,seed=4787458) -> Dict[str, Any]: workflow = { "6": { "inputs": { "text": prompt, "clip": [ "30", 1 ] }, "class_type": "CLIPTextEncode", "_meta": { "title": "CLIP Text Encode (Positive Prompt)" } }, "8": { "inputs": { "samples": [ "31", 0 ], "vae": [ "30", 2 ] }, "class_type": "VAEDecode", "_meta": { "title": "VAE解码" } }, "9": { "inputs": { "filename_prefix": "ComfyUI", "images": [ "8", 0 ] }, "class_type": "SaveImage", "_meta": { "title": "保存图像" } }, "27": { "inputs": { "width": width, "height": height, "batch_size": 1 }, "class_type": "EmptySD3LatentImage", "_meta": { "title": "空Latent图像(SD3)" } }, "30": { "inputs": { "ckpt_name": "flux1-dev-fp8.safetensors" }, "class_type": "CheckpointLoaderSimple", "_meta": { "title": "Checkpoint加载器(简易)" } }, "31": { "inputs": { "seed": seed, "steps": 20, "cfg": 1, "sampler_name": "euler", "scheduler": "simple", "denoise": 1, "model": [ "30", 0 ], "positive": [ "35", 0 ], "negative": [ "33", 0 ], "latent_image": [ "27", 0 ] }, "class_type": "KSampler", "_meta": { "title": "K采样器" } }, "33": { "inputs": { "text": "", "clip": [ "30", 1 ] }, "class_type": "CLIPTextEncode", "_meta": { "title": "CLIP Text Encode (Negative Prompt)" } }, "35": { "inputs": { "guidance": 3.5, "conditioning": [ "6", 0 ] }, "class_type": "FluxGuidance", "_meta": { "title": "Flux引导" } } } return queue_prompt(workflow) @mcp.tool() def get_image_status_and_download_to_local(prompt_id:str, absolute_path: str = "/Users/wangrupeng/Documents/work/files/images") -> List[str]: """get image generate status and download to local when it's generating success""" images = get_image_and_download(prompt_id, absolute_path) return images if __name__ == "__main__": mcp.run(transport="sse",host="127.0.0.1", port=9000) # create_simple_note() # print(generate_image_async("a cute girl with red hat standing on the green land")) # images = download_image_to_local("03e6da53-9779-4af8-b7a9-564f90eeea36") # print(images)
启动
此处记住端口
fastmcp run server.py:mcp --transport sse --host 127.0.0.1 --port 9000
第三方媒体Server
1. 参考Github项目 social-auto-upload:链接,支持抖音、B站、小红书等社交媒体;
2. 自己实现的server功能比较简单,仅为Demo展示使用;
代码
import json import logging import httpx from fastmcp import FastMCP from time import sleep from playwright.sync_api import sync_playwright from xhs import XhsClient cookie = "a1=xxxx;" XIAOHONGSHU_SERVER = "127.0.0.1:5005"# social-auto-load服务 def sign(uri, data=None, a1="", web_session=""): for _ in range(10): try: with sync_playwright() as playwright: stealth_js_path = "/Users/wangrupeng/Documents/dev/github/stealth_min/stealth.min.js" chromium = playwright.chromium # 如果一直失败可尝试设置成 False 让其打开浏览器,适当添加 sleep 可查看浏览器状态 browser = chromium.launch(headless=True) browser_context = browser.new_context() browser_context.add_init_script(path=stealth_js_path) context_page = browser_context.new_page() context_page.goto("https://www.xiaohongshu.com") browser_context.add_cookies([ {'name': 'a1', 'value': a1, 'domain': ".xiaohongshu.com", 'path': "/"}] ) context_page.reload() # 这个地方设置完浏览器 cookie 之后,如果这儿不 sleep 一下签名获取就失败了,如果经常失败请设置长一点试试 sleep(1) encrypt_params = context_page.evaluate("([url, data]) => window._webmsxyw(url, data)", [uri, data]) return { "x-s": encrypt_params["X-s"], "x-t": str(encrypt_params["X-t"]) } except Exception as e: # 这儿有时会出现 window._webmsxyw is not a function 或未知跳转错误,因此加一个失败重试趴 logger.warning(f"failed : {e}") pass raise Exception("重试了这么多次还是无法签名成功") xhs_client = XhsClient(cookie, sign=sign) # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger("comfy-image-mcp-server") # Create a basic server instance mcp = FastMCP(name="XiaoHongShuServer") # You can also add instructions for how to interact with the server mcp_with_instructions = FastMCP( name="HelpfulAssistant", instructions="这个服务是用来发布管理和查看小红书笔记的" ) def create_simple_note(title: str, desc: str, images: []) -> str: note = xhs_client.create_image_note(title, desc, images, is_private=False) return json.dumps(note, ensure_ascii=False, indent=2) @mcp.tool() def publish_xiaohongshu_note(title: str = "", desc = "a cat", images: [] = [ "/path/to/local/demo.jpg", ]) -> str: """publish a xiaohongshu note""" try: url = f"http://{XIAOHONGSHU_SERVER}/create" json = { "title": title, "desc": desc, "images": images} response = httpx.post(url=url, json=json, verify=False, trust_env=False, timeout=60.0) return response.json() except Exception as e: return f"error {e}" if __name__ == "__main__": mcp.run(transport="sse",host="127.0.0.1", port=9001)
CLINE配置MCP Server
CLINE中右上角点击MCP Servers按钮
更新cline_mcp_settings.json配置文件
{ "mcpServers": { "filesystem": { "autoApprove": [ "read_file", "list_allowed_directories", "read_multiple_files", "create_directory", "list_directory", "directory_tree", "search_files", "get_file_info" ], "disabled": false, "timeout": 60, "command": "npx", "args": [ "-y", "@modelcontextprotocol/server-filesystem", "/Users/wangrupeng/Documents/Cline/MCP/filesystem-server" ], "transportType": "stdio" }, "comfyui": { "autoApprove": [ "generate_image_async", "get_image_status", "get_image_status_and_download_to_local" ], "disabled": false, "timeout": 60, "url": "http://127.0.0.1:9001/sse", "transportType": "sse" }, "xiaohongshu": { "disabled": false, "timeout": 60, "url": "http://127.0.0.1:9002/sse", "transportType": "sse", "autoApprove": [ "publish_xiaohongshu_note" ] } } }
5. 感悟和思考
不管是大数据还是AI,核心其实都是数据,任何只要有IO的系统(不管是生理系统,还是计算机系统),IO过程都可以抽象为下面的模型:
我们日常生活中实际上也是在各种输入和输出的场景中切换,MCP实际上是借助大模型打通了人在各种场景的上下文,打通了大模型和物理世界的交互联系。
5.1 商业化场景的思
1. 售卖MCP Server API
- 对一些比较重的复杂的操作,比如商业非开源在线的产品,供应商可以封装内部API,对客户通MCP Server的调用;
2. 订单自动化分析管理
- 电商等平台订单系统API接入MCP Server,可以做到大模型自动帮客户分析订单,制作报表或者商家商品,灵活调整售价等等;
3. 脑机、残疾人机械臂接入大模型
- 有了MCP之后,可以为残障任务定制化场景,让他们能够通过MCP + 大模型完成任务,游戏、工作等;
4. 具身智能机器人场景
- 大模型驱动的移动机器人覆盖路径规划 ;
- 大模型赋能的手术机器人能够理解复杂的手术环境和任务要求 ;
- 智能家居机器人能够理解并执行各种家庭任务;
总之只要有API接口,MCP理论上都能够接入,从这里看能限制MCP应用的只有想象力了。不过这里说的MCP能做,离做好还是有差距的,请看下面的不足分析。
5.2 不足之处
1. 大模型还不够智能
- demo制作过程中尝试过多款模型,最好的大模型依然会有失误的情况,而且一步错可能步步错;
- 精细化的任务很难处理,比如给大模型下一个通过blender建模的工作,生成的blender指令经常是错误的,这个也和大模型掌握的blender版本知识库过旧有关系;
2. MCP Server有一定开发难度
- 基于FastMCP2.0开发的代码量不大,但是中间有很多网络、依赖冲突等方面的坑,目前很多都需要自己解决,网上资料很少;
3. 安全风险
- 身份验证机制还不完善;
- 可在本地执行有风险的操作,比如Filesystem server可能会误修改本地文件;
- 经过大模型“翻译”之后,下达给MCP的指令可能是错的,甚至是有风险的;
- MCP的操作很多不是“原子性”的,比如预定机票,万一定错了可能无法退订,会有资金损失的风险;
- 本地Secret数据可能被第三方mcp server服务器收集,存在泄露风险。
5.3 当前MCP Server的开发技巧和建议
建议使用FastMCP框架:https://github.com/jlowin/fastmcp,其在MCP官方SDK的基础上做了更好的封装,开发更简洁,而且FastMCP已经被官方采纳,官网Demo就是用FastMCP实现的。
1. tool定义的函数和参数命名尽可能的明确 ;
2. 函数内部加tool功能的prompt注解和注释 ;
3. 参数尽量附带默认值,防止调用的时候漏掉参数等 ;
这样mcp list tool和选择tool的时候会对每个tool的功能有更明确的理解。
6. 未来展望
1. 让大模型更智能,通义千问3开源之后我们又往前更近了一步;
2. 应用场景上会有越来越多的MCP应用场景出现,也会有越来越多的server出现;
3. 亟待解决安全类问题,不然商业场景上会存在很大不可靠性。
参考链接:
[1]参考一
[2]参考二
来源 | 阿里云开发者公众号
作者 | 子潍