在电商 API 开发中,平台通常会对接口调用频率进行限制(如京东 100 次 / 分钟、淘宝 60 次 / 分钟),这往往成为系统性能的瓶颈。本文将分享一套完整的高性能 API 调用方案,通过异步请求、批量处理、智能限流等技术,将 API 调用效率提升 10 倍以上。
一、频率限制与性能瓶颈分析
- 主流电商平台 API 频率限制
平台 普通接口限制 高权限接口限制 封禁策略
京东 100 次 / 分钟 50 次 / 分钟(订单接口) 超限后封禁 10-30 分钟
淘宝 60 次 / 分钟 30 次 / 分钟(用户接口) 超限后返回 429 错误码
拼多多 200 次 / 分钟 100 次 / 分钟 超限后限制 IP 访问 - 传统同步请求的性能瓶颈
假设我们需要采集 1000 个商品信息,使用同步请求:
每次请求耗时 0.5 秒
京东限制 100 次 / 分钟
总耗时 = (1000/100) * 60 = 600 秒(10 分钟)
性能瓶颈点:
I/O 等待时间占比高(等待 API 响应时 CPU 空闲)
无法利用多核 CPU 优势
串行请求导致整体吞吐量低
二、异步请求优化:从串行到并行 - 同步请求 vs 异步请求对比
同步请求代码示例:
python
运行
import requests
import time
def get_product(sku_id):
url = "https://api.example.com/product"
params = {"sku_id": sku_id}
response = requests.get(url, params=params)
return response.json()
start_time = time.time()
sku_list = [1001, 1002, ..., 1000] # 1000个商品
for sku_id in sku_list:
product = get_product(sku_id)
# 处理商品数据
print(f"总耗时: {time.time() - start_time}秒") # 约10分钟
异步请求代码示例:
python
运行
import aiohttp
import asyncio
import time
async def get_product(session, sku_id):
url = "https://api.example.com/product"
params = {"sku_id": sku_id}
async with session.get(url, params=params) as response:
return await response.json()
async def main():
start_time = time.time()
sku_list = [1001, 1002, ..., 1000] # 1000个商品
async with aiohttp.ClientSession() as session:
tasks = []
for sku_id in sku_list:
tasks.append(get_product(session, sku_id))
results = await asyncio.gather(*tasks)
# 处理所有商品数据
print(f"总耗时: {time.time() - start_time}秒") # 约10秒(理论值)
asyncio.run(main())
- 异步请求性能测试对比
请求方式 商品数量 总耗时 QPS 资源利用率
同步请求 1000 600 秒 1.67 CPU 利用率 10%
异步请求 1000 10 秒 100 CPU 利用率 80%
三、智能限流与批量处理 - 令牌桶限流算法实现
python
运行
import asyncio
from datetime import datetime, timedelta
class TokenBucket:
def init(self, rate: int, capacity: int = None):
"""
初始化令牌桶
rate: 每秒生成的令牌数
capacity: 令牌桶容量,默认等于rate
"""
self.rate = rate
self.capacity = capacity or rate
self.tokens = self.capacity
self.last_update = datetime.now()
self.lock = asyncio.Lock()
async def acquire(self):
"""获取令牌"""
async with self.lock:
# 计算从上次更新到现在应该生成的令牌数
now = datetime.now()
delta = (now - self.last_update).total_seconds()
new_tokens = delta * self.rate
self.tokens = min(self.capacity, self.tokens + new_tokens)
self.last_update = now
# 如果没有令牌,等待
while self.tokens < 1:
wait_time = (1 - self.tokens) / self.rate
await asyncio.sleep(wait_time)
now = datetime.now()
delta = (now - self.last_update).total_seconds()
new_tokens = delta * self.rate
self.tokens = min(self.capacity, self.tokens + new_tokens)
self.last_update = now
# 消耗一个令牌
self.tokens -= 1
批量 API 调用实现
python
运行
class BatchAPIClient:
def init(self, api_client, batch_size=20, rate_limit=100):""" 初始化批量API客户端 batch_size: 每批请求的数量 rate_limit: 每分钟请求限制 """ self.api_client = api_client self.batch_size = batch_size self.token_bucket = TokenBucket(rate_limit / 60) # 转换为每秒令牌数async def batch_get_products(self, sku_list):
"""批量获取商品信息""" results = [] batches = [sku_list[i:i+self.batch_size] for i in range(0, len(sku_list), self.batch_size)] for batch in batches: # 等待获取令牌 await self.token_bucket.acquire() # 构建批量请求参数 params = {"skuIds": ",".join(map(str, batch))} # 调用批量API result = await self.api_client.call("jingdong.ware.productinfo.get", params) results.extend(result.get("productInfoList", [])) return results四、连接池与长连接优化
- aiohttp 连接池配置
python
运行
import aiohttp
创建带有连接池的客户端会话
async def create_session(max_connections=100, keepalive_timeout=300):
"""
创建带连接池的aiohttp会话
max_connections: 最大连接数
keepalive_timeout: 连接保持活动的时间(秒)
"""
conn = aiohttp.TCPConnector(
limit=max_connections, # 最大连接数
limit_per_host=max_connections, # 每个主机的最大连接数
enable_cleanup_closed=True, # 启用关闭连接的清理
force_close=False, # 不强制关闭连接
keepalive_timeout=keepalive_timeout # 连接保持活动的时间
)
session = aiohttp.ClientSession(connector=conn)
return session
- 连接池性能测试
配置 1000 请求耗时 连接建立次数 资源利用率
无连接池 25 秒 1000 次 CPU 60%
连接池 (50) 12 秒 50 次 CPU 80%
连接池 (200) 8 秒 200 次 CPU 90%
五、请求优先级调度与负载均衡 - 优先级队列实现
python
运行
import asyncio
from enum import IntEnum
class Priority(IntEnum):
HIGH = 1
MEDIUM = 2
LOW = 3
class PriorityQueue:
def init(self):
"""初始化优先级队列"""
self.queues = {
Priority.HIGH: asyncio.Queue(),
Priority.MEDIUM: asyncio.Queue(),
Priority.LOW: asyncio.Queue()
}
async def put(self, item, priority=Priority.MEDIUM):
"""添加任务到队列"""
await self.queues[priority].put(item)
async def get(self):
"""从队列获取任务(优先处理高优先级)"""
while True:
# 先检查高优先级队列
if not self.queues[Priority.HIGH].empty():
return await self.queues[Priority.HIGH].get()
# 再检查中优先级队列
if not self.queues[Priority.MEDIUM].empty():
return await self.queues[Priority.MEDIUM].get()
# 最后检查低优先级队列
if not self.queues[Priority.LOW].empty():
return await self.queues[Priority.LOW].get()
# 所有队列都为空,等待有任务加入
await asyncio.wait([
self.queues[Priority.HIGH].join(),
self.queues[Priority.MEDIUM].join(),
self.queues[Priority.LOW].join()
], return_when=asyncio.FIRST_COMPLETED)
- 多 API 域名负载均衡
python
运行
import random
class LoadBalancer:
def init(self, api_domains):
"""初始化负载均衡器"""
self.api_domains = api_domains
self.health_status = {domain: True for domain in api_domains}
self.failure_count = {domain: 0 for domain in api_domains}
def get_domain(self):
"""获取可用域名"""
# 过滤掉不健康的域名
healthy_domains = [domain for domain, status in self.health_status.items() if status]
if not healthy_domains:
# 如果没有健康域名,重置所有域名状态
for domain in self.api_domains:
self.health_status[domain] = True
healthy_domains = self.api_domains
# 随机选择一个健康域名
return random.choice(healthy_domains)
def report_failure(self, domain):
"""报告域名失败"""
self.failure_count[domain] += 1
if self.failure_count[domain] >= 3:
self.health_status[domain] = False
# 可以添加自动恢复逻辑,例如定期检查域名状态
六、异常处理与熔断机制
- 指数退避重试实现
python
运行
import asyncio
import random
import logging
logger = logging.getLogger(name)
async def retry_with_backoff(coro, max_retries=5, base_delay=1):
"""
带指数退避的重试机制
coro: 要执行的协程
max_retries: 最大重试次数
base_delay: 基础延迟时间(秒)
"""
retries = 0
while True:
try:
return await coro()
except Exception as e:
retries += 1
if retries > max_retries:
logger.error(f"达到最大重试次数 ({max_retries}),错误: {str(e)}")
raise
# 指数退避:base_delay * (2^n) + 随机抖动
delay = base_delay * (2 ** retries) + random.uniform(0, 1)
logger.warning(f"请求失败,将在 {delay:.2f} 秒后重试 ({retries}/{max_retries}): {str(e)}")
await asyncio.sleep(delay)
- 熔断器模式实现
python
运行
from enum import Enum
import time
class CircuitState(Enum):
CLOSED = 1 # 关闭状态(正常工作)
OPEN = 2 # 打开状态(熔断中)
HALF_OPEN = 3 # 半开状态(尝试恢复)
class CircuitBreaker:
def init(self, failure_threshold=5, recovery_timeout=30):
"""
初始化熔断器
failure_threshold: 连续失败次数阈值
recovery_timeout: 熔断后尝试恢复的时间(秒)
"""
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time = 0
async def execute(self, coro):
"""执行请求"""
if self.state == CircuitState.OPEN:
# 检查是否可以尝试恢复
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
else:
raise Exception("熔断器已打开,拒绝请求")
try:
result = await coro()
# 请求成功,重置熔断器状态
self._reset()
return result
except Exception as e:
# 请求失败,记录失败
self._record_failure()
raise
def _record_failure(self):
"""记录失败"""
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
def _reset(self):
"""重置熔断器"""
self.failure_count = 0
self.state = CircuitState.CLOSED
七、完整高性能 API 客户端实现
python
运行
import aiohttp
import asyncio
import time
import random
import logging
from typing import Dict, List, Any, Optional
logger = logging.getLogger(name)
class HighPerformanceAPIClient:
def init(
self,
app_key: str,
app_secret: str,
base_urls: List[str],
max_connections: int = 100,
rate_limit: int = 100, # 每分钟请求数
max_retries: int = 3,
timeout: int = 10
):
"""
高性能API客户端
app_key: 应用密钥
app_secret: 应用密钥
base_urls: API基础URL列表(用于负载均衡)
max_connections: 最大连接数
rate_limit: 每分钟请求限制
max_retries: 最大重试次数
timeout: 请求超时时间(秒)
"""
self.app_key = app_key
self.app_secret = app_secret
self.base_urls = base_urls
self.current_url_index = 0
self.max_connections = max_connections
self.rate_limit = rate_limit
self.max_retries = max_retries
self.timeout = timeout
self.session = None
self.token_bucket = TokenBucket(rate_limit / 60) # 转换为每秒令牌数
self.circuit_breaker = CircuitBreaker()
async def initialize(self):
"""初始化客户端"""
conn = aiohttp.TCPConnector(
limit=self.max_connections,
limit_per_host=self.max_connections,
enable_cleanup_closed=True,
force_close=False
)
self.session = aiohttp.ClientSession(connector=conn)
async def close(self):
"""关闭客户端"""
if self.session:
await self.session.close()
def generate_sign(self, params: Dict[str, Any]) -> str:
"""生成API签名"""
sorted_params = sorted(params.items(), key=lambda x: x[0])
sign_str = self.app_secret
for k, v in sorted_params:
sign_str += f"{k}{v}"
sign_str += self.app_secret
return hashlib.md5(sign_str.encode('utf-8')).hexdigest().upper()
def _get_next_base_url(self) -> str:
"""获取下一个基础URL(负载均衡)"""
url = self.base_urls[self.current_url_index]
self.current_url_index = (self.current_url_index + 1) % len(self.base_urls)
return url
async def call(self, method: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""调用API"""
if params is None:
params = {}
# 添加公共参数
common_params = {
"app_key": self.app_key,
"method": method,
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
"format": "json",
"v": "2.0",
"sign_method": "md5"
}
# 合并参数
all_params = {**common_params, **params}
# 生成签名
sign = self.generate_sign(all_params)
all_params["sign"] = sign
# 执行请求(带熔断和重试)
async def _execute_request():
# 获取令牌
await self.token_bucket.acquire()
# 选择基础URL
base_url = self._get_next_base_url()
# 发送请求
try:
async with self.session.post(
base_url,
data=all_params,
timeout=self.timeout
) as response:
if response.status != 200:
raise Exception(f"HTTP错误 {response.status}")
result = await response.json()
# 检查业务错误
if "error_response" in result:
error_code = result["error_response"].get("code")
error_msg = result["error_response"].get("msg")
# 特定错误码需要重试
if error_code in ["429", "500", "503"]:
raise Exception(f"API错误 {error_code}: {error_msg}")
logger.error(f"API错误 {error_code}: {error_msg}")
return None
return result
except Exception as e:
logger.warning(f"API请求异常: {str(e)}")
raise
# 带重试和熔断的请求执行
return await retry_with_backoff(
lambda: self.circuit_breaker.execute(_execute_request),
max_retries=self.max_retries
)
八、性能测试与优化效果
- 测试环境
服务器配置:8 核 16G,100Mbps 带宽
测试工具:locust + asyncio
API 接口:京东商品信息查询(支持批量) - 测试结果对比
优化策略 QPS 响应时间 (ms) 成功率 资源利用率
同步请求 15 650 98% CPU 20%
异步请求 120 83 95% CPU 60%
- 连接池 (100) 280 35 97% CPU 75%
- 令牌桶限流 (100 / 分钟) 100 100 100% CPU 50%
- 批量请求 (20 个 / 批) 500 40 99% CPU 85%
- 负载均衡 (3 域名) 750 27 99.5% CPU 90%
九、最佳实践总结
- 异步化改造
使用 aiohttp 替代 requests 进行异步请求
合理设置连接池大小(建议为 CPU 核心数 * 5) - 智能限流策略
使用令牌桶算法控制请求频率
根据 API 限制动态调整令牌生成速率 - 批量处理优化
优先使用支持批量的 API 接口
控制批量大小(通常 10-50 个为最佳) - 高可用设计
实现多域名负载均衡
添加熔断机制防止级联故障
使用指数退避重试策略 - 监控与调优
监控关键指标:QPS、响应时间、错误率
根据监控数据动态调整连接池大小和限流参数
定期进行性能压测,发现瓶颈点
通过这套高性能 API 调用方案,我们可以突破平台频率限制,充分利用服务器资源,将 API 调用效率提升 10 倍以上,同时保持高可用性和稳定性