在商用场景下,针对微店商品详情 API(如 weidian.item.get)的限流问题,需从并发控制和缓存策略两方面设计解决方案,以平衡性能、成本与合规性。以下是具体设计指南:
一、并发控制:分层限流与任务调度
- 店铺级限流
- QPS 配额分配:微店对单店铺授权的 API 调用有 QPS 上限(通常 50-200,依店铺等级调整),跨店铺调用共享全局 QPS(如 appkey 总 QPS=1000)。需按店铺等级分配配额,核心店铺优先保障,普通店铺动态调整。
- 漏桶算法本地限流:在执行节点(如微服务集群)按店铺 ID 粒度实现漏桶限流。例如,店铺 A 的节点本地 QPS≤50,避免单节点超店铺配额。
- 分页请求控制:调用商品列表 API(如
weidian.items.list)获取商品总数后,按页码生成分页任务(如shop_id=123&page=1)。每页请求携带page_size=20,减少单次请求数据量。
- 分布式任务调度
- 双层队列设计:
- 店铺队列:用 RabbitMQ 延迟队列存储待处理店铺,按“店铺授权有效期”排序,优先处理即将过期的 token 对应店铺。
- 商品队列:每个店铺的商品任务单独分片(如按店铺 ID 哈希分区),避免不同店铺商品任务混流。
- 动态调度策略:
- 店铺调度:从店铺队列消费店铺信息,刷新即将过期的 access_token(提前 30 分钟触发),生成全量商品分页任务。
- 商品调度:按店铺分配 QPS 配额,协调节点资源。例如,店铺 A 分配 50 QPS,店铺 B 分配 30 QPS,确保全局 QPS 不超限。
- 异常处理与重试机制
- 429 错误(店铺 QPS 超限):延迟重试(延迟时间 = (当前重试次数+1)*10s,最多 5 次),并临时下调该店铺配额(如降 10%)。
- 401 错误(token 失效):触发店铺调度模块刷新 token,任务回退到店铺队列头部。
- 5xx 错误(微店服务错误):立即重试(最多 3 次),失败后标记为“待人工处理”。
二、缓存策略:多级缓存与数据一致性
- 热点商品缓存
- Redis 集群缓存:存储近 24 小时有更新的商品,key 为
item:{item_id},过期时间 2 小时(微店商品更新较频繁)。 - 本地缓存补充:在执行节点使用 Caffeine 等本地缓存,存储高频访问商品,减少 Redis 查询压力。
- 全量数据持久化
- MySQL 分库分表:按
item_id哈希分片存储全量商品,字段含shop_id、title、price、stock、update_time等。 - Elasticsearch 索引:建立索引支持按店铺、分类、价格区间检索,提升查询效率。
- 缓存更新机制
- 增量标记:每次全量请求后,对比商品的
update_time,标记“新增/修改/下架”状态,便于业务系统同步。 - 事件驱动更新:监听微店系统的通知或事件(如商品更新、下架),实时更新缓存数据。
三、监控与校验:保障数据完整性
- 监控指标
- 店铺维度:任务完成率(已完成页数/总页数)、失败页码分布、token 刷新成功率。
- 商品维度:单店铺商品总数(API 返回)与实际存储数的差值、重复商品 ID 数。
- 系统维度:节点并发数、代理 IP 可用率、队列堆积量(单店铺商品队列堆积超 100 页即告警)。
- 全量校验
- 每日抽检:随机抽取 10% 的店铺,重新调用其商品列表 API,对比总页数与实际完成页数,差异超 5% 则触发补爬。
- 下架商品校验:对 API 返回
is_deleted=true的商品,校验是否在存储中标记,避免垃圾数据堆积。
四、优化实践:提升效率与稳定性
- 批量请求优化
- 微店商品详情接口支持批量查询(如
weidian.item.get一次查 10 个商品 ID),比单条调用节省 90% 请求量。 - 示例代码(Python):
python import requests def batch_get_items(access_token, item_ids): url = "https://api.weidian.com/micro/item_get" params = { "accesstoken": access_token, "num_iids": ",".join(map(str, item_ids)), "fields": "item_id,title,price,stock,images" # 精简字段 } response = requests.get(url, params=params) return response.json()
- 异步与连接池
- 异步请求:使用
asyncio+aiohttp实现并发,示例:
python import asyncio import aiohttp async def fetch_items(session, item_id, access_token): url = f"https://api.weidian.com/micro/item_get?accesstoken={access_token}&num_iid={item_id}" async with session.get(url) as response: return await response.json() async def main(): access_token = "your_token" item_ids = [123, 456, 789] async with aiohttp.ClientSession() as session: tasks = [fetch_items(session, id, access_token) for id in item_ids] results = await asyncio.gather(*tasks) for item in results: print(item) asyncio.run(main())
- 连接池:使用
requests.Session()复用连接,减少建立/关闭开销。
- 分页断点续传
- 记录每个店铺的已完成页码(Redis key=
shop:{shop_id}:last_page),宕机后从last_page+1继续,避免重复请求。