二、稳定性保障:让系统更可靠
2.1 稳定性核心指标
SLA对应允许停机时间:
2.2 容错设计模式
2.2.1 超时与重试
class ResilientClient:
"""具备容错能力的客户端"""
def __init__(self, timeout=5, max_retries=3, backoff_factor=2):
self.timeout = timeout
self.max_retries = max_retries
self.backoff_factor = backoff_factor
async def call_with_retry(self, func, *args, **kwargs):
"""带超时和重试的调用"""
last_exception = None
for attempt in range(self.max_retries):
try:
# 设置超时
result = await asyncio.wait_for(
func(*args, **kwargs),
timeout=self.timeout
)
return result
except asyncio.TimeoutError:
last_exception = TimeoutError(f"Timeout after {self.timeout}s")
logging.warning(f"Attempt {attempt + 1} timed out")
except Exception as e:
last_exception = e
logging.warning(f"Attempt {attempt + 1} failed: {e}")
# 最后一次不等待
if attempt < self.max_retries - 1:
wait_time = self.backoff_factor ** attempt
logging.info(f"Retrying in {wait_time}s...")
await asyncio.sleep(wait_time)
raise last_exception
# 使用断路器模式防止雪崩
class CircuitBreaker:
"""断路器:防止级联故障"""
def __init__(self, failure_threshold=5, timeout=60, half_open_requests=3):
self.failure_threshold = failure_threshold # 失败阈值
self.timeout = timeout # 恢复超时
self.half_open_requests = half_open_requests # 半开状态请求数
self.failures = 0
self.last_failure_time = None
self.state = "CLOSED" # CLOSED, OPEN, HALF_OPEN
self.half_open_successes = 0
async def call(self, func, *args, **kwargs):
"""带断路器保护的调用"""
if self.state == "OPEN":
if time.time() - self.last_failure_time > self.timeout:
self.state = "HALF_OPEN"
self.half_open_successes = 0
logging.info("Circuit breaker moved to HALF_OPEN")
else:
raise CircuitBreakerOpenError("Circuit breaker is OPEN")
try:
result = await func(*args, **kwargs)
if self.state == "HALF_OPEN":
self.half_open_successes += 1
if self.half_open_successes >= self.half_open_requests:
self.state = "CLOSED"
self.failures = 0
logging.info("Circuit breaker closed (recovered)")
return result
except Exception as e:
self.failures += 1
self.last_failure_time = time.time()
if self.state == "CLOSED" and self.failures >= self.failure_threshold:
self.state = "OPEN"
logging.error(f"Circuit breaker opened after {self.failures} failures")
raise e
2.2.2 限流与降级
class RateLimiter:
"""令牌桶限流器"""
def __init__(self, rate=100, capacity=200):
self.rate = rate # 令牌生成速率(每秒)
self.capacity = capacity # 桶容量
self.tokens = capacity # 当前令牌数
self.last_refill = time.time()
self._lock = threading.Lock()
def acquire(self, tokens=1):
"""获取令牌,成功返回True"""
with self._lock:
self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
def _refill(self):
now = time.time()
elapsed = now - self.last_refill
new_tokens = elapsed * self.rate
self.tokens = min(self.capacity, self.tokens + new_tokens)
self.last_refill = now
# 限流装饰器
def rate_limit(rate=100, capacity=200):
limiter = RateLimiter(rate, capacity)
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
if not limiter.acquire():
raise RateLimitExceededError("Rate limit exceeded")
return func(*args, **kwargs)
return wrapper
return decorator
# 降级策略
class DegradationStrategy:
"""服务降级策略"""
def __init__(self, fallback_func):
self.fallback_func = fallback_func
self.degraded = False
self.degraded_until = None
@property
def is_degraded(self):
if self.degraded_until and time.time() > self.degraded_until:
self.degraded = False
self.degraded_until = None
return self.degraded
def degrade(self, duration=60):
"""进入降级模式"""
self.degraded = True
self.degraded_until = time.time() + duration
logging.warning(f"Service degraded for {duration}s")
def execute(self, primary_func, *args, **kwargs):
"""执行主逻辑或降级逻辑"""
if self.is_degraded:
return self.fallback_func(*args, **kwargs)
try:
return primary_func(*args, **kwargs)
except Exception as e:
logging.error(f"Primary function failed: {e}")
self.degrade()
return self.fallback_func(*args, **kwargs)
# 使用示例
def get_recommendations_fallback(user_id):
"""降级方案:返回默认推荐"""
return DEFAULT_RECOMMENDATIONS
@rate_limit(rate=50, capacity=100)
def get_recommendations(user_id):
recommendations = recommendation_service.get(user_id)
return recommendations
# 包装降级
strategy = DegradationStrategy(get_recommendations_fallback)
result = strategy.execute(get_recommendations, user_id)
2.2.3 幂等性设计
class IdempotentHandler:
"""幂等性处理器"""
def __init__(self, redis_client, ttl=86400):
self.redis = redis_client
self.ttl = ttl
def idempotent(self, key_prefix):
"""幂等性装饰器"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# 生成唯一请求ID
request_id = kwargs.get('request_id') or str(uuid.uuid4())
idempotent_key = f"{key_prefix}:{request_id}"
# 检查是否已处理
result = self.redis.get(idempotent_key)
if result is not None:
logging.info(f"Duplicate request: {request_id}")
return json.loads(result)
# 执行处理
result = func(*args, **kwargs)
# 缓存结果
self.redis.setex(
idempotent_key,
self.ttl,
json.dumps(result)
)
return result
return wrapper
return decorator
# 使用示例
handler = IdempotentHandler(redis_client)
@handler.idempotent("payment")
def process_payment(order_id, amount, request_id=None):
"""处理支付 - 同一request_id只会执行一次"""
# 扣款逻辑
return {"status": "success", "transaction_id": str(uuid.uuid4())}
2.3 故障演练与混沌工程
# 混沌实验配置
class ChaosExperiment:
"""混沌实验执行器"""
def __init__(self):
self.experiments = []
def add_latency_injection(self, service, latency_ms=100, probability=0.1):
"""注入延迟"""
self.experiments.append({
"type": "latency",
"service": service,
"latency_ms": latency_ms,
"probability": probability
})
def add_error_injection(self, service, error_code=500, probability=0.05):
"""注入错误"""
self.experiments.append({
"type": "error",
"service": service,
"error_code": error_code,
"probability": probability
})
def execute(self):
"""执行混沌实验"""
for exp in self.experiments:
asyncio.create_task(self._run_experiment(exp))
async def _run_experiment(self, exp):
while True:
if random.random() < exp["probability"]:
await self._inject_fault(exp)
await asyncio.sleep(1)
async def _inject_fault(self, exp):
if exp["type"] == "latency":
# 注入网络延迟
await asyncio.sleep(exp["latency_ms"] / 1000)
elif exp["type"] == "error":
# 注入错误响应
raise HTTPException(status_code=exp["error_code"])
# 在测试环境运行混沌实验
if os.getenv("CHAOS_ENABLED") == "true":
chaos = ChaosExperiment()
chaos.add_latency_injection("database", latency_ms=200, probability=0.05)
chaos.add_error_injection("payment_service", error_code=503, probability=0.02)
chaos.execute()