摘要:本文完整演示如何搭建一条从"需求输入"到"成品输出"的自动化AI内容生产流水线,涵盖Prompt模板引擎、批量任务执行、质量检测、多平台适配等模块。代码示例基于瑞思AI(Micrease)的异步API接口实现,可直接对接其他平台API使用。
一、为什么需要内容生产流水线?
当企业内容生产进入规模化阶段,单点调用AI API已经不够了。
典型场景:某电商企业大促期间,需要在3天内产出500+个商品的主图、场景图、详情页视频。
手动操作:每个商品写Prompt → 调用API → 等待结果 → 检查质量 → 调整尺寸 → 上传系统。一个商品至少30分钟,500个商品 ≈ 31个工作日。
自动化流水线:准备商品数据 → 自动生成Prompt → 批量调用API → 自动质检 → 自动适配 → 推送到业务系统。500个商品 ≈ 3-5小时。
二、流水线架构
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ 数据输入层 │ → │ Prompt引擎层 │ → │ AI生成执行层 │ → │ 后处理层 │
│ · 商品数据 │ │ · 模板选择 │ │ · 图片生成 │ │ · 质量检测 │
│ · 营销需求 │ │ · 变量替换 │ │ · 视频生成 │ │ · 尺寸适配 │
│ · 品牌规范 │ │ · 风格注入 │ │ · 批量调度 │ │ · 格式转换 │
└─────────────┘ └─────────────┘ └─────────────┘ └──────┬──────┘
↓
┌─────────────┐
│ 输出分发层 │
│ · CDN上传 │
│ · 业务系统 │
│ · 报告生成 │
└─────────────┘
三、模块一:Prompt模板引擎
3.1 模板定义
from dataclasses import dataclass
from string import Template
from typing import Dict, List, Optional
@dataclass
class PromptTemplate:
"""Prompt模板"""
template_id: str
name: str
category: str # ecommerce / brand / social / product
template_str: str # 模板字符串,变量用 ${var} 表示
variables: List[str] # 必填变量列表
optional_vars: List[str] # 可选变量列表
ratio: str = "1:1"
negative_prompt: str = ""
# 模板库示例
TEMPLATE_LIBRARY = {
"ecommerce_scene": PromptTemplate(
template_id="ecommerce_scene",
name="电商产品场景图",
category="ecommerce",
template_str=(
"${product_name}放置在${scene_description}中,"
"${lighting_style},${color_tone}色调,"
"商业产品摄影风格,高清晰度,${brand_style}"
),
variables=["product_name", "scene_description"],
optional_vars=["lighting_style", "color_tone", "brand_style"],
ratio="1:1",
negative_prompt="模糊、畸变、文字水印、低质量"
),
"brand_hero": PromptTemplate(
template_id="brand_hero",
name="品牌宣传主图",
category="brand",
template_str=(
"${brand_theme}主题,${scene_description},"
"${mood}氛围,${brand_style},"
"专业广告摄影,杂志封面质感,${color_palette}"
),
variables=["brand_theme", "scene_description"],
optional_vars=["mood", "brand_style", "color_palette"],
ratio="16:9"
),
}
3.2 渲染引擎
class PromptRenderer:
"""Prompt渲染器"""
def __init__(self, brand_model_id: Optional[str] = None):
self.brand_model_id = brand_model_id
self.brand_config = {
}
def load_brand_config(self, brand_config: dict):
self.brand_config = brand_config
def render(
self,
template: PromptTemplate,
variables: Dict[str, str],
brand_overrides: Optional[Dict[str, str]] = None
) -> dict:
merged_vars = {
}
for var in template.variables + template.optional_vars:
if var in variables:
merged_vars[var] = variables[var]
elif brand_overrides and var in brand_overrides:
merged_vars[var] = brand_overrides[var]
elif var in self.brand_config:
merged_vars[var] = self.brand_config[var]
elif var in template.optional_vars:
merged_vars[var] = ""
else:
raise ValueError(f"缺少必填变量: {var}")
rendered = Template(template.template_str).safe_substitute(merged_vars)
rendered = rendered.replace(",,", ",").replace(" ", " ").strip(", ")
return {
"prompt": rendered,
"negative_prompt": template.negative_prompt,
"ratio": template.ratio,
"model_id": self.brand_model_id,
}
四、模块二:批量任务执行器
这里以瑞思AI的异步API接口为例。其接口设计为标准的"提交→轮询"模式,与其他平台(Replicate、Fal.ai等)大同小异,替换base_url和鉴权方式即可复用。
4.1 核心执行器
import asyncio
import aiohttp
import time
from dataclasses import dataclass, field
from typing import List, Callable, Optional, Dict
@dataclass
class GenerationJob:
job_id: str
task_type: str # image_generation / fast_video
prompt: str
params: dict
priority: int = 3
status: str = "PENDING"
result_urls: List[str] = field(default_factory=list)
error: Optional[str] = None
created_at: float = field(default_factory=time.time)
completed_at: Optional[float] = None
class PipelineExecutor:
"""流水线执行器"""
def __init__(self, api_base_url: str, api_key: str, max_concurrent: int = 10):
self.api_base_url = api_base_url
self.api_key = api_key
self.semaphore = asyncio.Semaphore(max_concurrent)
async def submit_job(self, session: aiohttp.ClientSession, job: GenerationJob) -> str:
async with self.semaphore:
payload = {
"type": job.task_type, "prompt": job.prompt, **job.params}
async with session.post(
f"{self.api_base_url}/generation",
json=payload,
headers={
"Authorization": self.api_key}
) as resp:
result = await resp.json()
job.status = "SUBMITTED"
return result["data"]
async def poll_task(self, session: aiohttp.ClientSession, task_id: str) -> dict:
while True:
async with session.post(
f"{self.api_base_url}/task",
json={
"taskId": task_id},
headers={
"Authorization": self.api_key}
) as resp:
result = await resp.json()
status = result["data"]["status"]
if status == "SUCCESS":
return result["data"]
elif status == "FAILED":
raise Exception(result["data"].get("failReason", "生成失败"))
await asyncio.sleep(5)
async def execute_pipeline(self, jobs: List[GenerationJob]) -> List[GenerationJob]:
async with aiohttp.ClientSession() as session:
# Phase 1: 批量提交
task_id_map = {
}
for job in jobs:
task_id = await self.submit_job(session, job)
task_id_map[task_id] = job
print(f"已提交 {len(task_id_map)} 个任务")
# Phase 2: 并发轮询
async def poll_and_update(task_id, job):
try:
result = await self.poll_task(session, task_id)
job.status = "SUCCESS"
job.result_urls = result.get(
"imageTask", result.get("videoTask", {
})
).get("urls", [])
job.completed_at = time.time()
except Exception as e:
job.status = "FAILED"
job.error = str(e)
await asyncio.gather(*[
poll_and_update(tid, j) for tid, j in task_id_map.items()
])
success = sum(1 for j in jobs if j.status == "SUCCESS")
print(f"完成: 成功 {success}, 失败 {len(jobs) - success}")
return jobs
4.2 使用示例
async def run_ecommerce_pipeline():
# 初始化(瑞思AI API示例)
executor = PipelineExecutor(
api_base_url="https://ai.micrease.com/open/api/v1/resource/aigc",
api_key="sk-your-api-key",
max_concurrent=5
)
renderer = PromptRenderer(brand_model_id="brand_model_xxx")
renderer.load_brand_config({
"brand_style": "北欧简约风格,温暖质感",
"lighting_style": "柔和自然光",
})
products = [
{
"name": "智能台灯", "scene": "现代书房桌面", "type": "image_generation"},
{
"name": "蓝牙音箱", "scene": "客厅茶几上", "type": "image_generation"},
{
"name": "咖啡机", "scene": "开放式厨房台面", "type": "fast_video"},
]
jobs = []
for product in products:
rendered = renderer.render(
template=TEMPLATE_LIBRARY["ecommerce_scene"],
variables={
"product_name": product["name"], "scene_description": product["scene"]}
)
jobs.append(GenerationJob(
job_id=f"job_{product['name']}",
task_type=product["type"],
prompt=rendered["prompt"],
params={
"ratio": rendered["ratio"], "fast": False, "resolution": "HD"}
))
completed = await executor.execute_pipeline(jobs)
return {
"total": len(completed), "success": sum(1 for j in completed if j.status == "SUCCESS")}
report = asyncio.run(run_ecommerce_pipeline())
五、模块三:自动质量检测
from PIL import Image
import io
import numpy as np
class QualityChecker:
def __init__(self):
self.rules = []
def add_rule(self, rule_func, name: str, severity: str = "error"):
self.rules.append({
"func": rule_func, "name": name, "severity": severity})
async def check(self, image_url: str) -> dict:
image_data = await self._download_image(image_url)
image = Image.open(io.BytesIO(image_data))
results = {
"url": image_url, "passed": True, "issues": []}
for rule in self.rules:
try:
passed, message = rule["func"](image, image_data)
if not passed:
results["issues"].append({
"rule": rule["name"], "severity": rule["severity"], "message": message})
if rule["severity"] == "error":
results["passed"] = False
except Exception as e:
results["issues"].append({
"rule": rule["name"], "severity": "error", "message": str(e)})
results["passed"] = False
return results
# 质检规则
def check_resolution(image, data, min_w=800, min_h=800):
w, h = image.size
return (True, "") if w >= min_w and h >= min_h else (False, f"分辨率不足: {w}x{h}")
def check_aspect_ratio(image, data, expected="1:1"):
actual = image.size[0] / image.size[1]
expected_map = {
"1:1": 1.0, "16:9": 16/9, "9:16": 9/16, "4:3": 4/3, "3:4": 3/4}
return (True, "") if abs(actual - expected_map.get(expected, 1.0)) <= 0.05 else (False, f"宽高比不匹配")
def check_not_blank(image, data):
std = np.array(image).std()
return (True, "") if std >= 5 else (False, f"图片可能为空白")
# 注册
checker = QualityChecker()
checker.add_rule(check_resolution, "分辨率", "error")
checker.add_rule(lambda img, d: check_aspect_ratio(img, d, "1:1"), "宽高比", "error")
checker.add_rule(check_not_blank, "空白检测", "error")
六、模块四:多平台尺寸适配
class PlatformAdapter:
SPECS = {
"taobao_main": (800, 800),
"jd_main": (800, 800),
"douyin_cover": (1080, 1920),
"xiaohongshu": (1080, 1440),
"wechat_article": (900, 383),
"weibo": (1080, 1080),
}
def adapt(self, image: Image.Image, platform: str) -> Image.Image:
w, h = self.SPECS.get(platform, (800, 800))
ratio = max(w / image.width, h / image.height)
resized = image.resize((int(image.width * ratio), int(image.height * ratio)), Image.LANCZOS)
left = (resized.width - w) // 2
top = (resized.height - h) // 2
return resized.crop((left, top, left + w, top + h))
七、完整流水线串联
async def full_pipeline(product_data: list, platforms: list):
executor = PipelineExecutor(
api_base_url="https://ai.micrease.com/open/api/v1/resource/aigc",
api_key="sk-your-key",
max_concurrent=5
)
renderer = PromptRenderer()
checker = QualityChecker()
adapter = PlatformAdapter()
# Step 1: 生成Prompt
jobs = []
for p in product_data:
rendered = renderer.render(
template=TEMPLATE_LIBRARY["ecommerce_scene"],
variables={
"product_name": p["name"], "scene_description": p["scene"]}
)
jobs.append(GenerationJob(
job_id=p["sku"], task_type="image_generation",
prompt=rendered["prompt"],
params={
"ratio": "1:1", "fast": False, "resolution": "HD"}
))
# Step 2: 批量生成
completed = await executor.execute_pipeline(jobs)
# Step 3: 质检
for job in completed:
if job.status == "SUCCESS":
for url in job.result_urls:
qc = await checker.check(url)
if not qc["passed"]:
print(f"质检不通过 {job.job_id}: {qc['issues']}")
# Step 4: 多平台适配
results = []
for job in completed:
if job.status == "SUCCESS" and job.result_urls:
image = Image.open(io.BytesIO(await download(job.result_urls[0])))
for platform in platforms:
adapted = adapter.adapt(image, platform)
oss_url = await upload_to_oss(adapted, f"{job.job_id}_{platform}.jpg")
results.append({
"sku": job.job_id, "platform": platform, "url": oss_url})
return results
八、性能优化要点
并发控制:asyncio.Semaphore 控制并发数,避免触发API频率限制,建议5-10。
缓存策略:相同Prompt+参数的结果缓存24h,Redis存储,实测节省30-50%的API调用。
断点续传:持久化已完成任务ID,中断后从断点恢复。
Webhook替代轮询:高并发场景下用回调替代轮询。瑞思AI等平台支持 callback_url 配置,收到完成通知后直接处理,不用一直poll。
九、总结
本文搭建的流水线四大模块:
- Prompt模板引擎:标准化、可复用、品牌一致
- 批量执行器:异步并发、自动重试、进度可追踪
- 自动质检:规则可扩展、问题可定位
- 多平台适配:一次生成、多处使用
对接API时,核心关注:异步任务机制是否完善、是否支持Webhook回调、是否支持品牌微调模型。文中代码以瑞思AI(ai.micrease.com)为例,其REST接口设计为标准模式,替换base_url和鉴权方式即可适配其他平台。