电商 API 数据抓取是构建价格监控、供应链管理、竞品分析等系统的基础。本文将通过完整案例,演示如何从商品信息采集到订单管理的全链路实现,包含防封禁策略、数据解析与持久化方案。
一、项目架构设计
系统整体架构
plaintext
┌───────────────────────────────────────────────────┐
│ 电商API数据抓取系统 │
└───────────────────────────┬───────────────────────┘
│
┌───────────────┼───────────────┐
▼ ▼ ▼
┌────────────────┐ ┌────────────────┐ ┌────────────────┐
│ 商品信息采集 │ │ 订单数据同步 │ │ 用户信息管理 │
└────────────────┘ └────────────────┘ └────────────────┘
│ │ │
▼ ▼ ▼
┌────────────────┐ ┌────────────────┐ ┌────────────────┐
│ 数据解析与清洗 │ │ 数据解析与清洗 │ │ 数据解析与清洗 │
└────────────────┘ └────────────────┘ └────────────────┘
│ │ │
▼ ▼ ▼
┌────────────────┐ ┌────────────────┐ ┌────────────────┐
│ 数据存储(MongoDB) │ 数据存储(MongoDB) │ 数据存储(MongoDB) │
└────────────────┘ └────────────────┘ └────────────────┘
│ │ │
└───────────────┼───────────────┘
│
┌───────────────┐
│ 数据分析与应用 │
└───────────────┘
技术栈选择
编程语言:Python 3.8+
数据存储:MongoDB(灵活存储 JSON 数据)
异步框架:aiohttp(高并发 API 请求)
任务调度:APScheduler(定时同步数据)
数据可视化:Plotly Dash(可选)
二、商品信息采集模块实现
- 商品 API 客户端
python
运行
import aiohttp
import asyncio
import hashlib
import time
import json
import logging
from functools import wraps
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(name)
class RetryException(Exception):
"""重试异常"""
pass
class APIClient:
def init(self, app_key, app_secret, base_url, max_retries=3, timeout=30):
"""初始化API客户端"""
self.app_key = app_key
self.app_secret = app_secret
self.base_url = base_url
self.max_retries = max_retries
self.timeout = timeout
self.session = None
async def init_session(self):
"""初始化aiohttp会话"""
self.session = aiohttp.ClientSession()
async def close_session(self):
"""关闭aiohttp会话"""
if self.session:
await self.session.close()
def retry(func):
"""重试装饰器"""
@wraps(func)
async def wrapper(self, *args, **kwargs):
retries = 0
while retries < self.max_retries:
try:
return await func(self, *args, **kwargs)
except RetryException as e:
logger.warning(f"请求重试 ({retries+1}/{self.max_retries}): {str(e)}")
await asyncio.sleep(2 ** retries) # 指数退避
retries += 1
logger.error(f"请求失败,已达到最大重试次数 ({self.max_retries})")
return None
return wrapper
def generate_sign(self, params):
"""生成API签名(示例为京东风格,实际需根据平台调整)"""
sorted_params = sorted(params.items(), key=lambda x: x[0])
sign_str = self.app_secret
for k, v in sorted_params:
sign_str += f"{k}{v}"
sign_str += self.app_secret
return hashlib.md5(sign_str.encode('utf-8')).hexdigest().upper()
@retry
async def call(self, method, params=None):
"""调用API"""
if params is None:
params = {}
# 添加公共参数
common_params = {
"app_key": self.app_key,
"method": method,
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
"format": "json",
"v": "2.0",
"sign_method": "md5"
}
# 合并参数
all_params = {**common_params, **params}
# 生成签名
sign = self.generate_sign(all_params)
all_params["sign"] = sign
# 发送请求
try:
async with self.session.post(
self.base_url,
data=all_params,
timeout=self.timeout
) as response:
if response.status != 200:
raise RetryException(f"HTTP错误 {response.status}")
result = await response.json()
# 检查业务错误(根据平台API调整)
if "error_response" in result:
error_code = result["error_response"].get("code")
error_msg = result["error_response"].get("msg")
# 特定错误码需要重试
if error_code in ["429", "500"]:
raise RetryException(f"API错误 {error_code}: {error_msg}")
logger.error(f"API错误 {error_code}: {error_msg}")
return None
return result
except Exception as e:
raise RetryException(f"请求异常: {str(e)}")
AI 代码解读
商品信息采集器
python
运行
class ProductCollector:
def init(self, api_client, db_client, category_batch_size=10, product_batch_size=50):"""初始化商品采集器""" self.api_client = api_client self.db_client = db_client self.category_batch_size = category_batch_size self.product_batch_size = product_batch_size self.collection_categories = db_client["categories"] self.collection_products = db_client["products"]
AI 代码解读async def fetch_categories(self):
"""获取商品分类""" logger.info("开始获取商品分类...") method = "jingdong.category.read.findAttrsByCategoryId" # 示例接口 # 获取一级分类 params = {"parentId": 0} result = await self.api_client.call(method, params) if not result: logger.error("获取分类失败") return categories = result.get("jingdong_category_read_findAttrsByCategoryId_response", {}).get("result", []) # 存储分类 for category in categories: category["_id"] = category["id"] await self.collection_categories.update_one( {"_id": category["_id"]}, {"$set": category}, upsert=True ) logger.info(f"成功获取 {len(categories)} 个一级分类") return categories
AI 代码解读async def fetch_products_by_category(self, category_id, page=1, page_size=100):
"""根据分类获取商品""" method = "jingdong.search.v3.product.search" # 示例接口 params = { "categoryId": category_id, "page": page, "pageSize": page_size, "sortType": 1 # 按上架时间排序 } result = await self.api_client.call(method, params) if not result: logger.error(f"获取分类 {category_id} 下的商品失败") return [] products = result.get("jingdong_search_v3_product_search_response", {}).get("result", {}).get("productInfoList", []) # 存储商品 for product in products: product["_id"] = product["skuId"] product["categoryId"] = category_id product["update_time"] = time.time() await self.collection_products.update_one( {"_id": product["_id"]}, {"$set": product}, upsert=True ) logger.info(f"成功获取分类 {category_id} 下的 {len(products)} 个商品") return products
AI 代码解读async def fetch_all_products(self, categories=None):
"""获取所有分类下的商品""" if not categories: categories = await self.fetch_categories() # 并发获取每个分类下的商品 tasks = [] for category in categories[:self.category_batch_size]: # 限制并发分类数量 category_id = category["id"] tasks.append(self.fetch_products_by_category(category_id)) await asyncio.gather(*tasks) logger.info("商品采集完成")
AI 代码解读三、订单管理模块实现
订单 API 客户端
python
运行
class OrderClient(APIClient):
def init(self, app_key, app_secret, base_url):"""初始化订单API客户端""" super().__init__(app_key, app_secret, base_url)
AI 代码解读async def get_order_list(self, start_time, end_time, page=1, page_size=50):
"""获取订单列表""" method = "jingdong.etms.order.info.get" # 示例接口 params = { "startDate": start_time, "endDate": end_time, "page": page, "pageSize": page_size } return await self.call(method, params)
AI 代码解读async def get_order_detail(self, order_id):
"""获取订单详情""" method = "jingdong.order.get" # 示例接口 params = { "orderId": order_id } return await self.call(method, params)
AI 代码解读订单同步器
python
运行
class OrderSynchronizer:
def init(self, order_client, db_client):"""初始化订单同步器""" self.order_client = order_client self.db_client = db_client self.collection_orders = db_client["orders"] self.collection_order_items = db_client["order_items"]
AI 代码解读async def sync_orders(self, start_time, end_time):
"""同步指定时间范围内的订单""" logger.info(f"开始同步订单,时间范围: {start_time} - {end_time}") page = 1 has_more = True while has_more: # 获取订单列表 result = await self.order_client.get_order_list(start_time, end_time, page) if not result: logger.error(f"获取订单列表失败,第 {page} 页") break orders = result.get("jingdong_etms_order_info_get_response", {}).get("result", {}).get("orderInfoList", []) if not orders: has_more = False break # 处理订单 for order in orders: await self.process_order(order) logger.info(f"成功同步第 {page} 页,{len(orders)} 个订单") page += 1 logger.info(f"订单同步完成,共 {page-1} 页")
AI 代码解读async def process_order(self, order):
"""处理单个订单""" order_id = order["orderId"] # 获取订单详情 detail = await self.order_client.get_order_detail(order_id) if not detail: logger.error(f"获取订单 {order_id} 详情失败") return order_detail = detail.get("jingdong_order_get_response", {}).get("orderInfo", {}) # 存储订单主信息 order_detail["_id"] = order_id order_detail["update_time"] = time.time() await self.collection_orders.update_one( {"_id": order_id}, {"$set": order_detail}, upsert=True ) # 存储订单商品项 items = order_detail.get("orderItemList", []) for item in items: item["_id"] = f"{order_id}_{item['skuId']}" item["orderId"] = order_id await self.collection_order_items.update_one( {"_id": item["_id"]}, {"$set": item}, upsert=True ) logger.info(f"成功处理订单 {order_id},包含 {len(items)} 个商品项")
AI 代码解读四、数据存储与分析
- MongoDB 连接
python
运行
import motor.motor_asyncio
class Database:
def init(self, host="localhost", port=27017, db_name="ecommerce_data"):
"""初始化数据库连接"""
self.client = motor.motor_asyncio.AsyncIOMotorClient(host, port)
self.db = self.client[db_name]
def __getitem__(self, collection_name):
"""获取集合"""
return self.db[collection_name]
async def close(self):
"""关闭数据库连接"""
self.client.close()
AI 代码解读
数据分析示例
python
运行
class DataAnalyzer:
def init(self, db_client):"""初始化数据分析器""" self.db_client = db_client self.collection_products = db_client["products"] self.collection_orders = db_client["orders"]
AI 代码解读async def get_top_categories(self, limit=10):
"""获取销量最高的分类""" pipeline = [ { "$lookup": { "from": "products", "localField": "orderItemList.skuId", "foreignField": "skuId", "as": "product_info" } }, { "$unwind": "$product_info" }, { "$group": { "_id": "$product_info.categoryName", "total_sales": {"$sum": "$orderItemList.quantity"} } }, { "$sort": {"total_sales": -1} }, { "$limit": limit } ] return await self.collection_orders.aggregate(pipeline).to_list(None)
AI 代码解读async def analyze_price_trend(self, sku_id, days=30):
"""分析商品价格趋势""" # 假设我们定期采集商品价格,存储在products集合中 pipeline = [ { "$match": { "skuId": sku_id, "update_time": {"$gte": time.time() - days * 24 * 3600} } }, { "$project": { "price": 1, "update_time": 1, "date": {"$toDate": {"$multiply": ["$update_time", 1000]}} } }, { "$sort": {"update_time": 1} } ] return await self.collection_products.aggregate(pipeline).to_list(None)
AI 代码解读五、防封禁策略与性能优化
- 请求限流
python
运行
from asyncio import Semaphore
class RateLimiter:
def init(self, rate=10, period=1):
"""初始化限流器"""
self.rate = rate # 每秒请求数
self.period = period # 时间窗口(秒)
self.semaphore = Semaphore(rate)
self.requests = []
async def __aenter__(self):
"""进入上下文时获取许可"""
await self.semaphore.acquire()
self.requests.append(time.time())
self._cleanup_old_requests()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""离开上下文时释放许可"""
self.semaphore.release()
def _cleanup_old_requests(self):
"""清理时间窗口外的请求记录"""
cutoff = time.time() - self.period
self.requests = [t for t in self.requests if t >= cutoff]
AI 代码解读
- 随机延迟
python
运行
import random
async def random_delay(min_delay=0.5, max_delay=2.0):
"""添加随机延迟,避免规律性请求"""
delay = random.uniform(min_delay, max_delay)
await asyncio.sleep(delay)
六、主程序与调度
主程序
python
运行
async def main():
"""主程序入口"""
配置信息
config = {"app_key": "你的AppKey", "app_secret": "你的AppSecret", "api_base_url": "https://api.jd.com/routerjson", # 示例为京东API "db_host": "localhost", "db_port": 27017, "db_name": "ecommerce_data"
AI 代码解读}
初始化组件
db = Database(config["db_host"], config["db_port"], config["db_name"])
api_client = APIClient(config["app_key"], config["app_secret"], config["api_base_url"])
await api_client.init_session()商品采集
product_collector = ProductCollector(api_client, db)
await product_collector.fetch_all_products()订单同步(示例:同步最近7天的订单)
end_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
start_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time() - 7 24 3600))order_client = OrderClient(config["app_key"], config["app_secret"], config["api_base_url"])
order_synchronizer = OrderSynchronizer(order_client, db)
await order_synchronizer.sync_orders(start_time, end_time)数据分析示例
data_analyzer = DataAnalyzer(db)
top_categories = await data_analyzer.get_top_categories()
print("销量最高的分类:")
for category in top_categories:print(f"{category['_id']}: {category['total_sales']}件")
AI 代码解读清理资源
await api_client.close_session()
await db.close()
if name == "main":
asyncio.run(main())
- 定时任务调度
python
运行
from apscheduler.schedulers.asyncio import AsyncIOScheduler
async def scheduled_tasks():
"""定时任务调度"""
scheduler = AsyncIOScheduler()
# 每天凌晨2点同步商品信息
scheduler.add_job(main, 'cron', hour=2, minute=0)
# 每小时同步一次订单
scheduler.add_job(order_synchronizer.sync_orders, 'interval', hours=1,
args=[time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time() - 24 * 3600)),
time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())])
scheduler.start()
# 保持程序运行
try:
await asyncio.Event().wait()
except KeyboardInterrupt:
scheduler.shutdown()
AI 代码解读
七、项目部署与监控
- 部署方案
容器化部署:使用 Docker 容器打包应用,方便部署和扩展
云服务:部署到阿里云、腾讯云等云平台,利用弹性计算资源
守护进程:使用 systemd 或 supervisor 管理进程,确保服务高可用 - 监控方案
日志监控:使用 ELK Stack(Elasticsearch, Logstash, Kibana)收集和分析日志
性能监控:使用 Prometheus 和 Grafana 监控系统性能指标
异常告警:设置邮件、短信或钉钉告警,及时发现和处理问题
八、总结
通过本实战项目,我们实现了从商品信息采集到订单管理的全链路电商 API 数据抓取系统,主要包括:
API 请求模块:封装了安全的 API 调用逻辑,包括签名生成、错误重试
数据采集模块:实现了商品分类和商品信息的采集
订单管理模块:完成了订单数据的同步和处理
数据存储与分析:使用 MongoDB 存储非结构化数据,并提供基础分析功能
防封禁策略:实现了请求限流和随机延迟,降低被封禁风险
定时任务:使用 APScheduler 实现了自动化的数据同步
这个系统可以作为电商数据分析、价格监控、供应链管理等应用的基础,根据实际需求可以进一步扩展功能