Cdiscount 作为法国头部电商平台,其开放 API 对批量采集有明确的限流规则(默认 100 次 / 分钟)和分页限制(单请求最多返回 20 个商品),直接高频请求易触发封禁,分页低效则导致采集耗时过长。本文基于 Python 实现「合规限流 + 高效分页 + 批量容错」的完整采集方案,兼顾采集效率与平台规则,适配千级 / 万级商品批量采集场景。
一、核心前提(合规与基础信息)
1. 授权与 API 基础
- 已在 Cdiscount 开发者平台获取
Client ID/Client Secret,并通过 OAuth 2.0 获取有效 Access Token(有效期 1 小时); - 批量商品 API 端点:
https://api.cdiscount.com/OpenApi/json/GetProduct(支持单次传入 20 个商品 ID); - 类目批量查询 API(可选):
https://api.cdiscount.com/OpenApi/json/GetCategoryProducts(按类目分页采集商品 ID)。
2. 核心限制规则
| 限制类型 | 规则要求 | 违规后果 |
| 频率限流 | 100 次 / 分钟(≈0.6 秒 / 次) | 429 错误、临时封禁 10-30 分钟 |
| 单请求商品数 | 最多 20 个商品 ID | 超出部分 API 直接忽略 |
| 分页深度 | 类目分页最多返回 1000 页(2 万商品) | 超出页数无数据返回 |
| Token 有效期 | 1 小时 | 401 错误,请求失效 |
二、完整实现方案(Python)
1. 环境依赖
bash
运行
pip install requests # HTTP请求 pip install ratelimit # 精准限流(可选,替代手动延迟) pip install python-dotenv # 敏感信息管理
2. 核心代码(限流 + 分页 + 批量采集)
python
运行
import requests import json import time import math from typing import List, Dict, Optional, Generator import logging from ratelimit import limits, sleep_and_retry from dotenv import load_dotenv import os # -------------------------- 基础配置 -------------------------- # 加载.env文件(存储Client ID/Secret,避免硬编码) load_dotenv() CLIENT_ID = os.getenv("CDISCOUNT_CLIENT_ID") CLIENT_SECRET = os.getenv("CDISCOUNT_CLIENT_SECRET") # API地址 TOKEN_URL = "https://api.cdiscount.com/OpenApi/json/AccessToken" BATCH_PRODUCT_API = "https://api.cdiscount.com/OpenApi/json/GetProduct" CATEGORY_PRODUCT_API = "https://api.cdiscount.com/OpenApi/json/GetCategoryProducts" # 限流与分页配置 RATE_LIMIT = 100 # 每分钟最多请求数 RATE_LIMIT_PERIOD = 60 # 限流周期(秒) BATCH_SIZE = 20 # 单请求最大商品数 PAGE_SIZE = 20 # 类目分页单页商品数 RETRY_TIMES = 3 # 异常重试次数 TIMEOUT = 15 # 请求超时时间 # 日志配置 logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", handlers=[logging.FileHandler("cdiscount_batch.log"), logging.StreamHandler()] ) logger = logging.getLogger(__name__) # -------------------------- 通用工具函数 -------------------------- def get_access_token() -> Optional[str]: """获取Token(带重试,有效期1小时)""" payload = {"parameters": {"Login": CLIENT_ID, "Password": CLIENT_SECRET}} for retry in range(RETRY_TIMES): try: response = requests.post( TOKEN_URL, data=json.dumps(payload), headers={"Content-Type": "application/json"}, timeout=TIMEOUT ) response.raise_for_status() token_data = response.json() if token_data.get("Success") and token_data.get("Token"): logger.info("Token获取成功") return token_data["Token"] else: logger.error(f"Token获取失败:{token_data.get('ErrorMessage', '未知错误')}") return None except Exception as e: logger.error(f"Token获取异常(重试{retry+1}/{RETRY_TIMES}):{str(e)}") if retry < RETRY_TIMES - 1: time.sleep(2 ** retry) return None @sleep_and_retry # 触发限流时自动休眠重试 @limits(calls=RATE_LIMIT, period=RATE_LIMIT_PERIOD) def limited_request(url: str, payload: Dict, headers: Dict) -> Optional[Dict]: """带精准限流的请求函数(确保不超100次/分钟)""" try: response = requests.post( url, data=json.dumps(payload), headers=headers, timeout=TIMEOUT ) response.raise_for_status() return response.json() except Exception as e: logger.error(f"限流请求异常:{str(e)}") return None def safe_extract_field(data: Dict, field_path: str, default: any = None) -> any: """安全提取嵌套字段(容错字段缺失)""" keys = field_path.split(".") value = data for key in keys: if isinstance(value, dict) and key in value: value = value[key] else: return default return value # -------------------------- 方案1:指定商品ID批量采集(已知ID列表) -------------------------- def split_product_ids(product_ids: List[str]) -> Generator[List[str], None, None]: """ 拆分商品ID列表为批量(每批20个) :param product_ids: 完整商品ID列表 :return: 分批生成器(避免一次性加载大列表) """ for i in range(0, len(product_ids), BATCH_SIZE): yield product_ids[i:i + BATCH_SIZE] def batch_fetch_products_by_ids(product_ids: List[str], token: str) -> List[Dict]: """ 按商品ID批量采集(适配限流+分批) :param product_ids: 待采集商品ID列表(可上千/万条) :param token: 有效Token :return: 解析后的商品数据列表 """ if not product_ids: logger.warning("无商品ID可采集") return [] # 拆分ID列表为批量 batches = split_product_ids(product_ids) all_products = [] headers = {"Content-Type": "application/json"} for batch_idx, batch_ids in enumerate(batches, 1): logger.info(f"开始采集第{batch_idx}批商品,共{len(batch_ids)}个ID") # 构造批量请求参数 payload = { "Token": token, "Parameters": { "ProductIdList": batch_ids, "WithProductStock": True, "WithProductPrice": True, "WithProductAttribute": True } } # 带限流的请求 response_json = limited_request(BATCH_PRODUCT_API, payload, headers) if not response_json or not response_json.get("Success"): logger.error(f"第{batch_idx}批采集失败:{response_json.get('ErrorMessage', '未知错误') if response_json else '无响应'}") continue # 解析该批商品数据 batch_products = response_json.get("Products", []) for product in batch_products: parsed_product = { "商品ID": safe_extract_field(product, "Id", "未知ID"), "标题": safe_extract_field(product, "Name", "无标题"), "品牌": safe_extract_field(product, "Brand", "未知品牌"), "售价(€)": safe_extract_field(product, "Price", 0.0), "库存": safe_extract_field(product, "Stock", 0), "类目ID": safe_extract_field(product, "CategoryId", 0), "SKU数": len(safe_extract_field(product, "Skus", [])) } all_products.append(parsed_product) logger.info(f"第{batch_idx}批采集完成,新增{len(batch_products)}个商品数据") logger.info(f"批量采集完成,共获取{len(all_products)}/{len(product_ids)}个商品数据") return all_products # -------------------------- 方案2:按类目分页采集(未知ID,按类目批量获取) -------------------------- def get_category_product_total(category_id: int, token: str) -> int: """获取指定类目下商品总数(用于计算分页)""" payload = { "Token": token, "Parameters": { "CategoryId": category_id, "PageNumber": 1, "PageSize": 1, # 仅需获取总数,单页1条即可 "WithProductCount": True } } headers = {"Content-Type": "application/json"} response_json = limited_request(CATEGORY_PRODUCT_API, payload, headers) if not response_json or not response_json.get("Success"): logger.error(f"获取类目{category_id}商品总数失败") return 0 return safe_extract_field(response_json, "TotalProductCount", 0) def batch_fetch_products_by_category(category_id: int, token: str) -> List[Dict]: """ 按类目分页采集商品(高效分页+限流) :param category_id: Cdiscount类目ID(如电子产品类目ID) :param token: 有效Token :return: 类目下所有商品数据 """ # 第一步:获取类目商品总数,计算总页数 total_count = get_category_product_total(category_id, token) if total_count == 0: logger.warning(f"类目{category_id}无商品数据") return [] total_pages = math.ceil(total_count / PAGE_SIZE) # 限制最大页数(避免平台限制的1000页上限) total_pages = min(total_pages, 1000) logger.info(f"类目{category_id}共{total_count}个商品,需采集{total_pages}页") all_products = [] headers = {"Content-Type": "application/json"} # 第二步:分页采集(按页请求,带限流) for page in range(1, total_pages + 1): logger.info(f"开始采集类目{category_id}第{page}/{total_pages}页") payload = { "Token": token, "Parameters": { "CategoryId": category_id, "PageNumber": page, "PageSize": PAGE_SIZE, "WithProductStock": True, "WithProductPrice": True } } response_json = limited_request(CATEGORY_PRODUCT_API, payload, headers) if not response_json or not response_json.get("Success"): logger.error(f"类目{category_id}第{page}页采集失败") continue # 解析当前页商品 page_products = response_json.get("Products", []) for product in page_products: parsed_product = { "商品ID": safe_extract_field(product, "Id", "未知ID"), "标题": safe_extract_field(product, "Name", "无标题"), "售价(€)": safe_extract_field(product, "Price", 0.0), "库存": safe_extract_field(product, "Stock", 0), "类目ID": category_id } all_products.append(parsed_product) logger.info(f"类目{category_id}第{page}页采集完成,新增{len(page_products)}个商品") # 可选:每采集10页刷新一次Token(避免Token过期) if page % 10 == 0: new_token = get_access_token() if new_token: token = new_token logger.info("Token已刷新,继续采集") logger.info(f"类目{category_id}采集完成,共获取{len(all_products)}个商品数据") return all_products # -------------------------- 数据存储(可选,CSV/Excel) -------------------------- def save_products_to_csv(products: List[Dict], filename: str = "cdiscount_batch_products.csv"): """将批量采集数据保存为CSV(容错字段缺失)""" if not products: logger.warning("无数据可保存") return import csv # 动态获取所有字段(避免部分商品字段缺失) headers = set() for p in products: headers.update(p.keys()) headers = list(headers) try: with open(filename, "w", encoding="utf-8-sig", newline="") as f: writer = csv.DictWriter(f, fieldnames=headers, restval="") writer.writeheader() writer.writerows(products) logger.info(f"批量数据已保存到:{filename}") except Exception as e: logger.error(f"保存CSV失败:{str(e)}") # -------------------------- 主函数(测试调用) -------------------------- if __name__ == "__main__": # 1. 获取Token token = get_access_token() if not token: logger.critical("Token获取失败,终止程序") exit(1) # 场景1:已知商品ID列表,批量采集(示例:100个商品ID) # 实际使用时替换为真实商品ID列表 test_product_ids = [f"123456789{i}" for i in range(1, 101)] batch_products_by_ids = batch_fetch_products_by_ids(test_product_ids, token) save_products_to_csv(batch_products_by_ids, "cdiscount_batch_by_ids.csv") # 场景2:按类目分页采集(示例:电子产品类目ID,需替换为真实ID) # test_category_id = 1000 # 替换为真实类目ID # batch_products_by_category = batch_fetch_products_by_category(test_category_id, token) # save_products_to_csv(batch_products_by_category, "cdiscount_batch_by_category.csv")
三、核心方案拆解
1. 精准限流处理(避免触发平台封禁)
方案选型:
- 基础方案:手动添加
time.sleep(0.6)(100 次 / 分钟 = 0.6 秒 / 次),适合简单场景; - 进阶方案:使用
ratelimit库的@limits装饰器,精准控制每分钟请求数,触发限流时自动休眠重试(推荐生产环境使用)。
关键逻辑:
python
运行
@sleep_and_retry @limits(calls=100, period=60) # 每分钟最多100次请求 def limited_request(url: str, payload: Dict, headers: Dict) -> Optional[Dict]: # 请求逻辑...
@sleep_and_retry:当请求次数超出限流时,自动休眠至下一个周期再重试;- 无需手动计算延迟,库自动适配限流规则,避免人为计算误差。
2. 高效分页实现(两种场景适配)
场景 1:已知商品 ID 列表(精准批量)
- 分批策略:将大 ID 列表拆分为每批 20 个(平台单请求上限),通过生成器
split_product_ids分批返回,避免一次性加载大列表占用内存; - 容错处理:单批采集失败不中断整体流程,记录失败批次便于后续补采。
场景 2:未知商品 ID(按类目分页)
- 先算总数:先请求 1 页 1 条数据,获取类目商品总数
TotalProductCount,计算总页数; - 分页上限控制:平台限制最多返回 1000 页,代码中自动截断,避免无效请求;
- Token 刷新:每采集 10 页刷新一次 Token(Token 有效期 1 小时),避免采集中途 Token 过期。
3. 批量采集容错机制
- 单批 / 单页失败:仅记录异常,继续采集下一批 / 下一页,不终止整体流程;
- 字段缺失容错:用
safe_extract_field函数提取字段,缺失时返回默认值,避免KeyError中断解析; - 重试机制:核心请求添加 3 次指数退避重试,应对偶发的网络波动 / 服务器错误。
四、性能优化与避坑指南
1. 性能优化点
| 优化方向 | 具体措施 |
| 减少请求次数 | 单请求传入 20 个商品 ID(上限),而非单个 ID 请求;按类目分页时设置PageSize=20 |
| 内存优化 | 使用生成器拆分商品 ID 列表,避免一次性加载万级 ID 列表到内存 |
| 异步请求 | 高并发场景下,替换requests为aiohttp,结合限流实现异步批量请求 |
| 数据缓存 | 将采集结果缓存到 Redis / 本地文件,重复采集时优先读取缓存 |
| Token 预刷新 | 每采集 10 页 / 500 个商品主动刷新 Token,避免 Token 过期导致批量采集中断 |
2. 常见坑点与解决
| 坑点 | 表现形式 | 解决方案 |
| Token 过期 | 批量采集中途返回 401 错误 | 每 N 页主动刷新 Token;捕获 401 错误后自动重新获取 Token 并继续采集 |
| 类目分页超 1000 页 | 超过 1000 页后无数据返回 | 代码中限制最大页数为 1000;按子类目拆分采集(如电子产品拆分为手机、耳机等) |
| 商品 ID 无效 | 单批中部分 ID 返回空数据 | 采集完成后对比「请求 ID 数」与「成功解析数」,输出无效 ID 列表便于排查 |
| 法语字符编码问题 | 商品标题乱码 | 确保 JSON 解析 / CSV 保存时使用 UTF-8 编码;响应文本解码指定response.encoding='utf-8' |
| 限流触发 429 错误 | 请求被临时封禁 | 使用ratelimit库精准限流;触发 429 后休眠 10-30 分钟再重试 |
五、合规与生产环境适配
1. 合规要求
- 数据用途:仅可用于授权范围内的业务(如店铺运营、市场分析),禁止转售、泄露商品数据;
- 请求频率:严格遵守 100 次 / 分钟的限流规则,超量可能导致账号永久封禁;
- 隐私保护:解析日志 / 存储数据时,脱敏卖家名称、SKU 参考码等敏感信息。
2. 生产环境适配
- 异常监控:添加邮件 / 短信告警,批量采集失败率超过 10% 时及时通知开发者;
- 断点续采:记录已采集的批次 / 页数,程序中断后可从断点继续采集,避免重复采集;
- 分布式采集:多账号 / 多 IP 分布式采集时,每个账号独立限流,避免单账号超限;
- 数据校验:采集完成后校验数据完整性(如价格、库存字段非负),过滤无效数据。
六、总结
Cdiscount 批量商品采集的核心是「精准限流 + 高效分页 + 全链路容错」:
- 限流:用
ratelimit库确保不超 100 次 / 分钟,避免封禁; - 分页:已知 ID 则分批(20 个 / 批),未知 ID 则按类目分页(先算总数再分页数);
- 容错:单批 / 单页失败不中断,字段缺失返回默认值,Token 过期自动刷新。
该方案可稳定支撑千级 / 万级商品批量采集,兼顾效率与合规,适配店铺运营、价格监控、库存统计等合法业务场景。