Cdiscount 批量商品数据采集方案:API 限流处理与高效分页实现

简介: Cdiscount 作为法国头部电商平台,其开放 API 对批量采集有明确的限流规则(默认 100 次 / 分钟)和分页限制(单请求最多返回 20 个商品),直接高频请求易触发封禁,分页低效则导致采集耗时过长。本文基于 Python 实现「合规限流 + 高效分页 + 批量容错」的完整采集方案,兼顾采集效率与平台规则,适配千级 / 万级商品批量采集场景。

Cdiscount 作为法国头部电商平台,其开放 API 对批量采集有明确的限流规则(默认 100 次 / 分钟)和分页限制(单请求最多返回 20 个商品),直接高频请求易触发封禁,分页低效则导致采集耗时过长。本文基于 Python 实现「合规限流 + 高效分页 + 批量容错」的完整采集方案,兼顾采集效率与平台规则,适配千级 / 万级商品批量采集场景。

一、核心前提(合规与基础信息)

1. 授权与 API 基础

  • 已在 Cdiscount 开发者平台获取Client ID/Client Secret,并通过 OAuth 2.0 获取有效 Access Token(有效期 1 小时);
  • 批量商品 API 端点:https://api.cdiscount.com/OpenApi/json/GetProduct(支持单次传入 20 个商品 ID);
  • 类目批量查询 API(可选):https://api.cdiscount.com/OpenApi/json/GetCategoryProducts(按类目分页采集商品 ID)。

2. 核心限制规则

限制类型 规则要求 违规后果
频率限流 100 次 / 分钟(≈0.6 秒 / 次) 429 错误、临时封禁 10-30 分钟
单请求商品数 最多 20 个商品 ID 超出部分 API 直接忽略
分页深度 类目分页最多返回 1000 页(2 万商品) 超出页数无数据返回
Token 有效期 1 小时 401 错误,请求失效

二、完整实现方案(Python)

1. 环境依赖

bash

运行

pip install requests  # HTTP请求
pip install ratelimit  # 精准限流(可选,替代手动延迟)
pip install python-dotenv  # 敏感信息管理

2. 核心代码(限流 + 分页 + 批量采集)

python

运行

import requests
import json
import time
import math
from typing import List, Dict, Optional, Generator
import logging
from ratelimit import limits, sleep_and_retry
from dotenv import load_dotenv
import os
# -------------------------- 基础配置 --------------------------
# 加载.env文件(存储Client ID/Secret,避免硬编码)
load_dotenv()
CLIENT_ID = os.getenv("CDISCOUNT_CLIENT_ID")
CLIENT_SECRET = os.getenv("CDISCOUNT_CLIENT_SECRET")
# API地址
TOKEN_URL = "https://api.cdiscount.com/OpenApi/json/AccessToken"
BATCH_PRODUCT_API = "https://api.cdiscount.com/OpenApi/json/GetProduct"
CATEGORY_PRODUCT_API = "https://api.cdiscount.com/OpenApi/json/GetCategoryProducts"
# 限流与分页配置
RATE_LIMIT = 100  # 每分钟最多请求数
RATE_LIMIT_PERIOD = 60  # 限流周期(秒)
BATCH_SIZE = 20  # 单请求最大商品数
PAGE_SIZE = 20  # 类目分页单页商品数
RETRY_TIMES = 3  # 异常重试次数
TIMEOUT = 15  # 请求超时时间
# 日志配置
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    handlers=[logging.FileHandler("cdiscount_batch.log"), logging.StreamHandler()]
)
logger = logging.getLogger(__name__)
# -------------------------- 通用工具函数 --------------------------
def get_access_token() -> Optional[str]:
    """获取Token(带重试,有效期1小时)"""
    payload = {"parameters": {"Login": CLIENT_ID, "Password": CLIENT_SECRET}}
    for retry in range(RETRY_TIMES):
        try:
            response = requests.post(
                TOKEN_URL,
                data=json.dumps(payload),
                headers={"Content-Type": "application/json"},
                timeout=TIMEOUT
            )
            response.raise_for_status()
            token_data = response.json()
            if token_data.get("Success") and token_data.get("Token"):
                logger.info("Token获取成功")
                return token_data["Token"]
            else:
                logger.error(f"Token获取失败:{token_data.get('ErrorMessage', '未知错误')}")
                return None
        except Exception as e:
            logger.error(f"Token获取异常(重试{retry+1}/{RETRY_TIMES}):{str(e)}")
            if retry < RETRY_TIMES - 1:
                time.sleep(2 ** retry)
    return None
@sleep_and_retry  # 触发限流时自动休眠重试
@limits(calls=RATE_LIMIT, period=RATE_LIMIT_PERIOD)
def limited_request(url: str, payload: Dict, headers: Dict) -> Optional[Dict]:
    """带精准限流的请求函数(确保不超100次/分钟)"""
    try:
        response = requests.post(
            url,
            data=json.dumps(payload),
            headers=headers,
            timeout=TIMEOUT
        )
        response.raise_for_status()
        return response.json()
    except Exception as e:
        logger.error(f"限流请求异常:{str(e)}")
        return None
def safe_extract_field(data: Dict, field_path: str, default: any = None) -> any:
    """安全提取嵌套字段(容错字段缺失)"""
    keys = field_path.split(".")
    value = data
    for key in keys:
        if isinstance(value, dict) and key in value:
            value = value[key]
        else:
            return default
    return value
# -------------------------- 方案1:指定商品ID批量采集(已知ID列表) --------------------------
def split_product_ids(product_ids: List[str]) -> Generator[List[str], None, None]:
    """
    拆分商品ID列表为批量(每批20个)
    :param product_ids: 完整商品ID列表
    :return: 分批生成器(避免一次性加载大列表)
    """
    for i in range(0, len(product_ids), BATCH_SIZE):
        yield product_ids[i:i + BATCH_SIZE]
def batch_fetch_products_by_ids(product_ids: List[str], token: str) -> List[Dict]:
    """
    按商品ID批量采集(适配限流+分批)
    :param product_ids: 待采集商品ID列表(可上千/万条)
    :param token: 有效Token
    :return: 解析后的商品数据列表
    """
    if not product_ids:
        logger.warning("无商品ID可采集")
        return []
    
    # 拆分ID列表为批量
    batches = split_product_ids(product_ids)
    all_products = []
    headers = {"Content-Type": "application/json"}
    for batch_idx, batch_ids in enumerate(batches, 1):
        logger.info(f"开始采集第{batch_idx}批商品,共{len(batch_ids)}个ID")
        
        # 构造批量请求参数
        payload = {
            "Token": token,
            "Parameters": {
                "ProductIdList": batch_ids,
                "WithProductStock": True,
                "WithProductPrice": True,
                "WithProductAttribute": True
            }
        }
        # 带限流的请求
        response_json = limited_request(BATCH_PRODUCT_API, payload, headers)
        if not response_json or not response_json.get("Success"):
            logger.error(f"第{batch_idx}批采集失败:{response_json.get('ErrorMessage', '未知错误') if response_json else '无响应'}")
            continue
        # 解析该批商品数据
        batch_products = response_json.get("Products", [])
        for product in batch_products:
            parsed_product = {
                "商品ID": safe_extract_field(product, "Id", "未知ID"),
                "标题": safe_extract_field(product, "Name", "无标题"),
                "品牌": safe_extract_field(product, "Brand", "未知品牌"),
                "售价(€)": safe_extract_field(product, "Price", 0.0),
                "库存": safe_extract_field(product, "Stock", 0),
                "类目ID": safe_extract_field(product, "CategoryId", 0),
                "SKU数": len(safe_extract_field(product, "Skus", []))
            }
            all_products.append(parsed_product)
        
        logger.info(f"第{batch_idx}批采集完成,新增{len(batch_products)}个商品数据")
    logger.info(f"批量采集完成,共获取{len(all_products)}/{len(product_ids)}个商品数据")
    return all_products
# -------------------------- 方案2:按类目分页采集(未知ID,按类目批量获取) --------------------------
def get_category_product_total(category_id: int, token: str) -> int:
    """获取指定类目下商品总数(用于计算分页)"""
    payload = {
        "Token": token,
        "Parameters": {
            "CategoryId": category_id,
            "PageNumber": 1,
            "PageSize": 1,  # 仅需获取总数,单页1条即可
            "WithProductCount": True
        }
    }
    headers = {"Content-Type": "application/json"}
    response_json = limited_request(CATEGORY_PRODUCT_API, payload, headers)
    if not response_json or not response_json.get("Success"):
        logger.error(f"获取类目{category_id}商品总数失败")
        return 0
    return safe_extract_field(response_json, "TotalProductCount", 0)
def batch_fetch_products_by_category(category_id: int, token: str) -> List[Dict]:
    """
    按类目分页采集商品(高效分页+限流)
    :param category_id: Cdiscount类目ID(如电子产品类目ID)
    :param token: 有效Token
    :return: 类目下所有商品数据
    """
    # 第一步:获取类目商品总数,计算总页数
    total_count = get_category_product_total(category_id, token)
    if total_count == 0:
        logger.warning(f"类目{category_id}无商品数据")
        return []
    
    total_pages = math.ceil(total_count / PAGE_SIZE)
    # 限制最大页数(避免平台限制的1000页上限)
    total_pages = min(total_pages, 1000)
    logger.info(f"类目{category_id}共{total_count}个商品,需采集{total_pages}页")
    all_products = []
    headers = {"Content-Type": "application/json"}
    # 第二步:分页采集(按页请求,带限流)
    for page in range(1, total_pages + 1):
        logger.info(f"开始采集类目{category_id}第{page}/{total_pages}页")
        
        payload = {
            "Token": token,
            "Parameters": {
                "CategoryId": category_id,
                "PageNumber": page,
                "PageSize": PAGE_SIZE,
                "WithProductStock": True,
                "WithProductPrice": True
            }
        }
        response_json = limited_request(CATEGORY_PRODUCT_API, payload, headers)
        if not response_json or not response_json.get("Success"):
            logger.error(f"类目{category_id}第{page}页采集失败")
            continue
        # 解析当前页商品
        page_products = response_json.get("Products", [])
        for product in page_products:
            parsed_product = {
                "商品ID": safe_extract_field(product, "Id", "未知ID"),
                "标题": safe_extract_field(product, "Name", "无标题"),
                "售价(€)": safe_extract_field(product, "Price", 0.0),
                "库存": safe_extract_field(product, "Stock", 0),
                "类目ID": category_id
            }
            all_products.append(parsed_product)
        
        logger.info(f"类目{category_id}第{page}页采集完成,新增{len(page_products)}个商品")
        # 可选:每采集10页刷新一次Token(避免Token过期)
        if page % 10 == 0:
            new_token = get_access_token()
            if new_token:
                token = new_token
                logger.info("Token已刷新,继续采集")
    logger.info(f"类目{category_id}采集完成,共获取{len(all_products)}个商品数据")
    return all_products
# -------------------------- 数据存储(可选,CSV/Excel) --------------------------
def save_products_to_csv(products: List[Dict], filename: str = "cdiscount_batch_products.csv"):
    """将批量采集数据保存为CSV(容错字段缺失)"""
    if not products:
        logger.warning("无数据可保存")
        return
    
    import csv
    # 动态获取所有字段(避免部分商品字段缺失)
    headers = set()
    for p in products:
        headers.update(p.keys())
    headers = list(headers)
    try:
        with open(filename, "w", encoding="utf-8-sig", newline="") as f:
            writer = csv.DictWriter(f, fieldnames=headers, restval="")
            writer.writeheader()
            writer.writerows(products)
        logger.info(f"批量数据已保存到:{filename}")
    except Exception as e:
        logger.error(f"保存CSV失败:{str(e)}")
# -------------------------- 主函数(测试调用) --------------------------
if __name__ == "__main__":
    # 1. 获取Token
    token = get_access_token()
    if not token:
        logger.critical("Token获取失败,终止程序")
        exit(1)
    # 场景1:已知商品ID列表,批量采集(示例:100个商品ID)
    # 实际使用时替换为真实商品ID列表
    test_product_ids = [f"123456789{i}" for i in range(1, 101)]
    batch_products_by_ids = batch_fetch_products_by_ids(test_product_ids, token)
    save_products_to_csv(batch_products_by_ids, "cdiscount_batch_by_ids.csv")
    # 场景2:按类目分页采集(示例:电子产品类目ID,需替换为真实ID)
    # test_category_id = 1000  # 替换为真实类目ID
    # batch_products_by_category = batch_fetch_products_by_category(test_category_id, token)
    # save_products_to_csv(batch_products_by_category, "cdiscount_batch_by_category.csv")

三、核心方案拆解

1. 精准限流处理(避免触发平台封禁)

方案选型:

  • 基础方案:手动添加time.sleep(0.6)(100 次 / 分钟 = 0.6 秒 / 次),适合简单场景;
  • 进阶方案:使用ratelimit库的@limits装饰器,精准控制每分钟请求数,触发限流时自动休眠重试(推荐生产环境使用)。

关键逻辑:

python

运行

@sleep_and_retry
@limits(calls=100, period=60)  # 每分钟最多100次请求
def limited_request(url: str, payload: Dict, headers: Dict) -> Optional[Dict]:
    # 请求逻辑...
  • @sleep_and_retry:当请求次数超出限流时,自动休眠至下一个周期再重试;
  • 无需手动计算延迟,库自动适配限流规则,避免人为计算误差。

2. 高效分页实现(两种场景适配)

场景 1:已知商品 ID 列表(精准批量)

  • 分批策略:将大 ID 列表拆分为每批 20 个(平台单请求上限),通过生成器split_product_ids分批返回,避免一次性加载大列表占用内存;
  • 容错处理:单批采集失败不中断整体流程,记录失败批次便于后续补采。

场景 2:未知商品 ID(按类目分页)

  • 先算总数:先请求 1 页 1 条数据,获取类目商品总数TotalProductCount,计算总页数;
  • 分页上限控制:平台限制最多返回 1000 页,代码中自动截断,避免无效请求;
  • Token 刷新:每采集 10 页刷新一次 Token(Token 有效期 1 小时),避免采集中途 Token 过期。

3. 批量采集容错机制

  • 单批 / 单页失败:仅记录异常,继续采集下一批 / 下一页,不终止整体流程;
  • 字段缺失容错:用safe_extract_field函数提取字段,缺失时返回默认值,避免KeyError中断解析;
  • 重试机制:核心请求添加 3 次指数退避重试,应对偶发的网络波动 / 服务器错误。

四、性能优化与避坑指南

1. 性能优化点

优化方向 具体措施
减少请求次数 单请求传入 20 个商品 ID(上限),而非单个 ID 请求;按类目分页时设置PageSize=20
内存优化 使用生成器拆分商品 ID 列表,避免一次性加载万级 ID 列表到内存
异步请求 高并发场景下,替换requestsaiohttp,结合限流实现异步批量请求
数据缓存 将采集结果缓存到 Redis / 本地文件,重复采集时优先读取缓存
Token 预刷新 每采集 10 页 / 500 个商品主动刷新 Token,避免 Token 过期导致批量采集中断

2. 常见坑点与解决

坑点 表现形式 解决方案
Token 过期 批量采集中途返回 401 错误 每 N 页主动刷新 Token;捕获 401 错误后自动重新获取 Token 并继续采集
类目分页超 1000 页 超过 1000 页后无数据返回 代码中限制最大页数为 1000;按子类目拆分采集(如电子产品拆分为手机、耳机等)
商品 ID 无效 单批中部分 ID 返回空数据 采集完成后对比「请求 ID 数」与「成功解析数」,输出无效 ID 列表便于排查
法语字符编码问题 商品标题乱码 确保 JSON 解析 / CSV 保存时使用 UTF-8 编码;响应文本解码指定response.encoding='utf-8'
限流触发 429 错误 请求被临时封禁 使用ratelimit库精准限流;触发 429 后休眠 10-30 分钟再重试

五、合规与生产环境适配

1. 合规要求

  • 数据用途:仅可用于授权范围内的业务(如店铺运营、市场分析),禁止转售、泄露商品数据;
  • 请求频率:严格遵守 100 次 / 分钟的限流规则,超量可能导致账号永久封禁;
  • 隐私保护:解析日志 / 存储数据时,脱敏卖家名称、SKU 参考码等敏感信息。

2. 生产环境适配

  • 异常监控:添加邮件 / 短信告警,批量采集失败率超过 10% 时及时通知开发者;
  • 断点续采:记录已采集的批次 / 页数,程序中断后可从断点继续采集,避免重复采集;
  • 分布式采集:多账号 / 多 IP 分布式采集时,每个账号独立限流,避免单账号超限;
  • 数据校验:采集完成后校验数据完整性(如价格、库存字段非负),过滤无效数据。

六、总结

Cdiscount 批量商品采集的核心是「精准限流 + 高效分页 + 全链路容错」:

  1. 限流:用ratelimit库确保不超 100 次 / 分钟,避免封禁;
  2. 分页:已知 ID 则分批(20 个 / 批),未知 ID 则按类目分页(先算总数再分页数);
  3. 容错:单批 / 单页失败不中断,字段缺失返回默认值,Token 过期自动刷新。

该方案可稳定支撑千级 / 万级商品批量采集,兼顾效率与合规,适配店铺运营、价格监控、库存统计等合法业务场景。

相关文章
|
3月前
|
JSON API 数据安全/隐私保护
Python采集淘宝拍立淘按图搜索API接口及JSON数据返回全流程指南
通过以上流程,可实现淘宝拍立淘按图搜索的完整调用链路,并获取结构化的JSON商品数据,支撑电商比价、智能推荐等业务场景。
|
4月前
|
JSON API 数据格式
干货满满!淘宝商品详情数据,淘宝API(json数据返回)
淘宝商品详情 API 接口(如 taobao.item.get)的 JSON 数据返回示例如下
|
3月前
|
JSON 算法 API
Python采集淘宝商品评论API接口及JSON数据返回全程指南
Python采集淘宝商品评论API接口及JSON数据返回全程指南
|
6月前
|
JSON API 数据格式
Python采集京东商品评论API接口示例,json数据返回
下面是一个使用Python采集京东商品评论的完整示例,包括API请求、JSON数据解析
|
2月前
|
存储 缓存 监控
基于淘宝商品详情 API 的竞品监控系统搭建:价格 / 库存 / 促销实时追踪
淘宝商品详情 API 的竞品监控系统搭建:价格 / 库存 / 促销实时追踪
|
3月前
|
存储 数据采集 监控
基于淘宝商品详情 API 的数据分析应用:如何构建商品价格波动与库存监控系统?
构建基于淘宝商品详情API的商品价格波动与库存监控系统,需围绕数据采集、存储、分析、告警、可视化五大核心模块展开。以下是分步骤的详细方案,结合技术实现与业务逻辑,确保系统高效、稳定、可扩展。
|
2月前
|
JSON 供应链 API
1688商品详情 API 接口系列(JSON 数据返回参考)
提供的核心 B2B 电商数据接口集合,聚焦 1688 平台商品全维度信息的标准化获取,支持 JSON 格式统一返回
|
4月前
|
JSON API 数据安全/隐私保护
Python采集淘宝评论API接口及JSON数据返回全流程指南
Python采集淘宝评论API接口及JSON数据返回全流程指南
|
4月前
|
人工智能 供应链 API
淘宝API商品详情接口全解析:从基础数据到深度挖掘
淘宝API商品详情接口不仅提供基础数据,更通过深度挖掘实现从数据到洞察的跨越。开发者需结合业务场景选择合适分析方法,利用AI标签、区块链溯源等新技术,最终实现数据驱动的电商业务创新。
|
4月前
|
机器学习/深度学习 JSON API
干货,淘宝拍立淘按图搜索,淘宝API(json数据返回)
淘宝拍立淘按图搜索API接口基于深度学习与计算机视觉技术,通过解析用户上传的商品图片,在淘宝商品库中实现毫秒级相似商品匹配,并以JSON格式返回商品标题、图片链接、价格、销量、相似度评分等详细信息。