苏宁商品详情 API 与库存 API 联动:实时库存同步与预警机制实现
苏宁商品详情 API(核心字段)与库存 API(实时库存数据)的联动是电商运营、库存管理的核心需求,二者结合可实现「商品基础信息 + 实时库存」的全维度数据同步,并基于库存阈值触发预警。本文基于 Python 实现双 API 联动调用、实时库存同步逻辑,以及多维度库存预警机制(阈值预警、区域库存预警、SKU 库存预警),适配企业级库存管理场景。
一、核心背景:双 API 联动基础
1. 接口基础信息
| 接口类型 | 接口名称 | 核心端点(示例) | 核心字段 | 调用频率限制 |
| 商品详情 API | suning.custom.product.get | https://open.suning.com/api/http/sopRequest | 商品 ID、名称、类目、SKU 列表 | 100 次 / 分钟 |
| 库存 API | suning.custom.stock.get | https://open.suning.com/api/http/sopRequest | 商品 /sku 库存数、区域库存、库存状态 | 100 次 / 分钟 |
2. 双 API 关联逻辑
- 主键关联:商品详情 API 的
productCode(商品主 ID)/skuCode(SKU ID)是库存 API 的核心入参; - 数据互补:商品详情 API 提供 SKU 属性(如颜色、存储),库存 API 提供该 SKU 的实时库存,二者结合实现「规格 + 库存」的完整数据;
- 调用顺序:先调用商品详情 API 获取商品 / SKU 列表,再批量调用库存 API 获取实时库存,避免无效库存查询。
3. 核心需求场景
- 实时库存同步:将商品基础信息与实时库存绑定,更新至本地数据库 / 报表;
- 库存预警:库存低于阈值、区域库存缺货、核心 SKU 无货时触发告警(邮件 / 短信 / 企业微信);
- 库存监控:跟踪库存变化趋势,识别异常波动(如库存骤降、无货转有货)。
二、环境准备与基础配置
1. 依赖安装
bash
运行
pip install requests # API调用 pip install hashlib # 签名生成(内置,无需额外安装) pip install time # 时间戳/延迟(内置) pip install smtplib # 邮件预警(内置) pip install python-dotenv # 敏感信息管理
2. 基础配置(.env 文件)
env
# 苏宁开放平台密钥 APP_KEY=你的苏宁appKey APP_SECRET=你的苏宁appSecret # 库存预警配置 WARNING_THRESHOLD=50 # 全局库存预警阈值(库存<50触发预警) CRITICAL_THRESHOLD=10 # 紧急预警阈值(库存<10触发紧急告警) # 邮件预警配置 SMTP_HOST=smtp.163.com SMTP_PORT=465 SMTP_USER=你的邮箱@163.com SMTP_PASS=你的邮箱授权码 ALERT_RECEIVERS=接收人邮箱1,接收人邮箱2 # 区域库存监控(如上海、北京) MONITOR_AREAS=310000,110000 # 区域编码(上海:310000,北京:110000)
三、核心实现代码
1. 工具函数(签名、请求、字段提取)
python
运行
import requests import hashlib import time import json import smtplib from email.mime.text import MIMEText from email.header import Header from typing import Dict, List, Optional, Any import logging from dotenv import load_dotenv import os # 加载配置 load_dotenv() # 日志配置 logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", handlers=[logging.FileHandler("suning_stock_sync.log"), logging.StreamHandler()] ) logger = logging.getLogger(__name__) # 基础配置 APP_KEY = os.getenv("APP_KEY") APP_SECRET = os.getenv("APP_SECRET") PRODUCT_API_URL = "https://open.suning.com/api/http/sopRequest" STOCK_API_URL = "https://open.suning.com/api/http/sopRequest" TIMEOUT = 15 RETRY_TIMES = 3 RATE_LIMIT_DELAY = 0.6 # 限流延迟(100次/分钟) # 预警配置 WARNING_THRESHOLD = int(os.getenv("WARNING_THRESHOLD", 50)) CRITICAL_THRESHOLD = int(os.getenv("CRITICAL_THRESHOLD", 10)) MONITOR_AREAS = os.getenv("MONITOR_AREAS", "").split(",") # 邮件配置 SMTP_HOST = os.getenv("SMTP_HOST") SMTP_PORT = int(os.getenv("SMTP_PORT", 465)) SMTP_USER = os.getenv("SMTP_USER") SMTP_PASS = os.getenv("SMTP_PASS") ALERT_RECEIVERS = os.getenv("ALERT_RECEIVERS", "").split(",") def generate_sign(params: Dict, app_secret: str) -> str: """生成苏宁API签名(MD5)""" sorted_params = sorted(params.items(), key=lambda x: x[0]) param_str = "".join([f"{k}{v}" for k, v in sorted_params]) sign_str = f"{app_secret}{param_str}{app_secret}" md5 = hashlib.md5() md5.update(sign_str.encode("utf-8")) return md5.hexdigest().upper() def safe_extract_field(data: Dict, field_path: str, default: Any = None) -> Any: """安全提取嵌套字段""" keys = field_path.split(".") value = data for key in keys: if isinstance(value, dict) and key in value: value = value[key] else: return default return value def suning_api_request(method: str, params: Dict) -> Optional[Dict]: """通用苏宁API请求函数(带重试、限流)""" # 基础参数 base_params = { "appKey": APP_KEY, "timestamp": str(int(time.time() * 1000)), "format": "json", "version": "v1.0", "method": method } # 合并业务参数 base_params.update(params) # 生成签名 base_params["sign"] = generate_sign(base_params, APP_SECRET) for retry in range(RETRY_TIMES): try: time.sleep(RATE_LIMIT_DELAY) # 限流延迟 response = requests.post( PRODUCT_API_URL if "product" in method else STOCK_API_URL, data=base_params, timeout=TIMEOUT, headers={"Content-Type": "application/x-www-form-urlencoded"} ) response.raise_for_status() result = response.json() # 业务状态码校验 if result.get("code") != "0000": logger.error(f"API调用失败:{result.get('msg')},参数:{params}") return None return result except requests.exceptions.Timeout: logger.error(f"请求超时(重试{retry+1}/{RETRY_TIMES})") time.sleep(2 ** retry) except requests.exceptions.HTTPError as e: if response.status_code == 429: logger.warning("触发限流,休眠60秒") time.sleep(60) continue logger.error(f"HTTP错误:{e}") return None except Exception as e: logger.error(f"请求异常(重试{retry+1}/{RETRY_TIMES}):{str(e)}") time.sleep(2 ** retry) logger.error(f"API请求失败(重试{RETRY_TIMES}次),参数:{params}") return None
2. 双 API 联动:商品 + 库存数据同步
python
运行
def get_product_base_info(product_code: str) -> Optional[Dict]: """调用商品详情API,获取商品基础信息+SKU列表""" method = "suning.custom.product.get" params = {"productCode": product_code} result = suning_api_request(method, params) if not result: return None # 解析基础信息 product_info = { "product_code": safe_extract_field(result, "body.productInfo.productCode"), "product_name": safe_extract_field(result, "body.productInfo.productName"), "brand_name": safe_extract_field(result, "body.productInfo.brandName"), "category_name": safe_extract_field(result, "body.productInfo.categoryName"), "sku_list": [] } # 解析SKU列表 skus = safe_extract_field(result, "body.skuInfo.skuList", []) for sku in skus: product_info["sku_list"].append({ "sku_code": sku.get("skuCode"), "sku_attr": sku.get("skuAttr"), "sku_price": float(sku.get("salePrice", 0.0)) }) return product_info def get_stock_info(product_code: str, sku_code: str = "") -> Optional[Dict]: """调用库存API,获取商品/SKU级库存(支持区域库存)""" method = "suning.custom.stock.get" params = { "productCode": product_code, "skuCode": sku_code, "areaCodes": ",".join(MONITOR_AREAS) # 指定监控区域 } result = suning_api_request(method, params) if not result: return None # 解析总库存 total_stock = safe_extract_field(result, "body.stockInfo.stockCount", 0) # 处理库存格式(如"200+"→200) if isinstance(total_stock, str): total_stock = int(total_stock.replace("+", "")) if "+" in total_stock else 0 else: total_stock = int(total_stock) if total_stock else 0 # 解析区域库存 area_stocks = [] for area in safe_extract_field(result, "body.stockInfo.areaStockList", []): area_stocks.append({ "area_code": area.get("areaCode"), "area_name": area.get("areaName"), "stock_count": int(area.get("stockCount", 0)) if area.get("stockCount") else 0, "stock_status": area.get("stockStatus") # 有货/无货/预售 }) return { "product_code": product_code, "sku_code": sku_code, "total_stock": total_stock, "stock_status": safe_extract_field(result, "body.stockInfo.stockStatus"), "area_stocks": area_stocks } def sync_product_stock(product_code: str) -> Optional[Dict]: """联动商品+库存API,同步完整数据""" # 1. 获取商品基础信息 product_info = get_product_base_info(product_code) if not product_info or not product_info["sku_list"]: logger.error(f"商品{product_code}基础信息获取失败/无SKU") return None # 2. 批量获取SKU库存(无SKU则获取商品级库存) stock_data_list = [] # 商品级库存 product_stock = get_stock_info(product_code) if product_stock: stock_data_list.append(product_stock) # SKU级库存 for sku in product_info["sku_list"]: sku_stock = get_stock_info(product_code, sku["sku_code"]) if sku_stock: stock_data_list.append(sku_stock) # 3. 整合商品+库存数据 sync_result = { "product_base": product_info, "stock_data": stock_data_list, "sync_time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) } logger.info(f"商品{product_code}库存同步完成,SKU数:{len(product_info['sku_list'])}") return sync_result
3. 库存预警机制实现
python
运行
def check_stock_warning(stock_data: Dict) -> Dict: """库存预警判断(全局阈值+区域阈值+紧急阈值)""" warning_level = "normal" # normal/warning/critical warning_msg = [] total_stock = stock_data["total_stock"] sku_code = stock_data["sku_code"] or "商品主ID" # 1. 全局阈值预警 if total_stock < CRITICAL_THRESHOLD: warning_level = "critical" warning_msg.append(f"{sku_code}库存{total_stock},低于紧急阈值{CRITICAL_THRESHOLD}") elif total_stock < WARNING_THRESHOLD: warning_level = "warning" warning_msg.append(f"{sku_code}库存{total_stock},低于预警阈值{WARNING_THRESHOLD}") # 2. 区域库存预警(监控区域缺货) for area in stock_data["area_stocks"]: if area["area_code"] in MONITOR_AREAS and area["stock_count"] == 0: warning_msg.append(f"{sku_code}在{area['area_name']}库存为0(缺货)") if warning_level != "critical": warning_level = "warning" return { "sku_code": sku_code, "warning_level": warning_level, "warning_msg": "; ".join(warning_msg), "stock_count": total_stock } def send_email_alert(warning_data: List[Dict], product_name: str): """发送库存预警邮件""" if not warning_data or not ALERT_RECEIVERS: return # 构建邮件内容 html_content = f""" <h2>苏宁商品库存预警通知</h2> <p>商品名称:{product_name}</p> <p>预警时间:{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}</p> <table border="1" cellpadding="8" cellspacing="0"> <tr> <th>SKU/商品ID</th> <th>库存数</th> <th>预警级别</th> <th>预警信息</th> </tr> """ for warn in warning_data: level_style = "color:red;font-weight:bold;" if warn["warning_level"] == "critical" else "color:orange;" html_content += f""" <tr> <td>{warn['sku_code']}</td> <td>{warn['stock_count']}</td> <td style="{level_style}">{warn['warning_level']}</td> <td>{warn['warning_msg']}</td> </tr> """ html_content += "</table>" # 发送邮件 try: msg = MIMEText(html_content, "html", "utf-8") msg["From"] = Header(f"苏宁库存预警<{SMTP_USER}>", "utf-8") msg["To"] = Header(",".join(ALERT_RECEIVERS), "utf-8") msg["Subject"] = Header(f"【苏宁库存预警】{product_name}", "utf-8") # 连接SMTP服务器 server = smtplib.SMTP_SSL(SMTP_HOST, SMTP_PORT) server.login(SMTP_USER, SMTP_PASS) server.sendmail(SMTP_USER, ALERT_RECEIVERS, msg.as_string()) server.quit() logger.info(f"库存预警邮件发送成功,接收人:{ALERT_RECEIVERS}") except Exception as e: logger.error(f"邮件发送失败:{str(e)}") def stock_warning_handler(sync_result: Dict): """库存预警处理主函数""" product_name = sync_result["product_base"]["product_name"] warning_list = [] # 遍历库存数据,判断预警 for stock in sync_result["stock_data"]: warn_result = check_stock_warning(stock) if warn_result["warning_level"] != "normal": warning_list.append(warn_result) logger.warning(f"库存预警:{warn_result['warning_msg']}") # 触发预警(邮件) if warning_list: send_email_alert(warning_list, product_name) return warning_list
4. 主函数(批量同步 + 预警)
python
运行
def batch_sync_and_warning(product_codes: List[str]): """批量商品库存同步+预警""" sync_results = [] warning_summary = [] for product_code in product_codes: # 同步商品+库存数据 sync_result = sync_product_stock(product_code) if not sync_result: logger.error(f"商品{product_code}同步失败,跳过") continue sync_results.append(sync_result) # 库存预警判断 warnings = stock_warning_handler(sync_result) warning_summary.extend(warnings) # 输出同步/预警汇总 logger.info(f"批量同步完成:成功{len(sync_results)}/{len(product_codes)}个商品") logger.info(f"预警汇总:共{len(warning_summary)}条预警(紧急{len([w for w in warning_summary if w['warning_level']=='critical'])}条)") # 保存同步结果到JSON(可选) with open("suning_stock_sync_result.json", "w", encoding="utf-8") as f: json.dump(sync_results, f, ensure_ascii=False, indent=2) if __name__ == "__main__": # 待同步的商品ID列表 target_products = ["100032189765", "100032189888"] # 替换为真实商品ID # 批量同步+预警 batch_sync_and_warning(target_products)
四、核心逻辑拆解
1. 双 API 联动流程
是
否
是
否
输入商品ID列表
调用商品详情API
解析商品基础信息+SKU列表
是否有SKU?
批量调用库存API(SKU级)
调用库存API(商品级)
整合商品+库存数据
库存预警判断
是否触发预警?
发送预警邮件
保存同步结果
完成同步
2. 库存预警规则
| 预警级别 | 触发条件 | 处理方式 |
| 正常(normal) | 库存≥预警阈值,监控区域均有货 | 仅记录,无告警 |
| 预警(warning) | 库存≥紧急阈值且 < 预警阈值,或监控区域缺货 | 发送普通预警邮件 |
| 紧急(critical) | 库存 < 紧急阈值 | 发送紧急预警邮件(高亮) |
3. 关键优化点
- 限流控制:双 API 调用均添加
RATE_LIMIT_DELAY,避免触发苏宁 100 次 / 分钟限流; - 数据容错:库存格式兼容(如 "200+"→200)、字段缺失返回默认值,避免解析中断;
- 批量处理:支持多商品批量同步,解析结果保存至 JSON,便于后续核对;
- 预警分级:区分普通 / 紧急预警,邮件内容高亮紧急预警,提升告警优先级。
五、避坑指南与扩展建议
1. 常见问题及解决方案
| 问题类型 | 表现形式 | 解决方案 |
| 库存 API 返回区域为空 | 部分商品无区域库存数据 | 补充商品级库存兜底,标记「区域库存数据缺失」 |
| SKU 库存与商品库存不一致 | SKU 库存总和≠商品主库存 | 以 SKU 库存为准,记录库存差异,便于核查苏宁数据问题 |
| 邮件预警发送失败 | SMTP 连接超时 / 授权码错误 | 增加邮件发送重试逻辑,校验 SMTP 配置,启用邮箱 SMTP 服务 |
| 商品详情 API 无 SKU 数据 | 单规格商品返回空 SKU 列表 | 降级为商品级库存查询,标记「单规格商品」 |
| 库存状态异常(如 "预售") | 库存数为 0 但状态为预售 | 预警规则中排除预售商品,或新增「预售预警」维度 |
2. 功能扩展建议
- 定时同步:结合
APScheduler实现定时同步(如每小时同步一次):python
运行
from apscheduler.schedulers.blocking import BlockingScheduler scheduler = BlockingScheduler() scheduler.add_job(batch_sync_and_warning, "interval", hours=1, args=[target_products]) scheduler.start()
- 多渠道预警:除邮件外,集成企业微信 / 钉钉机器人发送预警:python
运行
def send_dingding_alert(warning_data: List[Dict]): """钉钉机器人预警""" ding_url = "你的钉钉机器人webhook" msg = f"【苏宁库存预警】共{len(warning_data)}条预警,紧急{len([w for w in warning_data if w['warning_level']=='critical'])}条" requests.post(ding_url, json={"msgtype": "text", "text": {"content": msg}})
- 库存变化监控:对比历史同步数据,识别库存波动(如库存骤降 > 50%):python
运行
def check_stock_change(new_stock: int, old_stock: int) -> bool: """判断库存是否异常波动""" if old_stock == 0: return False change_rate = abs(new_stock - old_stock) / old_stock return change_rate > 0.5 # 波动超50%触发告警
- 数据库存储:将同步结果存入 MySQL/PostgreSQL,便于可视化分析:python
运行
import pymysql # 连接数据库 conn = pymysql.connect(host="localhost", user="root", password="123456", db="suning_stock") cursor = conn.cursor() # 插入数据(示例) insert_sql = "INSERT INTO stock_sync (product_code, product_name, total_stock, sync_time) VALUES (%s, %s, %s, %s)" cursor.execute(insert_sql, (product_code, product_name, total_stock, sync_time)) conn.commit()
3. 合规与性能建议
- 调用频率:避免高频同步(如每秒 1 次),遵循苏宁 API 限流规则;
- 数据留存:库存数据仅留存业务必需周期,避免违规存储;
- 异常监控:添加同步失败率监控,失败率 > 20% 时触发告警;
- 资源优化:批量同步超 100 个商品时,使用多线程 / 异步调用提升效率。
六、总结
苏宁商品详情 API 与库存 API 联动的核心是「数据互补 + 实时同步 + 分级预警」:
- 数据互补:商品详情 API 提供规格维度,库存 API 提供实时库存,二者结合实现完整数据;
- 实时同步:按「商品→SKU」层级批量调用 API,兼顾效率与合规;
- 分级预警:基于阈值 + 区域维度实现多级别预警,适配不同紧急程度的库存管理需求。
该方案可直接应用于电商库存监控、供应链管理、售前库存校验等场景,通过定时同步 + 多渠道预警,确保库存数据实时准确,避免因缺货导致的订单流失。