程序员进阶工程师必备技能之性能、稳定性与安全优化(二)

简介: 教程来源 https://unbgv.cn/ 本文系统介绍稳定性保障核心实践:涵盖SLA指标定义、超时重试与断路器容错、令牌桶限流与服务降级、幂等性设计,以及基于混沌工程的故障演练方法,全面提升系统可靠性与韧性。

二、稳定性保障:让系统更可靠

2.1 稳定性核心指标
image.png
SLA对应允许停机时间:
image.png
2.2 容错设计模式
2.2.1 超时与重试

class ResilientClient:
    """具备容错能力的客户端"""

    def __init__(self, timeout=5, max_retries=3, backoff_factor=2):
        self.timeout = timeout
        self.max_retries = max_retries
        self.backoff_factor = backoff_factor

    async def call_with_retry(self, func, *args, **kwargs):
        """带超时和重试的调用"""
        last_exception = None

        for attempt in range(self.max_retries):
            try:
                # 设置超时
                result = await asyncio.wait_for(
                    func(*args, **kwargs),
                    timeout=self.timeout
                )
                return result

            except asyncio.TimeoutError:
                last_exception = TimeoutError(f"Timeout after {self.timeout}s")
                logging.warning(f"Attempt {attempt + 1} timed out")

            except Exception as e:
                last_exception = e
                logging.warning(f"Attempt {attempt + 1} failed: {e}")

            # 最后一次不等待
            if attempt < self.max_retries - 1:
                wait_time = self.backoff_factor ** attempt
                logging.info(f"Retrying in {wait_time}s...")
                await asyncio.sleep(wait_time)

        raise last_exception

# 使用断路器模式防止雪崩
class CircuitBreaker:
    """断路器:防止级联故障"""

    def __init__(self, failure_threshold=5, timeout=60, half_open_requests=3):
        self.failure_threshold = failure_threshold   # 失败阈值
        self.timeout = timeout                       # 恢复超时
        self.half_open_requests = half_open_requests  # 半开状态请求数

        self.failures = 0
        self.last_failure_time = None
        self.state = "CLOSED"  # CLOSED, OPEN, HALF_OPEN
        self.half_open_successes = 0

    async def call(self, func, *args, **kwargs):
        """带断路器保护的调用"""
        if self.state == "OPEN":
            if time.time() - self.last_failure_time > self.timeout:
                self.state = "HALF_OPEN"
                self.half_open_successes = 0
                logging.info("Circuit breaker moved to HALF_OPEN")
            else:
                raise CircuitBreakerOpenError("Circuit breaker is OPEN")

        try:
            result = await func(*args, **kwargs)

            if self.state == "HALF_OPEN":
                self.half_open_successes += 1
                if self.half_open_successes >= self.half_open_requests:
                    self.state = "CLOSED"
                    self.failures = 0
                    logging.info("Circuit breaker closed (recovered)")

            return result

        except Exception as e:
            self.failures += 1
            self.last_failure_time = time.time()

            if self.state == "CLOSED" and self.failures >= self.failure_threshold:
                self.state = "OPEN"
                logging.error(f"Circuit breaker opened after {self.failures} failures")

            raise e

2.2.2 限流与降级

class RateLimiter:
    """令牌桶限流器"""

    def __init__(self, rate=100, capacity=200):
        self.rate = rate          # 令牌生成速率(每秒)
        self.capacity = capacity  # 桶容量
        self.tokens = capacity    # 当前令牌数
        self.last_refill = time.time()
        self._lock = threading.Lock()

    def acquire(self, tokens=1):
        """获取令牌,成功返回True"""
        with self._lock:
            self._refill()

            if self.tokens >= tokens:
                self.tokens -= tokens
                return True
            return False

    def _refill(self):
        now = time.time()
        elapsed = now - self.last_refill
        new_tokens = elapsed * self.rate
        self.tokens = min(self.capacity, self.tokens + new_tokens)
        self.last_refill = now

# 限流装饰器
def rate_limit(rate=100, capacity=200):
    limiter = RateLimiter(rate, capacity)

    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            if not limiter.acquire():
                raise RateLimitExceededError("Rate limit exceeded")
            return func(*args, **kwargs)
        return wrapper
    return decorator

# 降级策略
class DegradationStrategy:
    """服务降级策略"""

    def __init__(self, fallback_func):
        self.fallback_func = fallback_func
        self.degraded = False
        self.degraded_until = None

    @property
    def is_degraded(self):
        if self.degraded_until and time.time() > self.degraded_until:
            self.degraded = False
            self.degraded_until = None
        return self.degraded

    def degrade(self, duration=60):
        """进入降级模式"""
        self.degraded = True
        self.degraded_until = time.time() + duration
        logging.warning(f"Service degraded for {duration}s")

    def execute(self, primary_func, *args, **kwargs):
        """执行主逻辑或降级逻辑"""
        if self.is_degraded:
            return self.fallback_func(*args, **kwargs)

        try:
            return primary_func(*args, **kwargs)
        except Exception as e:
            logging.error(f"Primary function failed: {e}")
            self.degrade()
            return self.fallback_func(*args, **kwargs)

# 使用示例
def get_recommendations_fallback(user_id):
    """降级方案:返回默认推荐"""
    return DEFAULT_RECOMMENDATIONS

@rate_limit(rate=50, capacity=100)
def get_recommendations(user_id):
    recommendations = recommendation_service.get(user_id)
    return recommendations

# 包装降级
strategy = DegradationStrategy(get_recommendations_fallback)
result = strategy.execute(get_recommendations, user_id)

2.2.3 幂等性设计

class IdempotentHandler:
    """幂等性处理器"""

    def __init__(self, redis_client, ttl=86400):
        self.redis = redis_client
        self.ttl = ttl

    def idempotent(self, key_prefix):
        """幂等性装饰器"""
        def decorator(func):
            @wraps(func)
            def wrapper(*args, **kwargs):
                # 生成唯一请求ID
                request_id = kwargs.get('request_id') or str(uuid.uuid4())
                idempotent_key = f"{key_prefix}:{request_id}"

                # 检查是否已处理
                result = self.redis.get(idempotent_key)
                if result is not None:
                    logging.info(f"Duplicate request: {request_id}")
                    return json.loads(result)

                # 执行处理
                result = func(*args, **kwargs)

                # 缓存结果
                self.redis.setex(
                    idempotent_key,
                    self.ttl,
                    json.dumps(result)
                )

                return result
            return wrapper
        return decorator

# 使用示例
handler = IdempotentHandler(redis_client)

@handler.idempotent("payment")
def process_payment(order_id, amount, request_id=None):
    """处理支付 - 同一request_id只会执行一次"""
    # 扣款逻辑
    return {"status": "success", "transaction_id": str(uuid.uuid4())}

2.3 故障演练与混沌工程

# 混沌实验配置
class ChaosExperiment:
    """混沌实验执行器"""

    def __init__(self):
        self.experiments = []

    def add_latency_injection(self, service, latency_ms=100, probability=0.1):
        """注入延迟"""
        self.experiments.append({
            "type": "latency",
            "service": service,
            "latency_ms": latency_ms,
            "probability": probability
        })

    def add_error_injection(self, service, error_code=500, probability=0.05):
        """注入错误"""
        self.experiments.append({
            "type": "error",
            "service": service,
            "error_code": error_code,
            "probability": probability
        })

    def execute(self):
        """执行混沌实验"""
        for exp in self.experiments:
            asyncio.create_task(self._run_experiment(exp))

    async def _run_experiment(self, exp):
        while True:
            if random.random() < exp["probability"]:
                await self._inject_fault(exp)
            await asyncio.sleep(1)

    async def _inject_fault(self, exp):
        if exp["type"] == "latency":
            # 注入网络延迟
            await asyncio.sleep(exp["latency_ms"] / 1000)
        elif exp["type"] == "error":
            # 注入错误响应
            raise HTTPException(status_code=exp["error_code"])

# 在测试环境运行混沌实验
if os.getenv("CHAOS_ENABLED") == "true":
    chaos = ChaosExperiment()
    chaos.add_latency_injection("database", latency_ms=200, probability=0.05)
    chaos.add_error_injection("payment_service", error_code=503, probability=0.02)
    chaos.execute()

来源:
https://oieaw.cn/

相关文章
|
16小时前
|
运维 监控 中间件
程序员进阶工程师必备技能之中间件深度使用与运维(二)
教程来源 https://bncne.cn/ 本节深入解析数据库中间件架构原理与MyCat实战:涵盖MySQL协议兼容、智能SQL路由、三级连接池;详解schema.xml/rule.xml配置、哈希/时间分片策略;实现读写分离、负载均衡及全链路运维监控与性能优化。
|
16小时前
|
缓存 运维 安全
程序员进阶工程师必备技能之性能、稳定性与安全优化(一)
教程来源 https://qcycj.cn/ 本文聚焦系统三大生命线——性能、稳定性和安全性,揭示其对用户体验与业务存续的决定性影响。通过代码、架构、运维三层优化方法论,系统性提升响应速度、故障抵御力与数据防护力,构建高可用数字底座。
|
人工智能 弹性计算 安全
AMD产品介绍|通用型实例g8a
g8a实例:高性价比X86服务器,搭载最新CIPU架构,提供100G*2网络带宽和eRDMA支持。基于AMD Genoa平台,主频2.7/3.7GHz,专为性能、成本和稳定性需求设计。适用于通用应用、AI推理训练、高清视频处理等场景。实例性能提升25%,性价比提升15%,内置安全芯片,支持可信计算和机密计算。
|
16小时前
|
缓存 安全 程序员
程序员进阶工程师必备技能之性能、稳定性与安全优化(四)
教程来源 https://vbzcj.cn/ 本文详解性能压测与容量规划实战:基于Locust实现登录、浏览、加购、下单全链路压测;提供QPS计算、服务器/数据库/缓存容量公式,并结合双十一场景示例。辅以优化优先级矩阵,强调性能、稳定、安全三位一体的系统工程思维。
|
16小时前
|
SQL 安全 程序员
程序员进阶工程师必备技能之性能、稳定性与安全优化(三)
教程来源 https://xcfsr.cn/ 本节系统梳理Web、认证、数据及基础设施四层安全威胁,详解SQL注入、XSS、JWT攻击等防御方案,涵盖参数化查询、HTML转义、短时效令牌、字段加密、动态脱敏、API签名与速率限制等实战措施,全面提升系统安全性。
|
13天前
|
人工智能 供应链 监控
2026年五款主流ChatBI产品推荐,适合电商、制造多场景及分析功能详解
本文深度解析2026年五大主流ChatBI工具(瓴羊Quick BI、SmartBI、Power BI、Qlik Sense、Tableau)在电商与制造行业的适配能力。重点剖析瓴羊Quick BI“智能小Q”五大AI Agent,覆盖自然语言查询、自动解读、报告生成、看板搭建与异常洞察,并提供分规模、分场景的实用选型指南。(239字)
|
7月前
|
存储 域名解析 弹性计算
阿里云服务器购买流程,域名注册、解析与备案教程参考
无论是构建企业官网、电商平台,还是部署应用程序,购买云服务器,注册域名,备案及域名解析和绑定是用户要操作的上云步骤。对于有的初次在阿里云完成上云的新手用户来说,这些过程并不是很清楚,本文将以图文形式为大家解析阿里云服务器ECS的购买流程,以及实例规格、存储、网络等核心配置选择注意事项,同步介绍域名注册、备案及解析至云服务器的实操步骤,手把手指导您完成线上业务的全流程部署。
|
人工智能 程序员 测试技术
通义灵码 AI 程序员核心功能体验
阿里云通义灵码AI程序员已全面上线,成为全球首个同时支持 VS Code、JetBrains IDEs 开发工具的AI程序员产品。
1762 1
通义灵码 AI 程序员核心功能体验
|
前端开发 Java PHP
开发体育赛事直播系统:实现聊天交友的私聊功能技术实现全方案解析
本文基于体育赛事直播系统,详细介绍了用户间私聊功能的完整实现方案。技术栈涵盖后端(PHP ThinkPHP)、前端(Vue.js)、移动端(Android Java、iOS OC),并结合MySQL数据库与WebSocket+Redis实现实时通信。功能包括一对一私聊、聊天记录显示、未读消息提示、消息免打扰、聊天置顶、删除/清空聊天记录等。文章提供了数据结构设计、接口代码示例及前后端关键实现细节,适合开发者学习参考。
|
机器学习/深度学习 人工智能 供应链
AI智能分析
AI智能分析运用人工智能技术对数据进行深度挖掘和模式识别,助力商业智能、法律分析、医疗健康、股票市场、产品设计和技术研发等领域。通过机器学习和深度学习,AI能优化商业策略、提升诊断精度、辅助投资决策,并解决技术难题,为各行各业提供精准洞察和决策支持。
774 1