【干货满满】电商API数据抓取实战:从商品信息到订单管理全链路实现

简介: 本文详解构建电商API数据抓取系统,涵盖商品采集、订单管理、防封策略、数据存储与分析,适用于价格监控、供应链管理等场景。

电商 API 数据抓取是构建价格监控、供应链管理、竞品分析等系统的基础。本文将通过完整案例,演示如何从商品信息采集到订单管理的全链路实现,包含防封禁策略、数据解析与持久化方案。
一、项目架构设计
系统整体架构
plaintext
┌───────────────────────────────────────────────────┐
│ 电商API数据抓取系统 │
└───────────────────────────┬───────────────────────┘

┌───────────────┼───────────────┐
▼ ▼ ▼
┌────────────────┐ ┌────────────────┐ ┌────────────────┐
│ 商品信息采集 │ │ 订单数据同步 │ │ 用户信息管理 │
└────────────────┘ └────────────────┘ └────────────────┘
│ │ │
▼ ▼ ▼
┌────────────────┐ ┌────────────────┐ ┌────────────────┐
│ 数据解析与清洗 │ │ 数据解析与清洗 │ │ 数据解析与清洗 │
└────────────────┘ └────────────────┘ └────────────────┘
│ │ │
▼ ▼ ▼
┌────────────────┐ ┌────────────────┐ ┌────────────────┐
│ 数据存储(MongoDB) │ 数据存储(MongoDB) │ 数据存储(MongoDB) │
└────────────────┘ └────────────────┘ └────────────────┘
│ │ │
└───────────────┼───────────────┘

┌───────────────┐
│ 数据分析与应用 │
└───────────────┘
技术栈选择
编程语言:Python 3.8+
数据存储:MongoDB(灵活存储 JSON 数据)
异步框架:aiohttp(高并发 API 请求)
任务调度:APScheduler(定时同步数据)
数据可视化:Plotly Dash(可选)
二、商品信息采集模块实现

  1. 商品 API 客户端
    python
    运行
    import aiohttp
    import asyncio
    import hashlib
    import time
    import json
    import logging
    from functools import wraps

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(name)

class RetryException(Exception):
"""重试异常"""
pass

class APIClient:
def init(self, app_key, app_secret, base_url, max_retries=3, timeout=30):
"""初始化API客户端"""
self.app_key = app_key
self.app_secret = app_secret
self.base_url = base_url
self.max_retries = max_retries
self.timeout = timeout
self.session = None

async def init_session(self):
    """初始化aiohttp会话"""
    self.session = aiohttp.ClientSession()

async def close_session(self):
    """关闭aiohttp会话"""
    if self.session:
        await self.session.close()

def retry(func):
    """重试装饰器"""
    @wraps(func)
    async def wrapper(self, *args, **kwargs):
        retries = 0
        while retries < self.max_retries:
            try:
                return await func(self, *args, **kwargs)
            except RetryException as e:
                logger.warning(f"请求重试 ({retries+1}/{self.max_retries}): {str(e)}")
                await asyncio.sleep(2 ** retries)  # 指数退避
                retries += 1
        logger.error(f"请求失败,已达到最大重试次数 ({self.max_retries})")
        return None
    return wrapper

def generate_sign(self, params):
    """生成API签名(示例为京东风格,实际需根据平台调整)"""
    sorted_params = sorted(params.items(), key=lambda x: x[0])
    sign_str = self.app_secret
    for k, v in sorted_params:
        sign_str += f"{k}{v}"
    sign_str += self.app_secret
    return hashlib.md5(sign_str.encode('utf-8')).hexdigest().upper()

@retry
async def call(self, method, params=None):
    """调用API"""
    if params is None:
        params = {}

    # 添加公共参数
    common_params = {
        "app_key": self.app_key,
        "method": method,
        "timestamp": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
        "format": "json",
        "v": "2.0",
        "sign_method": "md5"
    }

    # 合并参数
    all_params = {**common_params, **params}

    # 生成签名
    sign = self.generate_sign(all_params)
    all_params["sign"] = sign

    # 发送请求
    try:
        async with self.session.post(
            self.base_url, 
            data=all_params, 
            timeout=self.timeout
        ) as response:
            if response.status != 200:
                raise RetryException(f"HTTP错误 {response.status}")

            result = await response.json()

            # 检查业务错误(根据平台API调整)
            if "error_response" in result:
                error_code = result["error_response"].get("code")
                error_msg = result["error_response"].get("msg")

                # 特定错误码需要重试
                if error_code in ["429", "500"]:
                    raise RetryException(f"API错误 {error_code}: {error_msg}")

                logger.error(f"API错误 {error_code}: {error_msg}")
                return None

            return result

    except Exception as e:
        raise RetryException(f"请求异常: {str(e)}")
AI 代码解读
  1. 商品信息采集器
    python
    运行
    class ProductCollector:
    def init(self, api_client, db_client, category_batch_size=10, product_batch_size=50):

     """初始化商品采集器"""
     self.api_client = api_client
     self.db_client = db_client
     self.category_batch_size = category_batch_size
     self.product_batch_size = product_batch_size
     self.collection_categories = db_client["categories"]
     self.collection_products = db_client["products"]
    
    AI 代码解读

    async def fetch_categories(self):

     """获取商品分类"""
     logger.info("开始获取商品分类...")
     method = "jingdong.category.read.findAttrsByCategoryId"  # 示例接口
    
     # 获取一级分类
     params = {"parentId": 0}
     result = await self.api_client.call(method, params)
    
     if not result:
         logger.error("获取分类失败")
         return
    
     categories = result.get("jingdong_category_read_findAttrsByCategoryId_response", {}).get("result", [])
    
     # 存储分类
     for category in categories:
         category["_id"] = category["id"]
         await self.collection_categories.update_one(
             {"_id": category["_id"]}, 
             {"$set": category}, 
             upsert=True
         )
    
     logger.info(f"成功获取 {len(categories)} 个一级分类")
     return categories
    
    AI 代码解读

    async def fetch_products_by_category(self, category_id, page=1, page_size=100):

     """根据分类获取商品"""
     method = "jingdong.search.v3.product.search"  # 示例接口
    
     params = {
         "categoryId": category_id,
         "page": page,
         "pageSize": page_size,
         "sortType": 1  # 按上架时间排序
     }
    
     result = await self.api_client.call(method, params)
    
     if not result:
         logger.error(f"获取分类 {category_id} 下的商品失败")
         return []
    
     products = result.get("jingdong_search_v3_product_search_response", {}).get("result", {}).get("productInfoList", [])
    
     # 存储商品
     for product in products:
         product["_id"] = product["skuId"]
         product["categoryId"] = category_id
         product["update_time"] = time.time()
    
         await self.collection_products.update_one(
             {"_id": product["_id"]}, 
             {"$set": product}, 
             upsert=True
         )
    
     logger.info(f"成功获取分类 {category_id} 下的 {len(products)} 个商品")
     return products
    
    AI 代码解读

    async def fetch_all_products(self, categories=None):

     """获取所有分类下的商品"""
     if not categories:
         categories = await self.fetch_categories()
    
     # 并发获取每个分类下的商品
     tasks = []
     for category in categories[:self.category_batch_size]:  # 限制并发分类数量
         category_id = category["id"]
         tasks.append(self.fetch_products_by_category(category_id))
    
     await asyncio.gather(*tasks)
     logger.info("商品采集完成")
    
    AI 代码解读

    三、订单管理模块实现

  2. 订单 API 客户端
    python
    运行
    class OrderClient(APIClient):
    def init(self, app_key, app_secret, base_url):

     """初始化订单API客户端"""
     super().__init__(app_key, app_secret, base_url)
    
    AI 代码解读

    async def get_order_list(self, start_time, end_time, page=1, page_size=50):

     """获取订单列表"""
     method = "jingdong.etms.order.info.get"  # 示例接口
    
     params = {
         "startDate": start_time,
         "endDate": end_time,
         "page": page,
         "pageSize": page_size
     }
    
     return await self.call(method, params)
    
    AI 代码解读

    async def get_order_detail(self, order_id):

     """获取订单详情"""
     method = "jingdong.order.get"  # 示例接口
    
     params = {
         "orderId": order_id
     }
    
     return await self.call(method, params)
    
    AI 代码解读
  3. 订单同步器
    python
    运行
    class OrderSynchronizer:
    def init(self, order_client, db_client):

     """初始化订单同步器"""
     self.order_client = order_client
     self.db_client = db_client
     self.collection_orders = db_client["orders"]
     self.collection_order_items = db_client["order_items"]
    
    AI 代码解读

    async def sync_orders(self, start_time, end_time):

     """同步指定时间范围内的订单"""
     logger.info(f"开始同步订单,时间范围: {start_time} - {end_time}")
    
     page = 1
     has_more = True
    
     while has_more:
         # 获取订单列表
         result = await self.order_client.get_order_list(start_time, end_time, page)
    
         if not result:
             logger.error(f"获取订单列表失败,第 {page} 页")
             break
    
         orders = result.get("jingdong_etms_order_info_get_response", {}).get("result", {}).get("orderInfoList", [])
    
         if not orders:
             has_more = False
             break
    
         # 处理订单
         for order in orders:
             await self.process_order(order)
    
         logger.info(f"成功同步第 {page} 页,{len(orders)} 个订单")
         page += 1
    
     logger.info(f"订单同步完成,共 {page-1} 页")
    
    AI 代码解读

    async def process_order(self, order):

     """处理单个订单"""
     order_id = order["orderId"]
    
     # 获取订单详情
     detail = await self.order_client.get_order_detail(order_id)
    
     if not detail:
         logger.error(f"获取订单 {order_id} 详情失败")
         return
    
     order_detail = detail.get("jingdong_order_get_response", {}).get("orderInfo", {})
    
     # 存储订单主信息
     order_detail["_id"] = order_id
     order_detail["update_time"] = time.time()
    
     await self.collection_orders.update_one(
         {"_id": order_id},
         {"$set": order_detail},
         upsert=True
     )
    
     # 存储订单商品项
     items = order_detail.get("orderItemList", [])
     for item in items:
         item["_id"] = f"{order_id}_{item['skuId']}"
         item["orderId"] = order_id
    
         await self.collection_order_items.update_one(
             {"_id": item["_id"]},
             {"$set": item},
             upsert=True
         )
    
     logger.info(f"成功处理订单 {order_id},包含 {len(items)} 个商品项")
    
    AI 代码解读

    四、数据存储与分析

  4. MongoDB 连接
    python
    运行
    import motor.motor_asyncio

class Database:
def init(self, host="localhost", port=27017, db_name="ecommerce_data"):
"""初始化数据库连接"""
self.client = motor.motor_asyncio.AsyncIOMotorClient(host, port)
self.db = self.client[db_name]

def __getitem__(self, collection_name):
    """获取集合"""
    return self.db[collection_name]

async def close(self):
    """关闭数据库连接"""
    self.client.close()
AI 代码解读
  1. 数据分析示例
    python
    运行
    class DataAnalyzer:
    def init(self, db_client):

     """初始化数据分析器"""
     self.db_client = db_client
     self.collection_products = db_client["products"]
     self.collection_orders = db_client["orders"]
    
    AI 代码解读

    async def get_top_categories(self, limit=10):

     """获取销量最高的分类"""
     pipeline = [
         {
             "$lookup": {
                 "from": "products",
                 "localField": "orderItemList.skuId",
                 "foreignField": "skuId",
                 "as": "product_info"
             }
         },
         {
             "$unwind": "$product_info"
         },
         {
             "$group": {
                 "_id": "$product_info.categoryName",
                 "total_sales": {"$sum": "$orderItemList.quantity"}
             }
         },
         {
             "$sort": {"total_sales": -1}
         },
         {
             "$limit": limit
         }
     ]
    
     return await self.collection_orders.aggregate(pipeline).to_list(None)
    
    AI 代码解读

    async def analyze_price_trend(self, sku_id, days=30):

     """分析商品价格趋势"""
     # 假设我们定期采集商品价格,存储在products集合中
     pipeline = [
         {
             "$match": {
                 "skuId": sku_id,
                 "update_time": {"$gte": time.time() - days * 24 * 3600}
             }
         },
         {
             "$project": {
                 "price": 1,
                 "update_time": 1,
                 "date": {"$toDate": {"$multiply": ["$update_time", 1000]}}
             }
         },
         {
             "$sort": {"update_time": 1}
         }
     ]
    
     return await self.collection_products.aggregate(pipeline).to_list(None)
    
    AI 代码解读

    五、防封禁策略与性能优化

  2. 请求限流
    python
    运行
    from asyncio import Semaphore

class RateLimiter:
def init(self, rate=10, period=1):
"""初始化限流器"""
self.rate = rate # 每秒请求数
self.period = period # 时间窗口(秒)
self.semaphore = Semaphore(rate)
self.requests = []

async def __aenter__(self):
    """进入上下文时获取许可"""
    await self.semaphore.acquire()
    self.requests.append(time.time())
    self._cleanup_old_requests()
    return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
    """离开上下文时释放许可"""
    self.semaphore.release()

def _cleanup_old_requests(self):
    """清理时间窗口外的请求记录"""
    cutoff = time.time() - self.period
    self.requests = [t for t in self.requests if t >= cutoff]
AI 代码解读
  1. 随机延迟
    python
    运行
    import random

async def random_delay(min_delay=0.5, max_delay=2.0):
"""添加随机延迟,避免规律性请求"""
delay = random.uniform(min_delay, max_delay)
await asyncio.sleep(delay)
六、主程序与调度

  1. 主程序
    python
    运行
    async def main():
    """主程序入口"""
    配置信息
    config = {

     "app_key": "你的AppKey",
     "app_secret": "你的AppSecret",
     "api_base_url": "https://api.jd.com/routerjson",  # 示例为京东API
     "db_host": "localhost",
     "db_port": 27017,
     "db_name": "ecommerce_data"
    
    AI 代码解读

    }

    初始化组件
    db = Database(config["db_host"], config["db_port"], config["db_name"])
    api_client = APIClient(config["app_key"], config["app_secret"], config["api_base_url"])
    await api_client.init_session()

    商品采集
    product_collector = ProductCollector(api_client, db)
    await product_collector.fetch_all_products()

    订单同步(示例:同步最近7天的订单)
    end_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
    start_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time() - 7 24 3600))

    order_client = OrderClient(config["app_key"], config["app_secret"], config["api_base_url"])
    order_synchronizer = OrderSynchronizer(order_client, db)
    await order_synchronizer.sync_orders(start_time, end_time)

    数据分析示例
    data_analyzer = DataAnalyzer(db)
    top_categories = await data_analyzer.get_top_categories()
    print("销量最高的分类:")
    for category in top_categories:

     print(f"{category['_id']}: {category['total_sales']}件")
    
    AI 代码解读

    清理资源
    await api_client.close_session()
    await db.close()

if name == "main":
asyncio.run(main())

  1. 定时任务调度
    python
    运行
    from apscheduler.schedulers.asyncio import AsyncIOScheduler

async def scheduled_tasks():
"""定时任务调度"""
scheduler = AsyncIOScheduler()

# 每天凌晨2点同步商品信息
scheduler.add_job(main, 'cron', hour=2, minute=0)

# 每小时同步一次订单
scheduler.add_job(order_synchronizer.sync_orders, 'interval', hours=1, 
                 args=[time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time() - 24 * 3600)),
                       time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())])

scheduler.start()

# 保持程序运行
try:
    await asyncio.Event().wait()
except KeyboardInterrupt:
    scheduler.shutdown()
AI 代码解读

七、项目部署与监控

  1. 部署方案
    容器化部署:使用 Docker 容器打包应用,方便部署和扩展
    云服务:部署到阿里云、腾讯云等云平台,利用弹性计算资源
    守护进程:使用 systemd 或 supervisor 管理进程,确保服务高可用
  2. 监控方案
    日志监控:使用 ELK Stack(Elasticsearch, Logstash, Kibana)收集和分析日志
    性能监控:使用 Prometheus 和 Grafana 监控系统性能指标
    异常告警:设置邮件、短信或钉钉告警,及时发现和处理问题
    八、总结
    通过本实战项目,我们实现了从商品信息采集到订单管理的全链路电商 API 数据抓取系统,主要包括:
    API 请求模块:封装了安全的 API 调用逻辑,包括签名生成、错误重试
    数据采集模块:实现了商品分类和商品信息的采集
    订单管理模块:完成了订单数据的同步和处理
    数据存储与分析:使用 MongoDB 存储非结构化数据,并提供基础分析功能
    防封禁策略:实现了请求限流和随机延迟,降低被封禁风险
    定时任务:使用 APScheduler 实现了自动化的数据同步
    这个系统可以作为电商数据分析、价格监控、供应链管理等应用的基础,根据实际需求可以进一步扩展功能
目录
打赏
0
6
6
0
9
分享
相关文章
【干货满满】电商平台API接口用python调用脚本
这是一个支持淘宝、京东、拼多多、亚马逊等主流电商平台的通用 API 调用 Python 脚本框架,适配 doubao 使用。脚本封装了签名验证、请求处理、异常捕获及限流控制等核心功能,提供统一接口调用方式,便于开发者快速集成与扩展。
电商API数据接口深度分析
电商 API 是连接平台、商家与用户的核心枢纽,其设计直接影响数据流通效率与系统稳定性。本文从技术架构、性能优化、安全防护、合规治理等维度深度解析,结合淘宝、京东等头部平台实践,提供高并发场景下的多级缓存、异步处理、智能限流等落地解决方案,助力企业构建高效、安全、合规的 API 体系,推动电商系统智能化演进。
干货分享“对接的 API 总是不稳定,网络分层模型” 看电商 API 故障的本质
本文从 OSI 七层网络模型出发,深入剖析电商 API 不稳定的根本原因,涵盖物理层到应用层的典型故障与解决方案,结合阿里、京东等大厂架构,详解如何构建高稳定性的电商 API 通信体系。
电商 API 场景中,电商平台将核心完整诊断、分析和优化过程
某头部电商平台通过分阶段性能优化,将核心 API 的 QPS 从 100 提升至 1000。优化涵盖架构、应用、代码和运维四层,包括引入 API 网关、数据库分库分表、多级缓存、异步化改造、序列化优化、容器化弹性伸缩等关键手段,并结合 Jaeger、Prometheus、wrk 等工具进行性能诊断与监控。最终平均响应时间下降 4.7 倍,错误率降低 15 倍,资源使用率显著下降,系统稳定性与吞吐能力大幅提升。
2025电商API新特性:实时数据流、GraphQL接口与隐私合规
2025年电商API迎来技术与合规双重革新,实时数据流、GraphQL接口、隐私合规成为核心突破方向,推动全息电商、动态定价、供应链协同等场景升级,实现性能优化与用户隐私保护的协同发展。
|
17天前
|
入门到精通:电商API的全栈开发指南
本指南系统讲解电商API全栈开发,涵盖入门、进阶与精通三阶段,内容包括RESTful API设计、JWT认证、数据库集成、Redis缓存及第三方支付接入,结合Python Flask实战示例,助你从零构建高效可扩展的电商平台。
28 0
|
17天前
|
API如何支持电商多渠道销售
在数字化时代,电商企业面临多渠道销售带来的库存同步、订单处理等挑战。API作为系统间的“桥梁”,通过标准化接口实现数据自动同步与流程优化,助力企业高效管理多渠道运营,提升销售效率与客户体验。本文详解API如何支持电商实现无缝集成与持续增长。
43 0
API:电商的“魔法棒”,一键激活店铺管理潜能
在电商运营中,API如同“魔法棒”,实现库存同步、订单聚合与智能营销,大幅提升效率。通过自动化操作,商家可告别繁琐人工流程,迈向智能化管理新时代。
43 0
电商API 接口是什么?怎么使用API?
电商API是电商平台提供的数据接口,允许第三方工具与其系统交互,实现订单管理、库存同步、数据分析等自动化操作。通过API,卖家可高效管理多平台业务,提升运营效率。

热门文章

最新文章

下一篇
BFE 初探
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等

登录插画

登录以查看您的控制台资源

管理云资源
状态一览
快捷访问