唯品会(Vipshop)开放平台提供了丰富的 API 接口,支持开发者进行商品查询、订单管理、促销活动等电商全流程操作。自定义 API 操作基于这些基础接口,通过组合调用和业务逻辑封装,实现符合特定场景需求的功能,广泛应用于供应链管理、数据分析、智能采购等领域。
一、唯品会 API 核心特性分析
- 平台架构与认证机制
唯品会开放平台采用 OAuth 2.0 认证体系,具有以下特点:
基于 appKey 和 appSecret 的应用身份验证
通过 access_token 实现用户授权访问,有效期通常为 2 小时
支持授权码模式和客户端模式两种授权方式
所有 API 请求需通过 HTTPS 加密传输,并包含签名验证 - 核心 API 分类与功能
唯品会开放平台的 API 可分为以下几大类,构成自定义操作的基础:
功能类别 核心 API 主要功能
商品管理 product.get, product.search 获取商品详情、搜索商品、批量获取商品信息
订单管理 order.get, order.list 查询订单、获取订单详情、订单状态更新
库存管理 inventory.get, inventory.update 查询库存、更新库存数量
促销管理 promotion.get, promotion.create 获取促销信息、创建促销活动
用户管理 user.get, user.address 获取用户信息、管理用户地址 - 自定义 API 操作模式
数据聚合:组合多个 API 数据,生成综合业务报表
流程自动化:如 "商品上架→库存同步→订单处理→物流跟踪" 的自动化流程
监控预警:实时监控价格波动、库存变化、订单异常等关键指标
跨平台集成:将唯品会数据与企业 ERP、CRM 系统无缝对接
二、Python 实现方案
以下是唯品会自定义 API 操作的综合实现,包含基础调用框架、核心业务功能和数据分析模块:
import requests
import time
import hashlib
import json
import logging
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from datetime import datetime, timedelta
from collections import defaultdict
from typing import Dict, List, Optional, Tuple
配置日志
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
class VipshopCustomAPI:
"""唯品会自定义 API 操作类,封装基础 API 调用和业务逻辑"""
def __init__(self, app_key: str, app_secret: str, access_token: str = ""):
"""
初始化唯品会 API 客户端
:param app_key: 应用的 appKey
:param app_secret: 应用的 appSecret
:param access_token: 访问令牌
"""
self.app_key = app_key
self.app_secret = app_secret
self.access_token = access_token
self.gateway_url = "https://api.vip.com/v2/"
self.session = requests.Session()
self.session.headers.update({
"Content-Type": "application/json",
"User-Agent": "VipshopCustomAPI/1.0.0 (Python)"
})
# 频率控制
self.api_calls = defaultdict(int)
self.rate_limit = 100 # 每分钟最多调用次数
self.last_minute = time.time()
# Token 过期时间(默认 2 小时)
self.token_expiry = 0
def set_access_token(self, access_token: str, expires_in: int = 7200) -> None:
"""设置访问令牌及过期时间"""
self.access_token = access_token
self.token_expiry = time.time() + expires_in
def is_token_valid(self) -> bool:
"""检查令牌是否有效"""
return self.access_token and (time.time() < self.token_expiry - 60) # 提前 60 秒刷新
def get_access_token(self, grant_type: str = "client_credentials") -> Optional[str]:
"""
获取访问令牌
:param grant_type: 授权类型,client_credentials 或 authorization_code
:return: 访问令牌
"""
url = "https://oauth.vip.com/oauth/token"
params = {
"grant_type": grant_type,
"appKey": self.app_key,
"appSecret": self.app_secret
}
try:
response = self.session.post(url, params=params, timeout=10)
response.raise_for_status()
result = response.json()
if "access_token" in result:
self.set_access_token(
result["access_token"],
result.get("expires_in", 7200)
)
logging.info(f"成功获取 access_token,有效期 {result.get('expires_in', 7200)} 秒")
return result["access_token"]
else:
logging.error(f"获取 access_token 失败: {result.get('error_description')}")
return None
except Exception as e:
logging.error(f"获取 access_token 异常: {str(e)}")
return None
def _generate_sign(self, params: Dict, timestamp: str) -> str:
"""生成签名"""
# 排序参数
sorted_params = sorted(params.items(), key=lambda x: x[0])
# 拼接签名字符串
sign_str = f"{self.app_secret}{timestamp}"
for k, v in sorted_params:
sign_str += f"{k}{v}"
sign_str += self.app_secret
# 计算 SHA256
return hashlib.sha256(sign_str.encode('utf-8')).hexdigest().upper()
def _check_rate_limit(self) -> bool:
"""检查 API 调用频率限制"""
current_time = time.time()
# 每分钟重置计数
if current_time - self.last_minute > 60:
self.api_calls.clear()
self.last_minute = current_time
# 检查是否超过限制
if sum(self.api_calls.values()) >= self.rate_limit:
sleep_time = 60 - (current_time - self.last_minute)
logging.warning(f"API 调用频率超限,等待 {sleep_time:.1f} 秒")
time.sleep(sleep_time + 1)
self.api_calls.clear()
self.last_minute = current_time
return True
def call_api(self, api_name: str, params: Dict = None, method: str = "GET") -> Optional[Dict]:
"""
调用唯品会 API
:param api_name: API 名称,如 product.get
:param params: API 参数
:param method: 请求方法,GET 或 POST
:return: API 返回结果
"""
# 检查并刷新令牌
if not self.is_token_valid():
logging.info("access_token 已过期或无效,尝试重新获取")
if not self.get_access_token():
return None
# 检查频率限制
if not self._check_rate_limit():
return None
# 构建基础参数
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
base_params = {
"appKey": self.app_key,
"access_token": self.access_token,
"timestamp": timestamp
}
# 合并参数
if params:
base_params.update(params)
# 生成签名
sign = self._generate_sign(base_params, timestamp)
base_params["sign"] = sign
try:
# 构建完整 URL
url = f"{self.gateway_url}{api_name}"
# 发送请求
if method.upper() == "GET":
response = self.session.get(url, params=base_params, timeout=15)
else:
response = self.session.post(url, json=base_params, timeout=15)
response.raise_for_status()
# 解析响应
result = response.json()
# 处理错误
if result.get("code") != 0:
logging.error(f"API 调用错误: {result.get('message')} (错误码: {result.get('code')})")
return None
# 返回结果
return result.get("data", {})
except requests.exceptions.RequestException as e:
logging.error(f"API 请求异常: {str(e)}")
return None
except json.JSONDecodeError:
logging.error(f"API 响应解析失败: {response.text[:200]}...")
return None
finally:
# 记录 API 调用
self.api_calls[api_name] += 1
# ------------------------------
# 商品相关自定义操作
# ------------------------------
def search_products(self, keyword: str, page: int = 1, page_size: int = 30,
min_price: float = None, max_price: float = None,
category_id: int = None) -> Dict:
"""
搜索商品
:param keyword: 搜索关键词
:param page: 页码
:param page_size: 每页数量
:param min_price: 最低价格
:param max_price: 最高价格
:param category_id: 分类 ID
:return: 商品列表及分页信息
"""
params = {
"keyword": keyword,
"page": page,
"pageSize": page_size
}
if min_price is not None:
params["minPrice"] = min_price
if max_price is not None:
params["maxPrice"] = max_price
if category_id is not None:
params["categoryId"] = category_id
result = self.call_api("product/search", params)
if not result:
return {"total": 0, "products": []}
total = int(result.get("totalCount", 0))
products = result.get("products", [])
return {
"total": total,
"page": page,
"page_size": page_size,
"total_pages": (total + page_size - 1) // page_size,
"products": self._format_products(products)
}
def get_product_details(self, product_id: str) -> Optional[Dict]:
"""获取商品详情"""
params = {
"productId": product_id
}
result = self.call_api("product/get", params)
if not result:
return None
return self._format_product(result)
def batch_get_products(self, product_ids: List[str]) -> List[Dict]:
"""批量获取商品信息"""
if not product_ids:
return []
products = []
# 唯品会 API 一次最多支持 20 个 ID
batch_size = 20
for i in range(0, len(product_ids), batch_size):
batch_ids = product_ids[i:i+batch_size]
params = {
"productIds": ",".join(batch_ids)
}
result = self.call_api("product/batchGet", params)
if result and "products" in result:
formatted = [self._format_product(p) for p in result["products"]]
products.extend(formatted)
# 控制调用频率
time.sleep(1)
logging.info(f"成功获取 {len(products)} 件商品信息")
return products
def analyze_product_prices(self, keyword: str, max_pages: int = 3) -> Dict:
"""分析特定关键词商品的价格分布"""
all_products = []
page = 1
while page <= max_pages:
logging.info(f"搜索商品第 {page} 页: {keyword}")
result = self.search_products(keyword, page)
if not result["products"]:
break
all_products.extend(result["products"])
if page >= result["total_pages"]:
break
page += 1
time.sleep(1)
# 提取价格信息
prices = [float(p["currentPrice"]) for p in all_products]
# 价格统计
price_stats = {}
if prices:
price_stats = {
"min": min(prices),
"max": max(prices),
"avg": sum(prices) / len(prices),
"median": self._calculate_median(prices)
}
# 折扣分析
discount_stats = {}
original_prices = [
float(p["originalPrice"]) for p in all_products
if p["originalPrice"] and float(p["originalPrice"]) > 0
]
if original_prices and prices:
discounts = [(1 - prices[i] / original_prices[i]) * 100
for i in range(min(len(prices), len(original_prices)))]
discount_stats = {
"avg_discount": sum(discounts) / len(discounts),
"max_discount": max(discounts) if discounts else 0
}
# 分类分析
category_counts = defaultdict(int)
for product in all_products:
category = product.get("categoryName", "未知分类")
category_counts[category] += 1
return {
"total_products": len(all_products),
"price_stats": price_stats,
"discount_stats": discount_stats,
"top_categories": sorted(category_counts.items(), key=lambda x: x[1], reverse=True)[:5],
"products": all_products
}
# ------------------------------
# 订单相关自定义操作
# ------------------------------
def get_recent_orders(self, days: int = 7, order_status: int = -1) -> List[Dict]:
"""
获取最近 N 天的订单
:param days: 天数
:param order_status: 订单状态,-1-全部,0-待付款,1-已付款等
:return: 订单列表
"""
end_time = datetime.now()
start_time = end_time - timedelta(days=days)
params = {
"startTime": start_time.strftime("%Y-%m-%d %H:%M:%S"),
"endTime": end_time.strftime("%Y-%m-%d %H:%M:%S"),
"page": 1,
"pageSize": 50,
"orderStatus": order_status
}
all_orders = []
while True:
logging.info(f"获取订单第 {params['page']} 页")
result = self.call_api("order/list", params, method="POST")
if not result or "orders" not in result:
break
orders = result["orders"]
all_orders.extend(orders)
# 检查是否还有更多页
total_count = int(result.get("totalCount", 0))
if len(all_orders) >= total_count:
break
params["page"] += 1
time.sleep(1)
logging.info(f"成功获取 {len(all_orders)} 条订单")
return self._format_orders(all_orders)
def analyze_order_data(self, orders: List[Dict]) -> Dict:
"""分析订单数据"""
if not orders:
return {}
# 订单状态分布
state_counts = defaultdict(int)
# 每日订单统计
daily_stats = defaultdict(lambda: {"count": 0, "amount": 0})
# 商品销售统计
product_stats = defaultdict(lambda: {"count": 0, "amount": 0})
for order in orders:
# 订单状态统计
state_counts[order["orderStatusDesc"]] += 1
# 每日统计
if order["createTime"]:
date_str = order["createTime"].split()[0]
daily_stats[date_str]["count"] += 1
daily_stats[date_str]["amount"] += float(order["orderAmount"])
# 商品销售统计
for item in order.get("items", []):
product_id = item["productId"]
product_stats[product_id]["count"] += item["quantity"]
product_stats[product_id]["amount"] += float(item["price"]) * item["quantity"]
# 计算总销售额和订单数
total_orders = len(orders)
total_sales = sum(float(order["orderAmount"]) for order in orders)
# 转换为排序后的列表
sorted_dates = sorted(daily_stats.keys())
daily_sales = [daily_stats[date]["amount"] for date in sorted_dates]
daily_counts = [daily_stats[date]["count"] for date in sorted_dates]
# 热销商品
top_products = sorted(product_stats.items(),
key=lambda x: x[1]["amount"],
reverse=True)[:10]
return {
"total_orders": total_orders,
"total_sales": total_sales,
"avg_order_value": total_sales / total_orders if total_orders else 0,
"state_distribution": dict(state_counts),
"daily_trend": {
"dates": sorted_dates,
"sales": daily_sales,
"orders": daily_counts
},
"top_products": top_products
}
# ------------------------------
# 库存相关自定义操作
# ------------------------------
def get_product_inventory(self, product_id: str) -> Optional[Dict]:
"""获取商品库存"""
params = {
"productId": product_id
}
result = self.call_api("inventory/get", params)
if not result:
return None
return {
"product_id": product_id,
"total_stock": result.get("totalStock", 0),
"locked_stock": result.get("lockedStock", 0), # 锁定库存
"available_stock": result.get("availableStock", 0), # 可用库存
"update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
def check_low_stock(self, product_ids: List[str], threshold: int = 20) -> List[Dict]:
"""检查低库存商品"""
low_stock_items = []
for product_id in product_ids:
inventory = self.get_product_inventory(product_id)
if not inventory:
continue
if inventory["available_stock"] <= threshold:
# 获取商品基本信息
product = self.get_product_details(product_id)
if product:
inventory["product_name"] = product["productName"]
inventory["current_price"] = product["currentPrice"]
low_stock_items.append(inventory)
time.sleep(0.5)
logging.info(f"发现 {len(low_stock_items)} 个低库存商品")
return low_stock_items
# ------------------------------
# 工具方法
# ------------------------------
def _format_products(self, products: List[Dict]) -> List[Dict]:
"""格式化商品列表数据"""
return [self._format_product(p) for p in products]
def _format_product(self, product: Dict) -> Dict:
"""格式化单个商品数据"""
return {
"productId": product.get("productId"),
"productName": product.get("productName", ""),
"currentPrice": product.get("currentPrice", 0),
"originalPrice": product.get("originalPrice", 0),
"discount": product.get("discount", 0),
"categoryId": product.get("categoryId", ""),
"categoryName": product.get("categoryName", ""),
"brandId": product.get("brandId", ""),
"brandName": product.get("brandName", ""),
"salesCount": product.get("salesCount", 0),
"commentCount": product.get("commentCount", 0),
"score": product.get("score", 0),
"mainImage": product.get("mainImage", ""),
"detailUrl": product.get("detailUrl", "")
}
def _format_orders(self, orders: List[Dict]) -> List[Dict]:
"""格式化订单数据"""
formatted = []
for order in orders:
# 处理订单项
items = []
if "items" in order:
for item in order["items"]:
items.append({
"productId": item.get("productId"),
"productName": item.get("productName"),
"quantity": item.get("quantity", 1),
"price": item.get("price", 0),
"totalPrice": item.get("totalPrice", 0)
})
formatted.append({
"orderId": order.get("orderId"),
"orderAmount": order.get("orderAmount", 0),
"orderStatus": order.get("orderStatus", 0),
"orderStatusDesc": order.get("orderStatusDesc", ""),
"payTime": order.get("payTime", ""),
"shipTime": order.get("shipTime", ""),
"receiveTime": order.get("receiveTime", ""),
"createTime": order.get("createTime", ""),
"buyerName": order.get("buyerName", ""),
"items": items
})
return formatted
def _calculate_median(self, data: List[float]) -> float:
"""计算中位数"""
sorted_data = sorted(data)
n = len(sorted_data)
if n % 2 == 1:
return round(sorted_data[n//2], 2)
else:
return round((sorted_data[n//2 - 1] + sorted_data[n//2]) / 2, 2)
def visualize_analysis(self, analysis: Dict, title: str, output_file: str) -> None:
"""可视化分析结果"""
plt.rcParams["font.family"] = ["SimHei", "WenQuanYi Micro Hei", "Heiti TC"]
plt.rcParams["axes.unicode_minus"] = False
fig, ax = plt.subplots(figsize=(10, 6))
if "price_stats" in analysis:
# 价格分布直方图
prices = [float(p["currentPrice"]) for p in analysis["products"]]
ax.hist(prices, bins=10, alpha=0.7, color='skyblue')
ax.axvline(analysis["price_stats"]["avg"], color='r', linestyle='dashed', linewidth=1,
label=f'平均价格: {analysis["price_stats"]["avg"]:.2f}')
ax.set_title(f"{title} - 商品价格分布")
ax.set_xlabel("价格 (元)")
ax.set_ylabel("商品数量")
ax.legend()
elif "daily_trend" in analysis:
# 销售趋势图
x = np.arange(len(analysis["daily_trend"]["dates"]))
width = 0.35
ax.bar(x - width/2, analysis["daily_trend"]["orders"], width, label='订单数')
ax_twin = ax.twinx()
ax_twin.plot(x, analysis["daily_trend"]["sales"], 'r-', marker='o', label='销售额')
ax.set_title(f"{title} - 销售趋势")
ax.set_xlabel("日期")
ax.set_ylabel("订单数")
ax_twin.set_ylabel("销售额 (元)")
ax.set_xticks(x)
ax.set_xticklabels(analysis["daily_trend"]["dates"], rotation=45)
# 合并图例
lines, labels = ax.get_legend_handles_labels()
lines2, labels2 = ax_twin.get_legend_handles_labels()
ax.legend(lines + lines2, labels + labels2, loc='upper left')
elif "state_distribution" in analysis:
# 订单状态饼图
labels = list(analysis["state_distribution"].keys())
sizes = list(analysis["state_distribution"].values())
ax.pie(sizes, labels=labels, autopct='%1.1f%%', startangle=90)
ax.set_title(f"{title} - 订单状态分布")
ax.axis('equal')
plt.tight_layout()
plt.savefig(output_file)
plt.close()
logging.info(f"分析图表已保存至 {output_file}")
def export_to_excel(self, data: List[Dict], filename: str, sheet_name: str = "数据") -> None:
"""将数据导出到 Excel"""
if not data:
logging.warning("没有可导出的数据")
return
df = pd.DataFrame(data)
df.to_excel(filename, sheet_name=sheet_name, index=False)
logging.info(f"数据已导出至 {filename}")
示例调用
if name == "main":
# 替换为实际的参数(从唯品会开放平台获取)
APP_KEY = "your_app_key"
APP_SECRET = "your_app_secret"
# 初始化 API 客户端
api = VipshopCustomAPI(APP_KEY, APP_SECRET)
# 获取访问令牌
if not api.get_access_token():
logging.error("无法获取 access_token,程序退出")
exit(1)
# 示例 1: 商品价格分析
print("=== 商品价格分析 ===")
KEYWORD = "女装 连衣裙" # 搜索关键词
price_analysis = api.analyze_product_prices(KEYWORD, max_pages=3)
print(f"搜索关键词: {KEYWORD}")
print(f"获取商品总数: {price_analysis['total_products']}")
if price_analysis["price_stats"]:
print(f"价格范围: {price_analysis['price_stats']['min']}-{price_analysis['price_stats']['max']} 元")
print(f"平均价格: {price_analysis['price_stats']['avg']:.2f} 元")
if price_analysis["discount_stats"]:
print(f"平均折扣: {price_analysis['discount_stats']['avg_discount']:.1f}%")
print(f"最大折扣: {price_analysis['discount_stats']['max_discount']:.1f}%")
print("热门分类 TOP3:")
for i, (category, count) in enumerate(price_analysis["top_categories"][:3], 1):
print(f" {i}. {category}: {count} 款商品")
api.visualize_analysis(price_analysis, f"商品价格分析: {KEYWORD}", "product_price_analysis.png")
# 示例 2: 订单数据分析
print("\n=== 订单数据分析 ===")
recent_orders = api.get_recent_orders(days=15) # 获取最近 15 天订单
order_analysis = api.analyze_order_data(recent_orders)
print(f"总订单数: {order_analysis['total_orders']}")
print(f"总销售额: {order_analysis['total_sales']:.2f} 元")
print(f"平均订单金额: {order_analysis['avg_order_value']:.2f} 元")
print("订单状态分布:")
for state, count in order_analysis["state_distribution"].items():
print(f" {state}: {count} 单 ({count/order_analysis['total_orders']*100:.1f}%)")
api.visualize_analysis(order_analysis, "最近15天订单分析", "order_analysis.png")
api.export_to_excel(recent_orders, "recent_orders.xlsx", "订单数据")
# 示例 3: 库存检查
print("\n=== 库存检查 ===")
if price_analysis["products"]:
# 选取前 10 个商品检查库存
product_ids = [p["productId"] for p in price_analysis["products"][:10]]
low_stock = api.check_low_stock(product_ids, threshold=20)
if low_stock:
print(f"发现 {len(low_stock)} 个低库存商品:")
for item in low_stock:
print(f" {item['product_name'][:20]}... (ID: {item['product_id']})")
print(f" 可用库存: {item['available_stock']}, 价格: {item['current_price']} 元")
else:
print("未发现低库存商品")
三、唯品会 API 操作注意事项
- 调用限制与规范
API 调用频率:唯品会开放平台对 API 调用有严格的频率限制,不同 API 有不同的 QPS 限制,通常为 10-100 次 / 分钟
权限管理:大部分 API 需要申请相应权限才能调用,特别是订单和支付相关接口
数据缓存策略:商品基本信息等不常变化的数据建议缓存,减少 API 调用次数
签名规范:严格按照唯品会的签名算法生成签名,确保参数排序和拼接正确
Token 管理:access_token 有效期为 2 小时,需要在过期前及时刷新 - 常见错误及解决方案
错误码 说明 解决方案
10001 系统错误 重试请求,最多 3 次,采用指数退避策略
10002 参数错误 检查参数格式和取值范围是否符合要求
10003 签名错误 重新生成签名,检查参数排序和 appSecret 是否正确
10004 access_token 无效或过期 重新获取 access_token
10005 权限不足 在开放平台申请相应 API 的调用权限
10006 调用频率超限 降低调用频率,实现频率控制
20001 商品不存在 检查商品 ID 是否正确,商品可能已下架 - 性能优化建议
批量操作:优先使用支持批量处理的 API(如 product/batchGet)减少请求次数
按需获取字段:通过 fields 参数指定需要的字段,减少数据传输量
分页优化:合理设置分页大小,避免单次请求过多数据
并发控制:多线程调用时控制并发数,避免触发频率限制
智能重试:针对特定错误码实现智能重试,提高成功率
增量同步:通过时间戳参数只获取增量数据,减少处理量
四、应用场景与扩展建议
典型应用场景
商品监控系统:实时监控商品价格、库存和促销信息
销售分析平台:分析销售趋势、热门商品和用户购买行为
库存预警系统:自动监控库存水平,生成补货提醒
订单管理自动化:自动处理订单、更新状态、同步物流信息
多平台整合:将唯品会数据与其他电商平台数据整合分析
扩展建议
实现实时数据看板:可视化展示关键运营指标,支持实时监控
开发智能推荐系统:基于用户购买历史和商品特性,推荐相关商品
构建价格监控模型:分析价格变动规律,预测未来价格走势
集成供应链系统:将销售数据与库存、采购系统无缝对接
开发异常检测机制:自动识别异常订单、异常价格变动,及时预警
唯品会自定义 API 操作的核心价值在于将平台数据与企业业务流程深度融合,通过自动化和智能化提升运营效率和决策质量。在实际应用中,需特别注意 API 调用规范和性能优化,同时根据业务需求不断迭代自定义功能,以适应快速变化的电商环境。