
业务挑战
跨境电商数据平台在规模化阶段普遍面临同一架构瓶颈:同步采集的吞吐量上限与日益增长的数据需求之间的矛盾。亚马逊商品数据的采集,从单一验证查询扩展到持续性监控时,架构选型的差异会被数据量级放大成量级差异的运营影响。
核心矛盾:Amazon Scrape API 平均响应时间约 5 秒,同步串行采集 1 万个 ASIN 需要近 14 小时,远超大多数业务对数据时效性的容忍边界。异步亚马逊数据采集架构通过任务提交与结果接收的解耦,将此时间压缩至 30~60 分钟级别。
技术选型矩阵
| 维度 | 同步模式 | 异步模式 |
|---|---|---|
| 响应时延 | ~5s/次 | 提交 <200ms,结果回调 |
| 吞吐量 | 受客户端限制(10~50 QPS) | 服务端并行扩展 |
| 积点成本 | JSON: 1点/次 | JSON: 1点/次(持平) |
| 部署复杂度 | 无额外要求 | 需回调接收服务 |
| 适合任务量 | <100/天 | ≥100/天 |
| 数据完整性保障 | 内联,自然保证 | 需幂等+补偿机制 |
系统架构设计
┌──────────────────────────────────────────────────────────┐
│ 调度层 │
│ APScheduler / AWS EventBridge 定时触发 │
└──────────────────────────┬───────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────┐
│ 任务提交层 │
│ POST /api/v1/scrape/async (批量提交) │
│ callbackUrl → 你的回调服务端点 │
│ 写入任务注册表(Redis / PostgreSQL) │
└──────────────────────────┬───────────────────────────────┘
│ Pangolinfo 后台并行处理
▼
┌──────────────────────────────────────────────────────────┐
│ 回调接收层 │
│ FastAPI / Express 回调服务 │
│ → taskId 幂等校验 │
│ → 数据清洗 → 写入 ClickHouse / PostgreSQL │
└──────────────────────────┬───────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────┐
│ 监控与补偿层 │
│ 超时任务检测 → 同步 API 补偿查询 │
│ Prometheus + Grafana 采集成功率监控 │
└──────────────────────────────────────────────────────────┘
接口参数对比
同步接口
POST https://scrapeapi.pangolinfo.com/api/v1/scrape
Authorization: Bearer <token>
{
"url": "https://www.amazon.com/dp/B0DYTF8L2W",
"parserName": "amzProductDetail",
"format": "json",
"bizContext": {
"zipcode": "10041"}
}
# 约 5 秒后返回完整结构化数据
异步接口
POST https://scrapeapi.pangolinfo.com/api/v1/scrape/async
Authorization: Bearer <token>
{
"url": "https://www.amazon.com/dp/B0DYTF8L2W",
"callbackUrl": "https://your-server.com/api/callback",
"zipcode": "10041",
"format": "json",
"parserName": "amzProductDetail"
}
# 约 200ms 返回 taskId,结果通过回调推送
完整代码实现(Python + FastAPI)
"""
企业级异步亚马逊数据采集系统 - 完整实现
技术栈:Python / httpx(异步 HTTP) / FastAPI(回调服务)/ Redis(幂等控制)
"""
import asyncio
import hashlib
import httpx
import redis.asyncio as aioredis
from fastapi import FastAPI, Request, HTTPException
from pydantic import BaseModel
from typing import Optional
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# ─── 配置 ─────────────────────────────────────────────────
PANGOLIN_TOKEN = "your_api_token"
ASYNC_API_URL = "https://scrapeapi.pangolinfo.com/api/v1/scrape/async"
SYNC_API_URL = "https://scrapeapi.pangolinfo.com/api/v1/scrape"
CALLBACK_URL = "https://your-domain.com/api/callback"
REDIS_URL = "redis://localhost:6379"
# ─── 任务提交 ──────────────────────────────────────────────
async def submit_async_task(asin: str, parser_name: str = "amzProductDetail") -> Optional[str]:
"""提交单个异步采集任务,返回 taskId"""
async with httpx.AsyncClient() as client:
try:
resp = await client.post(
ASYNC_API_URL,
headers={
"Authorization": f"Bearer {PANGOLIN_TOKEN}", "Content-Type": "application/json"},
json={
"url": f"https://www.amazon.com/dp/{asin}",
"callbackUrl": CALLBACK_URL,
"zipcode": "10041",
"format": "json",
"parserName": parser_name
},
timeout=10
)
data = resp.json()
if data.get("code") == 0:
task_id = data["data"]["data"]
logger.info(f"[SUBMITTED] ASIN={asin} TaskID={task_id}")
return task_id
else:
logger.warning(f"[API ERROR] ASIN={asin}: {data}")
return None
except httpx.RequestError as e:
logger.error(f"[REQUEST ERROR] ASIN={asin}: {e}")
return None
async def bulk_submit(asins: list[str], rate_limit_delay: float = 0.1) -> dict[str, str]:
"""批量提交异步任务,带速率控制"""
task_map = {
}
for i, asin in enumerate(asins):
task_id = await submit_async_task(asin)
if task_id:
task_map[asin] = task_id
if (i + 1) % 100 == 0:
logger.info(f"Progress: {i+1}/{len(asins)}")
await asyncio.sleep(rate_limit_delay)
logger.info(f"Submitted {len(task_map)}/{len(asins)} tasks")
return task_map
# ─── 回调接收服务(FastAPI)─────────────────────────────────
app = FastAPI()
redis_client: Optional[aioredis.Redis] = None
@app.on_event("startup")
async def startup():
global redis_client
redis_client = await aioredis.from_url(REDIS_URL)
@app.post("/api/callback")
async def receive_callback(request: Request):
"""接收 Pangolinfo 异步采集回调"""
# 鉴权
auth = request.headers.get("Authorization", "")
if not auth.startswith("Bearer "):
raise HTTPException(status_code=401, detail="Unauthorized")
payload = await request.json()
task_id = payload.get("taskId") or payload.get("data", {
}).get("taskId")
if not task_id:
raise HTTPException(status_code=400, detail="Missing taskId")
# 幂等校验(Redis SET NX)
key = f"task:processed:{task_id}"
is_new = await redis_client.set(key, "1", nx=True, ex=86400) # 24h 过期
if not is_new:
logger.info(f"[DUPLICATE] TaskID={task_id}")
return {
"code": 0, "message": "duplicate, skipped"}
# 提取结果数据
results = (
payload.get("data", {
})
.get("json", [{
}])[0]
.get("data", {
})
.get("results", [])
)
if results:
product = results[0]
# 生成数据指纹(用于变更检测)
data_hash = hashlib.md5(str(product.get("price", "") + str(product.get("star", ""))).encode()).hexdigest()
logger.info(f"[RECEIVED] TaskID={task_id} Title={product.get('title','N/A')[:40]}... Hash={data_hash}")
# write_to_database(task_id, product, data_hash) # 写库逻辑
return {
"code": 0, "message": "ok"}
# ─── 补偿机制 ──────────────────────────────────────────────
async def compensate_timeout_tasks(task_map: dict[str, str], timeout_seconds: int = 60):
"""对超时未回调的任务,通过同步 API 进行补偿查询"""
async with httpx.AsyncClient() as client:
for asin, task_id in task_map.items():
# 检查是否已处理(生产环境从数据库查询 status)
key = f"task:processed:{task_id}"
already_done = await redis_client.exists(key)
if already_done:
continue
logger.warning(f"[COMPENSATING] ASIN={asin} TaskID={task_id}")
resp = await client.post(
SYNC_API_URL,
headers={
"Authorization": f"Bearer {PANGOLIN_TOKEN}", "Content-Type": "application/json"},
json={
"url": f"https://www.amazon.com/dp/{asin}",
"parserName": "amzProductDetail",
"site": "", "content": "",
"format": "json",
"bizContext": {
"zipcode": "10041"}
},
timeout=30
)
data = resp.json()
if data.get("code") == 0:
logger.info(f"[COMPENSATED] ASIN={asin}")
await redis_client.set(f"task:processed:{task_id}", "1", ex=86400)
成本效益分析
以日均 5000 个 ASIN 价格监控为例:
| 方案 | 完成时间 | 服务器 CPU | 工程成本(一次性) |
|---|---|---|---|
| 同步单线程 | ~7 小时 | 低 | 0(极简) |
| 同步多线程(20并发) | ~21 分钟 | 高 | 低 |
| 异步回调 | ~5~10 分钟 | 低 | 中(回调服务) |
| AMZ Data Tracker | 自动调度 | 无 | 无(SaaS) |
三种 API 模式积点消耗完全相同(JSON: 1点/次),差异仅在时效与工程复杂度。
实施建议
- 任务量 < 100/天:直接使用同步 API,开发成本最低,无需额外基础设施
- 任务量 100~1000/天:同步多线程或异步 API 均可,优先评估团队能否维护回调服务
- 任务量 > 1000/天:异步 API 是唯一可持续的选择,工程投入一次性,长期收益