亚马逊数据采集 API 架构设计:同步 vs 异步的规模化实践

简介: 跨境电商数据平台面临同步采集吞吐瓶颈:1万ASIN同步需14小时,而异步架构通过任务解耦+并行处理,压缩至30~60分钟。响应延迟从5秒降至200ms,吞吐量可弹性扩展,积点成本不变,仅需增加轻量回调服务。适合日均百单以上规模化场景。(239字)

异步亚马逊数据采集 vs 同步 API 调用模式对比示意图.png

业务挑战

跨境电商数据平台在规模化阶段普遍面临同一架构瓶颈:同步采集的吞吐量上限与日益增长的数据需求之间的矛盾。亚马逊商品数据的采集,从单一验证查询扩展到持续性监控时,架构选型的差异会被数据量级放大成量级差异的运营影响。

核心矛盾:Amazon Scrape API 平均响应时间约 5 秒,同步串行采集 1 万个 ASIN 需要近 14 小时,远超大多数业务对数据时效性的容忍边界。异步亚马逊数据采集架构通过任务提交与结果接收的解耦,将此时间压缩至 30~60 分钟级别。


技术选型矩阵

维度 同步模式 异步模式
响应时延 ~5s/次 提交 <200ms,结果回调
吞吐量 受客户端限制(10~50 QPS) 服务端并行扩展
积点成本 JSON: 1点/次 JSON: 1点/次(持平)
部署复杂度 无额外要求 需回调接收服务
适合任务量 <100/天 ≥100/天
数据完整性保障 内联,自然保证 需幂等+补偿机制

系统架构设计

┌──────────────────────────────────────────────────────────┐
│                      调度层                               │
│   APScheduler / AWS EventBridge 定时触发                  │
└──────────────────────────┬───────────────────────────────┘
                           │
                           ▼
┌──────────────────────────────────────────────────────────┐
│                     任务提交层                             │
│   POST /api/v1/scrape/async (批量提交)                    │
│   callbackUrl → 你的回调服务端点                           │
│   写入任务注册表(Redis / PostgreSQL)                     │
└──────────────────────────┬───────────────────────────────┘
                           │ Pangolinfo 后台并行处理
                           ▼
┌──────────────────────────────────────────────────────────┐
│                     回调接收层                             │
│   FastAPI / Express 回调服务                              │
│   → taskId 幂等校验                                       │
│   → 数据清洗 → 写入 ClickHouse / PostgreSQL               │
└──────────────────────────┬───────────────────────────────┘
                           │
                           ▼
┌──────────────────────────────────────────────────────────┐
│                   监控与补偿层                              │
│   超时任务检测 → 同步 API 补偿查询                          │
│   Prometheus + Grafana 采集成功率监控                      │
└──────────────────────────────────────────────────────────┘

接口参数对比

同步接口

POST https://scrapeapi.pangolinfo.com/api/v1/scrape
Authorization: Bearer <token>

{
   
  "url": "https://www.amazon.com/dp/B0DYTF8L2W",
  "parserName": "amzProductDetail",
  "format": "json",
  "bizContext": {
   "zipcode": "10041"}
}
# 约 5 秒后返回完整结构化数据

异步接口

POST https://scrapeapi.pangolinfo.com/api/v1/scrape/async
Authorization: Bearer <token>

{
   
  "url": "https://www.amazon.com/dp/B0DYTF8L2W",
  "callbackUrl": "https://your-server.com/api/callback",
  "zipcode": "10041",
  "format": "json",
  "parserName": "amzProductDetail"
}
# 约 200ms 返回 taskId,结果通过回调推送

完整代码实现(Python + FastAPI)

"""
企业级异步亚马逊数据采集系统 - 完整实现
技术栈:Python / httpx(异步 HTTP) / FastAPI(回调服务)/ Redis(幂等控制)
"""

import asyncio
import hashlib
import httpx
import redis.asyncio as aioredis
from fastapi import FastAPI, Request, HTTPException
from pydantic import BaseModel
from typing import Optional
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# ─── 配置 ─────────────────────────────────────────────────
PANGOLIN_TOKEN = "your_api_token"
ASYNC_API_URL = "https://scrapeapi.pangolinfo.com/api/v1/scrape/async"
SYNC_API_URL  = "https://scrapeapi.pangolinfo.com/api/v1/scrape"
CALLBACK_URL  = "https://your-domain.com/api/callback"
REDIS_URL     = "redis://localhost:6379"

# ─── 任务提交 ──────────────────────────────────────────────
async def submit_async_task(asin: str, parser_name: str = "amzProductDetail") -> Optional[str]:
    """提交单个异步采集任务,返回 taskId"""
    async with httpx.AsyncClient() as client:
        try:
            resp = await client.post(
                ASYNC_API_URL,
                headers={
   "Authorization": f"Bearer {PANGOLIN_TOKEN}", "Content-Type": "application/json"},
                json={
   
                    "url": f"https://www.amazon.com/dp/{asin}",
                    "callbackUrl": CALLBACK_URL,
                    "zipcode": "10041",
                    "format": "json",
                    "parserName": parser_name
                },
                timeout=10
            )
            data = resp.json()
            if data.get("code") == 0:
                task_id = data["data"]["data"]
                logger.info(f"[SUBMITTED] ASIN={asin} TaskID={task_id}")
                return task_id
            else:
                logger.warning(f"[API ERROR] ASIN={asin}: {data}")
                return None
        except httpx.RequestError as e:
            logger.error(f"[REQUEST ERROR] ASIN={asin}: {e}")
            return None


async def bulk_submit(asins: list[str], rate_limit_delay: float = 0.1) -> dict[str, str]:
    """批量提交异步任务,带速率控制"""
    task_map = {
   }
    for i, asin in enumerate(asins):
        task_id = await submit_async_task(asin)
        if task_id:
            task_map[asin] = task_id
        if (i + 1) % 100 == 0:
            logger.info(f"Progress: {i+1}/{len(asins)}")
        await asyncio.sleep(rate_limit_delay)
    logger.info(f"Submitted {len(task_map)}/{len(asins)} tasks")
    return task_map


# ─── 回调接收服务(FastAPI)─────────────────────────────────
app = FastAPI()
redis_client: Optional[aioredis.Redis] = None

@app.on_event("startup")
async def startup():
    global redis_client
    redis_client = await aioredis.from_url(REDIS_URL)


@app.post("/api/callback")
async def receive_callback(request: Request):
    """接收 Pangolinfo 异步采集回调"""
    # 鉴权
    auth = request.headers.get("Authorization", "")
    if not auth.startswith("Bearer "):
        raise HTTPException(status_code=401, detail="Unauthorized")

    payload = await request.json()
    task_id = payload.get("taskId") or payload.get("data", {
   }).get("taskId")

    if not task_id:
        raise HTTPException(status_code=400, detail="Missing taskId")

    # 幂等校验(Redis SET NX)
    key = f"task:processed:{task_id}"
    is_new = await redis_client.set(key, "1", nx=True, ex=86400)  # 24h 过期
    if not is_new:
        logger.info(f"[DUPLICATE] TaskID={task_id}")
        return {
   "code": 0, "message": "duplicate, skipped"}

    # 提取结果数据
    results = (
        payload.get("data", {
   })
               .get("json", [{
   }])[0]
               .get("data", {
   })
               .get("results", [])
    )
    if results:
        product = results[0]
        # 生成数据指纹(用于变更检测)
        data_hash = hashlib.md5(str(product.get("price", "") + str(product.get("star", ""))).encode()).hexdigest()
        logger.info(f"[RECEIVED] TaskID={task_id} Title={product.get('title','N/A')[:40]}... Hash={data_hash}")
        # write_to_database(task_id, product, data_hash)  # 写库逻辑

    return {
   "code": 0, "message": "ok"}


# ─── 补偿机制 ──────────────────────────────────────────────
async def compensate_timeout_tasks(task_map: dict[str, str], timeout_seconds: int = 60):
    """对超时未回调的任务,通过同步 API 进行补偿查询"""
    async with httpx.AsyncClient() as client:
        for asin, task_id in task_map.items():
            # 检查是否已处理(生产环境从数据库查询 status)
            key = f"task:processed:{task_id}"
            already_done = await redis_client.exists(key)
            if already_done:
                continue

            logger.warning(f"[COMPENSATING] ASIN={asin} TaskID={task_id}")
            resp = await client.post(
                SYNC_API_URL,
                headers={
   "Authorization": f"Bearer {PANGOLIN_TOKEN}", "Content-Type": "application/json"},
                json={
   
                    "url": f"https://www.amazon.com/dp/{asin}",
                    "parserName": "amzProductDetail",
                    "site": "", "content": "",
                    "format": "json",
                    "bizContext": {
   "zipcode": "10041"}
                },
                timeout=30
            )
            data = resp.json()
            if data.get("code") == 0:
                logger.info(f"[COMPENSATED] ASIN={asin}")
                await redis_client.set(f"task:processed:{task_id}", "1", ex=86400)

成本效益分析

以日均 5000 个 ASIN 价格监控为例:

方案 完成时间 服务器 CPU 工程成本(一次性)
同步单线程 ~7 小时 0(极简)
同步多线程(20并发) ~21 分钟
异步回调 ~5~10 分钟 中(回调服务)
AMZ Data Tracker 自动调度 无(SaaS)

三种 API 模式积点消耗完全相同(JSON: 1点/次),差异仅在时效与工程复杂度。


实施建议

  1. 任务量 < 100/天:直接使用同步 API,开发成本最低,无需额外基础设施
  2. 任务量 100~1000/天:同步多线程或异步 API 均可,优先评估团队能否维护回调服务
  3. 任务量 > 1000/天:异步 API 是唯一可持续的选择,工程投入一次性,长期收益
相关文章
|
存储 缓存 文件存储
如何保证分布式文件系统的数据一致性
分布式文件系统需要向上层应用提供透明的客户端缓存,从而缓解网络延时现象,更好地支持客户端性能水平扩展,同时也降低对文件服务器的访问压力。当考虑客户端缓存的时候,由于在客户端上引入了多个本地数据副本(Replica),就相应地需要提供客户端对数据访问的全局数据一致性。
32698 79
如何保证分布式文件系统的数据一致性
|
前端开发 容器
HTML5+CSS3前端入门教程---从0开始通过一个商城实例手把手教你学习PC端和移动端页面开发第8章FlexBox布局(上)
HTML5+CSS3前端入门教程---从0开始通过一个商城实例手把手教你学习PC端和移动端页面开发第8章FlexBox布局
17750 20
|
设计模式 存储 监控
设计模式(C++版)
看懂UML类图和时序图30分钟学会UML类图设计原则单一职责原则定义:单一职责原则,所谓职责是指类变化的原因。如果一个类有多于一个的动机被改变,那么这个类就具有多于一个的职责。而单一职责原则就是指一个类或者模块应该有且只有一个改变的原因。bad case:IPhone类承担了协议管理(Dial、HangUp)、数据传送(Chat)。good case:里式替换原则定义:里氏代换原则(Liskov 
36682 19
设计模式(C++版)
|
存储 编译器 C语言
抽丝剥茧C语言(初阶 下)(下)
抽丝剥茧C语言(初阶 下)
|
机器学习/深度学习 人工智能 自然语言处理
带你简单了解Chatgpt背后的秘密:大语言模型所需要条件(数据算法算力)以及其当前阶段的缺点局限性
带你简单了解Chatgpt背后的秘密:大语言模型所需要条件(数据算法算力)以及其当前阶段的缺点局限性
24758 14
|
机器学习/深度学习 弹性计算 监控
重生之---我测阿里云U1实例(通用算力型)
阿里云产品全线降价的一力作,2023年4月阿里云推出新款通用算力型ECS云服务器Universal实例,该款服务器的真实表现如何?让我先测为敬!
36660 15
重生之---我测阿里云U1实例(通用算力型)
|
SQL 存储 弹性计算
Redis性能高30%,阿里云倚天ECS性能摸底和迁移实践
Redis在倚天ECS环境下与同规格的基于 x86 的 ECS 实例相比,Redis 部署在基于 Yitian 710 的 ECS 上可获得高达 30% 的吞吐量优势。成本方面基于倚天710的G8y实例售价比G7实例低23%,总性价比提高50%;按照相同算法,相对G8a,性价比为1.4倍左右。
|
存储 算法 Java
【分布式技术专题】「分布式技术架构」手把手教你如何开发一个属于自己的限流器RateLimiter功能服务
随着互联网的快速发展,越来越多的应用程序需要处理大量的请求。如果没有限制,这些请求可能会导致应用程序崩溃或变得不可用。因此,限流器是一种非常重要的技术,可以帮助应用程序控制请求的数量和速率,以保持稳定和可靠的运行。
29838 52

热门文章

最新文章

下一篇
开通oss服务